0%

SparkCore学习笔记(上)

Spark是基于内存的计算框架,它主要包括SparkCore,SparkSQL,SparkML,SparkStreaming等内容,本文从与MR的区别,运行模式,RDD,Spark Application以及算子来介绍SparkCore

Spark与MR的区别

  1. Spark基于内存迭代计算, MR是基于磁盘迭代计算
  2. Spark中有DAG(有向无环图)
  3. MR中只有mapreduce算子(相当于Spark中的mapreduceByKey),而Spark中有很多应用不同场景的算子

Spark运行模式

  1. Local:用于本地测试,在IDEA中经常使用Local模式
  2. Standalone:Spark自带的资源调度框架,支持分布式搭建
  3. Yarn:Hadoop生态圈中的资源调度框架,Spark可以基于Yarn运行
  4. Mesos:资源调度框架

RDD

RDD(resilient distributed dataset也叫弹性分布式数据集

创建

// 方式1: 外部存储HDFS创建RDD
val sc = new SparkContext(conf)
val rdd1 = sc.textFile(path)

// 方式2: scala集合创建RDD
val rdd2 = sc.parallelize(List[String](
"love1", "love2", "love3"))

val rdd3 = sc.makeRDD(List[String](
"love1", "love2", "love3"))

// 方式3: 旧RDD转换成新的RDD
val rdd4 = rdd3.map((_, 1))

RDD特性

  1. RDD是由partiton构成
  2. 算子(函数)作用在partition
  3. RDD之间有依赖关系
  4. 分区器作用在(K,V)格式的RDD上
  5. partition对外提供最佳计算位置,利于数据处理的本地化
  6. RDD内不存数据,partition也不存数据,partition中存的是逻辑(算子)

注意事项

  • 用textFile读取HDFS文件的方法的底层原理是什么?
    该方法的底层是通过调用MR来读取HDFS文件,即要进行split,每个split对应一个block,也对应一个partition

  • 什么是(K,V)格式的RDD?
    RDD中的数据是一个个的二元组

  • 哪里体现了RDD的弹性(容错性)
    (1) RDD的分区可多可少
    (2) RDD之间有依赖关系

  • 哪里体现了RDD的分布式
    RDD的partition是分布在多个节点(机器)上的

Spark Application

Spark Application是通过Scala语言来实现的,其中版本的对应关系为:Spark1.6==>Scala2.10,Spark2.0+6==>Scala2.11

/**
* 1. Spark Application(应用程序)就是一段完整的spark代码
* 2. 下述容是WordCount的完整代码流程
*/

// Step1: 配置信息
val conf: SparkConf = new SparkConf()
.setAppName("wordcount") // Spark应用程序的名称
.setMaster("local") // Spark的运行模式
.set("spark.driver.memory", "100") // 其他配置

// Step2: SparkContext是通往Spark集群的唯一通道
// 将之前的配置信息传入
val sc = new SparkContext(conf)

// Step3: 读入数据并加载成RDD
val lines: RDD[String] = sc.textFile("./data/words")

// Step4: RDD的转换操作
val words: RDD[String] = lines.flatMap(line => {
line.split(" ")
})
val pairWords: RDD[(String, Int)] = words.map(word => {
(word, 1)
})
val result: RDD[(String, Int)] = pairWords.reduceByKey((v1, v2) => {
v1 + v2
})
val sortResult: RDD[(String, Int)] = result.sortBy(tuple => {
tuple._2
}, ascending = false)

// Step5: RDD的执行操作
sortResult.foreach(line => {
println(line)
})

// Step6: 关闭SparkContext
sc.stop()

/**
*为了减少代码量,我们可以将Step2至Step5合并,下面是WordCount代码流程的简洁版
*/

// Step1
val conf: SparkConf = new SparkConf()
.setAppName("wordcount")
.setMaster("local")
.set("spark.driver.memory", "100")

// Step2 - Step5
sc.textFile("./data/words")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_+_)
.sortBy(_._2)
.foreach(println)

// Step6
sc.stop()

