0%

SparkStreaming学习笔记

Spark是基于内存的计算框架,它主要包括SparkCore,SparkSQL,SparkML,SparkStreaming等方面的内容,本文从与Strom的区别与联系,基础,算子与这几个方面来介绍SparkStreaming

SparkStreaming简介

SparkStreaming是流式处理框架,7天24小时不间断运行,它支持实时数据流处理,其中实时数据来源可以是Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,它可以使用复杂算子来处理流数据,比如map,reduce,join,window ,最后它还可以将处理后的数据存放在文件系统,数据库以及实时的仪表盘

与Strom的区别与联系

  1. Storm是纯实时处理数据,SparkStreaming是准实时/微批处理数据,可以通过控制间隔时间做到实时处理,SparkStreaming的吞吐量大
  2. Storm只能处理简单的汇总型业务,比如累加/累减,而SparkStreaming擅长处理复杂的业务,所以Storm相对于SparkStreaming来讲是轻量级的。SparkStreaming中可以使用Core,SQL或者ML
  3. Storm的事务机制要比SparkStreaming的要更完善
  4. Strom和SparkStreaming都支持动态的资源调度

补充

  • 事务是应用程序中一系列逻辑相关的操作,所有操作必须成功完成,否则在每个操作中所作的所有更改都会被撤消

  • 事务的四大特性ACID:原子性(要么都做,要么都不做),一致性(数据库只包含成功的事务提交),隔离性(事务之间不能互相干扰),持久性(也叫永久性,指事务一旦提交对数据库中的数据是永久的改变)

SparkStreaming基础

SparkStreaming的底层数据结构是DStreamSparkStreaming启动之后,首先会启动一个job,这个job有一个task来接收数据,task每隔一段时间(batchInterval)会将接收来的数据封装到batch中,每个生成的batch又被封装到一个RDD中,这个RDD又被封装到DStream中,DStream有自己的Transformation类算子(懒执行),需要DStreamoutputOperator(action)类算子触发执行。假设batchInterval=5s,如果集群处理一批次的时间是4s,那么0~5s一直接收数据,5~9s一边接收数据一边处理数据,9~10s只是接收数据,每一批次都会造成集群“休息”1s,即集群资源没有充分利用

如果集群处理一批次的时间是9s,那么0~5s一直接收第一批数据,5~10s一边接收第二批数据一边处理第一批数据,10~14s一边接收第三批数据一边处理第一批数据,14~15s一边接收第三批数据一边处理第二批数据,这样数据会越堆越多,如果接收来的数据存入内存会造成oom的问题,如果内存不足放入磁盘,也会加大数据处理的延迟度。所以最理想的状态就是batchInterval=5s,集群处理一批次的时间也是5s

/**
* 1. 启动socket server 服务器:nc –lk 9999
* 2. local的模拟线程必须大于等于2,一个线程用来receiver用来接受数据,另一个线程用来执行job(当监控目录变化情况的时候,就只需要一个线程就可以了)
* 3. Durations时间设置就是batch_interval,通过WebUI来调节batch_interval
* 4. 创建JavaStreamingContext有两种方式(SparkConf,SparkContext)
* 5. 所有的代码逻辑完成后要有一个output operation类算子。
* 6. JavaStreamingContext.start() Streaming框架启动后不能再次添加业务逻辑。
* 7. JavaStreamingContext.stop() 无参的stop方法将SparkContext一同关闭,stop(false),不会关闭SparkContext
* 8. StreamingContext关闭之后,不能重新继续使用start方法,因为那个时候StreamingContext已经被回收了
*/
object StreamingApp {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("StreamingTest")
.setMaster("local[2]") // 设置为本地运行模式,2个线程,一个监听,另一个处理数据

// 在streamingContext里面会根据conf创建一个sparkContext对象
// val sc = new SparkContext(conf)
// Durations.seconds表示batchinterval,也就是微批接收数据的时间间隔
val ssc = new StreamingContext(conf, Durations.seconds(5))
ssc.sparkContext.setLogLevel("Error")

//streaming只能监听到目录下在程序启动后新增的文件,不会去处理历史上已经存在的文件
val lines: DStream[String] = ssc.textFileStream("file:///Users/huangning/IdeaProjects/Spark/data")
val words: DStream[String] = lines.flatMap(_.split(" "))
val pairwords: DStream[(String, Int)] = words.map((_, 1))
val result: DStream[(String, Int)] = pairwords.reduceByKey(_+_)

// outputOperator算子print表示打印行数
result.print(100)

// foreachRDD存在的问题就是内部的rdd需要额外的action算子触发
result.foreachRDD(pairRDD =>{
println("======Driver端 动态广播黑名单============")

val newRDD: RDD[(String, Int)] = pairRDD.filter(one => {
println("executor in filter ====" + one)
true
})
val result: RDD[Unit] = newRDD.map(one => {
println("executor in map *****" + one)
})
result.count()
})

//一旦输入ssc.start()以后,程序就开始自动进入循环监听状态,屏幕上会显示一堆的信息
ssc.start()
ssc.awaitTermination()
ssc.stop(false)
}
}

