9. 数据源

本文仅做学习总结,如有侵权立删

一、核心数据源

  • CSV
  • JSON
  • Parquet
  • ORC
  • JDBC
  • 纯文本

  • Hbase

  • MongoDB
  • AWS Redshift
  • XML

。。。

二、Read API结构

读取数据的核心结构如下:

1
DataFrameReader.format(...).option("key", "value").schema(...).load()

使用以上格式来读取所有数据源,format是可选的,默认是parquet格式。

Spark数据读取使用DataFrameReader,通过SparkSession的read属性得到:

​ Spark.read

需指定:

  • format
  • schema
  • read模式
  • 一系列option选项
1
2
3
4
5
spark.read.format("csv")
.option("mode", "FAILFAST")
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()

(1) 读取模式mode

从外部源读取数据很容易会遇到错误格式的数据,指定读取模式可以当Spark遇到错误格式的记录时应采取什么操作。

  • permissive:当遇到错误格式的记录时,将所有字段设置为null并将所有错误格式的记录放在名为_corrupt_record字符串列中
  • dropMalformed:删除包含错误格式记录的行
  • failFast:遇到错误格式的记录后立即返回失败

默认是permissive.

三、Write API结构

写数据的核心结构如下:

1
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

(1) 保存模式mode

.option(“mode”, “OVERWRITE”)

  • append:将输出内容追加到目标文件中
  • overwrite:将输出内容重写到目标文件中
  • errorIfExists:如果目标路径已存在数据或文件,则抛出错误并返回写入操作失败
  • ignore:如果目标路径已存在数据或文件,则不执行任何操作

默认值为errorIfExists.

四、CSV/json/parquet等文件

1
2
3
4
5
spark.read.format("csv") # json/ parquet/orc/
.option("header", "true")
.load("file.csv")

df.write.format("csv").mode("overwrite").option("sep", "\t").save("file.csv")

五、文本文件

1
2
3
4
5
# 读文件
spark.read.textFile("file.csv").selectExpr("split(value, ',') as rows")

# 写文件
df.limit(10).select("a", "b").write..partitionBy('b').text('file.csv')

六、高级I/O概念

  • 可分割的文件类型和压缩(gzip压缩格式的Parquet等)

  • 并行读写数据(分区概念)

    • repartitions:减少分区数量

      1
      df.repartitions(5).write.format("csv").save("file.csv")
    • partitionBy:按什么字段来分区

      1
      df.write.mode("overwrite").partitionBy("a").save("file.csv")
  • 数据分桶:bucketBy

具有相同桶ID(哈希分桶的ID)的数据将放置到一个物理分区中,这样就可以避免在稍后读取数据时进行shuffle。

1
2
DataFrameWriter.bucketBy(numBuckets, col, *cols)
# 按给定列存储输出。如果指定,则输出布局在文件系统上,类似于 Hive 的分桶方案,但具有不同的桶哈希函数,并且与 Hive 的分桶不兼容。
1
2
3
4
>>> (df.write.format('parquet')  
.bucketBy(100, 'year', 'month')
.mode("overwrite")
.saveAsTable('bucketed_table'))
  • 管理文件

!!!写数据对文件大小不那么重要,但读取很重要!管理大量小文件产生很大的元数据开销,Spark特别不适合处理小文件。

​ maxRecordsPerFile选项来指定每个文件的最大记录数,这使得你可以通过控制写入每个文件的记录数来控制文件大小。

1
df.write.option('maxRecordsPerFile', 5000) # Spark确保每个文件最多包含5000条记录