0%

SparkSQL学习笔记

Spark是基于内存的计算框架,它主要包括SparkCore,SparkSQL,SparkML,SparkStreaming等方面的内容,本文从SparkSQL的的发展,DataSet,DataFrame,UDF和UDAF,OverFunction来介绍SparkSQL

介绍

SparkSQL是支持使用SQL来查询分布式数据的技术,它的发展过程是Hive -> Shark -> SparkSQL,SparkSQL与Hive的区别在于前者S写的sql底层解析成Sparkjob,而后者写的hql底层解析成MRjob。SparkSQL针对底层job一定程度优化,比如在进行下述查询时

SELECT 
table1.name,
table2.score
FROM
table1 JOIN table2
ON
table1.id = table2.id
WHERE
table1.age > 50 AND table2.score > 90

为了优化job,SparkSQL引入了谓词下推,不使用谓词下推和使用谓词下推的执行过程如下

SparkSQL1.6和2.0+的主要区别:

  1. 创建sql对象
    Spark 1.6是创建SQLContext,而Spark 2.0创建SparkSession
  2. 注册DataFrame临时表
    spark1.6通过df.registerTempTable,而spark2.0通过df.createOrReplaceTempView/df.createOrReplaceGlobalTempView

DataSet

DataSet是一个分布式数据集合,它是在Spark1.6的时候加入的新接口。DataSet和RDD一样具备强类型以及可以使用lambda函数的优点。在Scala API中DataFrame就是DataSet[Row]的别名,在Java中只能使用DataSet来表示DataFrame

与DataFrame的区别与联系

区别

  1. Dataset是强类型的,会在编译的时候进行类型检测;而DataFrame是弱类型的,在执行的时候进行类型检测;
  2. Dataset是通过 Encoder 进行序列化,负责将对象转换为字节,直接在字节层面调用对象的排序、过滤等方法来加快计算速度,而DataFrame采用 java 或 Kyro 序列化,执行操作的时候也需要先进行反序列化然后调用对象方法

联系

  1. 在spark2.x,DataFrame和Dataset的API进行了统一
  2. 在语法角度,DataFrame是Dataset中每一个元素为Row类型的特殊情况
    type DataFrame = Dataset[Row]
  3. DataFrame和Dataset实质上都是包含着schema信息的算子,是懒加载的

创建DataSet

// spark1.6+ 引入DataSet
case class Student(name:String,age:Long)
case class Person(id:Int,name:String,age:Int,score:Double)

// 创建spark对象
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("createStruceDataSet")
.getOrCreate()
import spark.implicits._

// 方法1: 根据scala集合创建DataSet
val list: Dataset[Person] = Seq[Person](
Person(1, "zhangsan", 18, 100),
Person(2, "lisi", 19, 200),
Person(3, "wangwu", 20, 300)
).toDS()
list.show(4)

// 将List映射成Person类型的DataSet
val ds1: Dataset[Person] = list.toDS()

// 将List映射成Int类型的DataSet
// 默认列名是value,如果是多列的话默认列名是`_1、_2、_3...`。
val ds2: Dataset[Int] = List[Int](1, 2, 3, 4, 5).toDS()

// 方法2:由json文件和类直接映射成DataSet
val ds3: Dataset[Student] = spark.read.json("./data/json").as[Student]

// 方法3:读取外部文件直接加载DataSet
val ds4: Dataset[String] = spark.read.textFile("./data/people.txt")
val ds5: Dataset[Person] = ds4.map(line => {
val arr: Array[String] = line.split(",")
Person(arr(0).toInt, arr(1).toString, arr(2).toInt, arr(3).toDouble)
})

实现WordCount

val linesDs: Dataset[String] = spark.read.textFile("./data/words").as[String]
val words: Dataset[String] = linesDs.flatMap(line=>{line.split(" ")})

// 统计每个单词的个数并从大到小排序
val groupDs: RelationalGroupedDataset = words.groupBy($"value" as "word")
val aggDs: DataFrame = groupDs.agg(count("*") as "totalCount")
val result: Dataset[Row] = aggDs.sort($"totalCount" desc)
result.show(100, truncate = false)