算子

Transformations

Transformations(转换)算子的特点是懒执行,需要Actions(执行)算子触发执行

Single RDD

filter

//filter过滤数据,返回true的数据会被留下
val infos: RDD[Int] = sc.makeRDD(List[Int](11,22,32,46,83))
val result: RDD[Int] = infos.filter(one=>{
one>50
})

sample

/**
* 1. 参数(有无放回抽样,抽样的比例,种子)
* 2. 有种子和无种子的区别
* 1) 如果有种子,那么针对同一数据源并且指定相同的参数,那么每次抽样到的数据都是一样的
* 2) 如果没有种子, 那么针对同一数据源,每次抽样都是随机抽样
*/
val lines: RDD[Int] = sc.makeRDD(List[Int](1,2,4,12,3,1))
val result: RDD[Int] = lines.sample(withReplacement = true,1,100)

map&flatMap

val infos: RDD[String] = sc.parallelize(Array[String]("hello scala","hello spark","hello java"))

// map处理数据是一对一的关系, 即进入一条数据处理,出来的还是一条数据
val result: RDD[Array[String]] = infos.map(_.split(" "))

// flatmap处理数据是一对多的关系, 即进入一条数据处理,出来的是多条数据
val result: RDD[String] = infos.flatMap(_.split(" "))

mapPartitions&mapPartitionsWithIndex

// mapPartitions是一个个分区的遍历, 较于map中一条条数据的遍历,性能比较高
val infos: RDD[String] = sc.parallelize(List[String]("a","b","c","d","e","f","g"),4)
val result1: RDD[String] = infos.mapPartitions(iter=>{
println("创建数据库连接... ... ")
val array: ArrayBuffer[String] = ArrayBuffer[String]()

while(iter.hasNext){
val s: String = iter.next()
println("拼接sql... ... "+s)
array.append(s)
}
println("关闭数据库连接... ... ")
array.iterator
})

// mapPartitionsWithIndexs遍历的是每个分区中的数据, 并且给每个分区一个index, 其中preservesPartitioning表示是否保留父RDD的partitioner分区信息
val result2: RDD[String] = infos.mapPartitionsWithIndex((index, iter)=>{
val arr: ArrayBuffer[String] = ArrayBuffer[String]()
iter.foreach(one=>{
arr.append(s"current partition number = 【$index】, value = $one")
})
arr.iterator
}, preservesPartitioning = true)

mapValues & flatMapValues

/**
* mapValues
* 1. 作用在(K,V)格式的RDD上
* 2. 只对Value做操作,Key保持不变
*/
val infos: RDD[(String, String)] = sc.makeRDD(List[(String, String)](("zhangsna", "18"), ("lisi", "20"), ("wangwu", "30")))
val transInfo: RDD[(String, String)] = infos.mapValues(s => {
s + " " + "loveme"
})

/**
* flatMapValues
* 1. 作用在在(K,V)格式的RDD上
* 2. 对一个Key的一个Value返回多个Value
*/
val result: RDD[(String, String)] = transInfo.flatMapValues(s=>{
s.split(" ")
})

sortBy

/**
* 1. sortBy的第一个参数指定按照什么规则去排序,第二个参数true/false指定升序或者降序
* 2. sortByKey按照key排序
* 3. sortBy不用非要作用在(K,V)格式的RDD上, sortByKey一定作用在(K,V)格式的RDD上
*/
val rdd: RDD[Int] = sc.parallelize(Array[Int](400,200,500,100,300))
val infos: RDD[(String, Int)] = sc.parallelize(Array[(String,Int)](("f",1),("a",3),("c",2),("b",1)))

// sortBy针对RDD中的元素取反排序
val result: RDD[Int] = rdd.sortBy(one=>{-one},ascending = true)
result.foreach(println)

// sortBy针对(K,V)格式RDD中的V去排序
val result1: RDD[(String, Int)] = infos.sortBy(tp=>{
tp._2
},ascending = false)
result1.foreach(println)

