8. 聚合操作


本文仅做学习总结,如有侵权立删

[TOC]

一、分组类型

!!!注意null值,如果不过滤空值,则可能会得到不正确的结果。尤其是grouping set、rollup和cube!

  • group by:指定一个或多个key和一个或多个聚合函数,对列进行转换操作

  • window:跟group by类似功能, 但能保留当前行

  • grouping set:SQL中的用法
  • rollup:跟groupby类似功能,并会针对指定的多个key进行分级分组汇总。

  • cube:跟rollup相同功能。

二、聚合函数

  • count:统计个数
1
2
3
4
5
# 可用expr表达式来处理聚合函数
df.groupBy("a").agg(
count("aa").alias("aa"),
expr("count(aa)")
)
  • countDistinct:去重统计

  • approx_count_distinct:近似统计个数, pyspark.sql.functions.approx_count_distinct(col, rsd=None)

    • 允许的最大相对标准偏差(默认 = 0.05)。对于 rsd count_distinct() 效率更高
    • 对于大数据即,某种精度的近似值可以接受,但比countDistinct()耗时更少,数据越大,效率提升越明显。
    1
    2
    >>> df.agg(approx_count_distinct(df.age).alias('distinct_ages')).collect()
    [Row(distinct_ages=2)]
  • first和last:

  • min和max

  • sum

  • sumDistinct

  • avg

  • variance和stddev:方差和标准差

    • var_samp:样本方差
    • var_pop:总体方差
  • skewness和kurtosis:偏度和峰度

  • cov和corr:协方差和相关性

    • covar_samp:样本协方差
    • covar_pop:总体协方差
  • collect_list和collect_set:聚合成一个list或者set

1
df.agg(F.collect_set("a"), F.collect_list("a"))
  • 用户自定义聚合函数:UDAF:仅在Scala和Java中使用

    • UDF:

      UDF(User-defined functions)用户自定义函数,简单说就是输入一行输出一行的自定义算子。
      是大多数 SQL 环境的关键特性,用于扩展系统的内置功能。(一对一)

    • UDAF

      UDAF(User Defined Aggregate Function),即用户定义的聚合函数,聚合函数和普通函数的区别是什么呢,普通函数是接受一行输入产生一个输出,聚合函数是接受一组(一般是多行)输入然后产生一个输出,即将一组的值想办法聚合一下。(多对一)

      UDAF可以跟group by一起使用,也可以不跟group by一起使用,这个其实比较好理解,联想到mysql中的max、min等函数,可以:
      select max(foo) from foobar group by bar;
      表示根据bar字段分组,然后求每个分组的最大值,这时候的分组有很多个,使用这个函数对每个分组进行处理,也可以:
      select max(foo) from foobar;
      这种情况可以将整张表看做是一个分组,然后在这个分组(实际上就是一整张表)中求最大值。所以聚合函数实际上是对分组做处理,而不关心分组中记录的具体数量。

    • UDTF

      UDTF(User-Defined Table-Generating Functions),用户自定义生成函数。它就是输入一行输出多行的自定义算子,可输出多行多列,又被称为 “表生成函数”。(一对多)

三、例子

创建数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from pyspark.sql import functions as F
data = [
("James", "Sales", 3000, '2020'),
("Michael", "Sales", 4600, '2020'),
("Robert", "Sales", 4100, '2020'),
("Maria", "Finance", 3000, '2020'),
("James", "Sales", 3000, '2019'),
("Scott", "Finance", 3300, '2020'),
("Jen", "Finance", 3900, '2020'),
("Jeff", "Marketing", 3000, '2020'),
("Kumar", "Marketing", 2000, '2020'),
("Saif", "Sales", 4100, '2020')
]
schema = ["employee_name", "department", "salary", 'year']

df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df.show(truncate=False)

>>> output Data:
>>>
root
|-- employee_name: string (nullable = true)
|-- department: string (nullable = true)
|-- salary: long (nullable = true)
|-- year: string (nullable = true)