// 通过sparkSQL语句统计每个单词的个数并从大到小排序
val frame: DataFrame = words.withColumnRenamed("value","word")
frame.createOrReplaceTempView("myWords")
spark.sql("select word,count(word) as totalCount from myWords group by word order by totalCount desc").show()

DataFrame

如果说SparkCore底层操作的是RDD,那么SparkSQL底层操作的就是DataFrame,DataFrame是一张既有数据也有列Schema信息的二维表。想使用SQL语句查询分布式数据的方法一般流程是要创建DataFrame,然后注册视图,最后通过spark.sql(...)查询。在Spark1.6里Scala和Java都叫DataFrame,但是在Spark2.0+ 中Java的。下面小编从DataFrame创建,视图创建,DataFrame的使用以及DataFrame保存这4个方面对DataFrame做进一步介绍

DataFrame创建

RDD

反射

// 创建RDD
val rdd: RDD[String] = spark.sparkContext.textFile("./data/people.txt")

// 创建样例类
case class Person(id:Int,name:String,age:Int,score:Double)

// 将RDD中的元素都变成样例类Person的对象
val personRDD: RDD[Person] = peopleRDD.map(one => {
val arr: Array[String] = one.split(",")
Person(arr(0).toInt, arr(1).toString, arr(2).toInt, arr(3).toDouble)
})

// 为了使用rdd的toDF方法,必须要导入spark对象中的隐式转换对象
import spark.implicits._

// 将RDD通过.toDF()转化成DataFrame
val frame: DataFrame = personRDD.toDF()

动态创建Schema

// 创建RDD
val rdd: RDD[String] = spark.sparkContext.textFile("./data/people.txt")

// 将peopleRDD转换成RDD[Row]
val rowRDD: RDD[Row] = rdd.map(line => {
val arr: Array[String] = line.split(",")
Row(arr(2).toInt, arr(1), arr(0).toInt, arr(3).toLong)
})

// row中的数据类型一定要与schema中的类型保持一致
// 可以动态地里面塞入列
val structType: StructType = StructType(List[StructField](
StructField("id", IntegerType, nullable = true),
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("score", LongType, nullable = true)
))

// createDataFrame将二者自动映射
// 注:动态创建的ROW中数据的顺序要与Schema中的数据保持一致
val frame: DataFrame = spark.createDataFrame(rowRDD, structType)

Scala容器

/**
* List转成DataFrame
*/
val nameInfo: List[String] = List[String]("zhangsan", "lisi", "wangwu", "zhaoliu", "zhangsan", "lisi", "zhangsan")
import spark.implicits._
val nameDF: DataFrame = nameInfo.toDF("name_list")
nameDF.show(10)

/**
* Map转成DataFrame
* 将map中的key变成列名,value变成列值
*/
val featMap: Map[String, String] = Map[String, String]("k1"->"a1", "k2"->"a2", "k3"->"a3")

// 将map中的Value取出来,生成RDD[Row]
// val mapValue: RDD[Row] = spark.sparkContext.parallelize(Seq(Row(featMap.toSeq.map((_: (String, String))._2): _*)))
val mapValue: RDD[Row] = spark.sparkContext.parallelize(Seq(Row(featMap.values.toList:_*)))
mapValue.foreach(println)

// 将map中的key取出来,生成Schema
// val fields: List[StructField] = featMap.keys.toList.map((key: String) => StructField(key, StringType, nullable = true))
val fields: Seq[StructField] = featMap.toSeq.map(r => StructField(r._1, StringType, nullable = true))
// val mapSchema: StructType = StructType(fields)
val mapSchema: StructType = StructType(fields)

val frame: DataFrame = spark.createDataFrame(mapValue, mapSchema)
frame.show(100)

文件

Json文件

读取Json文件有以下两种方法

// 方式1
val df: DataFrame = spark.read.json("./data/json")

// 方式2
val df: DataFrame = spark.read.format("json").load("./data/json")

如果Json文件的内容中,包含嵌套Json数据,比如

那么我们可以使用”第一层key.第二层的key”来访问对应的value

df.sql("select infos.age, infos.gender from students")

如果Json文件的内容中,包含JsonArray数据,比如

那么我们可以通过explode算子将数组中的每个Json数据分别与前面key组合,构成的新列名默认为col

import org.apache.spark.sql.functions._  // 导入explode函数
import spark.implicits._ // 导入隐式转换 “$” 和 as