算子

Transfomations

updateStateByKey

/**
* 1. 根据key更新状态,所以需要设置checkpoint来保存状态
* 2. 默认key的状态在内存中有一份,在checkpoint目录中有一份
* 3. 多久会将内存中的数据(每个key所对应的状态)写入到磁盘上一份呢?
* 1)如果batchinterval小于10s,那么10s会将内存上的数据写入到磁盘上一份
* 2)如果大于10s,那么就一batchinterval为准
* 这样做是为了防止频繁的写hdfs
*/
ssc.checkpoint("file:///Users/huangning/IdeaProjects/Spark/data/checkpoint")
// currentValues: 当前批次某个key对应的所有value组成的集合, preValues:以往批次当前key对应的总状态值
val result: DStream[(String, Int)] = pariwords.updateStateByKey((currentValues: Seq[Int], preValue: Option[Int]) => {
var totalValue = 0
if (preValue.isDefined) {
totalValue += preValue.get
}
for (value <- currentValues) {
totalValue += value
}
Option(totalValue)
})

reduceByKeyAndWindow

/**
* 窗口操作的普通机制:
* 参数:计算逻辑 + 窗口长度(window length) + 窗口滑动间隔(window sliding interval)
*/
val result: DStream[(String, Int)] = pairwords.reduceByKeyAndWindow((v1: Int, v2: Int)=>{v1+v2}, Durations.seconds(15), Durations.seconds(10))
result.print()

/**
* 窗口的优化机制: 状态保留,即不需要每次都计算窗口长度内的结果,只需要计算滑动窗口的时候值减少了多少和增加了多少
*/
ssc.checkpoint("file:///Users/huangning/IdeaProjects/Spark/data/checkpoint")
val result2: DStream[(String, Int)] = pairwords.reduceByKeyAndWindow((v1: Int, v2: Int) => {
v1 + v2
},
(v1: Int, v2: Int) => {
v1 - v2
},
Durations.seconds(15),
Durations.seconds(5))
result2.print()

/**
* 通过自己的逻辑去使用窗口函数
*/
val result3: DStream[(String, Int)] = pairwords.window(Durations.seconds(15), Durations.seconds(5))
result3.print()

transform

/**
* 1. transform中拿到的RDD的算子之外,代码是在driver端执行的,可以做到动态的改变广播变量
* 2. transform可以拿到DStream中的RDD,对RDD使用RDD的算子操作, 但是要返回RDD,并且最后返回的RDD又会自动被封装到DStream
*/
val resultDStream: DStream[String] = pairLines.transform((pairRDD: RDD[(String, String)]) => {
println("++++++++++ Driver code +++++++++++++")
val filterRDD: RDD[(String, String)] = pairRDD.filter(tuple => {
val nameList: List[String] = blackList.value
!nameList.contains(tuple._1)
})

val returnRDD: RDD[String] = filterRDD.map(tp => tp._2)
returnRDD
})

Output Operations

saveAsTextFile

/**
* 1. saveAsTextFiles(prefix, [suffix]):将此DStream的内容另存为文本文件, 每批次数据产生的文件名称格式基于:prefix和suffix: "prefix-xxxxxx-suffix".
* 2. saveAsTextFile是调用saveAsHadoopFile实现的
* 3. spark中普通rdd可以直接只用saveAsTextFile(path)的方式,保存到本地,但是此时DStream的只有saveAsTextFiles()方法,没有传入路径的方法,其参数只有prefix, suffix.
* 4. DStream中的saveAsTextFiles方法中又调用了rdd中的saveAsTextFile方法,我们需要将path包含在prefix中
* 5. sparkStreaming 监控本地某个目录数据时,这个目录下已经存在的文件不会被监控到,可以监控到增加的文件, 增加的文件必须是原子性产生(一次性产生),已经存在的文件后面追加信息的文件,或者被删除的文件不能被监控到
*/
val lines: DStream[String] = ssc.textFileStream("./data/streamingCopyFile")
val words: DStream[String] = lines.flatMap(line => {
line.split(" ")
})
val pairWords: DStream[(String, Int)] = words.map((_, 1))
val result: DStream[(String, Int)] = pairWords.reduceByKey((v1: Int, v2: Int) => {
v1 + v2
})

// 保存的多级目录写入前缀之中
result.saveAsTextFiles("./data/streamingSavePath/prefex", "suffix")
// print(num)中的num表示打印行数
result.print(100)

// foreachRDD存在的问题就是内部的rdd需要额外的action算子触发
result.foreachRDD(pairRDD =>{
println("======Driver端 动态广播黑名单============")

val newRDD: RDD[(String, Int)] = pairRDD.filter(one => {
println("executor in filter ====" + one)
true
})
val result: RDD[Unit] = newRDD.map(one => {
println("executor in map *****" + one)
})
result.count()
})

补充

RDD中的许多Actions变成了sparkStreaming中的Transfomations,比如count,reduce,countByValue

Driver HA