// sortByKey针对(K,V)格式RDD中的K去排序
val result2: RDD[(String, Int)] = infos.sortByKey(ascending = false)
result2.foreach(println)

groupBy

/**
* 1. groupBy按照指定的规则,将数据分组,不必是(K,V)格式的RDD
* 2. groupByKey将相同的key对应的value合并在一起
*/
val info: RDD[String] = sc.parallelize(List[String](
"love1", "love2", "love3", "love4",
"love5", "love6", "love7", "love8",
"love9", "love10", "love11", "love12"),3)
val rdd: RDD[(String, Double)] = sc.parallelize(List[(String,Double)](("zhangsan",66.5),("lisi",33.2),
("zhangsan",66.7),("lisi",33.4),("zhangsan",66.8),("wangwu",29.8)))

// groupBy可以针对非(K,V)格式的RDD
val result: RDD[(String, Iterable[String])] = info.groupBy(one=>{one.split("")(4)})
result.foreach(println)

// groupBy也可以针对(K,V)格式的RDD
val resultBy: RDD[(Boolean, Iterable[(String, Double)])] = rdd.groupBy(one => {
one._2 > 34
})
resultBy.foreach(println)

// groupByKey针对的是(K,V)格式的RDD
val resultByKey: RDD[(String, Iterable[Double])] = rdd.groupByKey()
resultByKey.foreach(info=>{
val name: String = info._1
val iterable: Iterable[Double] = info._2
val list: List[Double] = info._2.toList
println("name = " + name + ",iterable = " + iterable + ",list = " + list)
})

combineByKey & aggregateByKey

/**
* 1. 作用在(K,V)格式的RDD上
* 2. 执行流程
* 1). 首先给RDD中每个分区中的每个key一个初始值
* 2). 其次在RDD每个分区内部 相同的key聚合一次
* 3). 最后在RDD不同的分区之间将相同的key结果聚合一次
*/
val infos: RDD[(String, Int)] = sc.makeRDD(List[(String, Int)](
("zhangsan", 10), ("zhangsan", 20), ("wangwu", 30),
("lisi", 40), ("zhangsan", 50), ("lisi", 60),
("wangwu", 70), ("wangwu", 80), ("lisi", 90)
), 3)

val result1: RDD[(String, String)] = infos.combineByKey(
v=>{v + "hello"},
(in1:String, in2)=>{in1 + "@" + in2},
(out1:String, out2:String)=>{out1 + "#" + out2})

/**
* 1. 作用在(K,V)格式的RDD上
* 2. 执行流程
* 1). 首先给RDD中每个分区中的每个key一个初始值
* 2). 其次在RDD每个分区内部 相同的key聚合一次
* 3). 最后在RDD不同的分区之间将相同的key结果聚合一次
* 3. 和combineByKey是一样的,唯一区别在于aggregateByKey在第一个初始化的逻辑上使用了柯里化的形式
*/

val result2: RDD[(String, String)] = infos.aggregateByKey("hello")(
(s, v)=>{s+"~"+v},
(s1, s2)=>{s1+"#"+s2})

reduceByKey & distinct

/**
* 1. 作用在(K,V)格式的RDD上
* 2. 首先根据key去分组,然后将每个组内的value聚合
*/
val infos: RDD[(String, Int)] = sc.parallelize(List[(String,Int)](("zhangsan",1),("zhangsan",2),("zhangsan",3), ("lisi",100),("lisi",200)))
val result: RDD[(String, Int)] = infos.reduceByKey((v1, v2)=>{v1+v2})

//distinct有shuffle产生,内部是根据map+reduceByKey+map实现
val infos: RDD[String] = sc.parallelize(List[String]("a","a","b","b","c","c","d"),4)
val result: RDD[String] = infos.distinct()

coalesce & repartition