val transDF: DataFrame = df.select($"name", $"age", explode($"scores")).toDF("name", "age","allScores")

Parquet文件

// 和读取json文件的方式一样
val df1: DataFrame = spark.read.parquet("./data/parquet")
val df2: DataFrame = spark.read.format("parquet").load("./data/parquet")

Csv文件

// 在读取csv文件的时候增加option选项来读取表头
val df1 : DataFrame = spark.read.format("csv").option("header", "true").load("./data/test.csv")
val df2 : DataFrame = spark.read.option("header", "true").csv("./data/test.csv")

其中要注意

(1) 如果csv没有第一行,就只能指定schema了,即增加.schema(xxx),并且列的顺序会和schema中列的顺序一致,其中xxx就是自己定义的schema
(2) 如果不指定schema,都会解析成StringType
(3) 如果schema列数多于原始数据列数,那么多出来的会显示null;如果少于原始数据列数,那么只会取原始数据中前面的列,并且原始数据中多出的列会被忽略

创建视图的两种方式以及区别

// 导入spark对象下的所有方法,比如之后的spark.sql就可以直接写成sql
import spark._

// 创建当前会话下的视图
df.createTempView("students")
df.createOrRepalceTempView("studentsTest")

// 创建全局视图
df.createGlobalTempView("globalStudents")
df.createOrRepalceGlobalTempView("globalStudentsTest")

val df1: DataFrame = sql("select * from students where age > 18 and name = 'zhangsan5'")

// 访问全局的表的时候需要加上global_temp.表名,这样就可以跨seeesion访问
// 用select语句中的where,可以用模糊匹配,比如列名 like 'wang%'
// 一个sc可以有多个会话,通过会话.newSession()来创建
spark.newSession().sql("select * from Students where name like 'wang%'").show()

// 创建一个某个会话下的新视图会报错!!!
spark.newSession().sql("select * from students").show()

数据库

MySQL

// 方法1
val properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "11111111")

//默认用mysql-connector-java里面的包
val person: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/spark", "person", properties)

// 方法2
// 相比上一种方法可以显式指出来用了哪种包
// map中的键是指定写法,不能修改
val map: Map[String, String] = Map[String, String](
"url" -> "jdbc:mysql://localhost:3306/spark",
"driver"->"com.mysql.jdbc.Driver",
"user"->"root",
"password"->"11111111",
"dbtable"->"score" //表名字
)
val score: DataFrame = spark.read.format("jdbc").options(map).load()

// 方法3
// 相比于前一种方法,不用单独定义map
val reader: DataFrameReader = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/spark")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "11111111")
.option("dbtable", "score")
val score2: DataFrame = reader.load()

// 方法4
val properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "11111111")

spark.read.jdbc("jdbc:mysql://localhost:3306/spark", table = "(select person.id,person.name,person.age,score.score from person ,score where person.id = score.id) T", properties)

// 将以上两张表注册临时表
person.createOrReplaceTempView("person")
score.createOrReplaceTempView("score")

// 通过查询语句获得数据框
val result: DataFrame = spark.sql("select person.id,person.name,person.age,score.score from person ,score where person.id = score.id")

// 上述4种方法都支持将table转化成复杂语句,但是必须“(复杂语句)T” 给别名的方式来使用

Hive

// Spark1.6 中用HiveContext对象操作Hive数据
// Spark2.0+ 中用SparkSession对象操作Hive数据,SparkSession将SQLContext和HiveContext封装起来,但是读取Hive中的数据要开启Hive支持,即enableHiveSupport()
val spark: SparkSession = SparkSession
.builder()
.appName("CreateDataFrameFromHive")
.master("local") // 读取本地Hive仓库中的数据
.enableHiveSupport()
.getOrCreate()

spark.sql("show databases")
spark.sql("use spark")
spark.sql("show tables")
spark.sql("drop table if exists student_infos")
spark.sql("create table if not exists student_infos(name string, age int) row format delimited fields terminated by '\t'")
spark.sql("load data local inpath './data/student_infos' into table student_infos")
spark.sql("drop table if exists student_scores")
spark.sql("create table if not exists student_scores ( name string, score int) row format delimited fields terminated by '\t'")
spark.sql("load data local inpath './data/student_scores' into table student_scores")
val result: DataFrame = spark.sql("select si.name, si.age, ss.score from student_infos si, student_scores ss where si.name = ss.name")
spark.sql("drop table if exists good_student_infos")