+-------------+----------+------+----+
|employee_name|department|salary|year|
+-------------+----------+------+----+
|James |Sales |3000 |2020|
|Michael |Sales |4600 |2020|
|Robert |Sales |4100 |2020|
|Maria |Finance |3000 |2020|
|James |Sales |3000 |2019|
|Scott |Finance |3300 |2020|
|Jen |Finance |3900 |2020|
|Jeff |Marketing |3000 |2020|
|Kumar |Marketing |2000 |2020|
|Saif |Sales |4100 |2020|
+-------------+----------+------+----+

1、group by:分组

1
2
3
4
# 按照department, year计算工资之和。
df.groupBy('department', 'year').agg(
F.sum('salary').alias('salary')
).orderBy('department', 'year').show()
1
2
3
4
5
6
7
8
9
10
>>> output Data:
>>>
+----------+----+------+
|department|year|salary|
+----------+----+------+
| Finance|2020| 10200|
| Marketing|2020| 5000|
| Sales|2019| 3000|
| Sales|2020| 15800|
+----------+----+------+
  • pivot:透视转换

使用pivot函数进行透视,透视过程中可以提供第二个参数来明确指定使用哪些数据项
注意:pivot只能跟在groupby之后

利用pivot实现行转列
pivot的第一个参数指定原数据列,第二个参数指定要新生成的数据列

2. rollup:分级分组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
'''
- 先按照 `department`、`employee_name`、`year`分组;
- 然后按照`department`、`employee_name`分组;
- 然后再按照 `department` 分组;
- 最后进行全表分组。
- 后面接聚合函数,此处使用的是`sum`。
'''

df.rollup('department', 'employee_name', 'year').agg(
F.sum('salary').alias('salary')
).orderBy('department', 'employee_name', 'year').show()
>>> output Data:
>>>
+----------+-------------+----+------+
|department|employee_name|year|salary|
+----------+-------------+----+------+
| null| null|null| 34000|
| Finance| null|null| 10200|
| Finance| Jen|null| 3900|
| Finance| Jen|2020| 3900|
| Finance| Maria|null| 3000|
| Finance| Maria|2020| 3000|
| Finance| Scott|null| 3300|
| Finance| Scott|2020| 3300|
| Marketing| null|null| 5000|
| Marketing| Jeff|null| 3000|
| Marketing| Jeff|2020| 3000|
| Marketing| Kumar|null| 2000|
| Marketing| Kumar|2020| 2000|
| Sales| null|null| 18800|
| Sales| James|null| 6000|
| Sales| James|2019| 3000|
| Sales| James|2020| 3000|
| Sales| Michael|null| 4600|
| Sales| Michael|2020| 4600|
| Sales| Robert|null| 4100|
+----------+-------------+----+------+
only showing top 20 rows

3. cube:分级分组,比rollup组合的维度更全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
'''
1. cube` 先按照`department、employee_name、year`分组;
2. 然后按照`(department, employee_name)`、`(department, year)`、`(year, employee_name)`分组;
3. 然后按照`department`、`employee_name`、`year`分别分组;
4. 最后进行全表分组。
'''
df.cube('department', 'employee_name', 'year').agg(
F.sum('salary').alias('salary')
).orderBy('department', 'employee_name', 'year').show()
>>> output Data:
>>>
+----------+-------------+----+------+
|department|employee_name|year|salary|
+----------+-------------+----+------+
| null| null|null| 34000|
| null| null|2019| 3000|
| null| null|2020| 31000|
| null| James|null| 6000|
| null| James|2019| 3000|
| null| James|2020| 3000|
| null| Jeff|null| 3000|
| null| Jeff|2020| 3000|
| null| Jen|null| 3900|
| null| Jen|2020| 3900|
| null| Kumar|null| 2000|
| null| Kumar|2020| 2000|
| null| Maria|null| 3000|
| null| Maria|2020| 3000|
| null| Michael|null| 4600|
| null| Michael|2020| 4600|
| null| Robert|null| 4100|
| null| Robert|2020| 4100|
| null| Saif|null| 4100|
| null| Saif|2020| 4100|
+----------+-------------+----+------+
only showing top 20 rows

3.1 grouping

指示 GROUP BY 列表中的指定列是否为空,在结果集中返回 1 表示空或 0 表示未空。

grouping(xx) = 0:表示维度出现在组合里面

grouping(xx) = 1: 表示维度不出现在组合里面

