7. 处理不同的数据类型

本文仅做学习总结,如有侵权立删

[TOC]

处理不同的数据类型

1. 数据类型

  • 布尔型
  • 数值型
  • 字符串型
  • 日期和时间戳类型
  • 空值处理
  • 复杂类型
  • 自定义函数

2. 处理布尔类型

(1)and、or、true 和false

(2)&、|、1、0

1
2
3
4
import pyspark.functions as F
filter1 = F.col('aa') > 600
filter2 = F.col('bb') > 2
df.where(filter1 & filter2).where(F.col('cc').isin("DOT")).show()

注意:创建布尔表达式时注意空值处理!!!

将df.where(F.col(‘a’) != ‘hello’)改写成df.where (F.col(‘a’).eqNullSafe(“hello”))可以保证空值安全。

3. 处理数值类型

  • pow:平方
1
2
3
4
5
6
7
# 例子
from pyspark.sql.functions import pow
f = pow(F.col('a') * F.col('b'), 2) + 5 # (a * b)^2 + 5
df.select(f.alias('c'))

# selectExpr
df.selectExpr('a', 'pow(F.col('a') * F.col('b'), 2) + 5')
  • round:向上取整,bround:向下取整
1
2
# from pyspark.sql.functions import lit, round, bround
df.select(round(lit("2.5")), bround(lit("2.5")))
  • corr:计算两列的相关性
1
2
3
from pyspark.sql.functions import corr
df.stat.corr('a', 'b')
df.select(corr('a', 'b'))
  • describe:计算一列或一组列的汇总统计信息,可以用describe来实现。
  • count:计数
  • mean:平均值
  • stddev_pop:标准差
  • min:最小值
  • max:最大值

  • StatFunctions包中封装了许多可使用的统计函数

1
2
3
quantileProbs = [0.5]
relError = 0.05
df.stat.approxQuantile('a', quantileProbs, relError)
  • monotonically_increasing_id函数:从0开始,为每行添加一个唯一的ID。
1
2
from pyspark.sql.functions import monotonically_increasing_id
df.select(monotonically_increasing_id())

4. 处理字符串类型

  • initcap:将给定字符串中空格分隔的每个单词首字母大写。
  • lower:全部小写
  • upper:全部大写
1
2
from pyspark.sql.functions import initcap
df.select(initcap(F.col('aaa')), lower(F.col('bbb')), upper(F.col('ccc')))
  • lpad:从左边对字符串使用指定的字符进行填充。
  • ltrim:从左边对字符串使用指定的字符进行删除空格。
  • rpad: 从右边对字符串使用指定的字符进行填充。
  • trim:从右边对字符串使用指定的字符进行删除空格。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from pyspark.sql.functions import lit, ltrim, rtim, rpad, lpad, trim
df.select(ltrim(lit(" HELLO ")).alias("ltrim"),
rtrim(lit(" HELLO ")).alias("rtrim"),
trim(lit(" HELLO ")).alias("trim"),
lpad(lit("HELLO", 3, " ")).alias("lpad"),
rpad(lit("HELLO", 10, " ")).alias("rpad")
)
'''
ltrim: "HELLO "
rtrim:" HELLO"
trim:"HELLO"
lpad:"HEL"
rpad: "HELLO "
注意 lpad或rpad输入的数值小于字符串长度,它将从字符串的右侧删除字符
'''

5、正则表达式

  • regexp_extract(列名,正则表达式,第几个):提取值
1
2
3
4
from pyspark.sql.functions import regexp_extract
regex_string = "BLACK|WHITE|RED|GREEN|BLUE"
df.select(regexp_extract(F.col('a'), regex_string, 1)) # 将列名为a的字段中出现包含在正则表达式的第一个单词取出来。
# 如 df['a'] = "WHITE HANGING HEA", 结果为WHITE
  • regexp_replace:替换值
1
2
3
4
5
from pyspark.sql.functions import regexp_replace
regex_string = "BLACK|WHITE|RED|GREEN|BLUE" # |在正则中表示或的意思
df.select(
regexp_replace(F.col('a'), regex_string, 'color').alias('color_clean'),
) # 将字段a中包含regex_string这些字段换成color。
  • translate:替换,将给定字符串替换掉所有出现的某字符串。
