本文仅做学习总结,如有侵权立删
DataFrame
一、Spark SQL
Spark SQL用于对结构化数据进行处理,它提供了DataFrame的抽象,作为分布式平台数据查询引擎,可以在此组件上构建大数据仓库。
DataFrame是一个分布式数据集,在概念上类似于传统数据库的表结构,数据被组织成命名的列,DataFrame的数据源可以是结构化的数据文件,也可以是Hive中的表或外部数据库,也还可以是现有的RDD。
DataFrame的一个主要优点是,Spark引擎一开始就构建了一个逻辑执行计划,而且执行生成的代码是基于成本优化程序确定的物理计划。与Java或者Scala相比,Python中的RDD是非常慢的,而DataFrame的引入则使性能在各种语言中都保持稳定。
二、初始化
在过去,你可能会使用SparkConf、SparkContext、SQLContext和HiveContext来分别执行配置、Spark环境、SQL环境和Hive环境的各种Spark查询。
SparkSession现在是读取数据、处理元数据、配置会话和管理集群资源的入口。SparkSession本质上是这些环境的组合,包括StreamingContext。
1 | from pyspark.sql import SparkSession |
Spark 交互式环境下,默认已经创建了名为 spark 的 SparkSession 对象,不需要自行创建。
从RDD创建DataFrame
1 | # 推断schema |
StructField包括以下方面的内容:
name:字段名
dataType:数据类型
nullable:此字段的值是否为空
从文件系统创建DataFrame
1 | df = spark.read.json("customer.json") |
输出和保存
1 | df.rdd # df转化为RDD |
数据库读写
1 | df = spark.sql('select name,age,city from users') |
操作hive表df.write
有两种方法操作hive表
saveAsTable()
如果hive中不存在该表,则spark会自动创建此表匹。
如果表已存在,则匹配插入数据和原表 schema(数据格式,分区等),只要有区别就会报错
若是分区表可以调用partitionBy
指定分区,使用mode
方法调整数据插入方式:
Specifies the behavior when data or table already exists. Options include:
overwrite
: 覆盖原始数据(包括原表的格式,注释等)append
: 追加数据(需要严格匹配)ignore
: ignore the operation (i.e. no-op).error
orerrorifexists
: default option, throw an exception at runtime.
df.write.partitionBy('dt').mode('append').saveAsTable('tb2')
insertInto()
无关schema,只按数据的顺序插入,如果原表不存在则会报错
对于分区表,先开启Hive动态分区,则不需要指定分区字段,如果有一个分区,那么默认为数据中最后一列为分区字段,有两个分区则为最后两列为分区字段,以此类推
1 | sqlContext.setConf("hive.exec.dynamic.partition", "true") |
- 同样也可以先开启Hive动态分区,用SQL语句直接运行
sql("insert into tb2 select * from tb1")
DataFrame信息 | 说明 |
---|---|
df.show(n) |
预览前 n 行数据 |
df.collect() |
列表形式返回 |
df.dtypes |
列名与数据类型 |
df.head(n) |
返回前 n 行数据 |
df.first() |
返回第 1 行数据 |
df.take(n) |
返回前 n 行数据 |
df.printSchema() |
打印模式信息 |
df.columns |
列名 |
查询语句 | 说明 |
---|---|
df.select(*cols) |
SELECT in SQL |
df.union(other) |
UNION ALL in SQL |
df.when(condition,value) |
CASE WHEN in SQL |
df.alias(*alias,**kwargs) |
as in SQL |
F.cast(dataType) |
数据类型转换(函数) |
F.lit(col) |
常数列(函数) |
selectExpr | 表查询 |
1 | from pyspark.sql import functions as F |
排序 | 说明 |
---|---|
df.sort(*col,**kwargs) |
排序 |
df.orderBy(*col,**kwargs) |
排序(用法同sort) |
1 | df.sort(df.age.desc()).show() |
筛选方法 | 说明 |
---|---|
df.filter(condition) |
筛选 |
column.isin(*cols) |
in (...) |
column.like(pattern) |
SQL通配符匹配 |
column.rlike(pattern) |
正则表达式匹配 |
column.startswith(pattern) |
匹配开始 |
column.endswith(pattern) |
匹配结尾 |
column.substr(start,length) |
截取字符串 |
column.between(lower,upper) |
between ... and ... |
column.where |
1 | df.filter("age = 22").show() |
统计信息 | 说明 |
---|---|
df.describe() |
描述性统计 |
df.count() |
行数 |
df.approxQuantile(col,prob,relativeError) |
百分位数 |
df.corr(col1,col2,method=None) |
相关系数 |
1 | # 异常值处理 |
分组和聚合 | 说明 |
---|---|
df.groupBy(*cols) |
分组,返回GroupedData |
groupedData.count() |
计数 |
groupedData.sum(*cols) |
求和 |
groupedData.avg(*cols) |
平均值 |
groupedData.mean(*cols) |
平均值 |
groupedData.max(*cols) |
最大值 |
groupedData.min(*cols) |
最小值 |
groupedData.agg(*exprs) |
应用表达式 |
聚合函数还包括 countDistinct, kurtosis, skewness, stddev, sumDistinct, variance 等
1 | df.groupBy('city').count().collect() |
去重 | 说明 |
---|---|
df.distinct() |
唯一值(整行去重) |
df.dropDuplicates(subset=None) |
删除重复项(可以指定字段) |
添加、修改、删除列 | 说明 |
---|---|
df.withColumnRenamed(existing,new) |
重命名 |
df.withColumn(colname,new) |
修改列 |
df.drop(*cols) |
删除列 |
1 | df=df.withColumn('age',df.age+1) |
缺失值处理 | 说明 |
---|---|
df.na.fill(value,subset=None) |
缺失值填充 |
df.na.drop(how='any',thresh=None,subset=None) |
缺失值删除 |
df.na.replace(to_teplace,value,subset=None) |
替换 |
1 | df=df.na.fill(0) |
分区和缓存 | 说明 |
---|---|
df.repartition(n) |
将df拆分为10个分区 |
df.coalesce(n) |
将df合并为n个分区 |
df.cache() |
缓存 |