Lee_yl's blog


  • 首页

  • 归档

TensorFlow1基础

发表于 2023-03-27 | 分类于 语言框架 , TensorFlow

本文仅做学习总结,如有侵权立删

一、设计理念

1. 图的定义和运行完全分开,采用符号式编程

  • 先定义各种变量
  • 建立一个数据流图
  • 在数据流图中规定各个变量之间的计算关系
  • 对数据流图进行编译
  • 直至真正的输入数据进入图中,才形成数据流输出值。

2. 运算都放在图中,图的运行只发生在session中。

  • 开启session后,可以用数据去填充节点,进行运算
  • 关闭会话后,就不能进行计算了

二、运行流程

  • 运行流程主要有2步:构造模型和训练

  • 模型图:但没发生实际运算。【Tensor,Variable,placeholder】

img

  • 训练:有实际数据输入,梯度计算等操作。【session】

三、编程模型

1
2
# 导包
import tensorflow as tf
1
2
3
4
5
6
7
8
9
10
# 创建常量vec1和vec2
vec1 = tf.constant([[1,2]])
vec2 = tf.constant([[3],[4]])

'''
vec1为1×2的向量
<tf.Tensor 'Const:0' shape=(1, 2) dtype=int32>
vec2为2×1的向量
<tf.Tensor 'Const_1:0' shape=(2, 1) dtype=int32>
'''
1
2
3
4
5
6
7
8
9
10
# 创建矩阵相乘的操作
product = tf.matmul(vec1, vec2)
'''
<tf.Tensor 'MatMul:0' shape=(1, 1) dtype=int32>
调换矩阵乘法的顺序,直接可以看出结果的形状不一样
'''
productv2 = tf.matmul(vec2, vec1)
'''
<tf.Tensor 'MatMul_1:0' shape=(2, 2) dtype=int32>
'''

In [10]:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 创建会话Session并输出计算的结果
with tf.Session() as sess:
product_res = sess.run(product)
productv2_res = sess.run(productv2)
print(product_res)
print(productv2_res)

'''
[[11]]
[[3 6]
[4 8]]

从结果可以看出`product`的shape为1×1
`productv2`的shape为2×2
符合矩阵乘法的法则

'''

11. Spark SQL

发表于 2023-03-25 | 分类于 大数据 , Spark

本文仅做学习总结,如有侵权立删

Spark SQL简介

1、什么是SQL

结构化查询语言:一种表示数据关系操作的特定领域语言。

Spark2.0发布了一个支持Hive操作的超集,并提供了一个能够同时支持ANSI SQL(标准SQL)和HiveSQL的原生SQL解析器。

  • SparkSession新的起始点
    在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。
    SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的

  • 表存储两类重要的信息:表内数据和表的有关数据,也就是元数据。

  • 托管表: 让Spark管理一组文件和数据的元数据

当您从磁盘上的文件定义表时,您正在定义一个非托管表。

“拥有”其数据,表定义 (元数据) 以及表数据都通过元数据系统进行管理。 托管表在其架构和数据之间提供一致性,如果删除了托管表,则它包含的数据 也将被删除。

  • 非托管表: Spark只管理表的元数据

外部表是在元数据系统中提供表架构的表,但仅引用来自元数据系统控件的外部数据的表。 因此,在创建表后,无法保证表架构与数据一致。 无需通过外部表即可更改外部数据。 如果更改或删除外部表,基础数据不会受到影响或删除。

2. Dataframe和SQL互转

(1)createOrReplaceTempView:对DataFrame创建一个临时表

(2)df.createGlobalTempView:对于DataFrame创建一个全局表

1
2
3
4
5
6
7
8
9
# Dataframe转SQL,对DataFrame创建一个临时表
spark.read.json("data.json")\
.createOrReplaceTempView("some_sql_view")

# SQL转DataFrame
spark.sql("""
SELECT a,count FROM some_sql_view
""").where("a like 'S%'").where("`sum(count)` > 10").count()
# 注意:临时表是Session范围内的,Session退出后,表就失效了。如果想应用范围内有效,可以使用全局表。注意使用全局表时需要全路径访问,如:global_temp.some_sql_view

3. Catalog

catalog: 指的是所有的database目录

Spark SQL中的最高抽象是Catalog。Catalog是存储关于表中存储的数据以及其他有用的东西(如数据库、表、函数和视图)的元数据的抽象。

Catalog位于org.apache.spark.sql.catalog.Catalog包中,包含许多有用的函数,用于列出表、数据库和函数。我们将很快讨论所有这些事情。它对用户来说非常容易理解,因此我们将省略这里的代码示例,但它实际上只是另一个SparkSQL的编程接口。本章只显示正在执行的SQL;因此,如果您正在使用编程接口,请记住您需要将所有内容封装在一个spark.sql()函数中,执行相关代码。

4. 表

(1)创建表:create table

  • 从多种数据源创建表:

USING 和 STORED AS

前面示例中的USING语法规范非常重要。如果不指定格式,Spark将默认为Hive SerDe配置。这对将来的读取和写入有性能影响,因为Hive SerDes比Spark的本地序列化慢得多。Hive用户还可以使用STORED AS语法来指定这应该是一个Hive表。

  • 也可以从查询中创建一个表:
1
CREATE TABLE flights_from_select USING parquet AS SELECT * FROM flights

此外,只有在表当前不存在时,才可以指定创建表:

在本例中,我们创建了一个与hive兼容的表,因为我们没有通过USING显式指定格式。我们还可以这样做:

1
CREATE TABLE IF NOT EXISTS flights_from_select AS SELECT * FROM flights

根据分区的控制数据布局:

1
2
3
CREATE TABLE partitioned_flights USING parquet 
PARTITIONED BY (DEST_COUNTRY_NAME)
AS SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 5

这些表甚至可以通过会话在Spark中使用;Spark中目前不存在临时表。您必须创建一个临时视图.

(2)创建外部表:create EXTERNAL table

  • create EXTERNAL table:创建一个外部表

您可以通过运行以下命令来使用任何已经定义的文件创建外部表:

您还可以从select子句创建一个外部表:

(3)插入数据到表

标准SQL:INSERT INTO

1
2
INSERT INTO flights_from_select 
SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 20

如果只希望写入某个分区,可以选择提供分区规范。注意,写操作也会遵循分区模式(这可能导致上面的查询运行得非常慢);但是,它只会添加额外的文件到目的分区:

1
2
3
4
INSERT INTO partitioned_flights 
PARTITION (DEST_COUNTRY_NAME="UNITED STATES")
SELECT count, ORIGIN_COUNTRY_NAME FROM flights
WHERE DEST_COUNTRY_NAME='UNITED STATES' LIMIT 12

(4) 查看表元数据: DESCRIBE TABLE

我们在前面看到,可以在创建表时添加注释。你可以通过描述表元数据来查看,它会显示相关的注释:

1
DESCRIBE TABLE flights_csv

您还可以使用以下方法查看数据的分区方案(但是,请注意,这只适用于分区表):

1
SHOW PARTITIONS partitioned_flights

(5)刷新表元数据: REFRESH TABLE 、REPAIR TABLE

维护表元数据是确保从最新数据集读取数据的一项重要任务。有两个命令用于刷新表元数据。REFRESH TABLE刷新与该表关联的所有缓存条目(本质上是文件)。如果该表以前缓存过,那么下次扫描时它将被延迟缓存:

另一个相关命令是REPAIR TABLE,它刷新catalog中为给定表维护的分区。这个命令的重点是收集新的分区信息,例如手工写一个新的分区,需要相应地修复表:

1
MSCKREPAIR TABLE partitioned_flights

(6) 删除表:drop table

1
您不能delete表:您只能“drop”它们。可以使用drop关键字删除表。如果删除托管表(例如flights_csv),数据和表定义都将被删除:
1
DROP TABLE flights_csv;

如果您试图删除不存在的表,您将收到一个错误。若要仅删除已经存在的表,请使用DROP TABLE IF EXISTS:

1
DROP TABLE IF EXISTS flights_csv;
  • 删除非托管表
1
如果您正在删除一个非托管表(例如,hive_flights),则不会删除任何数据,但您将不再能够通过表名引用该数据。

(7)缓存表:cache和uncache

就像DataFrames一样,您可以缓存和不缓存表。您只需使用以下语法指定要使用哪个表:

1
2
3
4
# 缓存表
CACHE TABLE flights
# 不缓存表
UNCACHE TABLE flights

5、视图

创建了表之后,还可以定义视图。视图在现有表的之上定义一组转换(基本上就是保存的查询计划),这对于组织或重用查询逻辑非常方便。Spark有几个不同的视图概念。视图可以是全局级别的,可以设置为数据库级别,也可以是会话级别。

(1)创建视图: CREATE VIEW

对于最终用户来说,视图显示为表,只是在查询时对源数据执行转换,而不是将所有源数据重写到新的位置。这可能是一个filter、select,也可能是一个更大的group BY或ROLLUP。例如,在下面的例子中,我们创建了一个目的地为美国的视图,以便只看到这些航班:

  • 创建视图: CREATE VIEW
1
CREATE VIEW just_usa_view AS SELECT * FROM flights WHERE dest_country_name = 'United States'
  • 创建临时视图: CREATE TEMP VIEW

    这些视图只在当前会话期间可用,并且不注册到数据库:

1
CREATE TEMP VIEW just_usa_view_temp AS SELECT * FROM flights WHERE dest_country_name = 'United States'
  • 创建全局临时视图: CREATE GLOBAL TEMP VIEW
    全局临时视图的解析与数据库无关,可以在整个Spark应用程序中查看,但在会话结束时将它们删除:
1
CREATE GLOBAL TEMP VIEW just_usa_global_view_temp AS  SELECT * FROM flights WHERE dest_country_name = 'United States'SHOW TABLES
  • 覆盖视图:CREATE OR REPLACE TEMP VIEW

    还可以使用下面示例中所示的关键字指定,如果一个视图已经存在,则希望覆盖该视图。我们可以覆盖临时视图和常规视图:

1
CREATE OR REPLACE TEMP VIEW just_usa_view_temp AS SELECT * FROM flights WHERE dest_country_name = 'United States'
  • 查询视图

    1
    SELECT * FROM just_usa_view_temp

    视图实际上是一个转换,Spark只在查询时执行它。这意味着它只会在您实际查询表之后应用该过滤器(而不是更早)。

    实际上,视图相当于从现有DataFrame创建一个新的DataFrame。实际上,您可以通过比较Spark DataFrames和Spark SQL生成的查询计划来了解这一点。

  • 删除视图: DROP VIEW IF EXISTS

可以像删除表一样删除视图;只需指定要删除的是视图而不是表。删除视图和删除表的主要区别在于,对于视图,不删除底层数据,只删除视图定义本身:

1
DROP VIEW IF EXISTS just_usa_view;

6、数据库

  • 查看所有数据库:SHOW DATABASES

  • 创建数据库

    创建数据库的模式与您在本章前面看到的相同;但是,这里使用CREATE DATABASE关键字:

    1
    CREATE DATABASE some_db

    您可能想要设置一个数据库来执行某个查询。要做到这一点,使用use关键字后面跟着数据库名称:

    1
    USE some_db
  • 查看当前使用的数数据库

1
SELECT current_database()
  • 删除数据库: DROP DATABASE
1
DROP DATABASE IF EXISTS some_db;

7、高级主题

Spark SQL中有三种核心的复杂类型:struct、list和map。

(1)Struct

struct更类似于map。它们提供了在Spark中创建或查询嵌套数据的方法。要创建一个,你只需要用括号括起一组列(或表达式):

  • 查询Struct
1
2
select country.DEST_COUNTRY_NAME, count FROM nested_date
select country.*, count FROM nested_date # 全部列

(2)List

有几种方法可以创建数组或值列表。

  • 使用collect_list函数,它创建一个值列表。

  • 使用函数collect_set,它创建一个没有重复值的数组。
    这两个都是聚合函数,因此只能在聚合中指定:

  • 手动创建数组

  • 查询列表索引:[0]

  • 将数组转换回行。使用explode函数

    • 建视图

    • 拆行

    现在让我们将复杂类型分解为数组中每个值对应的结果中的一行。DEST_COUNTRY_NAME将对数组中的每个值进行复制,执行与原始collect完全相反的操作,并返回到原始DataFrame:

(3)查看Spark SQL中的函数列表:

SHOW FUNCTIONS: 查看Spark SQL中的函数列表

SHOW SYSTEM FUNCTIONS:指示是否希望查看系统函数(即Spark内置函数)以及用户函数:

SHOW USER FUNCTIONS: 用户函数是由您或其他共享Spark环境的人定义的。

SHOW FUNCTIONS “s*“: 通过传递带有通配符(*)的字符串来过滤所有SHOW命令。所有以“s”开头的函数:;

SHOW FUNCTIONS LIKE “collect*”: 选择包含LIKE关键字

  • 自定义函数

可以通过Hive CREATE TEMPORARY FUNCTION语法注册函数。