CUBE 操作所生成的空值带来一个问题:如何区分 CUBE 操作所生成的 NULL 值和从实际数据中返回的 NULL 值?这个问题可用 GROUPING 函数解决。如果列中的值来自事实数据,则 GROUPING 函数返回 0;如果列中的值是 CUBE 操作所生成的 NULL,则返回 1。在 CUBE 操作中,所生成的 NULL 代表全体值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
df.cube("department").agg(
F.grouping("department").alias('department'),
F.sum("salary").alias('salary')
).orderBy("salary").show()
>>> output Data:
>>>
+----------+----------+------+
|department|department|salary|
+----------+----------+------+
| Marketing| 0| 5000|
| Finance| 0| 10200|
| Sales| 0| 18800|
| null| 1| 34000|
+----------+----------+------+

3.2 grouping_id: 帮助轻松找到自己想要的信息。

返回分组级别: (分組(c1) << (n-1)) + (分組(c2) << (n-2)) + … + 分組(cn)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
df.cube('department', 'employee_name', 'year').agg(
F.grouping_id().alias('group_level'),
F.sum('salary').alias('salary')
).orderBy(F.desc('group_level')).show()
>>> output Data:
>>>
+----------+-------------+----+-----------+------+
|department|employee_name|year|group_level|salary|
+----------+-------------+----+-----------+------+
| null| null|null| 7| 34000|
| null| null|2020| 6| 31000|
| null| null|2019| 6| 3000|
| null| Jeff|null| 5| 3000|
| null| Michael|null| 5| 4600|
| null| James|null| 5| 6000|
| null| Kumar|null| 5| 2000|
| null| Maria|null| 5| 3000|
| null| Saif|null| 5| 4100|
| null| Robert|null| 5| 4100|
| null| Scott|null| 5| 3300|
| null| Jen|null| 5| 3900|
| null| Michael|2020| 4| 4600|
| null| Robert|2020| 4| 4100|
| null| Kumar|2020| 4| 2000|
| null| James|2019| 4| 3000|
| null| Saif|2020| 4| 4100|
| null| James|2020| 4| 3000|
| null| Maria|2020| 4| 3000|
| null| Scott|2020| 4| 3300|
+----------+-------------+----+-----------+------+
only showing top 20 rows

4、grouping set:分组集,跨多个组的聚合操作,仅SQL

希望获得所有用户的各种股票的数量

sql:

1
2
3
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY CustomerId, stockCode
ORDER BY CustomerId DESC, stockCode DESC

grouping set也可以实现完全相同的操作:

1
2
3
4
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY CustomerId, stockCode
GROUPING SETS ((CustomerId, stockCode))
ORDER BY CustomerId DESC, stockCode DESC

如果想不区分客户和股票来统计股票总数

1
2
3
4
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY CustomerId, stockCode
GROUPING SETS ((CustomerId, stockCode), ())
ORDER BY CustomerId DESC, stockCode DESC

四、窗口函数

1. 概念:

能返回整个dataframe,也能进行聚合运算。

对于一个数据集,

  • map 是对每行进行操作,为每行得到一个结果;

  • reduce 则是对多行进行操作,得到一个结果;

  • window 函数则是对多行进行操作,得到多个结果(每行一个)。

窗口函数使用时由“窗口函数”和over从句组成;

其中,over从句分为三部分:

  • 分组(partition by)、
  • 排序(order by)、
  • frame选取(rangeBetween 和 rowsBetween)。

Window函数分类为三种:

  • 排名函数 ranking functions包括:
    • row_number():连续不重复。
    • rank():连续重复
    • dense_rank():重复不连续
    • percent_rank():对dense_rank归一化
    • ntile():n等份
  • 解析函数 analytic functions包括:
    • cume_dist():对rank归一化
    • lag()
    • lead()
  • 聚合函数 aggregate functions包括:
    • sum()
    • first()
    • last()
    • max()
    • min()
    • mean()
    • stddev()

例子:找到每年当中最冷那一天的温度 / 最冷那一天的日期。

  • 窗口函数的过程
  1. 根据某个条件对数据进行分组,PartitionBy
  2. 根据需求计算聚合函数
  3. 将计算结果Join回一个大dataframe