1
2
3
from pyspark.sql.functions import translate
df.select(translate(F.col('a'), 'LEET', '1337')) # L替换成1, E替换成3, T替换成7.
# 所以WHITE 会被替换成WHI73.
  • instr:是否存在(类似contains)
1
2
3
from pyspark.sql.functions import instr
containsBlack = instr(F.col('a'), 'BLACK') >= 1
df.withColumn('b', containsBlack)
  • locate(substr, str, pos=1):在位置 pos 之后定位字符串列中第一次出现 substr 的位置。
    • 该位置不是基于零的,而是基于 1 的索引。如果在 str 中找不到 substr,则返回 0。
1
2
3
df = spark.createDataFrame([('abcd',)], ['s',])
df.select(locate('b', df.s, 1).alias('s')).collect()
[Row(s=2)]

6、处理日期和时间戳类型

https://zhuanlan.zhihu.com/p/450636026

1
from pyspark.sql import functions as F

(1) 示例数据

1
2
3
4
5
6
7
8
9
10
11
12
data=[["1","2020-02-01"],["2","2019-03-01"],["3","2021-03-01"]]
df=spark.createDataFrame(data, ["id","time"])
df.show()
>>> output Data:
>>>
+---+----------+
| id| time|
+---+----------+
| 1|2020-02-01|
| 2|2019-03-01|
| 3|2021-03-01|
+---+----------+

(2). 日期

2.1 当前日期 current_date()

  • 获取当前系统日期。默认情况下,数据将以yyyy-dd-mm格式返回。
1
2
3
4
5
6
7
8
9
df.select(F.current_date().alias("current_date")).show(1)
>>> output Data:
>>>
+------------+
|current_date|
+------------+
| 2021-12-28|
+------------+
only showing top 1 row

2.2 日期格式 date_format()

  • 解析日期并转换yyyy-dd-mmMM-dd-yyyy格式。
1
2
3
4
5
6
7
8
9
10
11
df.select(F.col("time"), 
F.date_format(F.col("time"), "MM-dd-yyyy").alias("date_format")).show()
>>> output Data:
>>>
+----------+-----------+
| time|date_format|
+----------+-----------+
|2020-02-01| 02-01-2020|
|2019-03-01| 03-01-2019|
|2021-03-01| 03-01-2021|
+----------+-----------+

2.3 使用to_date()将日期格式字符串yyyy-MM-dd转换为DateType yyyy-MM-dd

1
2
3
4
5
6
7
8
9
10
11
df.select(F.col("time"), 
F.to_date(F.col("time"), "yyy-MM-dd").alias("to_date")).show()
>>> output Data:
>>>
+----------+----------+
| time| to_date|
+----------+----------+
|2020-02-01|2020-02-01|
|2019-03-01|2019-03-01|
|2021-03-01|2021-03-01|
+----------+----------+

2.4 两个日期之间的日差datediff()

1
2
3
4
5
6
7
8
9
10
11
12
df.select(F.col("time"), 
F.datediff(F.current_date(), F.col("time")).alias("datediff")
).show()
>>> output Data:
>>>
+----------+--------+
| time|datediff|
+----------+--------+
|2020-02-01| 696|
|2019-03-01| 1033|
|2021-03-01| 302|
+----------+--------+

2.5 两个日期之间的月份months_between()

1
2
3
4
5
6
7
8
9
10
11
12
df.select(F.col("time"), 
F.months_between(F.current_date(),F.col("time")).alias("months_between")
).show()
>>> output Data:
>>>
+----------+--------------+
| time|months_between|
+----------+--------------+
|2020-02-01| 22.87096774|
|2019-03-01| 33.87096774|
|2021-03-01| 9.87096774|
+----------+--------------+

2.6 截断指定单位的日期trunc()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
df.select(F.col("time"), 
F.trunc(F.col("time"),"Month").alias("Month_Trunc"),
F.trunc(F.col("time"),"Year").alias("Month_Year"),
F.trunc(F.col("time"),"Month").alias("Month_Trunc")).show()
>>> output Data:
>>>
+----------+-----------+----------+-----------+
| time|Month_Trunc|Month_Year|Month_Trunc|
+----------+-----------+----------+-----------+
|2020-02-01| 2020-02-01|2020-01-01| 2020-02-01|
|2019-03-01| 2019-03-01|2019-01-01| 2019-03-01|
|2021-03-01| 2021-03-01|2021-01-01| 2021-03-01|
+----------+-----------+----------+-----------+