(4)子查询

如果你想看到你的航班是否会把你从你当前所在国家带回来,你可以通过检查是否有这样航班:以当前所在国家为起飞点,以带回国家为目的地:

EXISTS只检查子查询中的一些存在性,如果有值,返回true。你可以通过把NOT运算符放在它前面来翻转它。这就相当于找到了一架飞往你无法返回目的地的航班!

SQL配置

您可以在应用程序初始化时或在应用程序执行过程中设置这些参数(就像我们在本书中看到的shuffle分区一样)。

spark.sql.shuffle.partitions 200 配置在为连接或聚合shuffle数据时要使用的分区数。

您只能以这种方式设置Spark SQL配置,但是下面是如何设置shuffle分区:SETspark.sql.shuffle.partitions=20

10. Spark持久化

发表于 2023-03-24 | 分类于 大数据 , Spark

本文仅做学习总结,如有侵权立删

持久化

1. 引言

查看上面那段伪代码,两个count触发算子会产生两个job, 那么,这两个job会往回去找errors,lines这两个rdd,最后到磁盘上拿数据。也就是每个job都会去读一遍磁盘,这里可以做优化, 将errors这个rdd保存到内存中, 然后第一个count会去磁盘度数, 但第二个count直接可以从内存中读数据了。

控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。

cache、persist和checkpoint都是懒执行的。

必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系

RDD 可以使用 persist() 方法或 cache() 方法进行持久化。数据将会在第一次 action 操作时进行计算,并缓存在节点的内存中。Spark 的缓存具有容错机制,如果一个缓存的 RDD 的某个分区丢失了,Spark 将按照原来的计算过程,自动重新计算并进行缓存。

在 shuffle 操作中(例如 reduceByKey),即便是用户没有调用 persist 方法,Spark 也会自动缓存部分中间数据。这么做的目的是,在 shuffle 的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个 RDD,强烈推荐在该 RDD 上调用 persist 方法。

存储级别

可以看到StorageLevel类的主构造器包含了5个参数:

  • useDisk:使用硬盘(外存)
  • useMemory:使用内存
  • useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
  • deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象
  • replication:备份数(在多个节点上备份)

每个持久化的 RDD 可以使用不同的存储级别进行缓存,例如,持久化到磁盘、已序列化的 Java 对象形式持久化到内存(可以节省空间)、跨节点间复制、以 off-heap 的方式存储在 Tachyon。这些存储级别通过传递一个 StorageLevel 对象给 persist() 方法进行设置。
详细的存储级别介绍如下:

  • MEMORY_ONLY : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。
  • MEMORY_AND_DISK : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
  • MEMORY_ONLY_SER : 将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast serializer时会节省更多的空间,但是在读取时会增加 CPU 的计算负担。
  • MEMORY_AND_DISK_SER : 类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。
  • DISK_ONLY : 只在磁盘上缓存 RDD。
  • MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等 : 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
  • OFF_HEAP): 类似于 MEMORY_ONLY_SER ,但是将数据存储在 off-heap memory,这需要启动 off-heap 内存。(Off-heap是指在堆外内存)
1
2
3
4
5
6
7
8
9
10
11
12
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

如何选择存储级别

Spark 的存储级别的选择,核心问题是在内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择 :

  • 如果使用默认的存储级别(MEMORY_ONLY),存储在内存中的 RDD 没有发生溢出,那么就选择默认的存储级别。默认存储级别可以最大程度的提高 CPU 的效率,可以使在 RDD 上的操作以最快的速度运行。
  • 如果内存不能全部存储 RDD,那么使用 MEMORY_ONLY_SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快。
  • 除了在计算该数据集的代价特别高,或者在需要过滤大量数据的情况下,尽量不要将溢出的数据存储到磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。
  • 如果想快速还原故障,建议使用多副本存储级别(例如,使用 Spark 作为 web 应用的后台服务,在服务出故障时需要快速恢复的场景下)。所有的存储级别都通过重新计算丢失的数据的方式,提供了完全容错机制。但是多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务继续运行。

2. persist

persist使用场景:

  • 某个步骤计算非常耗时,需要进行persist持久化
  • 计算链条非常长,重新恢复要算很多步骤
  • 需要checkpoint的RDD最好进行persist,checkpoint机制会在job执行完成之后根据DAG向前回溯,找到需要进行checkpoint的RDD,另起一个job来计算该RDD,将计算结果存储到HDFS,如果在job执行的过程中对该RDD进行了persist,那么进行checkpoint会非常快
  • shuffle之前进行persist,Spark默认将数据持久化到磁盘,自动完成,无需干预
  • shuffle之后为什么要persist,shuffle要进性网络传输,风险很大,数据丢失重来,恢复代价很大

3. cache

RDD的cache和persist的区别

cache()调用的persist(),是使用默认存储级别的快捷设置方法(MEMORY_ONLY)

通过源码可以看出cache()是persist()的简化方式,调用persist的无参版本,也就是调用persist(StorageLevel.MEMORY_ONLY),cache只有一个默认的缓存级别MEMORY_ONLY,即将数据持久化到内存中,而persist可以通过传递一个 StorageLevel 对象来设置其它的缓存级别。

4. unpersist 释放缓存:

Spark 自动监控各个节点上的缓存使用率,并以最近最少使用的方式(LRU)将旧数据块移除内存。如果想手动移除一个 RDD,而不是等待该 RDD 被 Spark 自动移除,可以使用 RDD.unpersist() 方法

注意:如果缓存的RDD之间有依赖关系,比如

val rdd_a = df.persist

val rdd_b = rdd_a.filter.persist

val rdd_c = rdd_b.map.persist

在用unpersist清理缓存时,当首先清理rdd_a时,会重建rdd_b和rdd_c的缓存,如果数据量巨大,这个过程可能花费很长时间,即使rddb和rdd_c后面也即将被清理,但是重建过程也会进行,可能会出现一个现象,所有job都以完成,但是任务长时间处于RUNNING状态,这可能就是因为最后再清理缓存时又会把依赖于它的RDD再重算一遍。这时可以只用使用spark.sharedState.cacheManager.uncacheQuery(df, cascade = true, blocking = false)来全部释放,参数cascade 表示是否清理所有引用此RDD的其他RDD,以下是unpersist的源码,可以一目了然

1
2
3
4
def unpersist(blocking: Boolean): this.type = {
sparkSession.sharedState.cacheManager.uncacheQuery(this, cascade = false, blocking)
this
}

5. checkpoint

  • checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系

  • checkpoint 的执行原理

    • 当RDD的job执行完毕后,会从finalRDD从后往前回溯。
    • 当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
    • Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
  • 优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步

  • 解释:cache 机制是每计算出一个要 cache 的 partition 就直接将其 cache 到内存了。但 checkpoint 没有使用这种第一次计算得到就存储的方法,而是等到 job 结束后另外启动专门的 job 去完成 checkpoint 。也就是说需要 checkpoint 的 RDD 会被计算两次。因此,在使用 rdd.checkpoint() 的时候,建议加上 rdd.cache(),这样第二次运行的 job 就不用再去计算该 rdd 了,直接读取 cache 写磁盘。

(1)cache 和 checkpoint 之间有一个重大的区别,

cache 将 RDD 以及 RDD 的血统(记录了这个RDD如何产生)缓存到内存中,当缓存的 RDD 失效的时候(如内存损坏),它们可以通过血统重新计算来进行恢复。但是 checkpoint 将 RDD 缓存到了 HDFS 中,同时忽略了它的血统(也就是RDD之前的那些依赖)。为什么要丢掉依赖?因为可以利用 HDFS 多副本特性保证容错!

(2)persist与checkpoint的区别

rdd.persist(StorageLevel.DISK_ONLY) 与 checkpoint 也有区别。前者虽然可以将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理。一旦 driver program 执行结束,也就是 executor 所在进程 CoarseGrainedExecutorBackend stop,blockManager 也会 stop,被 cache 到磁盘上的 RDD 也会被清空(整个 blockManager 使用的 local 文件夹被删除)。

而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是一直存在的,也就是说可以被下一个 driver program 使用,而 cached RDD 不能被其他 dirver program 使用。

总结

Spark相比Hadoop的优势在于尽量不去持久化,所以使用 pipeline,cache 等机制。用户如果感觉 job 可能会出错可以手动去 checkpoint 一些 critical 的 RDD,job 如果出错,下次运行时直接从 checkpoint 中读取数据。唯一不足的是,checkpoint 需要两次运行 job。

9. 数据源

发表于 2023-03-19 | 分类于 大数据 , Spark

本文仅做学习总结,如有侵权立删

一、核心数据源

  • CSV
  • JSON
  • Parquet
  • ORC
  • JDBC
  • 纯文本

  • Hbase

  • MongoDB
  • AWS Redshift
  • XML

。。。

二、Read API结构

读取数据的核心结构如下:

1
DataFrameReader.format(...).option("key", "value").schema(...).load()

使用以上格式来读取所有数据源,format是可选的,默认是parquet格式。

Spark数据读取使用DataFrameReader,通过SparkSession的read属性得到:

​ Spark.read

需指定:

  • format
  • schema
  • read模式
  • 一系列option选项
1
2
3
4
5
spark.read.format("csv")
.option("mode", "FAILFAST")
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()

(1) 读取模式mode

从外部源读取数据很容易会遇到错误格式的数据,指定读取模式可以当Spark遇到错误格式的记录时应采取什么操作。

  • permissive:当遇到错误格式的记录时,将所有字段设置为null并将所有错误格式的记录放在名为_corrupt_record字符串列中
  • dropMalformed:删除包含错误格式记录的行
  • failFast:遇到错误格式的记录后立即返回失败

默认是permissive.

三、Write API结构

写数据的核心结构如下:

1
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

(1) 保存模式mode

.option(“mode”, “OVERWRITE”)

  • append:将输出内容追加到目标文件中
  • overwrite:将输出内容重写到目标文件中
  • errorIfExists:如果目标路径已存在数据或文件,则抛出错误并返回写入操作失败
  • ignore:如果目标路径已存在数据或文件,则不执行任何操作

默认值为errorIfExists.

四、CSV/json/parquet等文件

1
2
3
4
5
spark.read.format("csv") # json/ parquet/orc/
.option("header", "true")
.load("file.csv")

df.write.format("csv").mode("overwrite").option("sep", "\t").save("file.csv")

五、文本文件

1
2
3
4
5
# 读文件
spark.read.textFile("file.csv").selectExpr("split(value, ',') as rows")

# 写文件
df.limit(10).select("a", "b").write..partitionBy('b').text('file.csv')

六、高级I/O概念

  • 可分割的文件类型和压缩(gzip压缩格式的Parquet等)

  • 并行读写数据(分区概念)

    • repartitions:减少分区数量

      1
      df.repartitions(5).write.format("csv").save("file.csv")
    • partitionBy:按什么字段来分区

      1
      df.write.mode("overwrite").partitionBy("a").save("file.csv")
  • 数据分桶:bucketBy

具有相同桶ID(哈希分桶的ID)的数据将放置到一个物理分区中,这样就可以避免在稍后读取数据时进行shuffle。

1
2
DataFrameWriter.bucketBy(numBuckets, col, *cols)
# 按给定列存储输出。如果指定,则输出布局在文件系统上,类似于 Hive 的分桶方案,但具有不同的桶哈希函数,并且与 Hive 的分桶不兼容。
1
2
3
4
>>> (df.write.format('parquet')  
.bucketBy(100, 'year', 'month')
.mode("overwrite")
.saveAsTable('bucketed_table'))
  • 管理文件

!!!写数据对文件大小不那么重要,但读取很重要!管理大量小文件产生很大的元数据开销,Spark特别不适合处理小文件。

​ maxRecordsPerFile选项来指定每个文件的最大记录数,这使得你可以通过控制写入每个文件的记录数来控制文件大小。

1
df.write.option('maxRecordsPerFile', 5000) # Spark确保每个文件最多包含5000条记录

8. 聚合操作

发表于 2023-03-19 | 分类于 大数据 , Spark

本文仅做学习总结,如有侵权立删

[TOC]

一、分组类型

!!!注意null值,如果不过滤空值,则可能会得到不正确的结果。尤其是grouping set、rollup和cube!

  • group by:指定一个或多个key和一个或多个聚合函数,对列进行转换操作

  • window:跟group by类似功能, 但能保留当前行

  • grouping set:SQL中的用法
  • rollup:跟groupby类似功能,并会针对指定的多个key进行分级分组汇总。

  • cube:跟rollup相同功能。

二、聚合函数

  • count:统计个数