1
2
3
4
5
from pyspark.sql.window import Window
each_year = Window.partitionBy("year")
gsod.withColumn('min_temp',F.min("temp").over(each_year))\
.where("temp=min_temp")\
.select("year", "month", "day")

2.三种函数的例子

(1)创建一个 PySpark DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from pyspark.sql.window import Window
import pyspark.sql.functions as F
employee_salary = [
("Ali", "Sales", 8000),
("Bob", "Sales", 7000),
("Cindy", "Sales", 7500),
("Davd", "Finance", 10000),
("Elena", "Sales", 8000),
("Fancy", "Finance", 12000),
("George", "Finance", 11000),
("Haffman", "Marketing", 7000),
("Ilaja", "Marketing", 8000),
("Joey", "Sales", 9000)]

columns= ["name", "department", "salary"]
df = spark.createDataFrame(data = employee_salary, schema = columns)
df.show(truncate=False)
>>> output Data:
>>>
+-------+----------+------+
|name |department|salary|
+-------+----------+------+
|Ali |Sales |8000 |
|Bob |Sales |7000 |
|Cindy |Sales |7500 |
|Davd |Finance |10000 |
|Elena |Sales |8000 |
|Fancy |Finance |12000 |
|George |Finance |11000 |
|Haffman|Marketing |7000 |
|Ilaja |Marketing |8000 |
|Joey |Sales |9000 |
+-------+----------+------+

(2)排名函数 ranking functions

2.1 row_number()

row_number() 窗口函数用于给出从1开始到每个窗口分区的结果的连续行号。 与 groupBy 不同 WindowpartitionBy 作为分组条件,orderByWindow 分组内的数据进行排序。

同分数不并列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 以 department 字段进行分组,以 salary 倒序排序
# 按照部门对薪水排名,薪水最低的为第一名
windowSpec = Window.partitionBy("department").orderBy(F.asc("salary"))
# 分组内增加 row_number
df_part = df.withColumn(
"row_number",
F.row_number().over(windowSpec)
)
print(df_part.toPandas().to_markdown())
>>> output Data:
>>>
| | name | department | salary | row_number |
|---:|:--------|:-------------|---------:|-------------:|
| 0 | Bob | Sales | 7000 | 1 |
| 1 | Cindy | Sales | 7500 | 2 |
| 2 | Ali | Sales | 8000 | 3 |
| 3 | Elena | Sales | 8000 | 4 |
| 4 | Joey | Sales | 9000 | 5 |
| 5 | Davd | Finance | 10000 | 1 |
| 6 | George | Finance | 11000 | 2 |
| 7 | Fancy | Finance | 12000 | 3 |
| 8 | Haffman | Marketing | 7000 | 1 |
| 9 | Ilaja | Marketing | 8000 | 2 |

观察上面的数据,发现同样的薪水会有不同的排名(都是8000的薪水,有的第二有的第三),这是因为row_number()是按照行来给定序号,其不关注实际数值的大小。由此我们可以引申出另一个用于给出排序数的函数rank。

使用场景
  • 选取本部门工资收入第N高的记录
  • (思考)选取某日第N笔交易记录
1
2
3
4
5
6
7
8
print(df_part.where(F.col('row_number') == 2).toPandas().to_markdown())
>>> output Data:
>>>
| | name | department | salary | row_number |
|---:|:-------|:-------------|---------:|-------------:|
| 0 | Cindy | Sales | 7500 | 2 |
| 1 | George | Finance | 11000 | 2 |
| 2 | Ilaja | Marketing | 8000 | 2 |

2.2 rank()

同分数并列。下一个顺延。

rank()用来给按照指定列排序的分组窗增加一个排序的序号,

