本文仅做学习总结,如有侵权立删
[TOC]
一、窗口函数的概念
能返回整个dataframe,也能进行聚合运算。Spark支持三种窗口函数:排名函数、解析函数和聚合函数。
例子:找到每年当中最冷那一天的温度 / 最冷那一天的日期。
- groupBy实现
【每年当中最冷的那一天的温度】:
1 | a = gsod.groupby('year').agg(F.min('temp').alias('temp')) |
将上面的结果join回原来的dataframe得到【最冷那一天的日期】:
1 | gsod.join(a, how = 'left', on = ["year", "temp"]).select('year', 'month', 'day', 'temp') |
但是上面做法影响效率,left join!
- 窗口函数的过程
- 根据某个条件对数据进行分组,PartitionBy
- 根据需求计算聚合函数
- 将计算结果Join回一个大dataframe
1 | from pyspark.sql.window import Window |
二、窗口函数
对于一个数据集,map
是对每行进行操作,为每行得到一个结果;reduce
则是对多行进行操作,得到一个结果;而 window
函数则是对多行进行操作,得到多个结果(每行一个)。
窗口函数是什么?来源于数据库,窗口函数是用与当前行有关的数据行参与计算。Mysql中:
- partition by:用于对全量数据表进行切分(与SQL中的groupby功能类似,但功能完全不同),直接体现的是前面窗口函数定义中的“有关”,即切分到同一组的即为有关,否则就是无关;
- order by:用于指定对partition后各组内的数据进行排序;
- rows between:用于对切分后的数据进一步限定“有关”行的数量,此种情景下即使partition后分到一组,也可能是跟当前行的计算无关。
需求:组内按分数排序。
1 | df.select($"uid", $"date", $"score", row_number().over(Window.partitionBy("uid").orderBy($"score".desc)).as("rank")) |
udf + 窗口函数
1 | from pyspark.sql import Window |
三、 同时groupby 两个key
1 | # 生成首次付费时间(FirstPayDate), 首次付费当天的付费金额(FirstPayPrice), 总付费金额(AllPayPrice). 首次单天付费频次(FirstPayFreq), 总付费频次(AllPayFreq) |
https://sparkbyexamples.com/pyspark/pyspark-select-first-row-of-each-group/
https://stackoverflow.com/questions/45946349/python-spark-cumulative-sum-by-group-using-dataframe (相加)
四. groupby + sort + list
1 | group_df = qds_com.groupby(['company']) \ |
五、求众数
1 | df_com_cookie.groupBy(['company','price']).agg(F.count('price').alias('count_price')).orderBy(['company','count_price'], ascending=False).drop_duplicates(subset=['company']).show() |
1 | window = Window.partitionBy(['a']).orderBy(['a']) |