1
2
3
4
5
# 可用expr表达式来处理聚合函数
df.groupBy("a").agg(
count("aa").alias("aa"),
expr("count(aa)")
)
  • countDistinct:去重统计

  • approx_count_distinct:近似统计个数, pyspark.sql.functions.approx_count_distinct(col, rsd=None)

    • 允许的最大相对标准偏差(默认 = 0.05)。对于 rsd count_distinct() 效率更高
    • 对于大数据即,某种精度的近似值可以接受,但比countDistinct()耗时更少,数据越大,效率提升越明显。
    1
    2
    >>> df.agg(approx_count_distinct(df.age).alias('distinct_ages')).collect()
    [Row(distinct_ages=2)]
  • first和last:

  • min和max

  • sum

  • sumDistinct

  • avg

  • variance和stddev:方差和标准差

    • var_samp:样本方差
    • var_pop:总体方差
  • skewness和kurtosis:偏度和峰度

  • cov和corr:协方差和相关性

    • covar_samp:样本协方差
    • covar_pop:总体协方差
  • collect_list和collect_set:聚合成一个list或者set

1
df.agg(F.collect_set("a"), F.collect_list("a"))
  • 用户自定义聚合函数:UDAF:仅在Scala和Java中使用

    • UDF:

      UDF(User-defined functions)用户自定义函数,简单说就是输入一行输出一行的自定义算子。
      是大多数 SQL 环境的关键特性,用于扩展系统的内置功能。(一对一)

    • UDAF

      UDAF(User Defined Aggregate Function),即用户定义的聚合函数,聚合函数和普通函数的区别是什么呢,普通函数是接受一行输入产生一个输出,聚合函数是接受一组(一般是多行)输入然后产生一个输出,即将一组的值想办法聚合一下。(多对一)

      UDAF可以跟group by一起使用,也可以不跟group by一起使用,这个其实比较好理解,联想到mysql中的max、min等函数,可以:
      select max(foo) from foobar group by bar;
      表示根据bar字段分组,然后求每个分组的最大值,这时候的分组有很多个,使用这个函数对每个分组进行处理,也可以:
      select max(foo) from foobar;
      这种情况可以将整张表看做是一个分组,然后在这个分组(实际上就是一整张表)中求最大值。所以聚合函数实际上是对分组做处理,而不关心分组中记录的具体数量。

    • UDTF

      UDTF(User-Defined Table-Generating Functions),用户自定义生成函数。它就是输入一行输出多行的自定义算子,可输出多行多列,又被称为 “表生成函数”。(一对多)

三、例子

创建数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from pyspark.sql import functions as F
data = [
("James", "Sales", 3000, '2020'),
("Michael", "Sales", 4600, '2020'),
("Robert", "Sales", 4100, '2020'),
("Maria", "Finance", 3000, '2020'),
("James", "Sales", 3000, '2019'),
("Scott", "Finance", 3300, '2020'),
("Jen", "Finance", 3900, '2020'),
("Jeff", "Marketing", 3000, '2020'),
("Kumar", "Marketing", 2000, '2020'),
("Saif", "Sales", 4100, '2020')
]
schema = ["employee_name", "department", "salary", 'year']

df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df.show(truncate=False)

>>> output Data:
>>>
root
|-- employee_name: string (nullable = true)
|-- department: string (nullable = true)
|-- salary: long (nullable = true)
|-- year: string (nullable = true)

+-------------+----------+------+----+
|employee_name|department|salary|year|
+-------------+----------+------+----+
|James |Sales |3000 |2020|
|Michael |Sales |4600 |2020|
|Robert |Sales |4100 |2020|
|Maria |Finance |3000 |2020|
|James |Sales |3000 |2019|
|Scott |Finance |3300 |2020|
|Jen |Finance |3900 |2020|
|Jeff |Marketing |3000 |2020|
|Kumar |Marketing |2000 |2020|
|Saif |Sales |4100 |2020|
+-------------+----------+------+----+

1、group by:分组

1
2
3
4
# 按照department, year计算工资之和。
df.groupBy('department', 'year').agg(
F.sum('salary').alias('salary')
).orderBy('department', 'year').show()
1
2
3
4
5
6
7
8
9
10
>>> output Data:
>>>
+----------+----+------+
|department|year|salary|
+----------+----+------+
| Finance|2020| 10200|
| Marketing|2020| 5000|
| Sales|2019| 3000|
| Sales|2020| 15800|
+----------+----+------+
  • pivot:透视转换

使用pivot函数进行透视,透视过程中可以提供第二个参数来明确指定使用哪些数据项
注意:pivot只能跟在groupby之后

利用pivot实现行转列
pivot的第一个参数指定原数据列,第二个参数指定要新生成的数据列

2. rollup:分级分组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
'''
- 先按照 `department`、`employee_name`、`year`分组;
- 然后按照`department`、`employee_name`分组;
- 然后再按照 `department` 分组;
- 最后进行全表分组。
- 后面接聚合函数,此处使用的是`sum`。
'''

df.rollup('department', 'employee_name', 'year').agg(
F.sum('salary').alias('salary')
).orderBy('department', 'employee_name', 'year').show()
>>> output Data:
>>>
+----------+-------------+----+------+
|department|employee_name|year|salary|
+----------+-------------+----+------+
| null| null|null| 34000|
| Finance| null|null| 10200|
| Finance| Jen|null| 3900|
| Finance| Jen|2020| 3900|
| Finance| Maria|null| 3000|
| Finance| Maria|2020| 3000|
| Finance| Scott|null| 3300|
| Finance| Scott|2020| 3300|
| Marketing| null|null| 5000|
| Marketing| Jeff|null| 3000|
| Marketing| Jeff|2020| 3000|
| Marketing| Kumar|null| 2000|
| Marketing| Kumar|2020| 2000|
| Sales| null|null| 18800|
| Sales| James|null| 6000|
| Sales| James|2019| 3000|
| Sales| James|2020| 3000|
| Sales| Michael|null| 4600|
| Sales| Michael|2020| 4600|
| Sales| Robert|null| 4100|
+----------+-------------+----+------+
only showing top 20 rows

3. cube:分级分组,比rollup组合的维度更全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
'''
1. cube` 先按照`department、employee_name、year`分组;
2. 然后按照`(department, employee_name)`、`(department, year)`、`(year, employee_name)`分组;
3. 然后按照`department`、`employee_name`、`year`分别分组;
4. 最后进行全表分组。
'''
df.cube('department', 'employee_name', 'year').agg(
F.sum('salary').alias('salary')
).orderBy('department', 'employee_name', 'year').show()
>>> output Data:
>>>
+----------+-------------+----+------+
|department|employee_name|year|salary|
+----------+-------------+----+------+
| null| null|null| 34000|
| null| null|2019| 3000|
| null| null|2020| 31000|
| null| James|null| 6000|
| null| James|2019| 3000|
| null| James|2020| 3000|
| null| Jeff|null| 3000|
| null| Jeff|2020| 3000|
| null| Jen|null| 3900|
| null| Jen|2020| 3900|
| null| Kumar|null| 2000|
| null| Kumar|2020| 2000|
| null| Maria|null| 3000|
| null| Maria|2020| 3000|
| null| Michael|null| 4600|
| null| Michael|2020| 4600|
| null| Robert|null| 4100|
| null| Robert|2020| 4100|
| null| Saif|null| 4100|
| null| Saif|2020| 4100|
+----------+-------------+----+------+
only showing top 20 rows

3.1 grouping

指示 GROUP BY 列表中的指定列是否为空,在结果集中返回 1 表示空或 0 表示未空。

grouping(xx) = 0:表示维度出现在组合里面

grouping(xx) = 1: 表示维度不出现在组合里面

CUBE 操作所生成的空值带来一个问题:如何区分 CUBE 操作所生成的 NULL 值和从实际数据中返回的 NULL 值?这个问题可用 GROUPING 函数解决。如果列中的值来自事实数据,则 GROUPING 函数返回 0;如果列中的值是 CUBE 操作所生成的 NULL,则返回 1。在 CUBE 操作中,所生成的 NULL 代表全体值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
df.cube("department").agg(
F.grouping("department").alias('department'),
F.sum("salary").alias('salary')
).orderBy("salary").show()
>>> output Data:
>>>
+----------+----------+------+
|department|department|salary|
+----------+----------+------+
| Marketing| 0| 5000|
| Finance| 0| 10200|
| Sales| 0| 18800|
| null| 1| 34000|
+----------+----------+------+

3.2 grouping_id: 帮助轻松找到自己想要的信息。

返回分组级别: (分組(c1) << (n-1)) + (分組(c2) << (n-2)) + … + 分組(cn)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
df.cube('department', 'employee_name', 'year').agg(
F.grouping_id().alias('group_level'),
F.sum('salary').alias('salary')
).orderBy(F.desc('group_level')).show()
>>> output Data:
>>>
+----------+-------------+----+-----------+------+
|department|employee_name|year|group_level|salary|
+----------+-------------+----+-----------+------+
| null| null|null| 7| 34000|
| null| null|2020| 6| 31000|
| null| null|2019| 6| 3000|
| null| Jeff|null| 5| 3000|
| null| Michael|null| 5| 4600|
| null| James|null| 5| 6000|
| null| Kumar|null| 5| 2000|
| null| Maria|null| 5| 3000|
| null| Saif|null| 5| 4100|
| null| Robert|null| 5| 4100|
| null| Scott|null| 5| 3300|
| null| Jen|null| 5| 3900|
| null| Michael|2020| 4| 4600|
| null| Robert|2020| 4| 4100|
| null| Kumar|2020| 4| 2000|
| null| James|2019| 4| 3000|
| null| Saif|2020| 4| 4100|
| null| James|2020| 4| 3000|
| null| Maria|2020| 4| 3000|
| null| Scott|2020| 4| 3300|
+----------+-------------+----+-----------+------+
only showing top 20 rows

4、grouping set:分组集,跨多个组的聚合操作,仅SQL

希望获得所有用户的各种股票的数量

sql:

1
2
3
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY CustomerId, stockCode
ORDER BY CustomerId DESC, stockCode DESC

grouping set也可以实现完全相同的操作:

1
2
3
4
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY CustomerId, stockCode
GROUPING SETS ((CustomerId, stockCode))
ORDER BY CustomerId DESC, stockCode DESC

如果想不区分客户和股票来统计股票总数

1
2
3
4
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY CustomerId, stockCode
GROUPING SETS ((CustomerId, stockCode), ())
ORDER BY CustomerId DESC, stockCode DESC

四、窗口函数

1. 概念:

能返回整个dataframe,也能进行聚合运算。

对于一个数据集,

  • map 是对每行进行操作,为每行得到一个结果;

  • reduce 则是对多行进行操作,得到一个结果;

  • window 函数则是对多行进行操作,得到多个结果(每行一个)。

窗口函数使用时由“窗口函数”和over从句组成;

其中,over从句分为三部分:

  • 分组(partition by)、
  • 排序(order by)、
  • frame选取(rangeBetween 和 rowsBetween)。

Window函数分类为三种:

  • 排名函数 ranking functions包括:
    • row_number():连续不重复。
    • rank():连续重复
    • dense_rank():重复不连续
    • percent_rank():对dense_rank归一化
    • ntile():n等份
  • 解析函数 analytic functions包括:
    • cume_dist():对rank归一化
    • lag()
    • lead()
  • 聚合函数 aggregate functions包括:
    • sum()
    • first()
    • last()
    • max()
    • min()
    • mean()
    • stddev()

例子:找到每年当中最冷那一天的温度 / 最冷那一天的日期。

  • 窗口函数的过程
  1. 根据某个条件对数据进行分组,PartitionBy
  2. 根据需求计算聚合函数
  3. 将计算结果Join回一个大dataframe
1
2
3
4
5
from pyspark.sql.window import Window
each_year = Window.partitionBy("year")
gsod.withColumn('min_temp',F.min("temp").over(each_year))\
.where("temp=min_temp")\
.select("year", "month", "day")

2.三种函数的例子

(1)创建一个 PySpark DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from pyspark.sql.window import Window
import pyspark.sql.functions as F
employee_salary = [
("Ali", "Sales", 8000),
("Bob", "Sales", 7000),
("Cindy", "Sales", 7500),
("Davd", "Finance", 10000),
("Elena", "Sales", 8000),
("Fancy", "Finance", 12000),
("George", "Finance", 11000),
("Haffman", "Marketing", 7000),
("Ilaja", "Marketing", 8000),
("Joey", "Sales", 9000)]

columns= ["name", "department", "salary"]
df = spark.createDataFrame(data = employee_salary, schema = columns)
df.show(truncate=False)
>>> output Data:
>>>
+-------+----------+------+
|name |department|salary|
+-------+----------+------+
|Ali |Sales |8000 |
|Bob |Sales |7000 |
|Cindy |Sales |7500 |
|Davd |Finance |10000 |
|Elena |Sales |8000 |
|Fancy |Finance |12000 |
|George |Finance |11000 |
|Haffman|Marketing |7000 |
|Ilaja |Marketing |8000 |
|Joey |Sales |9000 |
+-------+----------+------+

(2)排名函数 ranking functions

2.1 row_number()

row_number() 窗口函数用于给出从1开始到每个窗口分区的结果的连续行号。 与 groupBy 不同 Window 以 partitionBy 作为分组条件,orderBy 对 Window 分组内的数据进行排序。

