窗口函数

本文仅做学习总结,如有侵权立删

[TOC]

一、窗口函数的概念

能返回整个dataframe,也能进行聚合运算。Spark支持三种窗口函数:排名函数、解析函数和聚合函数。

例子:找到每年当中最冷那一天的温度 / 最冷那一天的日期。

  • groupBy实现

【每年当中最冷的那一天的温度】:

1
2
a = gsod.groupby('year').agg(F.min('temp').alias('temp'))
a.orderBy('temp')

将上面的结果join回原来的dataframe得到【最冷那一天的日期】:

1
gsod.join(a, how = 'left', on = ["year", "temp"]).select('year', 'month', 'day', 'temp')

但是上面做法影响效率,left join!

  • 窗口函数的过程
  1. 根据某个条件对数据进行分组,PartitionBy
  2. 根据需求计算聚合函数
  3. 将计算结果Join回一个大dataframe
1
2
3
4
5
from pyspark.sql.window import Window
each_year = Window.partitionBy("year")
gsod.withColumn('min_temp',F.min("temp").over(each_year))\
.where("temp=min_temp")\
.select("year", "month", "day")

二、窗口函数

对于一个数据集,map 是对每行进行操作,为每行得到一个结果;reduce 则是对多行进行操作,得到一个结果;而 window 函数则是对多行进行操作,得到多个结果(每行一个)。

窗口函数是什么?来源于数据库,窗口函数是用与当前行有关的数据行参与计算。Mysql中:

  • partition by:用于对全量数据表进行切分(与SQL中的groupby功能类似,但功能完全不同),直接体现的是前面窗口函数定义中的“有关”,即切分到同一组的即为有关,否则就是无关;
  • order by:用于指定对partition后各组内的数据进行排序;
  • rows between:用于对切分后的数据进一步限定“有关”行的数量,此种情景下即使partition后分到一组,也可能是跟当前行的计算无关。

img

需求:组内按分数排序。

1
df.select($"uid", $"date", $"score", row_number().over(Window.partitionBy("uid").orderBy($"score".desc)).as("rank"))

img

udf + 窗口函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from pyspark.sql import Window

@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
w = Window.partitionBy('id').orderBy('v').rowsBetween(-1, 0)
df.withColumn('mean_v', mean_udf("v").over(w)).show()

+---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.0|
| 1| 2.0| 1.5|
| 2| 3.0| 3.0|
| 2| 5.0| 4.0|
| 2|10.0| 7.5|
+---+----+------+

三、 同时groupby 两个key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 生成首次付费时间(FirstPayDate), 首次付费当天的付费金额(FirstPayPrice), 总付费金额(AllPayPrice). 首次单天付费频次(FirstPayFreq), 总付费频次(AllPayFreq)
w1 = Window.partitionBy("OrderId").orderBy(col("PayDate"))
w2 = Window.partitionBy("OrderId")
df_pay_data = df_order_all.filter((F.col("OrderType") == 0) & ((F.col("IsPay") == 1)))\
.withColumnRenamed("OrderTime", "PayTime") \
.withColumnRenamed("OrderPrice", "PayPrice") \
.withColumn("PayDate", date_trunc('day', to_timestamp(F.col("PayTime")/1000)))\
.withColumn("row",row_number().over(w1)) \
.withColumn("AllPayPrice", sum(col("PayPrice")).over(w2)) \
.withColumn("FirstPayDate", min(col("PayDate")).over(w2)) \
.withColumn("FirstPayPrice", sum(col("PayPrice")).over(w1)) \
.withColumn("FirstPayFreq", count(col("IsPay")).over(w1)) \
.withColumn("AllPayFreq", count(col("IsPay")).over(w2)) \
.where(col("row") == 1).select("AllPayPrice", "FirstPayPrice", "FirstPayDate","OrderId", "FirstPayFreq", "AllPayFreq")

https://sparkbyexamples.com/pyspark/pyspark-select-first-row-of-each-group/

https://stackoverflow.com/questions/45946349/python-spark-cumulative-sum-by-group-using-dataframe (相加)

四. groupby + sort + list

1
2
3
group_df = qds_com.groupby(['company']) \
.agg(F.sort_array(F.collect_list(F.struct("features", "label", "samples_count"))) \
.alias("pair"))

五、求众数

1
df_com_cookie.groupBy(['company','price']).agg(F.count('price').alias('count_price')).orderBy(['company','count_price'], ascending=False).drop_duplicates(subset=['company']).show()
1
2
window = Window.partitionBy(['a']).orderBy(['a'])
df.withColumn('rank',F.rank().over(window)).filter("rank = '1'").drop('rank')