# pyspark.sql.functions.date_trunc(format, timestamp)
# 返回截断为格式指定单位的时间戳。
# 2.3.0 版中的新函数。

>>> df = spark.createDataFrame([('1997-02-28 05:02:11',)], ['t'])
>>> df.select(date_trunc('year', df.t).alias('year')).collect()
[Row(year=datetime.datetime(1997, 1, 1, 0, 0))]
>>> df.select(date_trunc('mon', df.t).alias('month')).collect()
[Row(month=datetime.datetime(1997, 2, 1, 0, 0))]

2.7 月、日加减法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
df.select(F.col("time"), 
F.add_months(F.col("time"),3).alias("add_months"),
F.add_months(F.col("time"),-3).alias("sub_months"),
F.date_add(F.col("time"),4).alias("date_add"),
F.date_sub(F.col("time"),4).alias("date_sub")
).show()
>>> output Data:
>>>
+----------+----------+----------+----------+----------+
| time|add_months|sub_months| date_add| date_sub|
+----------+----------+----------+----------+----------+
|2020-02-01|2020-05-01|2019-11-01|2020-02-05|2020-01-28|
|2019-03-01|2019-06-01|2018-12-01|2019-03-05|2019-02-25|
|2021-03-01|2021-06-01|2020-12-01|2021-03-05|2021-02-25|
+----------+----------+----------+----------+----------+

2.8 年、月、下一天、一年中第几个星期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
df.select(F.col("time"), 
F.year(F.col("time")).alias("year"),
F.month(F.col("time")).alias("month"),
F.next_day(F.col("time"),"Sunday").alias("next_day"),
F.weekofyear(F.col("time")).alias("weekofyear")
).show()
>>> output Data:
>>>
+----------+----+-----+----------+----------+
| time|year|month| next_day|weekofyear|
+----------+----+-----+----------+----------+
|2020-02-01|2020| 2|2020-02-02| 5|
|2019-03-01|2019| 3|2019-03-03| 9|
|2021-03-01|2021| 3|2021-03-07| 9|
+----------+----+-----+----------+----------+

2.9 星期几、月日、年日

  • 查询星期几
  • 一个月中的第几天
  • 一年中的第几天
1
2
3
4
5
6
7
8
9
10
11
12
13
14
df.select(F.col("time"),  
F.dayofweek(F.col("time")).alias("dayofweek"),
F.dayofmonth(F.col("time")).alias("dayofmonth"),
F.dayofyear(F.col("time")).alias("dayofyear"),
).show()
>>> output Data:
>>>
+----------+---------+----------+---------+
| time|dayofweek|dayofmonth|dayofyear|
+----------+---------+----------+---------+
|2020-02-01| 7| 1| 32|
|2019-03-01| 6| 1| 60|
|2021-03-01| 2| 1| 60|
+----------+---------+----------+---------+

(3). 时间

3.1 创建一个测试数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
data=[
["1","02-01-2020 11 01 19 06"],
["2","03-01-2019 12 01 19 406"],
["3","03-01-2021 12 01 19 406"]]
df2=spark.createDataFrame(data,["id","time"])
df2.show(truncate=False)
>>> output Data:
>>>
+---+-----------------------+
|id |time |
+---+-----------------------+
|1 |02-01-2020 11 01 19 06 |
|2 |03-01-2019 12 01 19 406|
|3 |03-01-2021 12 01 19 406|
+---+-----------------------+

3.2 以 spark 默认格式yyyy-MM-dd HH:mm:ss返回当前时间戳

1
2
3
4
5
6
7
8
9
10
df2.select(F.current_timestamp().alias("current_timestamp")).show()
>>> output Data:
>>>
+--------------------+
| current_timestamp|
+--------------------+
|2021-12-28 09:31:...|
|2021-12-28 09:31:...|
|2021-12-28 09:31:...|
+--------------------+

3.3 将字符串时间戳转换为时间戳类型格式 to_timestamp()