/**
* 1. SparkStreaming Driver要一直启动,如果挂掉的话Application就挂掉
* 2. 在提交application的时候(--submit xxxx),添加 --supervise选项,如果Driver挂掉会自动重新启动
* 3. Driver HA 主要用在当停止SparkStreaming,再次启动的时候,SparkStreaming可以接着上次消费的数据继续消费
*/
object SparkStreamingDriverHA {
val checkpointDir = "./data/streamingCheckpoint"
def main(args: Array[String]): Unit = {
/**
* StreamingContext.getOrCreate(ckDir, CreateStreamingContext)
* 这个方法首先会从ckDir目录中获取StreamingContext, 因为StreamingContext是序列化存储在Checkpoint目录中,恢复时尝试反序列化这些内容,若能拿回StreamingContext,就不会执行CreateStreamingContext这个方法,否则会执行这个方法
*/
val ssc: StreamingContext = StreamingContext.getOrCreate(checkpointDir, CreateStreamingContext)
ssc.start()
ssc.awaitTermination()
ssc.stop()
}

def CreateStreamingContext(): StreamingContext = {
println("===========Create new StreamingContext=========")
val conf: SparkConf = new SparkConf().setAppName("DriverHA").setMaster("local")
val ssc = new StreamingContext(conf, Durations.seconds(5))
ssc.sparkContext.setLogLevel("Error")

/**
* 通过checkpoint来实现Driver的HA
* 默认checkpoint存储的内容分成下述四个方面:
* 1. 配置信息
* 2. DStream操作逻辑
* 3. job的执行速度
* 4. offset
*/
ssc.checkpoint(checkpointDir)
val result: DStream[(String, Int)] = ssc.textFileStream("./data/streamingCopyFile")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
result.print(2)

// 如果通过checkpoint路径来恢复streamingConctext,那么下述更改的逻辑并不能被执行,除非换checkpoint路径或者删除当前checkpoint路径
result.foreachRDD(pairRDD => {
val value: RDD[(String, Int)] = pairRDD.filter(one => {
println(" ************* filter ***********")
true
})
value.foreach(println)
})
ssc
}
}

Kafka

Kafka是高吞吐量(因为有零拷贝)的分布式消息系统

定义

producer

消息生产者,用来生产数据,producer自己决定往哪个partition写消息,写消息存在两种机制,可以是轮询的负载均衡,也可以是基于hash来写入partition(基于 key hash的时候,如果 key 为 null,则执行轮询)。partition内部是FIFO的,partition之间不一定是FIFO的,但是如果我们把topic设为一个partition,这样就是严格的FIFO

consumer

consumer是消息消费者,用来消费数据,每个consumer都有对应的group(比如下图中的groupA中有两个consumer),各个group独立消费互不影响。同一个topic中的数据对于同一组只能被消费一次,每个cousumer消费不同的partition。版本0.8之前由consumer自己维护消费到哪个offset,0.8之后由zookeeper管理

broker

broker是kafka集群的server,本质上就是一台kafka节点。每一个partition都属于一个broker,一个broker可以lead多个partition,比如:一个topic里面有6个parititon,有两个broker,那么每个broker就管理三个partition。broker负责消息的读写请求,存储消息,没有主从关系,依赖zookeeper

paritition

partition可以简单地想象为一个文件,当数据发过来的时候就往这个partition上面append,每个partition内部消息严格遵循FIFO强有序,其中的每个消息都有一个序号叫offset,它是构成topic的基本单元,partitiion有以下特点:

  1. 可以指定消息队列,在创建topic的时候创建,并可以指定副本数
  2. 消息不存储在内存,而是直接写入磁盘文件
  3. 其中的消息默认一周删除,而不是消费完就删除(这也是kafka与其他的消息系统不同之处,kafka不存在消费完的概念,之后过期的概念)

topic

kafka里面的消息是由topic来组织的,可以将其想象为一个队列,一个队列就是一个topic,然后每个topic又分成了多个parititon为了做并行的计算,每个

partition里面的消息是有序的,相当于是有序队列

zookeeper

管理broker,存储broker,partitioned信息

零拷贝

零拷贝指cpu不需要为数据在内存之间的拷贝消耗资源,通常指在网络上发送文件不需要现将文件内容拷贝到用户空间(user space),而是直接在内核空间(kernel space)中传输到网络的方式

  • 非零拷贝图示
  • 零拷贝图示

特点

  1. 是一个生产者消费者模型,有FIFO(在topic里面)
  2. 高性能,比如单节点支持上千个客户端,百MB/s吞吐
  3. 持久性,比如消息直接持久化在普通磁盘上且可以保证顺序写顺序读
  4. 分布式,可以构建数据副本,因为有数据副本同一份数据可以到不同的broker上面去, 如果磁盘坏掉的时候,数据不会丢失,比如构建3个副本, 就是在3个机器磁盘都坏掉的情况下数据才会丢
  5. 不丢数据,producer将数据存入磁盘
  6. 反复消费数据(数据存7天)
  7. kafka与spark streaming的结合中,后者扮演consumer的角色