DataFrame

本文仅做学习总结,如有侵权立删

DataFrame

一、Spark SQL

Spark SQL用于对结构化数据进行处理,它提供了DataFrame的抽象,作为分布式平台数据查询引擎,可以在此组件上构建大数据仓库。

DataFrame是一个分布式数据集,在概念上类似于传统数据库的表结构,数据被组织成命名的列,DataFrame的数据源可以是结构化的数据文件,也可以是Hive中的表或外部数据库,也还可以是现有的RDD。

DataFrame的一个主要优点是,Spark引擎一开始就构建了一个逻辑执行计划,而且执行生成的代码是基于成本优化程序确定的物理计划。与Java或者Scala相比,Python中的RDD是非常慢的,而DataFrame的引入则使性能在各种语言中都保持稳定。

二、初始化

在过去,你可能会使用SparkConf、SparkContext、SQLContext和HiveContext来分别执行配置、Spark环境、SQL环境和Hive环境的各种Spark查询。

SparkSession现在是读取数据、处理元数据、配置会话和管理集群资源的入口。SparkSession本质上是这些环境的组合,包括StreamingContext。

1
2
3
4
5
6
from pyspark.sql import SparkSession
spark=SparkSession \
.builder \
.appName('test') \
.config('master','yarn') \
.getOrCreate()

Spark 交互式环境下,默认已经创建了名为 spark 的 SparkSession 对象,不需要自行创建。

从RDD创建DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 推断schema
from pyspark.sql import Row
lines = sc.textFile("users.txt")
parts = lines.map(lambda l: l.split(","))
data = parts.map(lambda p: Row(name=p[0],age=p[1],city=p[2]))
df=createDataFrame(data)
# 指定schema
data = parts.map(lambda p: Row(name=p[0],age=int(p[1]),city=p[2]))
df=createDataFrame(data)
# StructType指定schema
from pyspark.sql.types import *
schema = StructType([
StructField('name',StringType(),True),
StructField('age',LongType(),True),
StructField('city',StringType(),True)
])
df=createDataFrame(parts, schema)

StructField包括以下方面的内容:
name:字段名
dataType:数据类型
nullable:此字段的值是否为空

从文件系统创建DataFrame

1
2
3
4
df = spark.read.json("customer.json")
df = spark.read.load("customer.json", format="json")
df = spark.read.load("users.parquet")
df = spark.read.text("users.txt")

输出和保存

1
2
3
4
5
6
7
df.rdd # df转化为RDD
df.toJSON() # df转化为RDD字符串
df.toPandas() # df转化为pandas
df.write.save("customer.json", format="json")
df.write.save("users.parquet")
df.write.json("users.json")
df.write.text("users.txt")

数据库读写

1
2
3
df = spark.sql('select name,age,city from users') 
df.createOrReplaceTempView(name) # 创建临时视图
df.write.saveAsTable(name,mode='overwrite',partitionBy=None)

操作hive表
df.write 有两种方法操作hive表

  • saveAsTable()
    如果hive中不存在该表,则spark会自动创建此表匹。
    如果表已存在,则匹配插入数据和原表 schema(数据格式,分区等),只要有区别就会报错
    若是分区表可以调用partitionBy指定分区,使用mode方法调整数据插入方式:

Specifies the behavior when data or table already exists. Options include:

  • overwrite: 覆盖原始数据(包括原表的格式,注释等)
  • append: 追加数据(需要严格匹配)
  • ignore: ignore the operation (i.e. no-op).
  • error or errorifexists: default option, throw an exception at runtime.
  • df.write.partitionBy('dt').mode('append').saveAsTable('tb2')
  • insertInto()

无关schema,只按数据的顺序插入,如果原表不存在则会报错
对于分区表,先开启Hive动态分区,则不需要指定分区字段,如果有一个分区,那么默认为数据中最后一列为分区字段,有两个分区则为最后两列为分区字段,以此类推

1
2
3
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
df.write.insertInto('tb2')
  • 同样也可以先开启Hive动态分区,用SQL语句直接运行
    sql("insert into tb2 select * from tb1")
DataFrame信息 说明
df.show(n) 预览前 n 行数据
df.collect() 列表形式返回
df.dtypes 列名与数据类型
df.head(n) 返回前 n 行数据
df.first() 返回第 1 行数据
df.take(n) 返回前 n 行数据
df.printSchema() 打印模式信息
df.columns 列名
查询语句 说明
df.select(*cols) SELECT in SQL
df.union(other) UNION ALL in SQL
df.when(condition,value) CASE WHEN in SQL
df.alias(*alias,**kwargs) as in SQL
F.cast(dataType) 数据类型转换(函数)
F.lit(col) 常数列(函数)
selectExpr 表查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from pyspark.sql import functions as F
df.select('*')
df.select('name','age') # 字段名查询
df.select(['name','age']) # 字段列表查询
df.select(df['name'],df['age']+1) # 表达式查询
df.select('name',df.mobile.alias('phone')) # 重命名列
df.select('name','age',F.lit('2020').alias('update')) # 常数
df.select('name',
F.when(df.age > 100,100)
.when(df.age < 0,-1)
.otherwise(df.age)
).show()
from pyspark.sql.types import *
df.select('name',df.age.cast('float'))
df.select('name',df.age.cast(FloatType()))
# selectExpr接口支持并行计算
expr=['count({})'.format(i) for i in df.columns]
df.selectExpr(*expr).collect()[0]