// 将数据框保存带hive表中
result.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")

// 创建一个具有多种数据格式的hive表
spark.sql("drop table if exists t")
spark.sql("create table if not exists t(id struct<id1:int,id2:int,id3:int>,name array<string>,xx map<int,string>) " +
" row format delimited " +
"fields terminated by ' ' " +
"collection items terminated by ',' " +
"map keys terminated by ':' " +
"lines terminated by '\n'")

spark.sql("load data local inpath './data/student_t' overwrite into table t")
val result2: DataFrame = spark.sql("select * from t")
result2.show(truncate = false)

补充

  • Hive是建立在HDFS上的数据仓库架构,并且对存储在HDFS中的数据进行分析和管理

  • Hive建表语法中分隔符

-- 创建一张hive表
create table t(id struct<id1:int,id2:int,id3:int>,name array<string>,xx map<int,string>)
row format delimited -- 分隔符设置开始语句,必须放最前
fields terminated by '\t' -- 设置字段与字段之间的分隔符
collection items terminated by ',' -- 设置复杂类型(array,struct)字段中各个item之间的分隔符
map keys terminated by ':' -- 设置复杂类型(Map)字段中key和value之间的分隔符
lines terminated by '\n'; -- 设置行与行之间的分隔符,必须放最后
  • Hive的load语法
-- load基本语法
load data [local] inpath 'filepath' [overwrite]
into table tablename [partition (partcol1=val1,partcol2=val2...)]

-- 方式1. 将本地的数据文件导入上步创建的t表,并且覆盖原来的数据
load data local inpath './data/student_t.txt' overwrite into table t

-- 方式2. 将HDFS中的数据文件导入上步创建的t表,并且覆盖原来的数据
load data inpath '/i/student_t.txt' overwrite into table t3

视图创建

// 导入spark对象下的所有方法,比如之后的spark.sql就可以直接写成sql
import spark._

// 创建当前会话下的视图
df.createTempView("students")
df.createOrRepalceTempView("studentsTest")

// 创建全局视图
df.createGlobalTempView("globalStudents")
df.createOrRepalceGlobalTempView("globalStudentsTest")

val df1: DataFrame = sql("select * from students where age > 18 and name = 'zhangsan5'")

// 访问全局的表的时候需要加上global_temp.表名,这样就可以跨seeesion访问
// 用select语句中的where,可以用模糊匹配,比如列名 like 'wang%'
// 一个sc可以有多个会话,通过会话.newSession()来创建
spark.newSession().sql("select * from Students where name like 'wang%'").show()

// 创建一个某个会话下的新视图会报错!!!
spark.newSession().sql("select * from students").show()

DataFrame使用

val df: DataFrame = spark.read.json("./data/json") 

// 打印schema信息
df.printSchema()

// 获取第1行数据
val row1: Row = df.first()

// 获取前4数据
val rows3: Array[Row] = df.head(4)

// 获取前4数据
val rows2: Array[Row] = df.take(4)

// 展示前rownum行的数据(默认展示前20行),truncate=false 表示将数据展开
df.show(rownum, truncate=false)

// 查询方法
val df1: DataFrame = df.select("name")
val df2: DataFrame = df.select("name", "age")
val df3: DataFrame = df.select($"name".as("studentNanme"), $"age")
val df4: DataFrame = df.select($"name", ($"age" + 1).as("addAge"))
val df5: DataFrame = df.select(df.col("name") , (df.col("age") + 1).as("addAge"))

// 过滤方法
val df1: Dataset[Row] = df.filter("age > 18")
val df2: Dataset[Row] = df.filter($"age">18)
val df3: Dataset[Row] = df.filter(df.col("age")>18)
val df4: Dataset[Row] = df.filter(df.col("name").equalTo("zhangsan"))
val df5: Dataset[Row] = df.filter("name = 'zhangsan4' or name = 'zhangsan5'")