如果有相同数值,则排序数相同,下一个序数顺延一位。来看如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 使用 rank 排序,都是8000的薪水,就同列第二
windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
df_rank = df.withColumn("rank", F.rank().over(windowSpec))
print(df_rank.toPandas().to_markdown())
>>> output Data:
>>>
| | name | department | salary | rank |
|---:|:--------|:-------------|---------:|-------:|
| 0 | Joey | Sales | 9000 | 1 |
| 1 | Ali | Sales | 8000 | 2 |
| 2 | Elena | Sales | 8000 | 2 |
| 3 | Cindy | Sales | 7500 | 4 |
| 4 | Bob | Sales | 7000 | 5 |
| 5 | Fancy | Finance | 12000 | 1 |
| 6 | George | Finance | 11000 | 2 |
| 7 | Davd | Finance | 10000 | 3 |
| 8 | Ilaja | Marketing | 8000 | 1 |
| 9 | Haffman | Marketing | 7000 | 2 |
print(df_rank.where(F.col("rank")=="2").toPandas().to_markdown())
>>> output Data:
>>>
| | name | department | salary | rank |
|---:|:--------|:-------------|---------:|-------:|
| 0 | Ali | Sales | 8000 | 2 |
| 1 | Elena | Sales | 8000 | 2 |
| 2 | George | Finance | 11000 | 2 |
| 3 | Haffman | Marketing | 7000 | 2 |
1
2
window = Window.partitionBy(['a']).orderBy(['a'])
df.withColumn('rank',F.rank().over(window)).filter("rank = '1'").drop('rank')

2.3 dense_rank

观察 dense_rankrank 的区别。

同分数并列且下一个不顺延。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 注意 rank 排序,8000虽然为同列第二,但7500属于第4名
# dense_rank()中, 8000同列第二后,7500属于第3名
windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("dense_rank", F.dense_rank().over(windowSpec)).show()
>>> output Data:
>>>
+-------+----------+------+----------+
| name|department|salary|dense_rank|
+-------+----------+------+----------+
| Joey| Sales| 9000| 1|
| Ali| Sales| 8000| 2|
| Elena| Sales| 8000| 2|
| Cindy| Sales| 7500| 3|
| Bob| Sales| 7000| 4|
| Fancy| Finance| 12000| 1|
| George| Finance| 11000| 2|
| Davd| Finance| 10000| 3|
| Ilaja| Marketing| 8000| 1|
|Haffman| Marketing| 7000| 2|
+-------+----------+------+----------+

2.4 percent_rank():百分比排序。(将 dense_rank() 的结果进行归一化)

一些业务场景下,我们需要计算不同数值的百分比排序数据,先来看一个例子吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
windowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("percent_rank",F.percent_rank().over(windowSpec)).show()
>>> output Data:
>>>
+-------+----------+------+------------+
| name|department|salary|percent_rank|
+-------+----------+------+------------+
| Joey| Sales| 9000| 0.0|
| Ali| Sales| 8000| 0.25|
| Elena| Sales| 8000| 0.25|
| Cindy| Sales| 7500| 0.75|
| Bob| Sales| 7000| 1.0|
| Fancy| Finance| 12000| 0.0|
| George| Finance| 11000| 0.5|
| Davd| Finance| 10000| 1.0|
| Ilaja| Marketing| 8000| 0.0|
|Haffman| Marketing| 7000| 1.0|
+-------+----------+------+------------+

上述结果可以理解为将 dense_rank() 的结果进行归一化, 即可得到0-1以内的百分数。percent_rank()SQL 中的 PERCENT_RANK 函数效果一致。

2.5 ntile():分组切分n均等数据。

ntile()可将分组的数据按照指定数值n切分为n个部分, 每一部分按照行的先后给定相同的序数。例如n指定为2,则将组内数据分为两个部分, 第一部分序号为1,第二部分序号为2。理论上两部分数据行数是均等的, 但当数据为奇数行时,中间的那一行归到前一部分。

按照部门对数据进行分组,然后在组内按照薪水高低进行排序, 再使用 ntile() 将组内数据切分为两个部分。结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 按照部门对数据进行分组,然后在组内按照薪水高低进行排序 
windowSpec = Window.partitionBy(
"department").orderBy(F.desc("salary"))
# 使用ntile() 将组内数据切分为两个部分
df.withColumn("ntile", F.ntile(2).over(windowSpec)).show()
>>> output Data:
>>>
+-------+----------+------+-----+
| name|department|salary|ntile|
+-------+----------+------+-----+
| Joey| Sales| 9000| 1|
| Ali| Sales| 8000| 1|
| Elena| Sales| 8000| 1|
| Cindy| Sales| 7500| 2|
| Bob| Sales| 7000| 2|
| Fancy| Finance| 12000| 1|
| George| Finance| 11000| 1|
| Davd| Finance| 10000| 2|
| Ilaja| Marketing| 8000| 1|
|Haffman| Marketing| 7000| 2|
+-------+----------+------+-----+