#表查询selectExpr,可以使用UDF函数,指定别名等
import datetime
spark.udf.register("getBirthYear",lambda age:datetime.datetime.now().year-age)
dftest = df.selectExpr("name", "getBirthYear(age) as birth_year" , "UPPER(gender) as gender" )
dftest.show()

---------------------------
#窗口函数

df = spark.createDataFrame([("LiLei",78,"class1"),("HanMeiMei",87,"class1"),
("DaChui",65,"class2"),("RuHua",55,"class2")]) \
.toDF("name","score","class")

df.show()
dforder = df.selectExpr("name","score","class",
"row_number() over (partition by class order by score desc) as order")

dforder.show()
+---------+-----+------+
| name|score| class|
+---------+-----+------+
| LiLei| 78|class1|
|HanMeiMei| 87|class1|
| DaChui| 65|class2|
| RuHua| 55|class2|
+---------+-----+------+

+---------+-----+------+-----+
| name|score| class|order|
+---------+-----+------+-----+
| DaChui| 65|class2| 1|
| RuHua| 55|class2| 2|
|HanMeiMei| 87|class1| 1|
| LiLei| 78|class1| 2|
+---------+-----+------+-----+
排序 说明
df.sort(*col,**kwargs) 排序
df.orderBy(*col,**kwargs) 排序(用法同sort)
1
2
3
4
df.sort(df.age.desc()).show()
df.sort('age',ascending=True).show()
df.sort(desc('age'),'name').show()
df.sort(['age','name'],ascending=[0,1]).show()
筛选方法 说明
df.filter(condition) 筛选
column.isin(*cols) in (...)
column.like(pattern) SQL通配符匹配
column.rlike(pattern) 正则表达式匹配
column.startswith(pattern) 匹配开始
column.endswith(pattern) 匹配结尾
column.substr(start,length) 截取字符串
column.between(lower,upper) between ... and ...
column.where
1
2
3
4
5
6
7
8
df.filter("age = 22").show()
df.filter(df.age == 22).show()
df.select(df['age'] == 22).show()
df.select(df.name.isin('Bill','Elon')).show()
df.filter("name like Elon%").show()
df.filter(df.name.rlike("Musk$").show()

df.where("gender='male' and age > 15").show()
统计信息 说明
df.describe() 描述性统计
df.count() 行数
df.approxQuantile(col,prob,relativeError) 百分位数
df.corr(col1,col2,method=None) 相关系数
1
2
3
4
5
6
7
8
# 异常值处理
bounds = {}
for col in df.columns:
quantiles = df.approxQuantile(col,[0.25,0.75],0.05)
# 第三个参数relativeError代表可接受的错误程度,越小精度越高
IQR = quantiles[1] - quantiles[0]
bounds[col] = [quantiles[0]-1.5*IQR, quantiles[1]+1.5*IQR]
# bounds保存了每个特征的上下限
分组和聚合 说明
df.groupBy(*cols) 分组,返回GroupedData
groupedData.count() 计数
groupedData.sum(*cols) 求和
groupedData.avg(*cols) 平均值
groupedData.mean(*cols) 平均值
groupedData.max(*cols) 最大值
groupedData.min(*cols) 最小值
groupedData.agg(*exprs) 应用表达式

聚合函数还包括 countDistinct, kurtosis, skewness, stddev, sumDistinct, variance 等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
df.groupBy('city').count().collect()
df.groupBy(df.city).avg('age').collect()
df.groupBy('city',df.age).count().collect()
df.groupBy('city').agg({'age':'mean'}).collect() # 字典形式给出
df.groupBy('city').agg({'*':'count'}).collect()
df.groupBy('city').agg(F.mean(df.age)).collect()
# groupBy + collect_list
df.groupBy("gender").agg(F.expr("avg(age)"),F.expr("collect_list(name)")).show()
+------+--------+------------------+
|gender|avg(age)|collect_list(name)|
+------+--------+------------------+
| null| 16.0| [RuHua]|
|female| 16.0| [HanMeiMei]|
| male| 16.0| [LiLei, DaChui]|
+------+--------+------------------+
去重 说明
df.distinct() 唯一值(整行去重)
df.dropDuplicates(subset=None) 删除重复项(可以指定字段)
添加、修改、删除列 说明
df.withColumnRenamed(existing,new) 重命名
df.withColumn(colname,new) 修改列
df.drop(*cols) 删除列
1
2
3
df=df.withColumn('age',df.age+1)
df=df.drop('age')
df=df.drop(df.age)
缺失值处理 说明
df.na.fill(value,subset=None) 缺失值填充
df.na.drop(how='any',thresh=None,subset=None) 缺失值删除
df.na.replace(to_teplace,value,subset=None) 替换
1
2
3
4
5
df=df.na.fill(0)
df=df.na.fill({'age':50,'name':'unknow'})
df=df.na.drop()
df = df.dropna() # 跟上面那种方式是一样的
df=df.na.replace(['Alice','Bob'],['A','B'],'name')
分区和缓存 说明
df.repartition(n) 将df拆分为10个分区
df.coalesce(n) 将df合并为n个分区
df.cache() 缓存