11. Spark SQL


本文仅做学习总结,如有侵权立删

Spark SQL简介

1、什么是SQL

结构化查询语言:一种表示数据关系操作的特定领域语言。

Spark2.0发布了一个支持Hive操作的超集,并提供了一个能够同时支持ANSI SQL(标准SQL)和HiveSQL的原生SQL解析器。

  • SparkSession新的起始点
    在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。
    SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的

  • 表存储两类重要的信息:表内数据和表的有关数据,也就是元数据。

  • 托管表: 让Spark管理一组文件和数据的元数据

当您从磁盘上的文件定义表时,您正在定义一个非托管表。

“拥有”其数据,表定义 (元数据) 以及表数据都通过元数据系统进行管理。 托管表在其架构和数据之间提供一致性,如果删除了托管表,则它包含的数据 也将被删除。

  • 非托管表: Spark只管理表的元数据

外部表是在元数据系统中提供表架构的表,但仅引用来自元数据系统控件的外部数据的表。 因此,在创建表后,无法保证表架构与数据一致。 无需通过外部表即可更改外部数据。 如果更改或删除外部表,基础数据不会受到影响或删除。

2. Dataframe和SQL互转

(1)createOrReplaceTempView:对DataFrame创建一个临时表

(2)df.createGlobalTempView:对于DataFrame创建一个全局表

1
2
3
4
5
6
7
8
9
# Dataframe转SQL,对DataFrame创建一个临时表
spark.read.json("data.json")\
.createOrReplaceTempView("some_sql_view")

# SQL转DataFrame
spark.sql("""
SELECT a,count FROM some_sql_view
""").where("a like 'S%'").where("`sum(count)` > 10").count()
# 注意:临时表是Session范围内的,Session退出后,表就失效了。如果想应用范围内有效,可以使用全局表。注意使用全局表时需要全路径访问,如:global_temp.some_sql_view

3. Catalog

catalog: 指的是所有的database目录

Spark SQL中的最高抽象是Catalog。Catalog是存储关于表中存储的数据以及其他有用的东西(如数据库、表、函数和视图)的元数据的抽象。

Catalog位于org.apache.spark.sql.catalog.Catalog包中,包含许多有用的函数,用于列出表、数据库和函数。我们将很快讨论所有这些事情。它对用户来说非常容易理解,因此我们将省略这里的代码示例,但它实际上只是另一个SparkSQL的编程接口。本章只显示正在执行的SQL;因此,如果您正在使用编程接口,请记住您需要将所有内容封装在一个spark.sql()函数中,执行相关代码。

4. 表

(1)创建表:create table

  • 从多种数据源创建表:

USING 和 STORED AS

前面示例中的USING语法规范非常重要。如果不指定格式,Spark将默认为Hive SerDe配置。这对将来的读取和写入有性能影响,因为Hive SerDes比Spark的本地序列化慢得多。Hive用户还可以使用STORED AS语法来指定这应该是一个Hive表。

  • 也可以从查询中创建一个表:
1
CREATE TABLE flights_from_select USING parquet AS SELECT * FROM flights

此外,只有在表当前不存在时,才可以指定创建表:

在本例中,我们创建了一个与hive兼容的表,因为我们没有通过USING显式指定格式。我们还可以这样做:

1
CREATE TABLE IF NOT EXISTS flights_from_select AS SELECT * FROM flights

根据分区的控制数据布局:

1
2
3
CREATE TABLE partitioned_flights USING parquet 
PARTITIONED BY (DEST_COUNTRY_NAME)
AS SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 5

这些表甚至可以通过会话在Spark中使用;Spark中目前不存在临时表。您必须创建一个临时视图.

(2)创建外部表:create EXTERNAL table

  • create EXTERNAL table:创建一个外部表

您可以通过运行以下命令来使用任何已经定义的文件创建外部表:

您还可以从select子句创建一个外部表:

(3)插入数据到表

标准SQL:INSERT INTO