(3). Analytic functions

3.1 cume_dist():(将 rank() 的结果进行归一化),和percent_rank很像。

cume_dist()函数用来获取数值的累进分布值,看如下例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn(
"cume_dist", F.cume_dist().over(windowSpec)).show()
>>> output Data:
>>>
+-------+----------+------+------------------+
| name|department|salary| cume_dist|
+-------+----------+------+------------------+
| Joey| Sales| 9000| 0.2|
| Ali| Sales| 8000| 0.6|
| Elena| Sales| 8000| 0.6|
| Cindy| Sales| 7500| 0.8|
| Bob| Sales| 7000| 1.0|
| Fancy| Finance| 12000|0.3333333333333333|
| George| Finance| 11000|0.6666666666666666|
| Davd| Finance| 10000| 1.0|
| Ilaja| Marketing| 8000| 0.5|
|Haffman| Marketing| 7000| 1.0|
+-------+----------+------+------------------+
# 和 percent_rank 对比一下
df.withColumn(
'percent_rank',
F.percent_rank().over(windowSpec)).show()
>>> output Data:
>>>
+-------+----------+------+------------+
| name|department|salary|percent_rank|
+-------+----------+------+------------+
| Joey| Sales| 9000| 0.0|
| Ali| Sales| 8000| 0.25|
| Elena| Sales| 8000| 0.25|
| Cindy| Sales| 7500| 0.75|
| Bob| Sales| 7000| 1.0|
| Fancy| Finance| 12000| 0.0|
| George| Finance| 11000| 0.5|
| Davd| Finance| 10000| 1.0|
| Ilaja| Marketing| 8000| 0.0|
|Haffman| Marketing| 7000| 1.0|
+-------+----------+------+------------+

结果好像和前面的percent_rank()很类似对不对,于是我们联想到这个其实也是一种归一化结果, 其按照 rank() 的结果进行归一化处理。回想一下前面讲过的 rank() 函数,并列排序会影响后续排序, 于是序号中间可能存在隔断。这样Sales组的排序数就是1、2、2、4、5, 归一化以后就得到了0.2、0.6、0.6、0.8、1。这个统计结果按照实际业务来理解就是:

  • 9000及以上的人占了20%,
  • 8000及以上的人占了60%,
  • 7500以上的人数占了80%,
  • 7000以上的人数占了100%,

3.2 lag():排序后找上一个数值。

lag() 函数用于寻找按照指定列排好序的分组内每个数值的上一个数值,

通俗的说,就是数值排好序以后,寻找排在每个数值的上一个数值。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 相当于滞后项
windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("lag", F.lag("salary",1).over(windowSpec)).show()
>>> output Data:
>>>
+-------+----------+------+-----+
| name|department|salary| lag|
+-------+----------+------+-----+
| Joey| Sales| 9000| null|
| Ali| Sales| 8000| 9000|
| Elena| Sales| 8000| 8000|
| Cindy| Sales| 7500| 8000|
| Bob| Sales| 7000| 7500|
| Fancy| Finance| 12000| null|
| George| Finance| 11000|12000|
| Davd| Finance| 10000|11000|
| Ilaja| Marketing| 8000| null|
|Haffman| Marketing| 7000| 8000|
+-------+----------+------+-----+

3.3 lead():排序后找下一个数值。

lead() 用于获取排序后的数值的下一个,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 和滞后项相反,提前一位
windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("lead",F.lead("salary",1).over(windowSpec)).show()
>>> output Data:
>>>
+-------+----------+------+-----+
| name|department|salary| lead|
+-------+----------+------+-----+
| Joey| Sales| 9000| 8000|
| Ali| Sales| 8000| 8000|
| Elena| Sales| 8000| 7500|
| Cindy| Sales| 7500| 7000|
| Bob| Sales| 7000| null|
| Fancy| Finance| 12000|11000|
| George| Finance| 11000|10000|
| Davd| Finance| 10000| null|
| Ilaja| Marketing| 8000| 7000|
|Haffman| Marketing| 7000| null|
+-------+----------+------+-----+
  1. 实际业务场景中,假设我们获取了每个月的销售数据, 我们可能想要知道,某月份与上一个月或下一个月数据相比怎么样, 于是就可以使用laglead来进行数据分析了。
  2. 思考差分如何做?增长率如何做(同比、环比)?

