本文仅做学习总结,如有侵权立删
一、核心数据源
- CSV
- JSON
- Parquet
- ORC
- JDBC
纯文本
Hbase
- MongoDB
- AWS Redshift
- XML
。。。
二、Read API结构
读取数据的核心结构如下:
1 | DataFrameReader.format(...).option("key", "value").schema(...).load() |
使用以上格式来读取所有数据源,format是可选的,默认是parquet格式。
Spark数据读取使用DataFrameReader,通过SparkSession的read属性得到:
Spark.read
需指定:
- format
- schema
- read模式
- 一系列option选项
1 | spark.read.format("csv") |
(1) 读取模式mode
从外部源读取数据很容易会遇到错误格式的数据,指定读取模式可以当Spark遇到错误格式的记录时应采取什么操作。
- permissive:当遇到错误格式的记录时,将所有字段设置为null并将所有错误格式的记录放在名为_corrupt_record字符串列中
- dropMalformed:删除包含错误格式记录的行
- failFast:遇到错误格式的记录后立即返回失败
默认是permissive.
三、Write API结构
写数据的核心结构如下:
1 | DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save() |
(1) 保存模式mode
.option(“mode”, “OVERWRITE”)
- append:将输出内容追加到目标文件中
- overwrite:将输出内容重写到目标文件中
- errorIfExists:如果目标路径已存在数据或文件,则抛出错误并返回写入操作失败
- ignore:如果目标路径已存在数据或文件,则不执行任何操作
默认值为errorIfExists.
四、CSV/json/parquet等文件
1 | spark.read.format("csv") # json/ parquet/orc/ |
五、文本文件
1 | # 读文件 |
六、高级I/O概念
可分割的文件类型和压缩(gzip压缩格式的Parquet等)
并行读写数据(分区概念)
repartitions:减少分区数量
1
df.repartitions(5).write.format("csv").save("file.csv")
partitionBy:按什么字段来分区
1
df.write.mode("overwrite").partitionBy("a").save("file.csv")
数据分桶:bucketBy
具有相同桶ID(哈希分桶的ID)的数据将放置到一个物理分区中,这样就可以避免在稍后读取数据时进行shuffle。
1 | DataFrameWriter.bucketBy(numBuckets, col, *cols) |
1 | 'parquet') (df.write.format( |
- 管理文件
!!!写数据对文件大小不那么重要,但读取很重要!管理大量小文件产生很大的元数据开销,Spark特别不适合处理小文件。
maxRecordsPerFile选项来指定每个文件的最大记录数,这使得你可以通过控制写入每个文件的记录数来控制文件大小。
1 | df.write.option('maxRecordsPerFile', 5000) # Spark确保每个文件最多包含5000条记录 |