同分数不并列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 以 department 字段进行分组,以 salary 倒序排序
# 按照部门对薪水排名,薪水最低的为第一名
windowSpec = Window.partitionBy("department").orderBy(F.asc("salary"))
# 分组内增加 row_number
df_part = df.withColumn(
"row_number",
F.row_number().over(windowSpec)
)
print(df_part.toPandas().to_markdown())
>>> output Data:
>>>
| | name | department | salary | row_number |
|---:|:--------|:-------------|---------:|-------------:|
| 0 | Bob | Sales | 7000 | 1 |
| 1 | Cindy | Sales | 7500 | 2 |
| 2 | Ali | Sales | 8000 | 3 |
| 3 | Elena | Sales | 8000 | 4 |
| 4 | Joey | Sales | 9000 | 5 |
| 5 | Davd | Finance | 10000 | 1 |
| 6 | George | Finance | 11000 | 2 |
| 7 | Fancy | Finance | 12000 | 3 |
| 8 | Haffman | Marketing | 7000 | 1 |
| 9 | Ilaja | Marketing | 8000 | 2 |

观察上面的数据,发现同样的薪水会有不同的排名(都是8000的薪水,有的第二有的第三),这是因为row_number()是按照行来给定序号,其不关注实际数值的大小。由此我们可以引申出另一个用于给出排序数的函数rank。

使用场景
  • 选取本部门工资收入第N高的记录
  • (思考)选取某日第N笔交易记录
1
2
3
4
5
6
7
8
print(df_part.where(F.col('row_number') == 2).toPandas().to_markdown())
>>> output Data:
>>>
| | name | department | salary | row_number |
|---:|:-------|:-------------|---------:|-------------:|
| 0 | Cindy | Sales | 7500 | 2 |
| 1 | George | Finance | 11000 | 2 |
| 2 | Ilaja | Marketing | 8000 | 2 |

2.2 rank()

同分数并列。下一个顺延。

rank()用来给按照指定列排序的分组窗增加一个排序的序号,

如果有相同数值,则排序数相同,下一个序数顺延一位。来看如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 使用 rank 排序,都是8000的薪水,就同列第二
windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
df_rank = df.withColumn("rank", F.rank().over(windowSpec))
print(df_rank.toPandas().to_markdown())
>>> output Data:
>>>
| | name | department | salary | rank |
|---:|:--------|:-------------|---------:|-------:|
| 0 | Joey | Sales | 9000 | 1 |
| 1 | Ali | Sales | 8000 | 2 |
| 2 | Elena | Sales | 8000 | 2 |
| 3 | Cindy | Sales | 7500 | 4 |
| 4 | Bob | Sales | 7000 | 5 |
| 5 | Fancy | Finance | 12000 | 1 |
| 6 | George | Finance | 11000 | 2 |
| 7 | Davd | Finance | 10000 | 3 |
| 8 | Ilaja | Marketing | 8000 | 1 |
| 9 | Haffman | Marketing | 7000 | 2 |
print(df_rank.where(F.col("rank")=="2").toPandas().to_markdown())
>>> output Data:
>>>
| | name | department | salary | rank |
|---:|:--------|:-------------|---------:|-------:|
| 0 | Ali | Sales | 8000 | 2 |
| 1 | Elena | Sales | 8000 | 2 |
| 2 | George | Finance | 11000 | 2 |
| 3 | Haffman | Marketing | 7000 | 2 |
1
2
window = Window.partitionBy(['a']).orderBy(['a'])
df.withColumn('rank',F.rank().over(window)).filter("rank = '1'").drop('rank')

2.3 dense_rank

观察 dense_rank 与 rank 的区别。

同分数并列且下一个不顺延。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 注意 rank 排序,8000虽然为同列第二,但7500属于第4名
# dense_rank()中, 8000同列第二后,7500属于第3名
windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("dense_rank", F.dense_rank().over(windowSpec)).show()
>>> output Data:
>>>
+-------+----------+------+----------+
| name|department|salary|dense_rank|
+-------+----------+------+----------+
| Joey| Sales| 9000| 1|
| Ali| Sales| 8000| 2|
| Elena| Sales| 8000| 2|
| Cindy| Sales| 7500| 3|
| Bob| Sales| 7000| 4|
| Fancy| Finance| 12000| 1|
| George| Finance| 11000| 2|
| Davd| Finance| 10000| 3|
| Ilaja| Marketing| 8000| 1|
|Haffman| Marketing| 7000| 2|
+-------+----------+------+----------+

2.4 percent_rank():百分比排序。(将 dense_rank() 的结果进行归一化)

一些业务场景下,我们需要计算不同数值的百分比排序数据,先来看一个例子吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
windowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("percent_rank",F.percent_rank().over(windowSpec)).show()
>>> output Data:
>>>
+-------+----------+------+------------+
| name|department|salary|percent_rank|
+-------+----------+------+------------+
| Joey| Sales| 9000| 0.0|
| Ali| Sales| 8000| 0.25|
| Elena| Sales| 8000| 0.25|
| Cindy| Sales| 7500| 0.75|
| Bob| Sales| 7000| 1.0|
| Fancy| Finance| 12000| 0.0|
| George| Finance| 11000| 0.5|
| Davd| Finance| 10000| 1.0|
| Ilaja| Marketing| 8000| 0.0|
|Haffman| Marketing| 7000| 1.0|
+-------+----------+------+------------+

上述结果可以理解为将 dense_rank() 的结果进行归一化, 即可得到0-1以内的百分数。percent_rank() 与 SQL 中的 PERCENT_RANK 函数效果一致。

2.5 ntile():分组切分n均等数据。

ntile()可将分组的数据按照指定数值n切分为n个部分, 每一部分按照行的先后给定相同的序数。例如n指定为2,则将组内数据分为两个部分, 第一部分序号为1,第二部分序号为2。理论上两部分数据行数是均等的, 但当数据为奇数行时,中间的那一行归到前一部分。

按照部门对数据进行分组,然后在组内按照薪水高低进行排序, 再使用 ntile() 将组内数据切分为两个部分。结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 按照部门对数据进行分组,然后在组内按照薪水高低进行排序 
windowSpec = Window.partitionBy(
"department").orderBy(F.desc("salary"))
# 使用ntile() 将组内数据切分为两个部分
df.withColumn("ntile", F.ntile(2).over(windowSpec)).show()
>>> output Data:
>>>
+-------+----------+------+-----+
| name|department|salary|ntile|
+-------+----------+------+-----+
| Joey| Sales| 9000| 1|
| Ali| Sales| 8000| 1|
| Elena| Sales| 8000| 1|
| Cindy| Sales| 7500| 2|
| Bob| Sales| 7000| 2|
| Fancy| Finance| 12000| 1|
| George| Finance| 11000| 1|
| Davd| Finance| 10000| 2|
| Ilaja| Marketing| 8000| 1|
|Haffman| Marketing| 7000| 2|
+-------+----------+------+-----+

(3). Analytic functions

3.1 cume_dist():(将 rank() 的结果进行归一化),和percent_rank很像。

cume_dist()函数用来获取数值的累进分布值,看如下例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn(
"cume_dist", F.cume_dist().over(windowSpec)).show()
>>> output Data:
>>>
+-------+----------+------+------------------+
| name|department|salary| cume_dist|
+-------+----------+------+------------------+
| Joey| Sales| 9000| 0.2|
| Ali| Sales| 8000| 0.6|
| Elena| Sales| 8000| 0.6|
| Cindy| Sales| 7500| 0.8|
| Bob| Sales| 7000| 1.0|
| Fancy| Finance| 12000|0.3333333333333333|
| George| Finance| 11000|0.6666666666666666|
| Davd| Finance| 10000| 1.0|
| Ilaja| Marketing| 8000| 0.5|
|Haffman| Marketing| 7000| 1.0|
+-------+----------+------+------------------+
# 和 percent_rank 对比一下
df.withColumn(
'percent_rank',
F.percent_rank().over(windowSpec)).show()
>>> output Data:
>>>
+-------+----------+------+------------+
| name|department|salary|percent_rank|
+-------+----------+------+------------+
| Joey| Sales| 9000| 0.0|
| Ali| Sales| 8000| 0.25|
| Elena| Sales| 8000| 0.25|
| Cindy| Sales| 7500| 0.75|
| Bob| Sales| 7000| 1.0|
| Fancy| Finance| 12000| 0.0|
| George| Finance| 11000| 0.5|
| Davd| Finance| 10000| 1.0|
| Ilaja| Marketing| 8000| 0.0|
|Haffman| Marketing| 7000| 1.0|
+-------+----------+------+------------+

结果好像和前面的percent_rank()很类似对不对,于是我们联想到这个其实也是一种归一化结果, 其按照 rank() 的结果进行归一化处理。回想一下前面讲过的 rank() 函数,并列排序会影响后续排序, 于是序号中间可能存在隔断。这样Sales组的排序数就是1、2、2、4、5, 归一化以后就得到了0.2、0.6、0.6、0.8、1。这个统计结果按照实际业务来理解就是:

  • 9000及以上的人占了20%,
  • 8000及以上的人占了60%,
  • 7500以上的人数占了80%,
  • 7000以上的人数占了100%,

3.2 lag():排序后找上一个数值。

lag() 函数用于寻找按照指定列排好序的分组内每个数值的上一个数值,

通俗的说,就是数值排好序以后,寻找排在每个数值的上一个数值。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 相当于滞后项
windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("lag", F.lag("salary",1).over(windowSpec)).show()
>>> output Data:
>>>
+-------+----------+------+-----+
| name|department|salary| lag|
+-------+----------+------+-----+
| Joey| Sales| 9000| null|
| Ali| Sales| 8000| 9000|
| Elena| Sales| 8000| 8000|
| Cindy| Sales| 7500| 8000|
| Bob| Sales| 7000| 7500|
| Fancy| Finance| 12000| null|
| George| Finance| 11000|12000|
| Davd| Finance| 10000|11000|
| Ilaja| Marketing| 8000| null|
|Haffman| Marketing| 7000| 8000|
+-------+----------+------+-----+

3.3 lead():排序后找下一个数值。

lead() 用于获取排序后的数值的下一个,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 和滞后项相反,提前一位
windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("lead",F.lead("salary",1).over(windowSpec)).show()
>>> output Data:
>>>
+-------+----------+------+-----+
| name|department|salary| lead|
+-------+----------+------+-----+
| Joey| Sales| 9000| 8000|
| Ali| Sales| 8000| 8000|
| Elena| Sales| 8000| 7500|
| Cindy| Sales| 7500| 7000|
| Bob| Sales| 7000| null|
| Fancy| Finance| 12000|11000|
| George| Finance| 11000|10000|
| Davd| Finance| 10000| null|
| Ilaja| Marketing| 8000| 7000|
|Haffman| Marketing| 7000| null|
+-------+----------+------+-----+
  1. 实际业务场景中,假设我们获取了每个月的销售数据, 我们可能想要知道,某月份与上一个月或下一个月数据相比怎么样, 于是就可以使用lag和lead来进行数据分析了。
  2. 思考差分如何做?增长率如何做(同比、环比)?

(4). Aggregate Functions

常见的聚合函数有avg, sum, min, max, count, approx_count_distinct()等,我们用如下代码来同时使用这些函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 分组,并对组内数据排序
windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
# 仅分组
windowSpecAgg = Window.partitionBy("department")

df.withColumn("row", F.row_number().over(windowSpec)) \
.withColumn("avg", F.avg("salary").over(windowSpecAgg)) \
.withColumn("sum", F.sum("salary").over(windowSpecAgg)) \
.withColumn("min", F.min("salary").over(windowSpecAgg)) \
.withColumn("max", F.max("salary").over(windowSpecAgg)) \
.withColumn("count", F.count("salary").over(windowSpecAgg)) \
.withColumn("distinct_count", F.approxCountDistinct("salary").over(windowSpecAgg)) \
.show()
>>> output Data:
>>>
+-------+----------+------+---+-------+-----+-----+-----+-----+--------------+
| name|department|salary|row| avg| sum| min| max|count|distinct_count|
+-------+----------+------+---+-------+-----+-----+-----+-----+--------------+
| Joey| Sales| 9000| 1| 7900.0|39500| 7000| 9000| 5| 4|
| Ali| Sales| 8000| 2| 7900.0|39500| 7000| 9000| 5| 4|
| Elena| Sales| 8000| 3| 7900.0|39500| 7000| 9000| 5| 4|
| Cindy| Sales| 7500| 4| 7900.0|39500| 7000| 9000| 5| 4|
| Bob| Sales| 7000| 5| 7900.0|39500| 7000| 9000| 5| 4|
| Fancy| Finance| 12000| 1|11000.0|33000|10000|12000| 3| 3|
| George| Finance| 11000| 2|11000.0|33000|10000|12000| 3| 3|
| Davd| Finance| 10000| 3|11000.0|33000|10000|12000| 3| 3|
| Ilaja| Marketing| 8000| 1| 7500.0|15000| 7000| 8000| 2| 2|
|Haffman| Marketing| 7000| 2| 7500.0|15000| 7000| 8000| 2| 2|
+-------+----------+------+---+-------+-----+-----+-----+-----+--------------+