(4). Aggregate Functions

常见的聚合函数有avg, sum, min, max, count, approx_count_distinct()等,我们用如下代码来同时使用这些函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 分组,并对组内数据排序
windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
# 仅分组
windowSpecAgg = Window.partitionBy("department")

df.withColumn("row", F.row_number().over(windowSpec)) \
.withColumn("avg", F.avg("salary").over(windowSpecAgg)) \
.withColumn("sum", F.sum("salary").over(windowSpecAgg)) \
.withColumn("min", F.min("salary").over(windowSpecAgg)) \
.withColumn("max", F.max("salary").over(windowSpecAgg)) \
.withColumn("count", F.count("salary").over(windowSpecAgg)) \
.withColumn("distinct_count", F.approxCountDistinct("salary").over(windowSpecAgg)) \
.show()
>>> output Data:
>>>
+-------+----------+------+---+-------+-----+-----+-----+-----+--------------+
| name|department|salary|row| avg| sum| min| max|count|distinct_count|
+-------+----------+------+---+-------+-----+-----+-----+-----+--------------+
| Joey| Sales| 9000| 1| 7900.0|39500| 7000| 9000| 5| 4|
| Ali| Sales| 8000| 2| 7900.0|39500| 7000| 9000| 5| 4|
| Elena| Sales| 8000| 3| 7900.0|39500| 7000| 9000| 5| 4|
| Cindy| Sales| 7500| 4| 7900.0|39500| 7000| 9000| 5| 4|
| Bob| Sales| 7000| 5| 7900.0|39500| 7000| 9000| 5| 4|
| Fancy| Finance| 12000| 1|11000.0|33000|10000|12000| 3| 3|
| George| Finance| 11000| 2|11000.0|33000|10000|12000| 3| 3|
| Davd| Finance| 10000| 3|11000.0|33000|10000|12000| 3| 3|
| Ilaja| Marketing| 8000| 1| 7500.0|15000| 7000| 8000| 2| 2|
|Haffman| Marketing| 7000| 2| 7500.0|15000| 7000| 8000| 2| 2|
+-------+----------+------+---+-------+-----+-----+-----+-----+--------------+

需要注意的是 approx_count_distinct() 函数适用与窗函数的统计, 而在groupby中通常用countDistinct()来代替该函数,用来求组内不重复的数值的条数。

从结果来看,统计值基本上是按照部门分组,统计组内的salary情况。 如果我们只想要保留部门的统计结果,而将每个人的实际情况去掉,可以采用如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
windowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
windowSpecAgg = Window.partitionBy("department")

df = df.withColumn("row", F.row_number().over(windowSpec)) \
.withColumn("avg", F.avg("salary").over(windowSpecAgg)) \
.withColumn("sum", F.sum("salary").over(windowSpecAgg)) \
.withColumn("min", F.min("salary").over(windowSpecAgg)) \
.withColumn("max", F.max("salary").over(windowSpecAgg)) \
.withColumn("count", F.count("salary").over(windowSpecAgg)) \
.withColumn("distinct_count", F.approx_count_distinct("salary").over(windowSpecAgg))

# 仅选取分组第一行数据
# 用F.col 去选row 行,怪怪的
df_part = df.where(F.col("row")==1)
df_part.select("department","avg","sum","min","max","count","distinct_count").show()
>>> output Data:
>>>
+----------+-------+-----+-----+-----+-----+--------------+
|department| avg| sum| min| max|count|distinct_count|
+----------+-------+-----+-----+-----+-----+--------------+
| Sales| 7900.0|39500| 7000| 9000| 5| 4|
| Finance|11000.0|33000|10000|12000| 3| 3|
| Marketing| 7500.0|15000| 7000| 8000| 2| 2|
+----------+-------+-----+-----+-----+-----+--------------+

3. frame函数

(1)Window.rangeBetween(start, end)