/**
* 1. coalesce 增加或者减少分区,默认shuffle = false, 如果从少的分区增到多的分区,必须指定shuffle = true, 否则不起作用
* 2. repartition可以将RDD的分区增多或者减少,会产生shuffle(宽依赖算子)
* 3. coalesce(num,true) = repartition(num), 会产生shuffle(宽依赖算子)
*/
val rdd: RDD[String] = sc.parallelize(List[String](
"love1", "love2", "love3", "love4",
"love5", "love6", "love7", "love8",
"love9", "love10", "love11", "love12"),3)

val infos :RDD[String] = rdd.mapPartitionsWithIndex((index,iter)=>{
val list: ListBuffer[String] = ListBuffer[String]()
iter.foreach(one=>{
list.append(s"rdd partition = 【$index】,value = 【$one】")
})
list.iterator
},preservesPartitioning = true)

val resultRDD: RDD[String] = infos.coalesce(4, shuffle = true)
resultRDD.mapPartitionsWithIndex((index,iter)=>{
val arr: ArrayBuffer[String] = ArrayBuffer[String]()
iter.foreach(one=>{
arr.append(s"resultRDD partition = 【$index】,value = 【$one】")
})
arr.iterator
}).foreach(println)

val result2RDD: RDD[String] = infos.repartition(3)
result2RDD.mapPartitionsWithIndex((index,iter)=>{
val arr: ArrayBuffer[String] = ArrayBuffer[String]()
iter.foreach(one=>{
arr.append(s"result2RDD partition = 【$index】,value = 【$one】")
})
arr.iterator
}).foreach(println)

Multiple RDD

zip

/**
* 1. zip将两个RDD合成一个(K,V)格式的RDD
* 2. zip要求分区数相同,每个分区中的元素必须相同
* 3. zipWithIndex将RDD和数据下标压缩成一个K,V格式的RDD
*/
val rdd1: RDD[String] = sc.parallelize(List[String]("a","b","c"),2)
val rdd2: RDD[Int] = sc.parallelize(List[Int](1,2,3),numSlices = 2)
val result1: RDD[(String, Int)] = rdd1.zip(rdd2)
val result2: RDD[(String, Long)] = rdd1.zipWithIndex()

cogroup

/**
* 1. cogroup将将两个RDD中相同的key结合起来,对应的value是两个sequnce构成的tuple
* 2. interator对象可以通过toList转成List
* 3. cogroup的结果分区个数为两个RDD中的分区个数的最大值
* 4. rdd.getNumPartitions可以获取当前rdd的分区个数
*/
val rdd1: RDD[(String, String)] = sc.parallelize(List[(String,String)](("zhangsan","female"),("zhangsan","female1"),("lisi","male"),("wangwu","female"),("maliu","male")),5)
val rdd2: RDD[(String, Int)] = sc.parallelize(List[(String,Int)](("zhangsan",28),("lisi",39),("lisi",190),("wangwu",10),("tianqi",51)),4)

val resultRDD: RDD[(String, (Iterable[String], Iterable[Int]))] = rdd1.cogroup(rdd2)

join

/**
* 1. 产生shuffle
* 2. 针对(K,V)格式的RDD和(K,W)格式的RDD
* 1) join按照相同的key进行join得到(K,(V,W))格式的数据,
* 2) leftOuterJoin是以左边的RDD出现的key为主,得到(K,(V,Option(W)))
* 3) rightOuterJoin是以右边的RDD出现的key为主,得到(K,(V,Option(W)))
* 4) fullOuterJoin是以两边的RDD出现的key为主,得到(K,(Option(V),Option(W)))
* 3. 无论何种方式join,结果分区的数量都是二者最大的那个
*/
val nameRDD: RDD[(String, String)] = sc.parallelize(List[(String,String)](("zhangsan","female"),("lisi","male"),("wangwu","female"),("maliu","male")), numSlices = 3)
val scoreRDD: RDD[(String, Int)] = sc.parallelize(List[(String,Int)](("zhangsan",18),("lisi",19),("wangwu",20), ("tianqi",21)), numSlices = 4)
val joinRDD: RDD[(String, (String, Int))] = nameRDD.join(scoreRDD)
val leftJoinRDD: RDD[(String, (String, Option[Int]))] = nameRDD.leftOuterJoin(scoreRDD)
val rightJoinRDD: RDD[(String, (Option[String], Int))] = nameRDD.rightOuterJoin(scoreRDD)
val fullJoinRDD: RDD[(String, (Option[String], Option[Int]))] = nameRDD.fullOuterJoin(scoreRDD)