1
2
3
4
5
6
7
8
9
10
11
12
df2.select(F.col("time"), 
F.to_timestamp(F.col("time"), "MM-dd-yyyy HH mm ss SSS").alias("to_timestamp")
).show(truncate=False)
>>> output Data:
>>>
+-----------------------+-----------------------+
|time |to_timestamp |
+-----------------------+-----------------------+
|02-01-2020 11 01 19 06 |2020-02-01 11:01:19.06 |
|03-01-2019 12 01 19 406|2019-03-01 12:01:19.406|
|03-01-2021 12 01 19 406|2021-03-01 12:01:19.406|
+-----------------------+-----------------------+

3.4 获取小时分钟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 数据
data=[
["1","2020-02-01 11:01:19.06"],
["2","2019-03-01 12:01:19.406"],
["3","2021-03-01 12:01:19.406"]]
df3=spark.createDataFrame(data,["id","time"])

# 提取小时、分钟、秒
df3.select(
F.col("time"),
F.hour(F.col("time")).alias("hour"),
F.minute(F.col("time")).alias("minute"),
F.second(F.col("time")).alias("second")
).show(truncate=False)
>>> output Data:
>>>
+-----------------------+----+------+------+
|time |hour|minute|second|
+-----------------------+----+------+------+
|2020-02-01 11:01:19.06 |11 |1 |19 |
|2019-03-01 12:01:19.406|12 |1 |19 |
|2021-03-01 12:01:19.406|12 |1 |19 |
+-----------------------+----+------+------+

7. 处理数据中的空值

8. 复杂类型

  • 结构体: struct
1
2
3
4
5
6
7
from pyspark.sql.functions import struct
df1 = df.select(struct('a', 'b').alias('c'))
# 可以通过"."来访问或列方法getField来实现:
df1.select("c.a")
df1.select(F.col("c").getField("a"))
# 可以通过 ".*"来查询结构体中所有值
df1.select("c.*")
  • 数组
    • split:指定分隔符
1
2
3
4
5
6
7
8
9
10
11
12
13
from pyspark.sql.functions import split
df.select(split(F.col("a"), "\t"))
'''
列名:split(a,)
结果:[WHITE, HANGING, ...]
'''
df.select(split(F.col("a"), "\t").alias("array_col"))
.selectExpr("array_col[0]")
'''
结果
列名:array_col[0]
结果:WHITE
'''

​ 数组长度

1
2
from pyspark.sql.functions import size
df.select(size(split(F.col("a"), "\t")))
array_contains:数组是否包含某个值
1
2
3
from pyspark.sql.functions import array_contains
df.select(array_contains(split(F.col("a"), "\t"), "WHITE"))
# 结果为true

​ explode:一行拆分成多行

1
2
3
4
5
6
7
8
9
10
11
12
13

from pyspark.sql.functions import explode, split

df = df.withColumn("sub_str", explode(split(df["str_col"], "_")))
# 将str_col按-拆分成list,list中的每一个元素成为sub_str,与原行中的其他列一起组成新的行
'''
eg:
"hello world","other column"
split ===> ["hello", "world"], "other column"
explode ===>
"hello", "other column";
"world", "other column"
'''
  • 将嵌套数组 DataFrame 列分解为行

    创建一个带有嵌套数组列的 DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
arrayArrayData = [
("James",[["Java","Scala","C++"],["Spark","Java"]]),
("Michael",[["Spark","Java","C++"],["Spark","Java"]]),
("Robert",[["CSharp","VB"],["Spark","Python"]])
]

df = spark.createDataFrame(data=arrayArrayData, schema = ['name','subjects'])
df.printSchema()
df.show(truncate=False)
>>> output Data:
>>>
root
|-- name: string (nullable = true)
|-- subjects: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

+-------+-----------------------------------+
|name |subjects |
+-------+-----------------------------------+
|James |[[Java, Scala, C++], [Spark, Java]]|
|Michael|[[Spark, Java, C++], [Spark, Java]]|
|Robert |[[CSharp, VB], [Spark, Python]] |
+-------+-----------------------------------+

​ 展平数组,请使用 flatten 函数