// 排序方法
val df1: Dataset[Row] = df.sort($"age".asc, $"name".desc)
val df2: Dataset[Row] = df.sort(df.col("age").asc, df.col("name").desc)
val df3: DataFrame = df.select(df.col("name").as("studentName"), df.col("age").alias("studentAge"))

// 分组方法
val df1: DataFrame = df.groupBy("age").count()

// map方法
// 数据框中的每一行都是Row类型,它可以像数组一样可以通过下标获取值
// Row类型相比数组的优点是保存列信息,也就是可以根据row中的字段名称获取值
val df1: DataFrame = df.map(row => "Name: " + row(0)).show()
val df2: DataFrame = df.map(row => "name: " + row.getAs[String]("name"))

// join方法: 每一种连接方法都会将两个连接键保留
// 1. 内连接
val inner_df1 = df1.join(df2, df1("name") === df2("name"))
val inner_df2 = df1.join(df2, df1("name") === df2("name"), joinType = "inner")

// 2. 左连接
val left_df1 = df1.join(df2, df1("name") === df2("name"), joinType = "left")
val left_df2 = df1.join(df2, df1("name") === df2("name"), joinType = "left_outer")

// 3. 右连接
val right_df1 = df1.join(df2, df1("name") === df2("name"), joinType = "right")
val right_df2 = df1.join(df2, df1("name") === df2("name"), joinType = "right_outer")

// 4. 全连接
val outer_df1 = df1.join(df2, df1("name") === df2("name"), joinType = "full")
val outer_df2 = df1.join(df2, df1("name") === df2("name"), joinType = "outer")
val outer_df3 = df1.join(df2, df1("name") === df2("name"), joinType = "full_outer")

// 数据框列名的重命名
import spark.implicits._
val temp: DataFrame = df.withColumnRenamed("name", "people_name")
val temp: DataFrame = idMappingDF.select($"name".alias("people_name"))
val temp: DataFrame = idMappingDF.select($"name".as("people_name"))

// DataFrame转成Row类型的RDD
val rdd: RDD[Row] = df.rdd
rdd.foreach(row => {
//打印row
println(row)
// row中获取值的两种方式:1.getAs("字段名") 2.getAs(下标)
val nameField: String = row.getAs[String]("name")
val ageField: Long = row.getAs[Long]("age")
val nameIndex: String = row.getAs[String](1)
val ageIndex: Long = row.getAs[Long](0)
println(s"name = $nameIndex, age = $ageIndex")
})

补充

  • filter/sort 得到的结果是DataSet
  • df.col(“xxx”) 和 $xxx的作用相同
  • Dataset和DataFrame都有show方法
  • select中如果其中一列用到了隐式转换$,其他列也必须用隐式转换

DataFrame保存

文件保存

parquet文件

df.write.mode(SaveMode.Append).format("parquet").save("./data/parquet")
df.write.mode(SaveMode.Append).parquet("./data/parquet")

csv文件

df.write.mode(SaveMode.Append).format("csv").save("./data/test.csv")
df.write.mode(SaveMode.Append).csv("./data/test.csv")

数据库保存

MySQL

// 注:数据框的名称result在jdbc()中要写成字符串的形式!!!
result.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties)

Hive

// 保存到本地的hive表中
result2.write.mode(SaveMode.Overwrite).saveAsTable("t_result")

UDF和UDAF

UDF

/**
* 1. 目前存在DataFrame以及SQL的UDF
* 2. UDF使用步骤为注册+使用
*/
object MyUDF {
def main(args: Array[String]): Unit = {

val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("UDF")
.getOrCreate()
import spark.implicits._
val df: DataFrame = spark.read.json("/Users/huangning/HNCode/neo_spark/data/student_scores.json")

/** Spark DataFrame UDF*/
val avgScoreUDF: UserDefinedFunction = functions.udf[Double,Int,Int,Int](avgScorePerStudent)
df.withColumn("avgScore", avgScoreUDF($"language",$"math",$"english")).show(false)

/** Spark SQL UDF */
df.createOrReplaceTempView("tmp_student_scores")
spark.udf.register("avgScore", functions.udf[Double,Int,Int,Int](avgScorePerStudent))
spark.sql("select *, avgScore(language,math,english) as avgScore from tmp_student_scores").show(false)

//还可以使用匿名函数代替functions.udf
val df2: DataFrame = List[String]("zhangsan", "lisi", "wangwu", "zhaoliu", "tianqi").toDF("name")
df2.createOrReplaceTempView("students")
spark.udf.register("strLen", (n:String)=>{
n.length
})
spark.sql("select name, strLen(name) as length from students sort by length desc").show(false)
}

// 计算每个同学的平均成绩
def avgScorePerStudent(language:Int, math:Int, english:Int):Double={
((language+math+english)/3.0).formatted("%.2f").toDouble
}
}