setOperation

/**
* 1. union取两个RDD的并集,两个RDD的类型要一致,不必是(K,V)格式的RDD, 分区数为二者之和
* 2. subtract取两个RDD的差集, 两个RDD的类型要一致,不必是(K,V)格式的RDD,分区数为前面RDD的分区数
* 3. intersection取两个RDD的交集,两个RDD的类型要一致,不必是(K,V)格式的RDD,分区数为两个RDD中分区数量大的
*/
val rdd1: RDD[String] = sc.parallelize(List[String]("zhangsan","lisi","wangwu","maliu"),5)
val rdd2: RDD[String] = sc.parallelize(List[String]("a","b","c","d"),4)
val unionRDD: RDD[String] = rdd1.union(rdd2)
val subtractRDD: RDD[String] = rdd1.subtract(rdd2)
val intersectionRDD: RDD[String] = rdd1.intersection(rdd2)

Actions

Actions(执行)算子触发Transformations算子执行,spark application中有一个Action算子就有一个job

collect

/**
* 1. collect: 会将结果回收到Driver端,如果结果比较大,就不要回收,这样的话会造成Driver端的OOM
* 2. collectAsMap: 会将(K,V)格式的RDD回收到Driver端作为Map使用
*/
val infos: RDD[(String, Double)] = sc.parallelize(List[(String,Double)](("zhangsan",32),("lisi",24),("wangwu",51)))

val resultArr: Array[(String, Double)] = infos.collect()
val resultMap: collection.Map[String, Double] = infos.collectAsMap()

count

/**
* 1. count统计RDD共有多少行数据
* 2. countByKey统计相同的key出现的个数
* 3. countByValue统计RDD中相同key下相同的value出现的次数,不必须是(K,V)格式的RDD
*/
val infos: RDD[(String, Integer)] = sc.makeRDD(List[(String,Integer)](("a",1),("a",11),("a",111),("b",6),("b",66),("c",3),("c",3)))
val result1: Long = infos.count()
val result2: collection.Map[String, Long] = infos.countByKey()
val result3: collection.Map[(String, Integer), Long] = infos.countByValue()

first

// 取出第一个元素
val infos: RDD[(String, Double)] = sc.parallelize(List[(String,Double)](("zhangsan",32),("lisi",24),("wangwu",51)))
val str: (String, Double) = infos.first

take

/**
* 1. take获取正数指定个元素
* 2. 输入指定个数,返回数组
*/
val infos: RDD[Int] = sc.makeRDD(Array[Int](1,2,3,4,5))
val result: Array[Int] = infos.take(3)

takeSample

/**
* 1. 随机抽样将数据结果拿回Driver端使用,返回Array。
* 2. 参数说明
* 1) withReplacement:有无放回抽样
* 2) num:抽样的条数
* 3) seed:种子
*/
val infos: RDD[Int] = sc.makeRDD(Array[Int](1,2,3,4,5))
val result: Array[Int] = infos.takeSample(withReplacement = false, 300, 100)

reduce

/**
* 1. 使用指定函数聚合数据中的元素,通过使用可交换且可结合的函数保证并行计算可以得到正确的结果
* 2. 接收两个参数返回一个参数
*/
val rdd: RDD[Int] = sc.makeRDD(Array[Int](1,2,3,4,5))
val result: Int = rdd.reduce((v1, v2) => {
v1 + v2
})
println(result)

foreach&foreachPartition