创建一个WindowSpec,定义了从start(含)到end(含)的帧边界。startend` 都是相对于当前行的。例如“0”表示“current row”,“-1”表示当前行前一关,“5”表示当前行后五关。

基准为当前行

行数选择

  • rowsBetween(x, y)
  • Window.unboundedPreceding 表示当前行之前的无限行
  • Window.currentRow 表示当前行
  • Window.unboundedFollowing 表示当前行之后的无限行

rowsBetween(-1,1):函数作用范围为当前行的上一行至下一行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
>>> from pyspark.sql import Window
>>> from pyspark.sql import functions as func
>>> from pyspark.sql import SQLContext
>>> sc = SparkContext.getOrCreate()
>>> sqlContext = SQLContext(sc)
>>> tup = [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")]
>>> df = sqlContext.createDataFrame(tup, ["id", "category"])
>>> window = Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
>>> df.withColumn("sum", func.sum("id").over(window)).sort("id", "category").show()
+---+--------+---+
| id|category|sum|
+---+--------+---+
| 1| a| 4|
| 1| a| 4|
| 1| b| 3|
| 2| a| 2|
| 2| b| 5|
| 3| b| 3|
+---+--------+---+

(2)rowsBetween

  • ROWS BETWEEN不关心确切的值.它只关心行的顺序,并在计算帧时采用固定数量的前后行. (比较行的顺序)
  • RANGE BETWEEN 在计算帧时考虑值 .(比较值的大小)

行范围设置 rangeBetween(x,y)
基准为当前行的值

  • rangeBetween(20,50)
    例如当前值为18
    则选取的值范围为[-2,68]

4. 例子

(1)小例子

需求:组内按分数排序。

1
df.select($"uid", $"date", $"score", row_number().over(Window.partitionBy("uid").orderBy($"score".desc)).as("rank"))

img

(2)udf + 窗口函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from pyspark.sql import Window

@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
w = Window.partitionBy('id').orderBy('v').rowsBetween(-1, 0)
df.withColumn('mean_v', mean_udf("v").over(w)).show()

+---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.0|
| 1| 2.0| 1.5|
| 2| 3.0| 3.0|
| 2| 5.0| 4.0|
| 2|10.0| 7.5|
+---+----+------+

(3)同时groupby 两个key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 生成首次付费时间(FirstPayDate), 首次付费当天的付费金额(FirstPayPrice), 总付费金额(AllPayPrice). 首次单天付费频次(FirstPayFreq), 总付费频次(AllPayFreq)
w1 = Window.partitionBy("OrderId").orderBy(col("PayDate"))
w2 = Window.partitionBy("OrderId")
df_pay_data = df_order_all.filter((F.col("OrderType") == 0) & ((F.col("IsPay") == 1)))\
.withColumnRenamed("OrderTime", "PayTime") \
.withColumnRenamed("OrderPrice", "PayPrice") \
.withColumn("PayDate", date_trunc('day', to_timestamp(F.col("PayTime")/1000)))\
.withColumn("row",row_number().over(w1)) \
.withColumn("AllPayPrice", sum(col("PayPrice")).over(w2)) \
.withColumn("FirstPayDate", min(col("PayDate")).over(w2)) \
.withColumn("FirstPayPrice", sum(col("PayPrice")).over(w1)) \
.withColumn("FirstPayFreq", count(col("IsPay")).over(w1)) \
.withColumn("AllPayFreq", count(col("IsPay")).over(w2)) \
.where(col("row") == 1).select("AllPayPrice", "FirstPayPrice", "FirstPayDate","OrderId", "FirstPayFreq", "AllPayFreq")

https://sparkbyexamples.com/pyspark/pyspark-select-first-row-of-each-group/

https://stackoverflow.com/questions/45946349/python-spark-cumulative-sum-by-group-using-dataframe (相加)

(4)groupby + sort + list

1
2
3
group_df = qds_com.groupby(['company']) \
.agg(F.sort_array(F.collect_list(F.struct("features", "label", "samples_count"))) \
.alias("pair"))

(5)求众数

1
df_com_cookie.groupBy(['company','price']).agg(F.count('price').alias('count_price')).orderBy(['company','count_price'], ascending=False).drop_duplicates(subset=['company']).show()