1
2
INSERT INTO flights_from_select 
SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 20

如果只希望写入某个分区,可以选择提供分区规范。注意,写操作也会遵循分区模式(这可能导致上面的查询运行得非常慢);但是,它只会添加额外的文件到目的分区:

1
2
3
4
INSERT INTO partitioned_flights 
PARTITION (DEST_COUNTRY_NAME="UNITED STATES")
SELECT count, ORIGIN_COUNTRY_NAME FROM flights
WHERE DEST_COUNTRY_NAME='UNITED STATES' LIMIT 12

(4) 查看表元数据: DESCRIBE TABLE

我们在前面看到,可以在创建表时添加注释。你可以通过描述表元数据来查看,它会显示相关的注释:

1
DESCRIBE TABLE flights_csv

您还可以使用以下方法查看数据的分区方案(但是,请注意,这只适用于分区表):

1
SHOW PARTITIONS partitioned_flights

(5)刷新表元数据: REFRESH TABLE 、REPAIR TABLE

维护表元数据是确保从最新数据集读取数据的一项重要任务。有两个命令用于刷新表元数据。REFRESH TABLE刷新与该表关联的所有缓存条目(本质上是文件)。如果该表以前缓存过,那么下次扫描时它将被延迟缓存:

另一个相关命令是REPAIR TABLE,它刷新catalog中为给定表维护的分区。这个命令的重点是收集新的分区信息,例如手工写一个新的分区,需要相应地修复表:

1
MSCKREPAIR TABLE partitioned_flights

(6) 删除表:drop table

1
您不能delete表:您只能“drop”它们。可以使用drop关键字删除表。如果删除托管表(例如flights_csv),数据和表定义都将被删除:
1
DROP TABLE flights_csv;

如果您试图删除不存在的表,您将收到一个错误。若要仅删除已经存在的表,请使用DROP TABLE IF EXISTS:

1
DROP TABLE IF EXISTS flights_csv;
  • 删除非托管表
1
如果您正在删除一个非托管表(例如,hive_flights),则不会删除任何数据,但您将不再能够通过表名引用该数据。

(7)缓存表:cache和uncache

就像DataFrames一样,您可以缓存和不缓存表。您只需使用以下语法指定要使用哪个表:

1
2
3
4
# 缓存表
CACHE TABLE flights
# 不缓存表
UNCACHE TABLE flights

5、视图

创建了表之后,还可以定义视图。视图在现有表的之上定义一组转换(基本上就是保存的查询计划),这对于组织或重用查询逻辑非常方便。Spark有几个不同的视图概念。视图可以是全局级别的,可以设置为数据库级别,也可以是会话级别。

(1)创建视图: CREATE VIEW

对于最终用户来说,视图显示为表,只是在查询时对源数据执行转换,而不是将所有源数据重写到新的位置。这可能是一个filter、select,也可能是一个更大的group BY或ROLLUP。例如,在下面的例子中,我们创建了一个目的地为美国的视图,以便只看到这些航班:

  • 创建视图: CREATE VIEW
1
CREATE VIEW just_usa_view AS SELECT * FROM flights WHERE dest_country_name = 'United States'
  • 创建临时视图: CREATE TEMP VIEW

    这些视图只在当前会话期间可用,并且不注册到数据库:

1
CREATE TEMP VIEW just_usa_view_temp AS SELECT * FROM flights WHERE dest_country_name = 'United States'
  • 创建全局临时视图: CREATE GLOBAL TEMP VIEW
    全局临时视图的解析与数据库无关,可以在整个Spark应用程序中查看,但在会话结束时将它们删除:
1
CREATE GLOBAL TEMP VIEW just_usa_global_view_temp AS  SELECT * FROM flights WHERE dest_country_name = 'United States'SHOW TABLES
  • 覆盖视图:CREATE OR REPLACE TEMP VIEW

    还可以使用下面示例中所示的关键字指定,如果一个视图已经存在,则希望覆盖该视图。我们可以覆盖临时视图和常规视图:

1
CREATE OR REPLACE TEMP VIEW just_usa_view_temp AS SELECT * FROM flights WHERE dest_country_name = 'United States'
  • 查询视图

    1
    SELECT * FROM just_usa_view_temp

    视图实际上是一个转换,Spark只在查询时执行它。这意味着它只会在您实际查询表之后应用该过滤器(而不是更早)。

    实际上,视图相当于从现有DataFrame创建一个新的DataFrame。实际上,您可以通过比较Spark DataFrames和Spark SQL生成的查询计划来了解这一点。

  • 删除视图: DROP VIEW IF EXISTS

可以像删除表一样删除视图;只需指定要删除的是视图而不是表。删除视图和删除表的主要区别在于,对于视图,不删除底层数据,只删除视图定义本身:

1
DROP VIEW IF EXISTS just_usa_view;

6、数据库

  • 查看所有数据库:SHOW DATABASES

  • 创建数据库

    创建数据库的模式与您在本章前面看到的相同;但是,这里使用CREATE DATABASE关键字:

    1
    CREATE DATABASE some_db

    您可能想要设置一个数据库来执行某个查询。要做到这一点,使用use关键字后面跟着数据库名称:

    1
    USE some_db
  • 查看当前使用的数数据库

1
SELECT current_database()
  • 删除数据库: DROP DATABASE
1
DROP DATABASE IF EXISTS some_db;

7、高级主题

Spark SQL中有三种核心的复杂类型:struct、list和map。

(1)Struct

struct更类似于map。它们提供了在Spark中创建或查询嵌套数据的方法。要创建一个,你只需要用括号括起一组列(或表达式):

  • 查询Struct
1
2
select country.DEST_COUNTRY_NAME, count FROM nested_date
select country.*, count FROM nested_date # 全部列

(2)List

有几种方法可以创建数组或值列表。

  • 使用collect_list函数,它创建一个值列表。

  • 使用函数collect_set,它创建一个没有重复值的数组。
    这两个都是聚合函数,因此只能在聚合中指定:

  • 手动创建数组

  • 查询列表索引:[0]

  • 将数组转换回行。使用explode函数

    • 建视图

    • 拆行

    现在让我们将复杂类型分解为数组中每个值对应的结果中的一行。DEST_COUNTRY_NAME将对数组中的每个值进行复制,执行与原始collect完全相反的操作,并返回到原始DataFrame:

(3)查看Spark SQL中的函数列表:

SHOW FUNCTIONS: 查看Spark SQL中的函数列表

SHOW SYSTEM FUNCTIONS:指示是否希望查看系统函数(即Spark内置函数)以及用户函数:

SHOW USER FUNCTIONS: 用户函数是由您或其他共享Spark环境的人定义的。

SHOW FUNCTIONS “s*“: 通过传递带有通配符(*)的字符串来过滤所有SHOW命令。所有以“s”开头的函数:;

SHOW FUNCTIONS LIKE “collect*”: 选择包含LIKE关键字

  • 自定义函数

可以通过Hive CREATE TEMPORARY FUNCTION语法注册函数。

(4)子查询

如果你想看到你的航班是否会把你从你当前所在国家带回来,你可以通过检查是否有这样航班:以当前所在国家为起飞点,以带回国家为目的地:

EXISTS只检查子查询中的一些存在性,如果有值,返回true。你可以通过把NOT运算符放在它前面来翻转它。这就相当于找到了一架飞往你无法返回目的地的航班!

SQL配置

您可以在应用程序初始化时或在应用程序执行过程中设置这些参数(就像我们在本书中看到的shuffle分区一样)。

spark.sql.shuffle.partitions 200 配置在为连接或聚合shuffle数据时要使用的分区数。

您只能以这种方式设置Spark SQL配置,但是下面是如何设置shuffle分区:SETspark.sql.shuffle.partitions=20