本文仅做学习总结,如有侵权立删
[TOC]
处理不同的数据类型
1. 数据类型
- 布尔型
- 数值型
- 字符串型
- 日期和时间戳类型
- 空值处理
- 复杂类型
- 自定义函数
2. 处理布尔类型
(1)and、or、true 和false
(2)&、|、1、0
1 | import pyspark.functions as F |
注意:创建布尔表达式时注意空值处理!!!
将df.where(F.col(‘a’) != ‘hello’)改写成df.where (F.col(‘a’).eqNullSafe(“hello”))可以保证空值安全。
3. 处理数值类型
- pow:平方
1 | # 例子 |
- round:向上取整,bround:向下取整
1 | # from pyspark.sql.functions import lit, round, bround |
- corr:计算两列的相关性
1 | from pyspark.sql.functions import corr |
- describe:计算一列或一组列的汇总统计信息,可以用describe来实现。
- count:计数
- mean:平均值
- stddev_pop:标准差
- min:最小值
max:最大值
StatFunctions包中封装了许多可使用的统计函数
1 | quantileProbs = [0.5] |
- monotonically_increasing_id函数:从0开始,为每行添加一个唯一的ID。
1 | from pyspark.sql.functions import monotonically_increasing_id |
4. 处理字符串类型
- initcap:将给定字符串中空格分隔的每个单词首字母大写。
- lower:全部小写
- upper:全部大写
1 | from pyspark.sql.functions import initcap |
- lpad:从左边对字符串使用指定的字符进行填充。
- ltrim:从左边对字符串使用指定的字符进行删除空格。
- rpad: 从右边对字符串使用指定的字符进行填充。
- trim:从右边对字符串使用指定的字符进行删除空格。
1 | from pyspark.sql.functions import lit, ltrim, rtim, rpad, lpad, trim |
5、正则表达式
- regexp_extract(列名,正则表达式,第几个):提取值
1 | from pyspark.sql.functions import regexp_extract |
- regexp_replace:替换值
1 | from pyspark.sql.functions import regexp_replace |
- translate:替换,将给定字符串替换掉所有出现的某字符串。
1 | from pyspark.sql.functions import translate |
- instr:是否存在(类似contains)
1 | from pyspark.sql.functions import instr |
- locate(substr, str, pos=1):在位置 pos 之后定位字符串列中第一次出现 substr 的位置。
- 该位置不是基于零的,而是基于 1 的索引。如果在 str 中找不到 substr,则返回 0。
1 | df = spark.createDataFrame([('abcd',)], ['s',]) |
6、处理日期和时间戳类型
https://zhuanlan.zhihu.com/p/450636026
1 | from pyspark.sql import functions as F |
(1) 示例数据
1 | data=[["1","2020-02-01"],["2","2019-03-01"],["3","2021-03-01"]] |
(2). 日期
2.1 当前日期 current_date()
- 获取当前系统日期。默认情况下,数据将以
yyyy-dd-mm
格式返回。
1 | df.select(F.current_date().alias("current_date")).show(1) |
2.2 日期格式 date_format()
- 解析日期并转换
yyyy-dd-mm
为MM-dd-yyyy
格式。
1 | df.select(F.col("time"), |
2.3 使用to_date()
将日期格式字符串yyyy-MM-dd
转换为DateType yyyy-MM-dd
1 | df.select(F.col("time"), |
2.4 两个日期之间的日差datediff()
1 | df.select(F.col("time"), |
2.5 两个日期之间的月份months_between()
1 | df.select(F.col("time"), |
2.6 截断指定单位的日期trunc()
1 | df.select(F.col("time"), |
2.7 月、日加减法
1 | df.select(F.col("time"), |
2.8 年、月、下一天、一年中第几个星期
1 | df.select(F.col("time"), |
2.9 星期几、月日、年日
- 查询星期几
- 一个月中的第几天
- 一年中的第几天
1 | df.select(F.col("time"), |
(3). 时间
3.1 创建一个测试数据
1 | data=[ |
3.2 以 spark 默认格式yyyy-MM-dd HH:mm:ss
返回当前时间戳
1 | df2.select(F.current_timestamp().alias("current_timestamp")).show() |
3.3 将字符串时间戳转换为时间戳类型格式 to_timestamp()
1 | df2.select(F.col("time"), |
3.4 获取小时
、分钟
、秒
1 | # 数据 |
7. 处理数据中的空值
8. 复杂类型
- 结构体: struct
1 | from pyspark.sql.functions import struct |
- 数组
- split:指定分隔符
1 | from pyspark.sql.functions import split |
数组长度
1 | from pyspark.sql.functions import size |
array_contains:数组是否包含某个值
1 | from pyspark.sql.functions import array_contains |
explode:一行拆分成多行
1 |
|
将嵌套数组
DataFrame
列分解为行创建一个带有嵌套数组列的
DataFrame
。
1 | arrayArrayData = [ |
展平数组,请使用 flatten
函数
1 | from pyspark.sql.functions import flatten |
展平再分解
1 | df.select(df.name, explode(flatten(df.subjects))).show(truncate=False) |
Map
- create_map:键值对
1
2from pyspark.sql.functions import explode, split
df.select(create_map(F.col("a"), F.col("b")).alias("c_map")) 根据key值取value
1
2
3from pyspark.sql.functions import explode, split
df.select(create_map(F.col("a"), F.col("b")).alias("c_map"))\
.selectExpt("c_map['WHILE METAL LANTERN']") 展开map类型,将其转换成列:explode
1
2df.select(create_map(F.col("a"), F.col("b")).alias("c_map"))\
.selectExpt("explode('c_map')")
9. 处理Json类型
(1)创建一个Json类型的列:
1 | jsonDF = spark.range(1).selectExpr(""" |
(2) get_json_object:查询JSON对象
pyspark.sql.functions.get_json_object(col, path) : 根据指定的 json 路径从 json 字符串中提取 json 对象,并返回提取的 json 对象的 json 字符串。如果输入的 json 字符串无效,它将返回 null。
1 |
|
(3) 若此查询的JSON对象仅有一层嵌套,则可使用json_tuple
pyspark.sql.functions.json_tuple(col, *fields): 根据给定的字段名称为 json 列创建一个新行。
1 | "1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')] data = [( |
(4)to_json:将StructType转换成JSON字符串
1 | 'a', 'b'], ['c', 'd']], df = ps.DataFrame([[ |
(5) from_json:解析JSON数据,需指定模式
可以使用pyspark.sql.functions.from_json
函数将DataFrame中的字典列拆分为多列
10. UDF (自定义函数)
UDF允许使用多种不同的变成语言编写,这些UDF函数被注册为SparkSession或者Context的临时函数。
Spark将在Driver进程上序列化UDF函数,并将它通过网络传递到所有的executor进程。
如果用Scala或Java编写的,可以在JVM中使用它。除了不能使用Spark为内置函数提供的代码生成功能之外,会导致性能的下降。
如果函数是用Python编写的,Spark在worker上启动一个Python进程,将所有程序列化为Python可解释的格式(在此之前程序位于JVM中),在Python进程中对该程序逐行执行函数,最终将对每行的操作结果返回给JVM和Spark。
将程序序列化为Python可解释的格式这个过程代价很高!!!!
- 计算昂贵
- 程序进入Python后Spark无法管理worker内存。若某个worker因资源受限而失败(JVM和Python在同一台机器争夺内存),可能会导致该worker出现故障。———–建议用java/scala编写UDF。
1 | from pyspark.sql.functions import udf |