需要注意的是 approx_count_distinct() 函数适用与窗函数的统计, 而在groupby中通常用countDistinct()来代替该函数,用来求组内不重复的数值的条数。

从结果来看,统计值基本上是按照部门分组,统计组内的salary情况。 如果我们只想要保留部门的统计结果,而将每个人的实际情况去掉,可以采用如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
windowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
windowSpecAgg = Window.partitionBy("department")

df = df.withColumn("row", F.row_number().over(windowSpec)) \
.withColumn("avg", F.avg("salary").over(windowSpecAgg)) \
.withColumn("sum", F.sum("salary").over(windowSpecAgg)) \
.withColumn("min", F.min("salary").over(windowSpecAgg)) \
.withColumn("max", F.max("salary").over(windowSpecAgg)) \
.withColumn("count", F.count("salary").over(windowSpecAgg)) \
.withColumn("distinct_count", F.approx_count_distinct("salary").over(windowSpecAgg))

# 仅选取分组第一行数据
# 用F.col 去选row 行,怪怪的
df_part = df.where(F.col("row")==1)
df_part.select("department","avg","sum","min","max","count","distinct_count").show()
>>> output Data:
>>>
+----------+-------+-----+-----+-----+-----+--------------+
|department| avg| sum| min| max|count|distinct_count|
+----------+-------+-----+-----+-----+-----+--------------+
| Sales| 7900.0|39500| 7000| 9000| 5| 4|
| Finance|11000.0|33000|10000|12000| 3| 3|
| Marketing| 7500.0|15000| 7000| 8000| 2| 2|
+----------+-------+-----+-----+-----+-----+--------------+

3. frame函数

(1)Window.rangeBetween(start, end)