UDAF

/**
* 1. 目前存在DataFrame以及SQL的UDF
* 2. UDF使用步骤为注册+使用
*/
object MyUDAF {
def main(args: Array[String]): Unit = {

val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("UDF")
.getOrCreate()
import spark.implicits._
val df: DataFrame = spark.read.json("/Users/huangning/HNCode/neo_spark/data/student_scores.json")

/** Spark DataFrame UDAF */
val avgScore: MyUDAF = new MyUDAF()
df.groupBy($"departmentId",$"classId")
.agg(avgScore($"language").as("avgScorePerClass"))
.show(false)

/** SQL DataFrame UDAF */
df.createOrReplaceTempView("student_scores")
//UDAF注册
spark.udf.register("avgScore",new MyUDAF())
//UDAF使用
spark.sql("select departmentId,classId,avgScore(language) as avgScore from student_scores group by departmentId,classId")
.show(false)
}
}

//按照departmentId以及classId分组后求语文(language)的平均成绩
class MyUDAF extends UserDefinedAggregateFunction{

// 输入数据类型
override def inputSchema: StructType = {
new StructType().add("subject",IntegerType)
}

// buffer数据类型:
override def bufferSchema: StructType = {
//分别统计每个分组下的总成绩以及总人数
new StructType()
.add("total_grade",IntegerType)
.add("count_people",IntegerType)
}

//返回值类型
override def dataType: DataType = DoubleType

//如果相同输入有相同的返回结果
override def deterministic: Boolean = true

//buffer数据初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0)=0
buffer(1)=0
}

//更新buffer值
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0)=buffer.getAs[Int](0)+input.getAs[Int](0)
buffer(1)=buffer.getAs[Int](1)+1
}

//合并buffer值
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
buffer1(1) = buffer1.getAs[Int](1) + buffer2.getAs[Int](1)
}

//计算最终结果
override def evaluate(buffer: Row): Any = {
buffer.getAs[Int](0).toDouble/buffer.getAs[Int](1)
}
}

补充:

UDAF继承UserDefinedAggregateFunction需要实现的重要的的三个函数initialize,update以及merge,实现方式如下:

OverFunction

基于HIVE的开窗函数

object OverFunctionOnHive {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("OverFunctionOnHive")
.master("local")
.enableHiveSupport()
.getOrCreate()

spark.sparkContext.setLogLevel("Error")

spark.sql("use spark")
spark.sql("drop table if exists sales")
spark.sql("create table if not exists sales (date string, category string, price Int)" + "row format delimited fields terminated by '\t'")
spark.sql("load data local inpath './data/sales' into table sales" )

/**
* rank 在每个组内从1开始
* 5 A 200 -- 1
* 3 A 100 -- 2
* 4 A 80 -- 3
* 7 A 60 -- 4
*
* 1 B 100 -- 1
* 8 B 90 -- 2
* 6 B 80 -- 3
* 1 B 70 -- 4
*/
val result: DataFrame = spark.sql(
"select "
+ "date, category, price "
+ "from ("
+ "select "
+ "date, category, price, row_number() over (partition by category order by price desc) rank "
+ "from sales) t "
+ "where rank <= 3"
)

//将结果保存到hive
result.write.mode(SaveMode.Append).saveAsTable("saleResult")

//读取保存到hive中的sales
val salesDF: DataFrame = spark.sql("select * from saleResult")
salesDF.show(100)
}
}

基于MySQL的开窗函数

object OverFunctionOnMySQL {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("OverFunctionOnMySQL")
.master("local")
.getOrCreate()

val properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "11111111")

val sql: String = "select * from (select date, category, price, row_number() over (partition by category order by price desc) 'rank' from sales) t where t.rank <= 3"

val person: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/spark", s"($sql)T", properties)
person.show(100, truncate = false)
}
}