本文仅做学习总结,如有侵权立删
[TOC]
一、分组类型
!!!注意null值,如果不过滤空值,则可能会得到不正确的结果。尤其是grouping set、rollup和cube!
group by:指定一个或多个key和一个或多个聚合函数,对列进行转换操作
window:跟group by类似功能, 但能保留当前行
- grouping set:SQL中的用法
rollup:跟groupby类似功能,并会针对指定的多个key进行分级分组汇总。
cube:跟rollup相同功能。
二、聚合函数
- count:统计个数
1 | # 可用expr表达式来处理聚合函数 |
countDistinct:去重统计
approx_count_distinct:近似统计个数, pyspark.sql.functions.approx_count_distinct(col, rsd=None)
- 允许的最大相对标准偏差(默认 = 0.05)。对于 rsd
count_distinct()
效率更高 - 对于大数据即,某种精度的近似值可以接受,但比countDistinct()耗时更少,数据越大,效率提升越明显。
1
2'distinct_ages')).collect() df.agg(approx_count_distinct(df.age).alias(
[Row(distinct_ages=2)]- 允许的最大相对标准偏差(默认 = 0.05)。对于 rsd
first和last:
min和max
sum
sumDistinct
avg
variance和stddev:方差和标准差
- var_samp:样本方差
- var_pop:总体方差
skewness和kurtosis:偏度和峰度
cov和corr:协方差和相关性
- covar_samp:样本协方差
- covar_pop:总体协方差
collect_list和collect_set:聚合成一个list或者set
1 | df.agg(F.collect_set("a"), F.collect_list("a")) |
用户自定义聚合函数:UDAF:仅在Scala和Java中使用
UDF:
UDF(User-defined functions)用户自定义函数,简单说就是输入一行输出一行的自定义算子。
是大多数 SQL 环境的关键特性,用于扩展系统的内置功能。(一对一)UDAF
UDAF(User Defined Aggregate Function),即用户定义的聚合函数,聚合函数和普通函数的区别是什么呢,普通函数是接受一行输入产生一个输出,聚合函数是接受一组(一般是多行)输入然后产生一个输出,即将一组的值想办法聚合一下。(多对一)
UDAF可以跟group by一起使用,也可以不跟group by一起使用,这个其实比较好理解,联想到mysql中的max、min等函数,可以:
select max(foo) from foobar group by bar;
表示根据bar字段分组,然后求每个分组的最大值,这时候的分组有很多个,使用这个函数对每个分组进行处理,也可以:
select max(foo) from foobar;
这种情况可以将整张表看做是一个分组,然后在这个分组(实际上就是一整张表)中求最大值。所以聚合函数实际上是对分组做处理,而不关心分组中记录的具体数量。UDTF
UDTF(User-Defined Table-Generating Functions),用户自定义生成函数。它就是输入一行输出多行的自定义算子,可输出多行多列,又被称为 “表生成函数”。(一对多)
三、例子
创建数据
1 | from pyspark.sql import functions as F |
1、group by:分组
1 | # 按照department, year计算工资之和。 |
1 | >>> output Data: |
- pivot:透视转换
使用pivot函数进行透视,透视过程中可以提供第二个参数来明确指定使用哪些数据项
注意:pivot只能跟在groupby之后
利用pivot实现行转列
pivot的第一个参数指定原数据列,第二个参数指定要新生成的数据列
2. rollup:分级分组
1 | ''' |
3. cube
:分级分组,比rollup组合的维度更全。
1 | ''' |
3.1 grouping
指示 GROUP BY
列表中的指定列是否为空,在结果集中返回 1 表示空或 0 表示未空。
grouping(xx) = 0:表示维度出现在组合里面
grouping(xx) = 1: 表示维度不出现在组合里面
CUBE 操作所生成的空值带来一个问题:如何区分 CUBE 操作所生成的 NULL 值和从实际数据中返回的 NULL 值?这个问题可用 GROUPING 函数解决。如果列中的值来自事实数据,则 GROUPING 函数返回 0;如果列中的值是 CUBE 操作所生成的 NULL,则返回 1。在 CUBE 操作中,所生成的 NULL 代表全体值。
1 | df.cube("department").agg( |
3.2 grouping_id
: 帮助轻松找到自己想要的信息。
返回分组级别: (分組(c1) << (n-1)) + (分組(c2) << (n-2)) + … + 分組(cn)
1 | df.cube('department', 'employee_name', 'year').agg( |
4、grouping set:分组集,跨多个组的聚合操作,仅SQL
希望获得所有用户的各种股票的数量
sql:
1 | SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull |
grouping set也可以实现完全相同的操作:
1 | SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull |
如果想不区分客户和股票来统计股票总数
1 | SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull |
四、窗口函数
1. 概念:
能返回整个dataframe,也能进行聚合运算。
对于一个数据集,
map
是对每行进行操作,为每行得到一个结果;
reduce
则是对多行进行操作,得到一个结果;
window
函数则是对多行进行操作,得到多个结果(每行一个)。窗口函数使用时由“窗口函数”和over从句组成;
其中,over从句分为三部分:
- 分组(partition by)、
- 排序(order by)、
- frame选取(rangeBetween 和 rowsBetween)。
Window函数分类为三种:
- 排名函数
ranking functions
包括:- row_number():连续不重复。
- rank():连续重复
- dense_rank():重复不连续
- percent_rank():对dense_rank归一化
- ntile():n等份
- 解析函数
analytic functions
包括:- cume_dist():对rank归一化
- lag()
- lead()
- 聚合函数
aggregate functions
包括:- sum()
- first()
- last()
- max()
- min()
- mean()
- stddev()
例子:找到每年当中最冷那一天的温度 / 最冷那一天的日期。
- 窗口函数的过程
- 根据某个条件对数据进行分组,PartitionBy
- 根据需求计算聚合函数
- 将计算结果Join回一个大dataframe
1 | from pyspark.sql.window import Window |
2.三种函数的例子
(1)创建一个 PySpark DataFrame
1 | from pyspark.sql.window import Window |
(2)排名函数 ranking functions
2.1 row_number()
row_number()
窗口函数用于给出从1开始到每个窗口分区的结果的连续行号。 与 groupBy
不同 Window
以 partitionBy
作为分组条件,orderBy
对 Window
分组内的数据进行排序。
同分数不并列。
1 | # 以 department 字段进行分组,以 salary 倒序排序 |
观察上面的数据,发现同样的薪水会有不同的排名(都是8000的薪水,有的第二有的第三),这是因为row_number()
是按照行来给定序号,其不关注实际数值的大小。由此我们可以引申出另一个用于给出排序数的函数rank。
使用场景
- 选取本部门工资收入第N高的记录
- (思考)选取某日第N笔交易记录
1 | print(df_part.where(F.col('row_number') == 2).toPandas().to_markdown()) |
2.2 rank()
同分数并列。下一个顺延。
rank()
用来给按照指定列排序的分组窗增加一个排序的序号,
如果有相同数值,则排序数相同,下一个序数顺延一位。来看如下代码:
1 | # 使用 rank 排序,都是8000的薪水,就同列第二 |
1 | window = Window.partitionBy(['a']).orderBy(['a']) |
2.3 dense_rank
观察 dense_rank
与 rank
的区别。
同分数并列且下一个不顺延。
1 | # 注意 rank 排序,8000虽然为同列第二,但7500属于第4名 |
2.4 percent_rank():百分比排序。(将 dense_rank()
的结果进行归一化)
一些业务场景下,我们需要计算不同数值的百分比排序数据,先来看一个例子吧。
1 | windowSpec = Window.partitionBy("department").orderBy(F.desc("salary")) |
上述结果可以理解为将 dense_rank()
的结果进行归一化, 即可得到0-1
以内的百分数。percent_rank()
与 SQL
中的 PERCENT_RANK
函数效果一致。
2.5 ntile()
:分组切分n均等数据。
ntile()
可将分组的数据按照指定数值n
切分为n
个部分, 每一部分按照行的先后给定相同的序数。例如n指定为2,则将组内数据分为两个部分, 第一部分序号为1,第二部分序号为2。理论上两部分数据行数是均等的, 但当数据为奇数行时,中间的那一行归到前一部分。
按照部门对数据进行分组,然后在组内按照薪水高低进行排序, 再使用 ntile()
将组内数据切分为两个部分。结果如下:
1 | # 按照部门对数据进行分组,然后在组内按照薪水高低进行排序 |
(3). Analytic functions
3.1 cume_dist()
:(将 rank()
的结果进行归一化),和percent_rank很像。
cume_dist()
函数用来获取数值的累进分布值,看如下例子:
1 | windowSpec = Window.partitionBy("department").orderBy(F.desc("salary")) |
结果好像和前面的percent_rank()
很类似对不对,于是我们联想到这个其实也是一种归一化结果, 其按照 rank()
的结果进行归一化处理。回想一下前面讲过的 rank()
函数,并列排序会影响后续排序, 于是序号中间可能存在隔断。这样Sales组的排序数就是1、2、2、4、5, 归一化以后就得到了0.2、0.6、0.6、0.8、1。这个统计结果按照实际业务来理解就是:
- 9000及以上的人占了20%,
- 8000及以上的人占了60%,
- 7500以上的人数占了80%,
- 7000以上的人数占了100%,
3.2 lag()
:排序后找上一个数值。
lag()
函数用于寻找按照指定列排好序的分组内每个数值的上一个数值,
通俗的说,就是数值排好序以后,寻找排在每个数值的上一个数值。代码如下:
1 | # 相当于滞后项 |
3.3 lead()
:排序后找下一个数值。
lead()
用于获取排序后的数值的下一个,代码如下:
1 | # 和滞后项相反,提前一位 |
- 实际业务场景中,假设我们获取了每个月的销售数据, 我们可能想要知道,某月份与上一个月或下一个月数据相比怎么样, 于是就可以使用
lag
和lead
来进行数据分析了。 - 思考差分如何做?增长率如何做(同比、环比)?
(4). Aggregate Functions
常见的聚合函数有avg, sum, min, max, count, approx_count_distinct()
等,我们用如下代码来同时使用这些函数:
1 | # 分组,并对组内数据排序 |
需要注意的是 approx_count_distinct()
函数适用与窗函数的统计, 而在groupby
中通常用countDistinct()
来代替该函数,用来求组内不重复的数值的条数。
从结果来看,统计值基本上是按照部门分组,统计组内的salary情况。 如果我们只想要保留部门的统计结果,而将每个人的实际情况去掉,可以采用如下代码:
1 | windowSpec = Window.partitionBy("department").orderBy(F.desc("salary")) |
3. frame函数
(1)Window.rangeBetween(start, end)
创建一个
WindowSpec
,定义了从start
(含)到end
(含)的帧边界。start和
end` 都是相对于当前行的。例如“0”表示“current row”,“-1”表示当前行前一关,“5”表示当前行后五关。
基准为当前行
行数选择
- rowsBetween(x, y)
- Window.unboundedPreceding 表示当前行之前的无限行
- Window.currentRow 表示当前行
- Window.unboundedFollowing 表示当前行之后的无限行
rowsBetween(-1,1):函数作用范围为当前行的上一行至下一行
1 | from pyspark.sql import Window |
(2)rowsBetween
ROWS BETWEEN
不关心确切的值.它只关心行的顺序,并在计算帧时采用固定数量的前后行. (比较行的顺序)RANGE BETWEEN
在计算帧时考虑值 .(比较值的大小)
行范围设置 rangeBetween(x,y)
基准为当前行的值
- rangeBetween(20,50)
例如当前值为18
则选取的值范围为[-2,68]
4. 例子
(1)小例子
需求:组内按分数排序。
1 | df.select($"uid", $"date", $"score", row_number().over(Window.partitionBy("uid").orderBy($"score".desc)).as("rank")) |
(2)udf + 窗口函数
1 | from pyspark.sql import Window |
(3)同时groupby 两个key
1 | # 生成首次付费时间(FirstPayDate), 首次付费当天的付费金额(FirstPayPrice), 总付费金额(AllPayPrice). 首次单天付费频次(FirstPayFreq), 总付费频次(AllPayFreq) |
https://sparkbyexamples.com/pyspark/pyspark-select-first-row-of-each-group/
https://stackoverflow.com/questions/45946349/python-spark-cumulative-sum-by-group-using-dataframe (相加)
(4)groupby + sort + list
1 | group_df = qds_com.groupby(['company']) \ |
(5)求众数
1 | df_com_cookie.groupBy(['company','price']).agg(F.count('price').alias('count_price')).orderBy(['company','count_price'], ascending=False).drop_duplicates(subset=['company']).show() |