创建一个WindowSpec,定义了从start(含)到end(含)的帧边界。start和end` 都是相对于当前行的。例如“0”表示“current row”,“-1”表示当前行前一关,“5”表示当前行后五关。

基准为当前行

行数选择

  • rowsBetween(x, y)
  • Window.unboundedPreceding 表示当前行之前的无限行
  • Window.currentRow 表示当前行
  • Window.unboundedFollowing 表示当前行之后的无限行

rowsBetween(-1,1):函数作用范围为当前行的上一行至下一行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
>>> from pyspark.sql import Window
>>> from pyspark.sql import functions as func
>>> from pyspark.sql import SQLContext
>>> sc = SparkContext.getOrCreate()
>>> sqlContext = SQLContext(sc)
>>> tup = [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")]
>>> df = sqlContext.createDataFrame(tup, ["id", "category"])
>>> window = Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
>>> df.withColumn("sum", func.sum("id").over(window)).sort("id", "category").show()
+---+--------+---+
| id|category|sum|
+---+--------+---+
| 1| a| 4|
| 1| a| 4|
| 1| b| 3|
| 2| a| 2|
| 2| b| 5|
| 3| b| 3|
+---+--------+---+

(2)rowsBetween

  • ROWS BETWEEN不关心确切的值.它只关心行的顺序,并在计算帧时采用固定数量的前后行. (比较行的顺序)
  • RANGE BETWEEN 在计算帧时考虑值 .(比较值的大小)

行范围设置 rangeBetween(x,y)
基准为当前行的值

  • rangeBetween(20,50)
    例如当前值为18
    则选取的值范围为[-2,68]

4. 例子

(1)小例子

需求:组内按分数排序。

1
df.select($"uid", $"date", $"score", row_number().over(Window.partitionBy("uid").orderBy($"score".desc)).as("rank"))

img

(2)udf + 窗口函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from pyspark.sql import Window

@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
w = Window.partitionBy('id').orderBy('v').rowsBetween(-1, 0)
df.withColumn('mean_v', mean_udf("v").over(w)).show()

+---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.0|
| 1| 2.0| 1.5|
| 2| 3.0| 3.0|
| 2| 5.0| 4.0|
| 2|10.0| 7.5|
+---+----+------+

(3)同时groupby 两个key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 生成首次付费时间(FirstPayDate), 首次付费当天的付费金额(FirstPayPrice), 总付费金额(AllPayPrice). 首次单天付费频次(FirstPayFreq), 总付费频次(AllPayFreq)
w1 = Window.partitionBy("OrderId").orderBy(col("PayDate"))
w2 = Window.partitionBy("OrderId")
df_pay_data = df_order_all.filter((F.col("OrderType") == 0) & ((F.col("IsPay") == 1)))\
.withColumnRenamed("OrderTime", "PayTime") \
.withColumnRenamed("OrderPrice", "PayPrice") \
.withColumn("PayDate", date_trunc('day', to_timestamp(F.col("PayTime")/1000)))\
.withColumn("row",row_number().over(w1)) \
.withColumn("AllPayPrice", sum(col("PayPrice")).over(w2)) \
.withColumn("FirstPayDate", min(col("PayDate")).over(w2)) \
.withColumn("FirstPayPrice", sum(col("PayPrice")).over(w1)) \
.withColumn("FirstPayFreq", count(col("IsPay")).over(w1)) \
.withColumn("AllPayFreq", count(col("IsPay")).over(w2)) \
.where(col("row") == 1).select("AllPayPrice", "FirstPayPrice", "FirstPayDate","OrderId", "FirstPayFreq", "AllPayFreq")

https://sparkbyexamples.com/pyspark/pyspark-select-first-row-of-each-group/

https://stackoverflow.com/questions/45946349/python-spark-cumulative-sum-by-group-using-dataframe (相加)

(4)groupby + sort + list

1
2
3
group_df = qds_com.groupby(['company']) \
.agg(F.sort_array(F.collect_list(F.struct("features", "label", "samples_count"))) \
.alias("pair"))

(5)求众数

1
df_com_cookie.groupBy(['company','price']).agg(F.count('price').alias('count_price')).orderBy(['company','count_price'], ascending=False).drop_duplicates(subset=['company']).show()

7. 处理不同的数据类型

发表于 2023-03-18 | 分类于 大数据 , Spark

本文仅做学习总结,如有侵权立删

[TOC]

处理不同的数据类型

1. 数据类型

  • 布尔型
  • 数值型
  • 字符串型
  • 日期和时间戳类型
  • 空值处理
  • 复杂类型
  • 自定义函数

2. 处理布尔类型

(1)and、or、true 和false

(2)&、|、1、0

1
2
3
4
import pyspark.functions as F
filter1 = F.col('aa') > 600
filter2 = F.col('bb') > 2
df.where(filter1 & filter2).where(F.col('cc').isin("DOT")).show()

注意:创建布尔表达式时注意空值处理!!!

将df.where(F.col(‘a’) != ‘hello’)改写成df.where (F.col(‘a’).eqNullSafe(“hello”))可以保证空值安全。

3. 处理数值类型

  • pow:平方
1
2
3
4
5
6
7
# 例子
from pyspark.sql.functions import pow
f = pow(F.col('a') * F.col('b'), 2) + 5 # (a * b)^2 + 5
df.select(f.alias('c'))

# selectExpr
df.selectExpr('a', 'pow(F.col('a') * F.col('b'), 2) + 5')
  • round:向上取整,bround:向下取整
1
2
# from pyspark.sql.functions import lit, round, bround
df.select(round(lit("2.5")), bround(lit("2.5")))
  • corr:计算两列的相关性
1
2
3
from pyspark.sql.functions import corr
df.stat.corr('a', 'b')
df.select(corr('a', 'b'))
  • describe:计算一列或一组列的汇总统计信息,可以用describe来实现。
  • count:计数
  • mean:平均值
  • stddev_pop:标准差
  • min:最小值
  • max:最大值

  • StatFunctions包中封装了许多可使用的统计函数

1
2
3
quantileProbs = [0.5]
relError = 0.05
df.stat.approxQuantile('a', quantileProbs, relError)
  • monotonically_increasing_id函数:从0开始,为每行添加一个唯一的ID。
1
2
from pyspark.sql.functions import monotonically_increasing_id
df.select(monotonically_increasing_id())

4. 处理字符串类型

  • initcap:将给定字符串中空格分隔的每个单词首字母大写。
  • lower:全部小写
  • upper:全部大写
1
2
from pyspark.sql.functions import initcap
df.select(initcap(F.col('aaa')), lower(F.col('bbb')), upper(F.col('ccc')))
  • lpad:从左边对字符串使用指定的字符进行填充。
  • ltrim:从左边对字符串使用指定的字符进行删除空格。
  • rpad: 从右边对字符串使用指定的字符进行填充。
  • trim:从右边对字符串使用指定的字符进行删除空格。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from pyspark.sql.functions import lit, ltrim, rtim, rpad, lpad, trim
df.select(ltrim(lit(" HELLO ")).alias("ltrim"),
rtrim(lit(" HELLO ")).alias("rtrim"),
trim(lit(" HELLO ")).alias("trim"),
lpad(lit("HELLO", 3, " ")).alias("lpad"),
rpad(lit("HELLO", 10, " ")).alias("rpad")
)
'''
ltrim: "HELLO "
rtrim:" HELLO"
trim:"HELLO"
lpad:"HEL"
rpad: "HELLO "
注意 lpad或rpad输入的数值小于字符串长度,它将从字符串的右侧删除字符
'''

5、正则表达式

  • regexp_extract(列名,正则表达式,第几个):提取值
1
2
3
4
from pyspark.sql.functions import regexp_extract
regex_string = "BLACK|WHITE|RED|GREEN|BLUE"
df.select(regexp_extract(F.col('a'), regex_string, 1)) # 将列名为a的字段中出现包含在正则表达式的第一个单词取出来。
# 如 df['a'] = "WHITE HANGING HEA", 结果为WHITE
  • regexp_replace:替换值
1
2
3
4
5
from pyspark.sql.functions import regexp_replace
regex_string = "BLACK|WHITE|RED|GREEN|BLUE" # |在正则中表示或的意思
df.select(
regexp_replace(F.col('a'), regex_string, 'color').alias('color_clean'),
) # 将字段a中包含regex_string这些字段换成color。
  • translate:替换,将给定字符串替换掉所有出现的某字符串。
1
2
3
from pyspark.sql.functions import translate
df.select(translate(F.col('a'), 'LEET', '1337')) # L替换成1, E替换成3, T替换成7.
# 所以WHITE 会被替换成WHI73.
  • instr:是否存在(类似contains)
1
2
3
from pyspark.sql.functions import instr
containsBlack = instr(F.col('a'), 'BLACK') >= 1
df.withColumn('b', containsBlack)
  • locate(substr, str, pos=1):在位置 pos 之后定位字符串列中第一次出现 substr 的位置。
    • 该位置不是基于零的,而是基于 1 的索引。如果在 str 中找不到 substr,则返回 0。
1
2
3
df = spark.createDataFrame([('abcd',)], ['s',])
df.select(locate('b', df.s, 1).alias('s')).collect()
[Row(s=2)]

6、处理日期和时间戳类型

https://zhuanlan.zhihu.com/p/450636026

1
from pyspark.sql import functions as F

(1) 示例数据

1
2
3
4
5
6
7
8
9
10
11
12
data=[["1","2020-02-01"],["2","2019-03-01"],["3","2021-03-01"]]
df=spark.createDataFrame(data, ["id","time"])
df.show()
>>> output Data:
>>>
+---+----------+
| id| time|
+---+----------+
| 1|2020-02-01|
| 2|2019-03-01|
| 3|2021-03-01|
+---+----------+

(2). 日期

2.1 当前日期 current_date()

  • 获取当前系统日期。默认情况下,数据将以yyyy-dd-mm格式返回。
1
2
3
4
5
6
7
8
9
df.select(F.current_date().alias("current_date")).show(1)
>>> output Data:
>>>
+------------+
|current_date|
+------------+
| 2021-12-28|
+------------+
only showing top 1 row

2.2 日期格式 date_format()

  • 解析日期并转换yyyy-dd-mm为MM-dd-yyyy格式。
1
2
3
4
5
6
7
8
9
10
11
df.select(F.col("time"), 
F.date_format(F.col("time"), "MM-dd-yyyy").alias("date_format")).show()
>>> output Data:
>>>
+----------+-----------+
| time|date_format|
+----------+-----------+
|2020-02-01| 02-01-2020|
|2019-03-01| 03-01-2019|
|2021-03-01| 03-01-2021|
+----------+-----------+

2.3 使用to_date()将日期格式字符串yyyy-MM-dd转换为DateType yyyy-MM-dd

1
2
3
4
5
6
7
8
9
10
11
df.select(F.col("time"), 
F.to_date(F.col("time"), "yyy-MM-dd").alias("to_date")).show()
>>> output Data:
>>>
+----------+----------+
| time| to_date|
+----------+----------+
|2020-02-01|2020-02-01|
|2019-03-01|2019-03-01|
|2021-03-01|2021-03-01|
+----------+----------+

2.4 两个日期之间的日差datediff()

1
2
3
4
5
6
7
8
9
10
11
12
df.select(F.col("time"), 
F.datediff(F.current_date(), F.col("time")).alias("datediff")
).show()
>>> output Data:
>>>
+----------+--------+
| time|datediff|
+----------+--------+
|2020-02-01| 696|
|2019-03-01| 1033|
|2021-03-01| 302|
+----------+--------+

2.5 两个日期之间的月份months_between()

1
2
3
4
5
6
7
8
9
10
11
12
df.select(F.col("time"), 
F.months_between(F.current_date(),F.col("time")).alias("months_between")
).show()
>>> output Data:
>>>
+----------+--------------+
| time|months_between|
+----------+--------------+
|2020-02-01| 22.87096774|
|2019-03-01| 33.87096774|
|2021-03-01| 9.87096774|
+----------+--------------+

2.6 截断指定单位的日期trunc()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
df.select(F.col("time"), 
F.trunc(F.col("time"),"Month").alias("Month_Trunc"),
F.trunc(F.col("time"),"Year").alias("Month_Year"),
F.trunc(F.col("time"),"Month").alias("Month_Trunc")).show()
>>> output Data:
>>>
+----------+-----------+----------+-----------+
| time|Month_Trunc|Month_Year|Month_Trunc|
+----------+-----------+----------+-----------+
|2020-02-01| 2020-02-01|2020-01-01| 2020-02-01|
|2019-03-01| 2019-03-01|2019-01-01| 2019-03-01|
|2021-03-01| 2021-03-01|2021-01-01| 2021-03-01|
+----------+-----------+----------+-----------+


# pyspark.sql.functions.date_trunc(format, timestamp)
# 返回截断为格式指定单位的时间戳。
# 2.3.0 版中的新函数。

>>> df = spark.createDataFrame([('1997-02-28 05:02:11',)], ['t'])
>>> df.select(date_trunc('year', df.t).alias('year')).collect()
[Row(year=datetime.datetime(1997, 1, 1, 0, 0))]
>>> df.select(date_trunc('mon', df.t).alias('month')).collect()
[Row(month=datetime.datetime(1997, 2, 1, 0, 0))]

2.7 月、日加减法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
df.select(F.col("time"), 
F.add_months(F.col("time"),3).alias("add_months"),
F.add_months(F.col("time"),-3).alias("sub_months"),
F.date_add(F.col("time"),4).alias("date_add"),
F.date_sub(F.col("time"),4).alias("date_sub")
).show()
>>> output Data:
>>>
+----------+----------+----------+----------+----------+
| time|add_months|sub_months| date_add| date_sub|
+----------+----------+----------+----------+----------+
|2020-02-01|2020-05-01|2019-11-01|2020-02-05|2020-01-28|
|2019-03-01|2019-06-01|2018-12-01|2019-03-05|2019-02-25|
|2021-03-01|2021-06-01|2020-12-01|2021-03-05|2021-02-25|
+----------+----------+----------+----------+----------+

2.8 年、月、下一天、一年中第几个星期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
df.select(F.col("time"), 
F.year(F.col("time")).alias("year"),
F.month(F.col("time")).alias("month"),
F.next_day(F.col("time"),"Sunday").alias("next_day"),
F.weekofyear(F.col("time")).alias("weekofyear")
).show()
>>> output Data:
>>>
+----------+----+-----+----------+----------+
| time|year|month| next_day|weekofyear|
+----------+----+-----+----------+----------+
|2020-02-01|2020| 2|2020-02-02| 5|
|2019-03-01|2019| 3|2019-03-03| 9|
|2021-03-01|2021| 3|2021-03-07| 9|
+----------+----+-----+----------+----------+

2.9 星期几、月日、年日

  • 查询星期几
  • 一个月中的第几天
  • 一年中的第几天
1
2
3
4
5
6
7
8
9
10
11
12
13
14
df.select(F.col("time"),  
F.dayofweek(F.col("time")).alias("dayofweek"),
F.dayofmonth(F.col("time")).alias("dayofmonth"),
F.dayofyear(F.col("time")).alias("dayofyear"),
).show()
>>> output Data:
>>>
+----------+---------+----------+---------+
| time|dayofweek|dayofmonth|dayofyear|
+----------+---------+----------+---------+
|2020-02-01| 7| 1| 32|
|2019-03-01| 6| 1| 60|
|2021-03-01| 2| 1| 60|
+----------+---------+----------+---------+

(3). 时间

3.1 创建一个测试数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
data=[
["1","02-01-2020 11 01 19 06"],
["2","03-01-2019 12 01 19 406"],
["3","03-01-2021 12 01 19 406"]]
df2=spark.createDataFrame(data,["id","time"])
df2.show(truncate=False)
>>> output Data:
>>>
+---+-----------------------+
|id |time |
+---+-----------------------+
|1 |02-01-2020 11 01 19 06 |
|2 |03-01-2019 12 01 19 406|
|3 |03-01-2021 12 01 19 406|
+---+-----------------------+

3.2 以 spark 默认格式yyyy-MM-dd HH:mm:ss返回当前时间戳

1
2
3
4
5
6
7
8
9
10
df2.select(F.current_timestamp().alias("current_timestamp")).show()
>>> output Data:
>>>
+--------------------+
| current_timestamp|
+--------------------+
|2021-12-28 09:31:...|
|2021-12-28 09:31:...|
|2021-12-28 09:31:...|
+--------------------+

3.3 将字符串时间戳转换为时间戳类型格式 to_timestamp()

1
2
3
4
5
6
7
8
9
10
11
12
df2.select(F.col("time"), 
F.to_timestamp(F.col("time"), "MM-dd-yyyy HH mm ss SSS").alias("to_timestamp")
).show(truncate=False)
>>> output Data:
>>>
+-----------------------+-----------------------+
|time |to_timestamp |
+-----------------------+-----------------------+
|02-01-2020 11 01 19 06 |2020-02-01 11:01:19.06 |
|03-01-2019 12 01 19 406|2019-03-01 12:01:19.406|
|03-01-2021 12 01 19 406|2021-03-01 12:01:19.406|
+-----------------------+-----------------------+

3.4 获取小时、分钟、秒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 数据
data=[
["1","2020-02-01 11:01:19.06"],
["2","2019-03-01 12:01:19.406"],
["3","2021-03-01 12:01:19.406"]]
df3=spark.createDataFrame(data,["id","time"])

# 提取小时、分钟、秒
df3.select(
F.col("time"),
F.hour(F.col("time")).alias("hour"),
F.minute(F.col("time")).alias("minute"),
F.second(F.col("time")).alias("second")
).show(truncate=False)
>>> output Data:
>>>
+-----------------------+----+------+------+
|time |hour|minute|second|
+-----------------------+----+------+------+
|2020-02-01 11:01:19.06 |11 |1 |19 |
|2019-03-01 12:01:19.406|12 |1 |19 |
|2021-03-01 12:01:19.406|12 |1 |19 |
+-----------------------+----+------+------+

7. 处理数据中的空值

8. 复杂类型

  • 结构体: struct
1
2
3
4
5
6
7
from pyspark.sql.functions import struct
df1 = df.select(struct('a', 'b').alias('c'))
# 可以通过"."来访问或列方法getField来实现:
df1.select("c.a")
df1.select(F.col("c").getField("a"))
# 可以通过 ".*"来查询结构体中所有值
df1.select("c.*")
  • 数组
    • split:指定分隔符
1
2
3
4
5
6
7
8
9
10
11
12
13
from pyspark.sql.functions import split
df.select(split(F.col("a"), "\t"))
'''
列名:split(a,)
结果:[WHITE, HANGING, ...]
'''
df.select(split(F.col("a"), "\t").alias("array_col"))
.selectExpr("array_col[0]")
'''
结果
列名:array_col[0]
结果:WHITE
'''

​ 数组长度

1
2
from pyspark.sql.functions import size
df.select(size(split(F.col("a"), "\t")))
array_contains:数组是否包含某个值
1
2
3
from pyspark.sql.functions import array_contains
df.select(array_contains(split(F.col("a"), "\t"), "WHITE"))
# 结果为true

​ explode:一行拆分成多行

1
2
3
4
5
6
7
8
9
10
11
12
13

from pyspark.sql.functions import explode, split

df = df.withColumn("sub_str", explode(split(df["str_col"], "_")))
# 将str_col按-拆分成list,list中的每一个元素成为sub_str,与原行中的其他列一起组成新的行
'''
eg:
"hello world","other column"
split ===> ["hello", "world"], "other column"
explode ===>
"hello", "other column";
"world", "other column"
'''
  • 将嵌套数组 DataFrame 列分解为行

    创建一个带有嵌套数组列的 DataFrame。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
arrayArrayData = [
("James",[["Java","Scala","C++"],["Spark","Java"]]),
("Michael",[["Spark","Java","C++"],["Spark","Java"]]),
("Robert",[["CSharp","VB"],["Spark","Python"]])
]

df = spark.createDataFrame(data=arrayArrayData, schema = ['name','subjects'])
df.printSchema()
df.show(truncate=False)
>>> output Data:
>>>
root
|-- name: string (nullable = true)
|-- subjects: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

+-------+-----------------------------------+
|name |subjects |
+-------+-----------------------------------+
|James |[[Java, Scala, C++], [Spark, Java]]|
|Michael|[[Spark, Java, C++], [Spark, Java]]|
|Robert |[[CSharp, VB], [Spark, Python]] |
+-------+-----------------------------------+

​ 展平数组,请使用 flatten 函数

1
2
3
4
5
6
7
8
9
10
11
from pyspark.sql.functions import flatten
df.select(df.name, flatten(df.subjects)).show(truncate=False)
>>> output Data:
>>>
+-------+-------------------------------+
|name |flatten(subjects) |
+-------+-------------------------------+
|James |[Java, Scala, C++, Spark, Java]|
|Michael|[Spark, Java, C++, Spark, Java]|
|Robert |[CSharp, VB, Spark, Python] |
+-------+-------------------------------+
展平再分解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
df.select(df.name, explode(flatten(df.subjects))).show(truncate=False)
>>> output Data:
>>>
+-------+------+
|name |col |
+-------+------+
|James |Java |
|James |Scala |
|James |C++ |
|James |Spark |
|James |Java |
|Michael|Spark |
|Michael|Java |
|Michael|C++ |
|Michael|Spark |
|Michael|Java |
|Robert |CSharp|
|Robert |VB |
|Robert |Spark |
|Robert |Python|
+-------+------+
  • Map

    • create_map:键值对
    1
    2
    from pyspark.sql.functions import explode, split
    df.select(create_map(F.col("a"), F.col("b")).alias("c_map"))

    ​ 根据key值取value

    1
    2
    3
    from pyspark.sql.functions import explode, split
    df.select(create_map(F.col("a"), F.col("b")).alias("c_map"))\
    .selectExpt("c_map['WHILE METAL LANTERN']")

    ​ 展开map类型,将其转换成列:explode

    1
    2
    df.select(create_map(F.col("a"), F.col("b")).alias("c_map"))\
    .selectExpt("explode('c_map')")

9. 处理Json类型

(1)创建一个Json类型的列:

1
2
3
4
5
6
7
jsonDF = spark.range(1).selectExpr("""
'{
"a": {
"aa": [1,2,3]
}
}' as jsonString
""")

(2) get_json_object:查询JSON对象

pyspark.sql.functions.get_json_object(col, path) : 根据指定的 json 路径从 json 字符串中提取 json 对象,并返回提取的 json 对象的 json 字符串。如果输入的 json 字符串无效,它将返回 null。

1
2
3
4
5
6

>>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')]
>>> df = spark.createDataFrame(data, ("key", "jstring"))
>>> df.select(df.key, get_json_object(df.jstring, '$.f1').alias("c0"), \
... get_json_object(df.jstring, '$.f2').alias("c1") ).collect()
[Row(key='1', c0='value1', c1='value2'), Row(key='2', c0='value12', c1=None)]

(3) 若此查询的JSON对象仅有一层嵌套,则可使用json_tuple

pyspark.sql.functions.json_tuple(col, *fields): 根据给定的字段名称为 json 列创建一个新行。

1
2
3
4
>>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')]
>>> df = spark.createDataFrame(data, ("key", "jstring"))
>>> df.select(df.key, json_tuple(df.jstring, 'f1', 'f2')).collect()
[Row(key='1', c0='value1', c1='value2'), Row(key='2', c0='value12', c1=None)]

(4)to_json:将StructType转换成JSON字符串

1
2
3
4
5
6
7
8
>>> df = ps.DataFrame([['a', 'b'], ['c', 'd']],
... columns=['col 1', 'col 2'])
>>> df.to_json()
'[{"col 1":"a","col 2":"b"},{"col 1":"c","col 2":"d"}]'


>>> df['col 1'].to_json()
'[{"col 1":"a"},{"col 1":"c"}]'

(5) from_json:解析JSON数据,需指定模式

可以使用pyspark.sql.functions.from_json函数将DataFrame中的字典列拆分为多列

10. UDF (自定义函数)

UDF允许使用多种不同的变成语言编写,这些UDF函数被注册为SparkSession或者Context的临时函数。

Spark将在Driver进程上序列化UDF函数,并将它通过网络传递到所有的executor进程。

如果用Scala或Java编写的,可以在JVM中使用它。除了不能使用Spark为内置函数提供的代码生成功能之外,会导致性能的下降。

如果函数是用Python编写的,Spark在worker上启动一个Python进程,将所有程序列化为Python可解释的格式(在此之前程序位于JVM中),在Python进程中对该程序逐行执行函数,最终将对每行的操作结果返回给JVM和Spark。

将程序序列化为Python可解释的格式这个过程代价很高!!!!

  1. 计算昂贵
  2. 程序进入Python后Spark无法管理worker内存。若某个worker因资源受限而失败(JVM和Python在同一台机器争夺内存),可能会导致该worker出现故障。———–建议用java/scala编写UDF。
1
2
3
4
5
from pyspark.sql.functions import udf
def power1(v):
return v**2
powerudf = udf(power1)
df.select(powerudf(F.col('a')))

Join

发表于 2023-03-16 | 分类于 大数据 , Spark

本文仅做学习总结,如有侵权立删

PySpark的Join

https://zhuanlan.zhihu.com/p/344080090

一、Join方式:

pyspark主要分为以下几种join方式:

  • Inner joins (内连接): 两边都有的保持
  • Outer joins (外连接):两边任意一边有的保持
  • Left outer joins (左外连接):只保留左边有的records
  • Right outer joins (右外连接):只保留右边有的records
  • Left semi joins (左半连接):只保留在右边记录里出现的左边的records
  • Left anti joins (左反连接):只保留没出现在右边记录里的左边records(可以用来做过滤)
  • natural join(自然连接):通过隐式匹配两个数据集之间具有相同名称的列来执行连接)
  • cross join(笛卡尔连接):将左侧数据集的每一行与右侧数据集中的每一行匹配,结果行数很多。

##二、使用方式和例子

下面造个数据集来看看这些join的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
person = spark.createDataFrame([
(0, "Bill Chambers", 0, [100]),
(1, "Matei Zaharia", 1, [500, 250, 100]),
(2, "Michael Armbrust", 1, [250, 100])])\
.toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley")])\
.toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")])\
.toDF("id", "status")

Inner Joins

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# in Python
joinExpression = person["graduate_program"] == graduateProgram['id']

# in Python
wrongJoinExpression = person["name"] == graduateProgram["school"]

# default so no need to specify
person.join(graduateProgram, joinExpression).show()
>>>

+---+----------------+----------------+---------------+---+-------+----------+---
| id| name|graduate_program| spark_status| id| degree|department|...
+---+----------------+----------------+---------------+---+-------+----------+---
| 0| Bill Chambers| 0| [100]| 0|Masters| School...|...
| 1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.| EECS|...
| 2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.| EECS|...
+---+----------------+----------------+---------------+---+-------+----------+---

Outer Joins

Outerjoins evaluate the keys in both of the DataFrames or tables and includes (and joins together) the rows that evaluate to true or false. If there is no equivalent row in either the left or right DataFrame, Spark will insertnull:

1
2
3
4
5
6
7
8
9
10
11
joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).show()

+----+----------------+----------------+---------------+---+-------+-------------
| id| name|graduate_program| spark_status| id| degree| departmen...
+----+----------------+----------------+---------------+---+-------+-------------
| 1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.| EEC...
| 2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.| EEC...
|null| null| null| null| 2|Masters| EEC...
| 0| Bill Chambers| 0| [100]| 0|Masters| School...
+----+----------------+----------------+---------------+---+-------+-------------

Left Outer Joins

Leftouter joins evaluate the keys in both of the DataFrames or tables and includes all rows from the left DataFrame as well as any rows in the right DataFrame that have a match in the left DataFrame. If there is no equivalent row in the right DataFrame, Spark will insertnull:

1
2
3
4
5
6
7
8
9
10
11
12
13
joinType = "left_outer"

graduateProgram.join(person, joinExpression, joinType).show()

>>>
+---+-------+----------+-----------+----+----------------+----------------+---
| id| degree|department| school| id| name|graduate_program|...
+---+-------+----------+-----------+----+----------------+----------------+---
| 0|Masters| School...|UC Berkeley| 0| Bill Chambers| 0|...
| 2|Masters| EECS|UC Berkeley|null| null| null|...
| 1| Ph.D.| EECS|UC Berkeley| 2|Michael Armbrust| 1|...
| 1| Ph.D.| EECS|UC Berkeley| 1| Matei Zaharia| 1|...
+---+-------+----------+-----------+----+----------------+----------------+---

Right Outer Joins

Rightouter joins evaluate the keys in both of the DataFrames or tables and includes all rows from the right DataFrame as well as any rows in the left DataFrame that have a match in the right DataFrame. If there is no equivalent row in the left DataFrame, Spark will insertnull:

1
2
3
4
5
6
7
8
9
10
11
12
joinType = "right_outer"

person.join(graduateProgram, joinExpression, joinType).show()

+----+----------------+----------------+---------------+---+-------+------------+
| id| name|graduate_program| spark_status| id| degree| department|
+----+----------------+----------------+---------------+---+-------+------------+
| 0| Bill Chambers| 0| [100]| 0|Masters|School of...|
|null| null| null| null| 2|Masters| EECS|
| 2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.| EECS|
| 1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.| EECS|
+----+----------------+----------------+---------------+---+-------+------------+

Left Semi Joins 用作数据筛选(include 方式)

Semijoins are a bit of a departure from the other joins. They do not actually include any values from the right DataFrame. They only compare values to see if the value exists in the second DataFrame. If the value does exist, those rows will be kept in the result, even if there are duplicate keys in the left DataFrame. Think of left semi joins as filters on a DataFrame, as opposed to the function of a conventional join:

1
2
3
4
5
6
7
8
9
10
joinType = "left_semi"

graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+
| id| degree| department| school|
+---+-------+--------------------+-----------+
| 0|Masters|School of Informa...|UC Berkeley|
| 1| Ph.D.| EECS|UC Berkeley|
+---+-------+--------------------+-----------+

Left Anti Joins 用作数据筛选(exclude的方式)

Leftanti joins are the opposite of left semi joins. Like left semi joins, they do not actually include any values from the right DataFrame. They only compare values to see if the value exists in the second DataFrame. However, rather than keeping the values that exist in the second DataFrame, they keep only the values thatdo nothave a corresponding key in the second DataFrame. Think of anti joins as aNOT INSQL-style filter

1
2
3
4
5
6
7
8
joinType = "left_anti"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+----------+-----------+
| id| degree|department| school|
+---+-------+----------+-----------+
| 2|Masters| EECS|UC Berkeley|
+---+-------+----------+-----------+

三、常见问题和解决方案

1. 处理重复列名:两张表如果存在相同列名??

方法1:采用不同的连接表达式:

​ 当有两个同名的键时,将连接表达式从布尔表达式更改为字符串或序列,这会在连接过程中自动删除其中一个列。

1
a.join(b, 'id') # 不写成a.join(b , a['id']==b['id'])

方法2:连接后删除列,采用drop

1
a.join(b, a['id']==b['id']).drop(a.id)

方法3:在连接前重命名列,采用withColumnRenamed

1
2
a = a.withColumnRenamed('id1', F.col('id'))
a.join(b, a.id1=b.id)

四、Spark如何执行连接

两个核心模块:点对点通信模式和逐点计算模式

  • 大表和大表的连接

    • shuffle join :每个节点都与所有其他节点进行通信,并根据哪个节点具有某些键来共享数据。由于网络会因通信量而阻塞,所以这种方式很耗时,特殊是如果数据没有合理分区的情况下。
  • 大表与小表连接

    • broadcast join:当表的大小足够小以便能够放入单个节点内存中且还有空闲空间的时候,可优化join。

      把数据量较小的DataFrame复制到集群中的所有工作节点上,只需在开始时执行一次,然后让每个工作节点独立执行作业,而无需等待其他工作节点,也无需与其他工作节点通信。

广播变量

发表于 2023-03-14 | 分类于 大数据 , Spark

本文仅做学习总结,如有侵权立删

[TOC]

如果想在节点之间共享一份变量,spark提供了两种特定的共享变量,来完成节点之间的变量共享。

(1)广播变量(2)累加器

一、广播变量

概念:

广播变量允许程序员缓存一个只读的变量在每台机器上(worker),而不是每个任务(task)保存一个拷贝。例如,利用广播变量,我们能够以一种更有效率的方式将一个大数据量输入集合的副本分配给每个节点。

广播变量用于跨所有节点保存数据副本。 此变量缓存在所有计算机上,而不是在具有任务的计算机上发送。

一个广播变量可以通过调用SparkContext.broadcast(v)方法从一个初始变量v中创建。广播变量是v的一个包装变量,它的值可以通过value方法访问。

用途:比如一个配置文件,可以共享给所有节点。比如一个Node的计算结果需要共享给其他节点。

  • 可以通过广播变量, 通知当前worker上所有的task, 来共享这个数据,避免数据的多次复制,可以大大降低内存的开销
  • sparkContext.broadcast(要共享的数据)

声明:broadcast

调用broadcast,Scala中一切可序列化的对象都可以进行广播。

sc.broadcast(xxx)

引用广播变量数据:value

可在各个计算节点中通过 bc.value来引用广播的数据。

img

更新广播变量:unpersist

由于广播变量是只读的,即广播出去的变量没法再修改,

利用unpersist函数将老的广播变量删除,然后重新广播一遍新的广播变量。

bc.unpersist()

img

销毁广播变量:destroy

bc.destroy()可将广播变量的数据和元数据一同销毁,销毁之后就不能再使用了。

img

二、累加器

概念:

累加器是一种只能利用关联操作做“加”操作的变数,因此他能够快速执行并行操作。而且其能够操作counters和sums。Spark原本支援数值类型的累加器,程序员可以自行增加可被支援的类型。如果建立一个具体的累加器,其可在spark UI上显示。

用途:

对信息进行聚合,累加器的一个常见的用途是在调试时对作业的执行过程中事件进行计数。

创建累加器:accumulator

调用SparkContext.accumulator(v)方法从一个初始变量v中创建。

运行在集群上的任务可以通过add方法或者使用+=操作来给它加值。然而,它们无法读取这个值。和广播变量相反,累加器是一种add only的变项。

img

累加器的陷阱

img

打破累加器陷阱:persist函数

img

存累加器初始值:

img

img

累加器实现一些基本的功能:

img

窗口函数

发表于 2023-03-13 | 分类于 大数据 , Spark

本文仅做学习总结,如有侵权立删

[TOC]

一、窗口函数的概念

能返回整个dataframe,也能进行聚合运算。Spark支持三种窗口函数:排名函数、解析函数和聚合函数。

例子:找到每年当中最冷那一天的温度 / 最冷那一天的日期。

  • groupBy实现

【每年当中最冷的那一天的温度】:

1
2
a = gsod.groupby('year').agg(F.min('temp').alias('temp'))
a.orderBy('temp')

将上面的结果join回原来的dataframe得到【最冷那一天的日期】:

1
gsod.join(a, how = 'left', on = ["year", "temp"]).select('year', 'month', 'day', 'temp')

但是上面做法影响效率,left join!

  • 窗口函数的过程
  1. 根据某个条件对数据进行分组,PartitionBy
  2. 根据需求计算聚合函数
  3. 将计算结果Join回一个大dataframe
1
2
3
4
5
from pyspark.sql.window import Window
each_year = Window.partitionBy("year")
gsod.withColumn('min_temp',F.min("temp").over(each_year))\
.where("temp=min_temp")\
.select("year", "month", "day")

二、窗口函数

对于一个数据集,map 是对每行进行操作,为每行得到一个结果;reduce 则是对多行进行操作,得到一个结果;而 window 函数则是对多行进行操作,得到多个结果(每行一个)。

窗口函数是什么?来源于数据库,窗口函数是用与当前行有关的数据行参与计算。Mysql中:

  • partition by:用于对全量数据表进行切分(与SQL中的groupby功能类似,但功能完全不同),直接体现的是前面窗口函数定义中的“有关”,即切分到同一组的即为有关,否则就是无关;
  • order by:用于指定对partition后各组内的数据进行排序;
  • rows between:用于对切分后的数据进一步限定“有关”行的数量,此种情景下即使partition后分到一组,也可能是跟当前行的计算无关。

img

需求:组内按分数排序。

1
df.select($"uid", $"date", $"score", row_number().over(Window.partitionBy("uid").orderBy($"score".desc)).as("rank"))

img

udf + 窗口函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from pyspark.sql import Window

@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
w = Window.partitionBy('id').orderBy('v').rowsBetween(-1, 0)
df.withColumn('mean_v', mean_udf("v").over(w)).show()

+---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.0|
| 1| 2.0| 1.5|
| 2| 3.0| 3.0|
| 2| 5.0| 4.0|
| 2|10.0| 7.5|
+---+----+------+

三、 同时groupby 两个key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 生成首次付费时间(FirstPayDate), 首次付费当天的付费金额(FirstPayPrice), 总付费金额(AllPayPrice). 首次单天付费频次(FirstPayFreq), 总付费频次(AllPayFreq)
w1 = Window.partitionBy("OrderId").orderBy(col("PayDate"))
w2 = Window.partitionBy("OrderId")
df_pay_data = df_order_all.filter((F.col("OrderType") == 0) & ((F.col("IsPay") == 1)))\
.withColumnRenamed("OrderTime", "PayTime") \
.withColumnRenamed("OrderPrice", "PayPrice") \
.withColumn("PayDate", date_trunc('day', to_timestamp(F.col("PayTime")/1000)))\
.withColumn("row",row_number().over(w1)) \
.withColumn("AllPayPrice", sum(col("PayPrice")).over(w2)) \
.withColumn("FirstPayDate", min(col("PayDate")).over(w2)) \
.withColumn("FirstPayPrice", sum(col("PayPrice")).over(w1)) \
.withColumn("FirstPayFreq", count(col("IsPay")).over(w1)) \
.withColumn("AllPayFreq", count(col("IsPay")).over(w2)) \
.where(col("row") == 1).select("AllPayPrice", "FirstPayPrice", "FirstPayDate","OrderId", "FirstPayFreq", "AllPayFreq")

https://sparkbyexamples.com/pyspark/pyspark-select-first-row-of-each-group/

https://stackoverflow.com/questions/45946349/python-spark-cumulative-sum-by-group-using-dataframe (相加)

四. groupby + sort + list

1
2
3
group_df = qds_com.groupby(['company']) \
.agg(F.sort_array(F.collect_list(F.struct("features", "label", "samples_count"))) \
.alias("pair"))

五、求众数

1
df_com_cookie.groupBy(['company','price']).agg(F.count('price').alias('count_price')).orderBy(['company','count_price'], ascending=False).drop_duplicates(subset=['company']).show()
1
2
window = Window.partitionBy(['a']).orderBy(['a'])
df.withColumn('rank',F.rank().over(window)).filter("rank = '1'").drop('rank')

DataFrame

发表于 2023-03-12 | 分类于 大数据 , Spark

本文仅做学习总结,如有侵权立删

DataFrame

一、Spark SQL

Spark SQL用于对结构化数据进行处理,它提供了DataFrame的抽象,作为分布式平台数据查询引擎,可以在此组件上构建大数据仓库。

DataFrame是一个分布式数据集,在概念上类似于传统数据库的表结构,数据被组织成命名的列,DataFrame的数据源可以是结构化的数据文件,也可以是Hive中的表或外部数据库,也还可以是现有的RDD。

DataFrame的一个主要优点是,Spark引擎一开始就构建了一个逻辑执行计划,而且执行生成的代码是基于成本优化程序确定的物理计划。与Java或者Scala相比,Python中的RDD是非常慢的,而DataFrame的引入则使性能在各种语言中都保持稳定。

二、初始化

在过去,你可能会使用SparkConf、SparkContext、SQLContext和HiveContext来分别执行配置、Spark环境、SQL环境和Hive环境的各种Spark查询。

SparkSession现在是读取数据、处理元数据、配置会话和管理集群资源的入口。SparkSession本质上是这些环境的组合,包括StreamingContext。

1
2
3
4
5
6
from pyspark.sql import SparkSession
spark=SparkSession \
.builder \
.appName('test') \
.config('master','yarn') \
.getOrCreate()

Spark 交互式环境下,默认已经创建了名为 spark 的 SparkSession 对象,不需要自行创建。

从RDD创建DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 推断schema
from pyspark.sql import Row
lines = sc.textFile("users.txt")
parts = lines.map(lambda l: l.split(","))
data = parts.map(lambda p: Row(name=p[0],age=p[1],city=p[2]))
df=createDataFrame(data)
# 指定schema
data = parts.map(lambda p: Row(name=p[0],age=int(p[1]),city=p[2]))
df=createDataFrame(data)
# StructType指定schema
from pyspark.sql.types import *
schema = StructType([
StructField('name',StringType(),True),
StructField('age',LongType(),True),
StructField('city',StringType(),True)
])
df=createDataFrame(parts, schema)

StructField包括以下方面的内容:
name:字段名
dataType:数据类型
nullable:此字段的值是否为空

从文件系统创建DataFrame

1
2
3
4
df = spark.read.json("customer.json")
df = spark.read.load("customer.json", format="json")
df = spark.read.load("users.parquet")
df = spark.read.text("users.txt")

输出和保存

1
2
3
4
5
6
7
df.rdd # df转化为RDD
df.toJSON() # df转化为RDD字符串
df.toPandas() # df转化为pandas
df.write.save("customer.json", format="json")
df.write.save("users.parquet")
df.write.json("users.json")
df.write.text("users.txt")

数据库读写

1
2
3
df = spark.sql('select name,age,city from users') 
df.createOrReplaceTempView(name) # 创建临时视图
df.write.saveAsTable(name,mode='overwrite',partitionBy=None)

操作hive表
df.write 有两种方法操作hive表

  • saveAsTable()
    如果hive中不存在该表,则spark会自动创建此表匹。
    如果表已存在,则匹配插入数据和原表 schema(数据格式,分区等),只要有区别就会报错
    若是分区表可以调用partitionBy指定分区,使用mode方法调整数据插入方式:

Specifies the behavior when data or table already exists. Options include:

  • overwrite: 覆盖原始数据(包括原表的格式,注释等)
  • append: 追加数据(需要严格匹配)
  • ignore: ignore the operation (i.e. no-op).
  • error or errorifexists: default option, throw an exception at runtime.
  • df.write.partitionBy('dt').mode('append').saveAsTable('tb2')
  • insertInto()

无关schema,只按数据的顺序插入,如果原表不存在则会报错
对于分区表,先开启Hive动态分区,则不需要指定分区字段,如果有一个分区,那么默认为数据中最后一列为分区字段,有两个分区则为最后两列为分区字段,以此类推

1
2
3
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
df.write.insertInto('tb2')
  • 同样也可以先开启Hive动态分区,用SQL语句直接运行
    sql("insert into tb2 select * from tb1")
DataFrame信息 说明
df.show(n) 预览前 n 行数据
df.collect() 列表形式返回
df.dtypes 列名与数据类型
df.head(n) 返回前 n 行数据
df.first() 返回第 1 行数据
df.take(n) 返回前 n 行数据
df.printSchema() 打印模式信息
df.columns 列名
查询语句 说明
df.select(*cols) SELECT in SQL
df.union(other) UNION ALL in SQL
df.when(condition,value) CASE WHEN in SQL
df.alias(*alias,**kwargs) as in SQL
F.cast(dataType) 数据类型转换(函数)
F.lit(col) 常数列(函数)
selectExpr 表查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from pyspark.sql import functions as F
df.select('*')
df.select('name','age') # 字段名查询
df.select(['name','age']) # 字段列表查询
df.select(df['name'],df['age']+1) # 表达式查询
df.select('name',df.mobile.alias('phone')) # 重命名列
df.select('name','age',F.lit('2020').alias('update')) # 常数
df.select('name',
F.when(df.age > 100,100)
.when(df.age < 0,-1)
.otherwise(df.age)
).show()
from pyspark.sql.types import *
df.select('name',df.age.cast('float'))
df.select('name',df.age.cast(FloatType()))
# selectExpr接口支持并行计算
expr=['count({})'.format(i) for i in df.columns]
df.selectExpr(*expr).collect()[0]

#表查询selectExpr,可以使用UDF函数,指定别名等
import datetime
spark.udf.register("getBirthYear",lambda age:datetime.datetime.now().year-age)
dftest = df.selectExpr("name", "getBirthYear(age) as birth_year" , "UPPER(gender) as gender" )
dftest.show()

---------------------------
#窗口函数

df = spark.createDataFrame([("LiLei",78,"class1"),("HanMeiMei",87,"class1"),
("DaChui",65,"class2"),("RuHua",55,"class2")]) \
.toDF("name","score","class")

df.show()
dforder = df.selectExpr("name","score","class",
"row_number() over (partition by class order by score desc) as order")

dforder.show()
+---------+-----+------+
| name|score| class|
+---------+-----+------+
| LiLei| 78|class1|
|HanMeiMei| 87|class1|
| DaChui| 65|class2|
| RuHua| 55|class2|
+---------+-----+------+

+---------+-----+------+-----+
| name|score| class|order|
+---------+-----+------+-----+
| DaChui| 65|class2| 1|
| RuHua| 55|class2| 2|
|HanMeiMei| 87|class1| 1|
| LiLei| 78|class1| 2|
+---------+-----+------+-----+
排序 说明
df.sort(*col,**kwargs) 排序
df.orderBy(*col,**kwargs) 排序(用法同sort)
1
2
3
4
df.sort(df.age.desc()).show()
df.sort('age',ascending=True).show()
df.sort(desc('age'),'name').show()
df.sort(['age','name'],ascending=[0,1]).show()
筛选方法 说明
df.filter(condition) 筛选
column.isin(*cols) in (...)
column.like(pattern) SQL通配符匹配
column.rlike(pattern) 正则表达式匹配
column.startswith(pattern) 匹配开始
column.endswith(pattern) 匹配结尾
column.substr(start,length) 截取字符串
column.between(lower,upper) between ... and ...
column.where
1
2
3
4
5
6
7
8
df.filter("age = 22").show()
df.filter(df.age == 22).show()
df.select(df['age'] == 22).show()
df.select(df.name.isin('Bill','Elon')).show()
df.filter("name like Elon%").show()
df.filter(df.name.rlike("Musk$").show()

df.where("gender='male' and age > 15").show()
统计信息 说明
df.describe() 描述性统计
df.count() 行数
df.approxQuantile(col,prob,relativeError) 百分位数
df.corr(col1,col2,method=None) 相关系数
1
2
3
4
5
6
7
8
# 异常值处理
bounds = {}
for col in df.columns:
quantiles = df.approxQuantile(col,[0.25,0.75],0.05)
# 第三个参数relativeError代表可接受的错误程度,越小精度越高
IQR = quantiles[1] - quantiles[0]
bounds[col] = [quantiles[0]-1.5*IQR, quantiles[1]+1.5*IQR]
# bounds保存了每个特征的上下限
分组和聚合 说明
df.groupBy(*cols) 分组,返回GroupedData
groupedData.count() 计数
groupedData.sum(*cols) 求和
groupedData.avg(*cols) 平均值
groupedData.mean(*cols) 平均值
groupedData.max(*cols) 最大值
groupedData.min(*cols) 最小值
groupedData.agg(*exprs) 应用表达式

聚合函数还包括 countDistinct, kurtosis, skewness, stddev, sumDistinct, variance 等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
df.groupBy('city').count().collect()
df.groupBy(df.city).avg('age').collect()
df.groupBy('city',df.age).count().collect()
df.groupBy('city').agg({'age':'mean'}).collect() # 字典形式给出
df.groupBy('city').agg({'*':'count'}).collect()
df.groupBy('city').agg(F.mean(df.age)).collect()
# groupBy + collect_list
df.groupBy("gender").agg(F.expr("avg(age)"),F.expr("collect_list(name)")).show()
+------+--------+------------------+
|gender|avg(age)|collect_list(name)|
+------+--------+------------------+
| null| 16.0| [RuHua]|
|female| 16.0| [HanMeiMei]|
| male| 16.0| [LiLei, DaChui]|
+------+--------+------------------+
去重 说明
df.distinct() 唯一值(整行去重)
df.dropDuplicates(subset=None) 删除重复项(可以指定字段)
添加、修改、删除列 说明
df.withColumnRenamed(existing,new) 重命名
df.withColumn(colname,new) 修改列
df.drop(*cols) 删除列
1
2
3
df=df.withColumn('age',df.age+1)
df=df.drop('age')
df=df.drop(df.age)
缺失值处理 说明
df.na.fill(value,subset=None) 缺失值填充
df.na.drop(how='any',thresh=None,subset=None) 缺失值删除
df.na.replace(to_teplace,value,subset=None) 替换
1
2
3
4
5
df=df.na.fill(0)
df=df.na.fill({'age':50,'name':'unknow'})
df=df.na.drop()
df = df.dropna() # 跟上面那种方式是一样的
df=df.na.replace(['Alice','Bob'],['A','B'],'name')
分区和缓存 说明
df.repartition(n) 将df拆分为10个分区
df.coalesce(n) 将df合并为n个分区
df.cache() 缓存
123

Lee_yl

30 日志
22 分类
4 标签
© 2023 Lee_yl
由 Hexo 强力驱动
|
主题 — NexT.Pisces v5.1.4