/**
* 1. foreach遍历RDD中的每个元素
* 2. mapPartitions相比于foreach的好处是几个分区就创建几个对象,减少创建链接的次数,是一个高性能算子
* 3. mapPartitions相比于mapPartitions的应用场景是,插入数据库之后不再有其他操作就用foreachPartitions, 如果还有其他操作就用mapPartitions
*/
val infos: RDD[String] = sc.parallelize(List[String]("a","b","c","d","e","f","g"),4)
infos.foreach(println)
infos.foreachPartition(iter => {
println("创建数据库")
while (iter.hasNext) {
val elem: String = iter.next()
println("插入数据库..." + elem)
}
println("关闭数据库")
})

Persistence

Cache&Persist

/**
* 1. cache算子默认将数据存在内存中
* 2. persist可以手动指定持久化级别, 常用的持久化级别(存储类型)
* 1) MEMORY_ONLY
* 2) MEMORY_ONLY_SER
* 3) MEMORY_AND_DISK(先存到内存,内存满了在溢写磁盘)
* 4) MEMORY_AND_DISK_SER
* 3. cache和persist的联系
* 1) 持久化单位都是partition
* 2) 都是懒执行算子, 需要action算子触发执行
* 3) 对RDD进行cache或者persist之后可以赋值给一个变量, 下次使用这个变量就是使用持久化的数据,如果采用赋给变量之的方法,持久化算子
* 后面不能紧跟action算子
* 4) 可以直接对RDD进行cache或者persist而不赋值给一个变量
* 5) 当前application执行完成之后会自动清除
* 6) cache() = persist(StorageLevel.MEMORY_ONLY)
* 4. 存储级别的后缀"_2" 表示有副本
* 5. 尽量少使用DISK_ONLY的持久化级别
*/
val infos: RDD[Int] = sc.makeRDD(Array[Int](1,2,3,4,5))

infos.cache()
val infos1: infos.type = infos.persist(StorageLevel.MEMORY_ONLY)

// action触发了cache或者persist算子
val startTime1: Long = System.currentTimeMillis()
val count1: Long = infos.count()
val endTime1: Long = System.currentTimeMillis()
println("count1 = " + count1 + ",time = " + (endTime1-startTime1) + "ms")

// cache持久化后操作速度加快
val startTime2: Long = System.currentTimeMillis()
val count2: Long = infos.count()
val endTime2: Long = System.currentTimeMillis()
println("count2 = " + count2 + ",time = " + (endTime2-startTime2)+"ms")

// persist持久化后操作速度加快
val startTime3: Long = System.currentTimeMillis()
val count3: Long = infos1.count()
val endTime3: Long = System.currentTimeMillis()
println("count3 = " + count3 + ",time = " + (endTime3-startTime3)+"ms")

Checkpoint

/**
* 1. 使用场景
* 当Lineage(RDD之间的依赖关系构成的链) 中的计算逻辑非常复杂,可以尝试使用**checkpoint**将数据直接持久化到指定目录
* 2. 执行流程
* 1) 当Spark job完成之后,Spark会从后向前回溯,找到带有checkpoint的RDD做标记
* 2) 回溯完成后,Spark重新启动一个job,计算带有标记的RDD数据并放入指定的checkpoint目录中
* 3) 计算完成并放入目录后,会切断RDD的依赖关系,当Spark Application执行完成之后,指定目录中的**数据不会被清除
* 3. 执行流程优化
* 对RDD进行checkpoint之前先对RDD进行cache,这样执行流程的第3步就不用重新从头计算当前带有标记的RDD数据
* 4. checkpoint一定要指定存储目录
* 5. 与Cache,Persist的联系
* Cache,Persist以及Checkpoint都是懒执行,最小持久化单位是partition
* 6. 与Cache,Persist的区别
* 当Spark App执行完成之后会自动清除Cache,Persist的数据,但是checkpoint持久化到目录中的数据不会被清除
*/
val conf: SparkConf =new SparkConf().setAppName("Checkpoint").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("Error")

sc.setCheckpointDir("./data/checkpoint")
val infos: RDD[Int] = sc.makeRDD(Array[Int](1, 2, 3, 4, 5)).cache()
infos.checkpoint()
infos.count()
println(s"checkpoint的路径为${infos.getCheckpointFile}")