1
2
3
4
5
6
7
8
9
10
11
from pyspark.sql.functions import flatten
df.select(df.name, flatten(df.subjects)).show(truncate=False)
>>> output Data:
>>>
+-------+-------------------------------+
|name |flatten(subjects) |
+-------+-------------------------------+
|James |[Java, Scala, C++, Spark, Java]|
|Michael|[Spark, Java, C++, Spark, Java]|
|Robert |[CSharp, VB, Spark, Python] |
+-------+-------------------------------+
展平再分解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
df.select(df.name, explode(flatten(df.subjects))).show(truncate=False)
>>> output Data:
>>>
+-------+------+
|name |col |
+-------+------+
|James |Java |
|James |Scala |
|James |C++ |
|James |Spark |
|James |Java |
|Michael|Spark |
|Michael|Java |
|Michael|C++ |
|Michael|Spark |
|Michael|Java |
|Robert |CSharp|
|Robert |VB |
|Robert |Spark |
|Robert |Python|
+-------+------+
  • Map

    • create_map:键值对
    1
    2
    from pyspark.sql.functions import explode, split
    df.select(create_map(F.col("a"), F.col("b")).alias("c_map"))

    ​ 根据key值取value

    1
    2
    3
    from pyspark.sql.functions import explode, split
    df.select(create_map(F.col("a"), F.col("b")).alias("c_map"))\
    .selectExpt("c_map['WHILE METAL LANTERN']")

    ​ 展开map类型,将其转换成列:explode

    1
    2
    df.select(create_map(F.col("a"), F.col("b")).alias("c_map"))\
    .selectExpt("explode('c_map')")

9. 处理Json类型

(1)创建一个Json类型的列:

1
2
3
4
5
6
7
jsonDF = spark.range(1).selectExpr("""
'{
"a": {
"aa": [1,2,3]
}
}' as jsonString
""")

(2) get_json_object:查询JSON对象

pyspark.sql.functions.get_json_object(col, path) : 根据指定的 json 路径从 json 字符串中提取 json 对象,并返回提取的 json 对象的 json 字符串。如果输入的 json 字符串无效,它将返回 null。

1
2
3
4
5
6

>>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')]
>>> df = spark.createDataFrame(data, ("key", "jstring"))
>>> df.select(df.key, get_json_object(df.jstring, '$.f1').alias("c0"), \
... get_json_object(df.jstring, '$.f2').alias("c1") ).collect()
[Row(key='1', c0='value1', c1='value2'), Row(key='2', c0='value12', c1=None)]

(3) 若此查询的JSON对象仅有一层嵌套,则可使用json_tuple

pyspark.sql.functions.json_tuple(col, *fields): 根据给定的字段名称为 json 列创建一个新行。

1
2
3
4
>>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')]
>>> df = spark.createDataFrame(data, ("key", "jstring"))
>>> df.select(df.key, json_tuple(df.jstring, 'f1', 'f2')).collect()
[Row(key='1', c0='value1', c1='value2'), Row(key='2', c0='value12', c1=None)]

(4)to_json:将StructType转换成JSON字符串

1
2
3
4
5
6
7
8
>>> df = ps.DataFrame([['a', 'b'], ['c', 'd']],
... columns=['col 1', 'col 2'])
>>> df.to_json()
'[{"col 1":"a","col 2":"b"},{"col 1":"c","col 2":"d"}]'


>>> df['col 1'].to_json()
'[{"col 1":"a"},{"col 1":"c"}]'

(5) from_json:解析JSON数据,需指定模式

可以使用pyspark.sql.functions.from_json函数将DataFrame中的字典列拆分为多列

10. UDF (自定义函数)

UDF允许使用多种不同的变成语言编写,这些UDF函数被注册为SparkSession或者Context的临时函数。

Spark将在Driver进程上序列化UDF函数,并将它通过网络传递到所有的executor进程。

如果用Scala或Java编写的,可以在JVM中使用它。除了不能使用Spark为内置函数提供的代码生成功能之外,会导致性能的下降。

如果函数是用Python编写的,Spark在worker上启动一个Python进程将所有程序列化为Python可解释的格式(在此之前程序位于JVM中),在Python进程中对该程序逐行执行函数,最终将对每行的操作结果返回给JVM和Spark。

将程序序列化为Python可解释的格式这个过程代价很高!!!!

  1. 计算昂贵
  2. 程序进入Python后Spark无法管理worker内存。若某个worker因资源受限而失败(JVM和Python在同一台机器争夺内存),可能会导致该worker出现故障。———–建议用java/scala编写UDF。
1
2
3
4
5
from pyspark.sql.functions import udf
def power1(v):
return v**2
powerudf = udf(power1)
df.select(powerudf(F.col('a')))