全国送花网站,下沙网站制作,wordpress redirect.php,网站是别人做的 ftp账号吗1、 Spark Streaming概述
1.1 Spark Streaming是什么
Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多#xff0c;例如#xff1a;Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如#xff1a;map、…1、 Spark Streaming概述
1.1 Spark Streaming是什么
Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多例如Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如map、reduce、join、window等进行运算。而结果也能保存在很多地方如HDFS数据库等。
和Spark基于RDD的概念很相似Spark Streaming使用离散化流(discretized stream)作为抽象表示叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部每个时间区间收到的数据都作为 RDD 存在而DStream是由这些RDD所组成的序列(因此得名“离散化”)。
1.2 Spark Streaming特点
1.易用
2.容错
3.易整合到Spark体系
1.3 SparkStreaming架构 2、 Dstream入门
2.1 WordCount案例实操
1需求使用netcat工具向9999端口不断的发送数据通过SparkStreaming读取端口数据并统计不同单词出现的次数 2添加依赖
dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.11/artifactIdversion2.1.1/version
/dependency3编写代码
package com.wxnimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConfobject StreamWordCount {def main(args: Array[String]): Unit {//1.初始化Spark配置信息val sparkConf new SparkConf().setMaster(local[*]).setAppName(StreamWordCount)//2.初始化SparkStreamingContextval ssc new StreamingContext(sparkConf, Seconds(5))//3.通过监控端口创建DStream读进来的数据为一行行val lineStreams ssc.socketTextStream(hadoop102, 9999)//将每一行数据做切分形成一个个单词val wordStreams lineStreams.flatMap(_.split( ))//将单词映射成元组word,1val wordAndOneStreams wordStreams.map((_, 1))//将相同的单词次数做统计val wordAndCountStreams wordAndOneStreams.reduceByKey(__)//打印wordAndCountStreams.print()//启动SparkStreamingContextssc.start()ssc.awaitTermination()}
}4启动程序并通过NetCat发送数据 [wxnhadoop102 spark]$ nc -lk 9999 hello wxn 注意如果程序运行时log日志太多可以将spark conf目录下的log4j文件里面的日志级别改成WARN。
2.2 WordCount解析
Discretized Stream是Spark Streaming的基础抽象代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据如下图 对数据的操作也是按照RDD为单位来进行的 计算过程由Spark engine来完成
3、 Dstream创建
Spark Streaming原生支持一些不同的数据源。一些“核心”数据源已经被打包到Spark Streaming 的 Maven 工件中而其他的一些则可以通过 spark-streaming-kafka 等附加工件获取。每个接收器都以 Spark 执行器程序中一个长期运行的任务的形式运行因此会占据分配给应用的 CPU 核心。此外我们还需要有可用的 CPU 核心来处理数据。这意味着如果要运行多个接收器就必须至少有和接收器数目相同的核心数还要加上用来完成计算所需要的核心数。例如如果我们想要在流计算应用中运行 10 个接收器那么至少需要为应用分配 11 个 CPU 核心。所以如果在本地模式运行不要使用local[1]。
3.1文件数据源
3.1.1 用法及说明
文件数据流能够读取所有HDFS API兼容的文件系统文件通过fileStream方法进行读取Spark Streaming 将会监控 dataDirectory 目录并不断处理移动进来的文件记住目前不支持嵌套目录。 streamingContext.textFileStream(dataDirectory) 注意事项 1文件需要有相同的数据格式 2文件进入 dataDirectory的方式需要通过移动或者重命名来实现 3一旦文件移动进目录则不能再修改即便修改了也不会读取新数据
3.1.2 案例实操
1在HDFS上建好目录 [wxnhadoop102 spark]$ hadoop fs -mkdir /fileStream 2在/opt/module/data创建三个文件 [wxnhadoop102 data]$ touch a.tsv [wxnhadoop102 data]$ touch b.tsv [wxnhadoop102 data]$ touch c.tsv
添加如下数据 Hello wxn Hello spark 3编写代码
package com.wxnimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStreamobject FileStream {def main(args: Array[String]): Unit {//1.初始化Spark配置信息
Val sparkConf new SparkConf().setMaster(local[*])
.setAppName(StreamWordCount)//2.初始化SparkStreamingContextval ssc new StreamingContext(sparkConf, Seconds(5))//3.监控文件夹创建DStreamval dirStream ssc.textFileStream(hdfs://hadoop102:9000/fileStream)//4.将每一行数据做切分形成一个个单词val wordStreams dirStream.flatMap(_.split(\t))//5.将单词映射成元组word,1val wordAndOneStreams wordStreams.map((_, 1))//6.将相同的单词次数做统计val wordAndCountStreams] wordAndOneStreams.reduceByKey(_ _)//7.打印wordAndCountStreams.print()//8.启动SparkStreamingContextssc.start()ssc.awaitTermination()}
}4启动程序并向fileStream目录上传文件 [wxnhadoop102 data]$ hadoop fs -put ./a.tsv /fileStream [wxnhadoop102 data]$ hadoop fs -put ./b.tsv /fileStream [wxnhadoop102 data]$ hadoop fs -put ./c.tsv /fileStream 5获取计算结果
-------------------------------------------
Time: 1539073810000 ms
--------------------------------------------------------------------------------------
Time: 1539073815000 ms
-------------------------------------------
(Hello,4)
(spark,2)
(wxn,2)-------------------------------------------
Time: 1539073820000 ms
-------------------------------------------
(Hello,2)
(spark,1)
(wxn,1)-------------------------------------------
Time: 1539073825000 ms
-------------------------------------------3.2 RDD队列了解
3.2.1 用法及说明
测试过程中可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream每一个推送到这个队列中的RDD都会作为一个DStream处理。
3.2.2 案例实操
1需求循环创建几个RDD将RDD放入队列。通过SparkStream创建Dstream计算WordCount 2编写代码
package com.wxnimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject RDDStream {def main(args: Array[String]) {//1.初始化Spark配置信息val conf new SparkConf().setMaster(local[*]).setAppName(RDDStream)//2.初始化SparkStreamingContextval ssc new StreamingContext(conf, Seconds(4))//3.创建RDD队列val rddQueue new mutable.Queue[RDD[Int]]()//4.创建QueueInputDStreamval inputStream ssc.queueStream(rddQueue,oneAtATime false)//5.处理队列中的RDD数据val mappedStream inputStream.map((_,1))val reducedStream mappedStream.reduceByKey(_ _)//6.打印结果reducedStream.print()//7.启动任务ssc.start()//8.循环创建并向RDD队列中放入RDDfor (i - 1 to 5) {rddQueue ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}ssc.awaitTermination()}
}3结果展示
------------------------------------------
Time: 1539075280000 ms
-------------------------------------------
(4,60)
(0,60)
(6,60)
(8,60)
(2,60)
(1,60)
(3,60)
(7,60)
(9,60)
(5,60)-------------------------------------------
Time: 1539075284000 ms
-------------------------------------------
(4,60)
(0,60)
(6,60)
(8,60)
(2,60)
(1,60)
(3,60)
(7,60)
(9,60)
(5,60)-------------------------------------------
Time: 1539075288000 ms
-------------------------------------------
(4,30)
(0,30)
(6,30)
(8,30)
(2,30)
(1,30)
(3,30)
(7,30)
(9,30)
(5,30)-------------------------------------------
Time: 1539075292000 ms
-------------------------------------------3.3 自定义数据源
3.3.1 用法及说明
需要继承Receiver并实现onStart、onStop方法来自定义数据源采集。
3.3.2 案例实操
1需求自定义数据源实现监控某个端口号获取该端口号内容。 2代码实现
package com.wxnimport java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsetsimport org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiverclass CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {//最初启动的时候调用该方法作用为读数据并将数据发送给Sparkoverride def onStart(): Unit {new Thread(Socket Receiver) {override def run() {receive()}}.start()}//读数据并将数据发送给Sparkdef receive(): Unit {//创建一个Socketvar socket: Socket new Socket(host, port)//定义一个变量用来接收端口传过来的数据var input: String null//创建一个BufferedReader用于读取端口传来的数据val reader new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))//读取数据input reader.readLine()//当receiver没有关闭并且输入数据不为空则循环发送数据给Sparkwhile (!isStopped() input ! null) {store(input)input reader.readLine()}//跳出循环则关闭资源reader.close()socket.close()//重启任务restart(restart)}override def onStop(): Unit {}
}3使用自定义的数据源采集数据
package com.wxnimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStreamobject FileStream {def main(args: Array[String]): Unit {//1.初始化Spark配置信息
Val sparkConf new SparkConf().setMaster(local[*])
.setAppName(StreamWordCount)//2.初始化SparkStreamingContextval ssc new StreamingContext(sparkConf, Seconds(5))//3.创建自定义receiver的Streaming
val lineStream ssc.receiverStream(new CustomerReceiver(hadoop102, 9999))//4.将每一行数据做切分形成一个个单词val wordStreams lineStream.flatMap(_.split(\t))//5.将单词映射成元组word,1val wordAndOneStreams wordStreams.map((_, 1))//6.将相同的单词次数做统计val wordAndCountStreams] wordAndOneStreams.reduceByKey(_ _)//7.打印wordAndCountStreams.print()//8.启动SparkStreamingContextssc.start()ssc.awaitTermination()}
}3.4 Kafka数据源重点
3.4.1 用法及说明
在工程中需要引入 Maven 工件 spark- streaming-kafka_2.10 来使用它。包内提供的 KafkaUtils 对象可以在 StreamingContext 和 JavaStreamingContext 中以你的 Kafka 消息创建出 DStream。由于 KafkaUtils 可以订阅多个主题因此它创建出的 DStream 由成对的主题和消息组成。要创建出一个流数据需要使用 StreamingContext 实例、一个由逗号隔开的 ZooKeeper 主机列表字符串、消费者组的名字(唯一名字)以及一个从主题到针对这个主题的接收器线程数的映射表来调用 createStream() 方法。
3.4.2 案例实操
1需求1通过SparkStreaming从Kafka读取数据并将读取过来的数据做简单计算(WordCount)最终打印到控制台。 1导入依赖
dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-10_2.11/artifactIdversion2.1.1/version
/dependency
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion0.11.0.2/version
/dependency2编写代码
package com.wxnimport kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}object KafkaSparkStreaming {def main(args: Array[String]): Unit {//1.创建SparkConf并初始化SSCval sparkConf: SparkConf new SparkConf().setMaster(local[*]).setAppName(KafkaSparkStreaming)val ssc new StreamingContext(sparkConf, Seconds(5))//2.定义kafka参数val brokers hadoop102:9092,hadoop103:9092,hadoop104:9092val topic sourceval consumerGroup spark//3.将kafka参数映射为mapval kafkaParam: Map[String, String] Map[String, String](ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG - org.apache.kafka.common.serialization.StringDeserializer,ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG - org.apache.kafka.common.serialization.StringDeserializer,ConsumerConfig.GROUP_ID_CONFIG - consumerGroup,ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - brokers)//4.通过KafkaUtil创建kafkaDSteamval kafkaDSteam: ReceiverInputDStream[(String, String)] KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParam,Set(topic),StorageLevel.MEMORY_ONLY)//5.对kafkaDSteam做计算WordCountkafkaDSteam.foreachRDD {rdd {val word: RDD[String] rdd.flatMap(_._2.split( ))val wordAndOne: RDD[(String, Int)] word.map((_, 1))val wordAndCount: RDD[(String, Int)] wordAndOne.reduceByKey(_ _)wordAndCount.collect().foreach(println)}}//6.启动SparkStreamingssc.start()ssc.awaitTermination()}
}4、 DStream转换
DStream上的原语与RDD的类似分为Transformations转换和Output Operations输出两种此外转换操作中还有一些比较特殊的原语如updateStateByKey()、transform()以及各种Window相关的原语。
4.1 无状态转化操作
无状态转化操作就是把简单的RDD转化操作应用到每个批次上也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。注意针对键值对的DStream转化操作(比如 reduceByKey())要添加import StreamingContext._才能在Scala中使用。
需要记住的是尽管这些函数看起来像作用在整个流上一样但事实上每个DStream在内部是由许多RDD(批次)组成且无状态转化操作是分别应用到每个RDD上的。例如reduceByKey()会归约每个时间区间中的数据但不会归约不同区间之间的数据。 举个例子在之前的wordcount程序中我们只会统计5秒内接收到的数据的单词个数而不会累加。 无状态转化操作也能在多个DStream间整合数据不过也是在各个时间区间内。例如键 值对DStream拥有和RDD一样的与连接相关的转化操作也就是cogroup()、join()、leftOuterJoin() 等。我们可以在DStream上使用这些操作这样就对每个批次分别执行了对应的RDD操作。 我们还可以像在常规的Spark 中一样使用 DStream的union() 操作将它和另一个DStream 的内容合并起来也可以使用StreamingContext.union()来合并多个流。
4.2 有状态转化操作重点
4.2.1 UpdateStateByKey
UpdateStateByKey原语用于记录历史记录有时我们需要在 DStream 中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况updateStateByKey() 为我们提供了对一个状态变量的访问用于键值对形式的 DStream。给定一个由(键事件)对构成的 DStream并传递一个指定如何根据新的事件 更新每个键对应状态的函数它可以构建出一个新的 DStream其内部数据为(键状态) 对。 updateStateByKey() 的结果会是一个新的 DStream其内部的 RDD 序列是由每个时间区间对应的(键状态)对组成的。 updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能你需要做下面两步
定义状态状态可以是一个任意的数据类型。定义状态更新函数用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。 使用updateStateByKey需要对检查点目录进行配置会使用检查点来保存状态。 更新版的wordcount 1编写代码
package com.wxn.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object WorldCount {def main(args: Array[String]) {// 定义更新状态方法参数values为当前批次单词频度state为以往批次单词频度val updateFunc (values: Seq[Int], state: Option[Int]) {val currentCount values.foldLeft(0)(_ _)val previousCount state.getOrElse(0)Some(currentCount previousCount)}val conf new SparkConf().setMaster(local[2]).setAppName(NetworkWordCount)val ssc new StreamingContext(conf, Seconds(3))ssc.checkpoint(hdfs://hadoop102:9000/streamCheck)// Create a DStream that will connect to hostname:port, like hadoop102:9999val lines ssc.socketTextStream(hadoop102, 9999)// Split each line into wordsval words lines.flatMap(_.split( ))//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3// Count each word in each batchval pairs words.map(word (word, 1))// 使用updateStateByKey来更新状态统计从运行开始以来单词总的次数val stateDstream pairs.updateStateByKey[Int](updateFunc)stateDstream.print()//val wordCounts pairs.reduceByKey(_ _)// Print the first ten elements of each RDD generated in this DStream to the console//wordCounts.print()ssc.start() // Start the computationssc.awaitTermination() // Wait for the computation to terminate//ssc.stop()}}2启动程序并向9999端口发送数据 [wxnhadoop102 kafka]$ nc -lk 9999 ni shi shui ni hao ma 3结果展示
-------------------------------------------
Time: 1504685175000 ms
-------------------------------------------
-------------------------------------------
Time: 1504685181000 ms
-------------------------------------------
(shi,1)
(shui,1)
(ni,1)
-------------------------------------------
Time: 1504685187000 ms
-------------------------------------------
(shi,1)
(ma,1)
(hao,1)
(shui,1)
(ni,2)4.2.2 Window Operations
Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内通过整合多个批次的结果计算出整个窗口的结果。 注意所有基于窗口的操作都需要两个参数分别为窗口时长以及滑动步长两者都必须是 StreamContext 的批次间隔的整数倍。 窗口时长控制每次计算最近的多少个批次的数据其实就是最近的 windowDuration/batchInterval 个批次。如果有一个以 10 秒为批次间隔的源 DStream要创建一个最近 30 秒的时间窗口(即最近 3 个批次)就应当把 windowDuration 设为 30 秒。而滑动步长的默认值与批次间隔相等用来控制对新的 DStream 进行计算的间隔。如果源 DStream 批次间隔为 10 秒并且我们只希望每两个批次计算一次窗口结果 就应该把滑动步长设置为 20 秒。 假设你想拓展前例从而每隔十秒对持续30秒的数据生成word count。为做到这个我们需要在持续30秒数据的(word,1)对DStream上应用reduceByKey。使用操作reduceByKeyAndWindow. #reduce last 30 seconds of data, every 10 second windowedWordCounts pairs.reduceByKeyAndWindow(lambda x, y: x y, lambda x, y: x -y, 30, 20) 关于Window的操作有如下原语 1window(windowLength, slideInterval): 基于对源DStream窗化的批次进行计算返回一个新的Dstream 2countByWindow(windowLength, slideInterval)返回一个滑动窗口计数流中的元素。 3reduceByWindow(func, windowLength, slideInterval)通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流。 4reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])当在一个(K,V)对的DStream上调用此函数会返回一个新(K,V)对的DStream此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。Note:默认情况下这个操作使用Spark的默认数量并行任务(本地是2)在集群模式中依据配置属性(spark.default.parallelism)来做grouping。你可以通过设置可选参数numTasks来设置不同数量的tasks。 5reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])这个函数是上述函数的更高效版本每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的“加”“减”计数。通过前边介绍可以想到这个函数只适用于”可逆的reduce函数”也就是这些reduce函数有相应的”反reduce”函数(以参数invFunc形式传入)。如前述函数reduce任务的数量通过可选参数来配置。注意为了使用这个操作检查点必须可用。 6countByValueAndWindow(windowLength,slideInterval, [numTasks])对(K,V)对的DStream调用返回(K,Long)对的新DStream其中每个key的值是其在滑动窗口中频率。如上可配置reduce任务数量。 reduceByWindow() 和 reduceByKeyAndWindow() 让我们可以对每个窗口更高效地进行归约操作。它们接收一个归约函数在整个窗口上执行比如 。除此以外它们还有一种特殊形式通过只考虑新进入窗口的数据和离开窗口的数据让 Spark 增量计算归约结果。这种特殊形式需要提供归约函数的一个逆函数比 如 对应的逆函数为 -。对于较大的窗口提供逆函数可以大大提高执行效率 val ipDStream accessLogsDStream.map(logEntry (logEntry.getIpAddress(), 1))
val ipCountDStream ipDStream.reduceByKeyAndWindow({(x, y) x y},{(x, y) x - y},Seconds(30),Seconds(10))// 加上新进入窗口的批次中的元素 // 移除离开窗口的老批次中的元素 // 窗口时长// 滑动步长 countByWindow()和countByValueAndWindow()作为对数据进行计数操作的简写。countByWindow()返回一个表示每个窗口中元素个数的DStream而countByValueAndWindow()返回的DStream则包含窗口中每个值的个数。
val ipDStream accessLogsDStream.map{entry entry.getIpAddress()}
val ipAddressRequestCount ipDStream.countByValueAndWindow(Seconds(30), Seconds(10))
val requestCount accessLogsDStream.countByWindow(Seconds(30), Seconds(10))WordCount第三版3秒一个批次窗口12秒滑步6秒。
package com.wxn.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object WorldCount {def main(args: Array[String]) {// 定义更新状态方法参数values为当前批次单词频度state为以往批次单词频度val updateFunc (values: Seq[Int], state: Option[Int]) {val currentCount values.foldLeft(0)(_ _)val previousCount state.getOrElse(0)Some(currentCount previousCount)}val conf new SparkConf().setMaster(local[2]).setAppName(NetworkWordCount)val ssc new StreamingContext(conf, Seconds(3))ssc.checkpoint(.)// Create a DStream that will connect to hostname:port, like localhost:9999val lines ssc.socketTextStream(hadoop102, 9999)// Split each line into wordsval words lines.flatMap(_.split( ))//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3// Count each word in each batchval pairs words.map(word (word, 1))val wordCounts pairs.reduceByKeyAndWindow((a:Int,b:Int) (a b),Seconds(12), Seconds(6))// Print the first ten elements of each RDD generated in this DStream to the consolewordCounts.print()ssc.start() // Start the computationssc.awaitTermination() // Wait for the computation to terminate//ssc.stop()}}4.3 其他重要操作
4.3.1 Transform
Transform原语允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。 比如下面的例子在进行单词统计的时候想要过滤掉spam的信息。
val spamInfoRDD ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam informationval cleanedDStream wordCounts.transform { rdd rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning...
}4.3.2 Join
连接操作leftOuterJoin, rightOuterJoin, fullOuterJoin也可以可以连接Stream-Streamwindows-stream to windows-stream、stream-dataset Stream-Stream Joins val stream1: DStream[String, String] ...
val stream2: DStream[String, String] ...
val joinedStream stream1.join(stream2)val windowedStream1 stream1.window(Seconds(20))
val windowedStream2 stream2.window(Minutes(1))
val joinedStream windowedStream1.join(windowedStream2)
Stream-dataset joins
val dataset: RDD[String, String] ...
val windowedStream stream.window(Seconds(20))...
val joinedStream windowedStream.transform { rdd rdd.join(dataset) }5、 DStream输出
输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似如果一个DStream及其派生出的DStream都没有被执行输出操作那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作整个context就都不会启动。 输出操作如下 1print()在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中同样的操作叫print()。 2saveAsTextFiles(prefix, [suffix])以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”. 3saveAsObjectFiles(prefix, [suffix])以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为prefix-TIME_IN_MS[.suffix]“. Python中目前不可用。 4saveAsHadoopFiles(prefix, [suffix])将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为prefix-TIME_IN_MS[.suffix]”。 Python API Python中目前不可用。 5foreachRDD(func)这是最通用的输出操作即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统如将RDD存入文件或者通过网络将其写入数据库。注意函数func在运行流应用的驱动中被执行同时其中一般函数RDD操作从而强制其对于流RDD的运算。 通用的输出操作foreachRDD()它用来对DStream中的RDD运行任意计算。这和transform() 有些类似都可以让我们访问任意RDD。在foreachRDD()中可以重用我们在Spark中实现的所有行动操作。 比如常见的用例之一是把数据写到诸如MySQL的外部数据库中。 注意 1连接不能写在driver层面 2如果写在foreach则每个RDD都创建得不偿失 3增加foreachPartition在分区创建。
6、累加器和广播变量
累加器(Accumulators)和广播变量(Broadcast variables)不能从Spark Streaming的检查点中恢复。如果你启用检查并也使用了累加器和广播变量那么你必须创建累加器和广播变量的延迟单实例从而在驱动因失效重启后他们可以被重新实例化。如下例述
object WordBlacklist {volatile private var instance: Broadcast[Seq[String]] nulldef getInstance(sc: SparkContext): Broadcast[Seq[String]] {if (instance null) {synchronized {if (instance null) {val wordBlacklist Seq(a, b, c)instance sc.broadcast(wordBlacklist)}}}instance}
}object DroppedWordsCounter {volatile private var instance: LongAccumulator nulldef getInstance(sc: SparkContext): LongAccumulator {if (instance null) {synchronized {if (instance null) {instance sc.longAccumulator(WordsInBlacklistCounter)}}}instance}
}
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) // Get or register the blacklist Broadcastval blacklist WordBlacklist.getInstance(rdd.sparkContext)// Get or register the droppedWordsCounter Accumulatorval droppedWordsCounter DroppedWordsCounter.getInstance(rdd.sparkContext)// Use blacklist to drop words and use droppedWordsCounter to count themval counts rdd.filter { case (word, count) if (blacklist.value.contains(word)) {droppedWordsCounter.add(count)false} else {true}}.collect().mkString([, , , ])val output Counts at time time counts
})7、DataFrame ans SQL Operations
你可以很容易地在流数据上使用DataFrames和SQL。你必须使用SparkContext来创建StreamingContext要用的SQLContext。此外这一过程可以在驱动失效后重启。我们通过创建一个实例化的SQLContext单实例来实现这个工作。如下例所示。我们对前例word count进行修改从而使用DataFrames和SQL来产生word counts。每个RDD被转换为DataFrame以临时表格配置并用SQL进行查询。
val words: DStream[String] ...words.foreachRDD { rdd // Get the singleton instance of SparkSessionval spark SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()import spark.implicits._// Convert RDD[String] to DataFrameval wordsDataFrame rdd.toDF(word)// Create a temporary viewwordsDataFrame.createOrReplaceTempView(words)// Do word count on DataFrame using SQL and print itval wordCountsDataFrame spark.sql(select word, count(*) as total from words group by word)wordCountsDataFrame.show()
}你也可以从不同的线程在定义于流数据的表上运行SQL查询也就是说异步运行StreamingContext。仅确定你设置StreamingContext记住了足够数量的流数据以使得查询操作可以运行。否则StreamingContext不会意识到任何异步的SQL查询操作那么其就会在查询完成之后删除旧的数据。例如如果你要查询最后一批次但是你的查询会运行5分钟那么你需要调用streamingContext.remember(Minutes(5))(in Scala, 或者其他语言的等价操作)。
8、Caching / Persistence
和RDDs类似DStreams同样允许开发者将流数据保存在内存中。也就是说在DStream上使用persist()方法将会自动把DStreams中的每个RDD保存在内存中。当DStream中的数据要被多次计算时这个非常有用如在同样数据上的多次操作。对于像reduceByWindow和reduceByKeyAndWindow以及基于状态的(updateStateByKey)这种操作保存是隐含默认的。因此即使开发者没有调用persist()由基于窗操作产生的DStreams会自动保存在内存中。
9、7x24 不间断运行
9.1、检查点机制
检查点机制是我们在Spark Streaming中用来保障容错性的主要机制。与应用程序逻辑无关的错误即系统错位JVM崩溃等有迅速恢复的能力. 它可以使Spark Streaming阶段性地把应用数据存储到诸如HDFS或Amazon S3这样的可靠存储系统中 以供恢复时使用。具体来说检查点机制主要为以下两个目的服务。 1)控制发生失败时需要重算的状态数。SparkStreaming可以通 过转化图的谱系图来重算状态检查点机制则可以控制需要在转化图中回溯多远。 2)提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了你可以重启驱动器程序 并让驱动器程序从检查点恢复这样Spark Streaming就可以读取之前运行的程序处理 数据的进度并从那里继续。
了实现这个Spark Streaming需要为容错存储系统checkpoint足够的信息从而使得其可以从失败中恢复过来。有两种类型的数据设置检查点。 Metadata checkpointing将定义流计算的信息存入容错的系统如HDFS。元数据包括 配置 – 用于创建流应用的配置。 DStreams操作 – 定义流应用的DStreams操作集合。 不完整批次 – 批次的工作已进行排队但是并未完成。 Data checkpointing 将产生的RDDs存入可靠的存储空间。对于在多批次间合并数据的状态转换这个很有必要。在这样的转换中RDDs的产生基于之前批次的RDDs这样依赖链长度随着时间递增。为了避免在恢复期这种无限的时间增长和链长度成比例状态转换中间的RDDs周期性写入可靠地存储空间如HDFS从而切短依赖链。 总而言之元数据检查点在由驱动失效中恢复是首要需要的。而数据或者RDD检查点甚至在使用了状态转换的基础函数中也是必要的。 出于这些原因检查点机制对于任何生产环境中的流计算应用都至关重要。你可以通过向 ssc.checkpoint() 方法传递一个路径参数(HDFS、S3 或者本地路径均可)来配置检查点机制,同时你的应用应该能够使用检查点的数据 1. 当程序首次启动其将创建一个新的StreamingContext设置所有的流并调用start()。 2. 当程序在失效后重启其将依据检查点目录的检查点数据重新创建一个StreamingContext。 通过使用StraemingContext.getOrCreate很容易获得这个性能。
ssc.checkpoint(hdfs://...) # 创建和设置一个新的StreamingContext
def functionToCreateContext():sc SparkContext(...) # new contextssc new StreamingContext(...)lines ssc.socketTextStream(...) # create DStreams...ssc.checkpoint(checkpointDirectory) # 设置检查点目录return ssc
# 从检查点数据中获取StreamingContext或者重新创建一个
context StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)# 在需要完成的context上做额外的配置
# 无论其有没有启动
context ...
# 启动context
context.start()
contaxt.awaitTermination()如果检查点目录(checkpointDirectory)存在那么context将会由检查点数据重新创建。如果目录不存在首次运行那么函数functionToCreateContext将会被调用来创建一个新的context并设置DStreams。 注意RDDs的检查点引起存入可靠内存的开销。在RDDs需要检查点的批次里处理的时间会因此而延长。所以检查点的间隔需要很仔细地设置。在小尺寸批次1秒钟。每一批次检查点会显著减少操作吞吐量。反之检查点设置的过于频繁导致“血统”和任务尺寸增长这会有很不好的影响对于需要RDD检查点设置的状态转换默认间隔是批次间隔的乘数一般至少为10秒钟。可以通过dstream.checkpoint(checkpointInterval)。通常检查点设置间隔是5-10个DStream的滑动间隔。
9.2、WAL预写日志
WAL 即 write ahead log预写日志是在 1.2 版本中就添加的特性。作用就是将数据通过日志的方式写到可靠的存储比如 HDFS、s3在 driver 或 worker failure 时可以从在可靠存储上的日志文件恢复数据。WAL 在 driver 端和 executor 端都有应用。
WAL在 driver 端的应用
用于写日志的对象 writeAheadLogOption: WriteAheadLog。在 StreamingContext 中的 JobScheduler 中的 ReceiverTracker 的 ReceivedBlockTracker 构造函数中被创建ReceivedBlockTracker 用于管理已接收到的 blocks 信息。需要注意的是这里只需要启用 checkpoint 就可以创建该 driver 端的 WAL 管理实例而不需要将 spark.streaming.receiver.writeAheadLog.enable 设置为 true。
写什么、何时写、写什么
首选需要明确的是ReceivedBlockTracker 通过 WAL 写入 log 文件的内容是3种事件当然会进行序列化 case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)即新增了一个 block 及该 block 的具体信息包括 streamId、blockId、数据条数等 case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)即为某个 batchTime 分配了哪些 blocks 作为该 batch RDD 的数据源 case class BatchCleanupEvent(times: Seq[Time])即清理了哪些 batchTime 对应的 block 知道了写了什么内容结合源码也不难找出是什么时候写了这些内容。需要再次注意的是写上面这三种事件也不需要将 spark.streaming.receiver.writeAheadLog.enable 设置为 true。
WAL 在 executor 端的应用
Receiver 接收到的数据会源源不断的传递给 ReceiverSupervisor是否启用 WAL 机制即是否将 spark.streaming.receiver.writeAheadLog.enable 设置为 true会影响 ReceiverSupervisor 在存储 block 时的行为 不启用 WAL你设置的StorageLevel是什么就怎么存储。比如MEMORY_ONLY只会在内存中存一份MEMORY_AND_DISK会在内存和磁盘上各存一份等 启用 WAL在StorageLevel指定的存储的基础上写一份到 WAL 中。存储一份在 WAL 上更不容易丢数据但性能损失也比较大
关于是否要启用 WAL要视具体的业务而定
若可以接受一定的数据丢失则不需要启用 WAL因为对性能影响较大 若完全不能接受数据丢失那就需要同时启用 checkpoint 和 WALcheckpoint 保存着执行进度比如已生成但未完成的 jobsWAL 中保存着 blocks 及 blocks 元数据比如保存着未完成的 jobs 对应的 blocks 信息及 block 文件。同时这种情况可能要在数据源和 Streaming Application 中联合来保证 exactly once 语义
预写日志功能的流程是 1一个SparkStreaming应用开始时也就是driver开始时相关的StreamingContext使用SparkContext启动接收器成为长驻运行任务。这些接收器接收并保存流数据到Spark内存中以供处理。 2接收器通知driver。 3接收块中的元数据metadata被发送到driver的StreamingContext。 这个元数据包括 a定位其在executor内存中数据的块referenceid b块数据在日志中的偏移信息如果启用了。 用户传送数据的生命周期如下图所示。 类似Kafka这样的系统可以通过复制数据保持可靠性。
9.3背压机制
默认情况下Spark Streaming通过Receiver以生产者生产数据的速率接收数据计算过程中会出现batch processing time batch interval的情况其中batch processing time 为实际计算一个批次花费时间 batch interval为Streaming应用设置的批处理间隔。这意味着Spark Streaming的数据接收速率高于Spark从队列中移除数据的速率也就是数据处理能力低在设置间隔内不能完全处理当前接收速率接收的数据。如果这种情况持续过长的时间会造成数据在内存中堆积导致Receiver所在Executor内存溢出等问题如果设置StorageLevel包含disk, 则内存存放不下的数据会溢写至disk, 加大延迟。Spark 1.5以前版本用户如果要限制Receiver的数据接收速率可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现此举虽然可以通过限制接收速率来适配当前的处理能力防止内存溢出但也会引入其它问题。比如producer数据生产高于maxRate当前集群处理能力也高于maxRate这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力Spark Streaming 从v1.5开始引入反压机制back-pressure,通过动态控制数据接收速率来适配集群数据处理能力。
Spark Streaming Backpressure: 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制默认值false即不启用。
Streaming架构如下图所示 在原架构的基础上加上一个新的组件RateController,这个组件负责监听“OnBatchCompleted”事件然后从中抽取processingDelay 及schedulingDelay信息. Estimator依据这些信息估算出最大处理速度rate最后由基于Receiver的Input Stream将rate通过ReceiverTracker与ReceiverSupervisorImpl转发给BlockGenerator继承自RateLimiter. 流量控制点
当Receiver开始接收数据时会通过supervisor.pushSingle()方法将接收的数据存入currentBuffer等待BlockGenerator定时将数据取走包装成block. 在将数据存放入currentBuffer之时要获取许可令牌。如果获取到许可就可以将数据存入buffer, 否则将被阻塞进而阻塞Receiver从数据源拉取数据。 其令牌投放采用令牌桶机制进行 原理如下图所示: 令牌桶机制 大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗或者被消耗的速度小于产生的速度令牌就会不断地增多直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。当进行某操作时需要令牌时会从令牌桶中取出相应的令牌数如果获取到则继续操作否则阻塞。用完之后不用放回。
9.4驱动器程序容错
驱动器程序的容错要求我们以特殊的方式创建 StreamingContext。我们需要把检查点目录提供给 StreamingContext。与直接调用 new StreamingContext 不同应该使用 StreamingContext.getOrCreate() 函数。 配置过程如下 1、启动Driver自动重启功能 standalone: 提交任务时添加 --supervise 参数 yarn:设置yarn.resourcemanager.am.max-attempts 或者spark.yarn.maxAppAttempts mesos: 提交任务时添加 --supervise 参数 2、设置checkpoint StreamingContext.setCheckpoint(hdfsDirectory) 3、支持从checkpoint中重启配置 def createContext(checkpointDirectory: String): StreamingContext { val ssc new StreamingContext ssc.checkpoint(checkpointDirectory) ssc } val ssc StreamingContext.getOrCreate(checkpointDirectory, createContext(checkpointDirectory))
9.5工作节点容错
为了应对工作节点失败的问题Spark Streaming使用与Spark的容错机制相同的方法。所 有从外部数据源中收到的数据都在多个工作节点上备份。所有从备份数据转化操作的过程 中创建出来的 RDD 都能容忍一个工作节点的失败因为根据 RDD 谱系图系统可以把丢 失的数据从幸存的输入数据备份中重算出来。对于reduceByKey等Stateful操作重做的lineage较长的强制启动checkpoint减少重做几率
9.6接收器容错
运行接收器的工作节点的容错也是很重要的。如果这样的节点发生错误Spark Streaming 会在集群中别的节点上重启失败的接收器。然而这种情况会不会导致数据的丢失取决于 数据源的行为(数据源是否会重发数据)以及接收器的实现(接收器是否会向数据源确认 收到数据)。举个例子使用 Flume 作为数据源时两种接收器的主要区别在于数据丢失 时的保障。在“接收器从数据池中拉取数据”的模型中Spark 只会在数据已经在集群中 备份时才会从数据池中移除元素。而在“向接收器推数据”的模型中如果接收器在数据 备份之前失败一些数据可能就会丢失。总的来说对于任意一个接收器你必须同时考 虑上游数据源的容错性(是否支持事务)来确保零数据丢失。 一般主要是通过将接收到数据后先写日志WAL到可靠文件系统中后才写入实际的RDD。如果后续处理失败则成功写入WAL的数据通过WAL进行恢复未成功写入WAL的数据通过可回溯的Source进行重放 总的来说接收器提供以下保证。 • 所有从可靠文件系统中读取的数据(比如通过StreamingContext.hadoopFiles读取的) 都是可靠的因为底层的文件系统是有备份的。Spark Streaming会记住哪些数据存放到 了检查点中并在应用崩溃后从检查点处继续执行。 • 对于像Kafka、推式Flume、Twitter这样的不可靠数据源Spark会把输入数据复制到其 他节点上但是如果接收器任务崩溃Spark 还是会丢失数据。在 Spark 1.1 以及更早的版 本中收到的数据只被备份到执行器进程的内存中所以一旦驱动器程序崩溃(此时所 有的执行器进程都会丢失连接)数据也会丢失。在 Spark 1.2 中收到的数据被记录到诸 如 HDFS 这样的可靠的文件系统中这样即使驱动器程序重启也不会导致数据丢失。 综上所述确保所有数据都被处理的最佳方式是使用可靠的数据源(例如 HDFS、拉式 Flume 等)。如果你还要在批处理作业中处理这些数据使用可靠数据源是最佳方式因为 这种方式确保了你的批处理作业和流计算作业能读取到相同的数据因而可以得到相同的结果。 操作过程如下 启用checkpoint ssc.setCheckpoint(checkpointDir) 启用WAL sparkConf.set(“spark.streaming.receiver.writeAheadLog.enable”, “true”) 对Receiver使用可靠性存储StoreageLevel.MEMORY_AND_DISK_SER or StoreageLevel.MEMORY_AND_DISK_SER2
9.7处理保证
由于Spark Streaming工作节点的容错保障Spark Streaming可以为所有的转化操作提供 “精确一次”执行的语义即使一个工作节点在处理部分数据时发生失败最终的转化结 果(即转化操作得到的 RDD)仍然与数据只被处理一次得到的结果一样。 然而当把转化操作得到的结果使用输出操作推入外部系统中时写结果的任务可能因故 障而执行多次一些数据可能也就被写了多次。由于这引入了外部系统因此我们需要专 门针对各系统的代码来处理这样的情况。我们可以使用事务操作来写入外部系统(即原子 化地将一个 RDD 分区一次写入)或者设计幂等的更新操作(即多次运行同一个更新操作 仍生成相同的结果)。比如 Spark Streaming 的 saveAs…File 操作会在一个文件写完时自动 将其原子化地移动到最终位置上以此确保每个输出文件只存在一份。
10性能考量
最常见的问题是Spark Streaming可以使用的最小批次间隔是多少。总的来说500毫秒已经被证实为对许多应用而言是比较好的最小批次大小。寻找最小批次大小的最佳实践是从一个比较大的批次大小(10 秒左右)开始不断使用更小的批次大小。如果 Streaming 用 户界面中显示的处理时间保持不变你就可以进一步减小批次大小。如果处理时间开始增 加你可能已经达到了应用的极限。 相似地对于窗口操作计算结果的间隔(也就是滑动步长)对于性能也有巨大的影响。 当计算代价巨大并成为系统瓶颈时就应该考虑提高滑动步长了。 减少批处理所消耗时间的常见方式还有提高并行度。有以下三种方式可以提高并行度 • 增加接收器数目 有时如果记录太多导致单台机器来不及读入并分发的话接收器会成为系统瓶颈。这时 你就需要通过创建多个输入 DStream(这样会创建多个接收器)来增加接收器数目然 后使用 union 来把数据合并为一个数据源。 • 将收到的数据显式地重新分区如果接收器数目无法再增加你可以通过使用 DStream.repartition 来显式重新分区输 入流(或者合并多个流得到的数据流)来重新分配收到的数据。 • 提高聚合计算的并行度 对于像 reduceByKey() 这样的操作你可以在第二个参数中指定并行度我们在介绍 RDD 时提到过类似的手段。
11、高级解析
11.1DStreamGraph对象解析
在 Spark Streaming 中DStreamGraph 是一个非常重要的组件主要用来 1.通过成员 inputStreams 持有 Spark Streaming 输入源及接收数据的方式 2.通过成员 outputStreams 持有 Streaming app 的 output 操作并记录 DStream 依赖关系 3.生成每个 batch 对应的 jobs 下面通过分析一个简单的例子结合源码分析来说明 DStreamGraph 是如何发挥作用的。例子如下
val sparkConf new SparkConf().setAppName(HdfsWordCount)
val ssc new StreamingContext(sparkConf, Seconds(2))val lines ssc.textFileStream(args(0))
val words lines.flatMap(_.split( ))
val wordCounts words.map(x (x, 1)).reduceByKey(_ _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()创建 DStreamGraph 实例 代码val ssc new StreamingContext(sparkConf, Seconds(2))创建了 StreamingContext 实例StreamingContext 包含了 DStreamGraph 类型的成员graphgraph 在 StreamingContext主构造函数中被创建如下 private[streaming] val graph: DStreamGraph {if (isCheckpointPresent) {cp_.graph.setContext(this)cp_.graph.restoreCheckpointData()cp_.graph} else {require(batchDur_ ! null, Batch duration for StreamingContext cannot be null)val newGraph new DStreamGraph()newGraph.setBatchDuration(batchDur_)newGraph}}可以看到若当前 checkpoint 可用会优先从 checkpoint 恢复 graph否则新建一个。还可以从这里知道的一点是graph 是运行在 driver 上的
DStreamGraph记录输入源及如何接收数据 DStreamGraph有和application 输入数据相关的成员和方法如下 private val inputStreams new ArrayBuffer[InputDStream[_]]()def addInputStream(inputStream: InputDStream[_]) {this.synchronized {inputStream.setGraph(this)inputStreams inputStream}}成员inputStreams为 InputDStream 类型的数组InputDStream是所有 input streams(数据输入流) 的虚基类。该类提供了 start() 和 stop()方法供 streaming 系统来开始和停止接收数据。那些只需要在 driver 端接收数据并转成 RDD 的 input streams 可以直接继承 InputDStream例如 FileInputDStream是 InputDStream 的子类它监控一个 HDFS 目录并将新文件转成RDDs。而那些需要在 workers 上运行receiver 来接收数据的 Input DStream需要继承 ReceiverInputDStream比如 KafkaReceiver。 我们来看看val lines ssc.textFileStream(args(0))调用。 为了更容易理解画出了val lines ssc.textFileStream(args(0))的调用流程 从上面的调用流程图我们可以知道 1.ssc.textFileStream会触发新建一个FileInputDStream。FileInputDStream继承于InputDStream其start()方法定义了数据源及如何接收数据 2.在FileInputDStream构造函数中会调用ssc.graph.addInputStream(this)将自身添加到 DStreamGraph 的 inputStreams: ArrayBuffer[InputDStream[_]] 中这样 DStreamGraph 就知道了这个 Streaming App 的输入源及如何接收数据。可能你会奇怪为什么inputStreams 是数组类型举个例子这里再来一个 val lines1 ssc.textFileStream(args(0))那么又将生成一个 FileInputStream 实例添加到inputStreams所以这里需要集合类型 3.生成FileInputDStream调用其 map 方法将以 FileInputDStream 本身作为 partent 来构造新的 MappedDStream。对于 DStream 的 transform 操作都将生成一个新的 DStream和 RDD transform 生成新的 RDD 类似 与MappedDStream 不同所有继承了 InputDStream 的定义了输入源及接收数据方式的 sreams 都没有 parent因为它们就是最初的 streams。
DStream 的依赖链 每个 DStream 的子类都会继承 def dependencies: List[DStream[_]] List()方法该方法用来返回自己的依赖的父 DStream 列表。比如没有父DStream 的 InputDStream 的 dependencies方法返回List()。 MappedDStream 的实现如下
class MappedDStream[T: ClassTag, U: ClassTag] (parent: DStream[T],mapFunc: T U) extends DStream[U](parent.ssc) {override def dependencies: List[DStream[_]] List(parent)...
}在上例中构造函数参数列表中的 parent 即在 ssc.textFileStream 中new 的定义了输入源及数据接收方式的最初的 FileInputDStream实例这里的 dependencies方法将返回该FileInputDStream实例这就构成了第一条依赖。可用如下图表示这里特地将 input streams 用蓝色表示以强调其与普通由 transform 产生的 DStream 的不同 继续来看val words lines.flatMap(_.split( ))flatMap如下 def flatMap[U: ClassTag](flatMapFunc: T Traversable[U]): DStream[U] ssc.withScope {new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))}每一个 transform 操作都将创建一个新的 DStreamflatMap 操作也不例外它会创建一个FlatMappedDStreamFlatMappedDStream的实现如下
class FlatMappedDStream[T: ClassTag, U: ClassTag](parent: DStream[T],flatMapFunc: T Traversable[U]) extends DStream[U](parent.ssc) {override def dependencies: List[DStream[_]] List(parent)...
}与 MappedDStream 相同FlatMappedDStream#dependencies也返回其依赖的父 DStream及 lines到这里依赖链就变成了下图 之后的几步操作不再这样具体分析到生成wordCounts时依赖图将变成下面这样 在 DStream 中与 transofrm 相对应的是 output 操作包括 print, saveAsTextFiles, saveAsObjectFiles, saveAsHadoopFiles, foreachRDD。output 操作中会创建ForEachDStream实例并调用register方法将自身添加到DStreamGraph.outputStreams成员中该ForEachDStream实例也会持有是调用的哪个 output 操作。本例的代码调用如下只需看箭头所指几行代码 与 DStream transform 操作返回一个新的 DStream 不同output 操作不会返回任何东西只会创建一个ForEachDStream作为依赖链的终结。 至此生成了完成的依赖链也就是 DAG如下图这里将 ForEachDStream 标为黄色以显示其与众不同
11.2ReceiverTracker 与数据导入
Spark Streaming 在数据接收与导入方面需要满足有以下三个特点 1.兼容众多输入源包括HDFS, Flume, Kafka, Twitter and ZeroMQ。还可以自定义数据源 2.要能为每个 batch 的 RDD 提供相应的输入数据 3.为适应 7*24h 不间断运行要有接收数据挂掉的容错机制 有容乃大兼容众多数据源
InputDStream是所有 input streams(数据输入流) 的虚基类。该类提供了 start() 和 stop()方法供 streaming 系统来开始和停止接收数据。那些只需要在 driver 端接收数据并转成 RDD 的 input streams 可以直接继承 InputDStream例如 FileInputDStream是 InputDStream 的子类它监控一个 HDFS 目录并将新文件转成RDDs。而那些需要在 workers 上运行receiver 来接收数据的 Input DStream需要继承 ReceiverInputDStream比如 KafkaReceiver 只需在 driver 端接收数据的 input stream 一般比较简单且在生产环境中使用的比较少本文不作分析只分析继承了 ReceiverInputDStream 的 input stream 是如何导入数据的。 ReceiverInputDStream有一个def getReceiver(): Receiver[T]方法每个继承了ReceiverInputDStream的 input stream 都必须实现这个方法。该方法用来获取将要分发到各个 worker 节点上用来接收数据的 receiver接收器。不同的 ReceiverInputDStream 子类都有它们对应的不同的 receiver如KafkaInputDStream对应KafkaReceiverFlumeInputDStream对应FlumeReceiverTwitterInputDStream对应TwitterReceiver如果你要实现自己的数据源也需要定义相应的 receiver。 继承 ReceiverInputDStream 并定义相应的 receiver就是 Spark Streaming 能兼容众多数据源的原因。
为每个 batch 的 RDD 提供输入数据 在 StreamingContext 中有一个重要的组件叫做 ReceiverTracker它是 Spark Streaming 作业调度器 JobScheduler 的成员负责启动、管理各个 receiver 及管理各个 receiver 接收到的数据。
确定 receiver 要分发到哪些 executors 上执行 创建 ReceiverTracker 实例 我们来看 StreamingContext#start() 方法部分调用实现如下 可以看到StreamingContext#start() 会调用 JobScheduler#start() 方法在 JobScheduler#start() 中会创建一个新的 ReceiverTracker 实例 receiverTracker并调用其 start() 方法。
ReceiverTracker#start() 继续跟进 ReceiverTracker#start()如下图它主要做了两件事 1.初始化一个 endpoint: ReceiverTrackerEndpoint用来接收和处理来自 ReceiverTracker 和 receivers 发送的消息 2.调用 launchReceivers 来自将各个 receivers 分发到 executors 上 ReceiverTracker#launchReceivers() 继续跟进 launchReceivers它也主要干了两件事 1.获取 DStreamGraph.inputStreams 中继承了 ReceiverInputDStream 的 input streams 的 receivers。也就是数据接收器 2.给消息接收处理器 endpoint 发送 StartAllReceivers(receivers)消息。直接返回不等待消息被处理 处理StartAllReceivers消息 endpoint 在接收到消息后会先判断消息类型对不同的消息做不同处理。对于StartAllReceivers消息处理流程如下 计算每个 receiver 要分发的目的 executors。遵循两条原则 o将 receiver 分布的尽量均匀 o如果 receiver 的preferredLocation本身不均匀以preferredLocation为准 遍历每个 receiver根据第1步中得到的目的 executors 调用 startReceiver 方法 到这里已经确定了每个 receiver 要分发到哪些 executors 上 启动 receivers 接上通过 ReceiverTracker#startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]) 来启动 receivers我们来看具体流程
如上流程图所述分发和启动 receiver 的方式不可谓不精彩。其中startReceiverFunc 函数主要实现如下
val supervisor new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() supervisor.start() 中会调用 receiver#onStart 后立即返回。receiver#onStart 一般自行新建线程或线程池来接收数据比如在 KafkaReceiver 中就新建了线程池在线程池中接收 topics 的数据。 supervisor.start() 返回后由 supervisor.awaitTermination() 阻塞住线程以让这个 task 一直不退出从而可以源源不断接收数据。
数据流转 上图为 receiver 接收到的数据的流转过程让我们来逐一分析 Step1: Receiver - ReceiverSupervisor
这一步中Receiver 将接收到的数据源源不断地传给 ReceiverSupervisor。Receiver 调用其 store(…) 方法store 方法中继续调用 supervisor.pushSingle 或 supervisor.pushArrayBuffer 等方法来传递数据。Receiver#store 有多重形式 ReceiverSupervisor 也有 pushSingle、pushArrayBuffer、pushIterator、pushBytes 方法与不同的 store 对应。 pushSingle: 对应单条小数据 pushArrayBuffer: 对应数组形式的数据 pushIterator: 对应 iterator 形式数据 pushBytes: 对应 ByteBuffer 形式的块数据 对于细小的数据存储时需要 BlockGenerator 聚集多条数据成一块然后再成块存储反之就不用聚集直接成块存储。当然存储操作并不在 Step1 中执行只为说明之后不同的操作逻辑。
Step2.1: ReceiverSupervisor - BlockManager - disk/memory
在这一步中主要将从 receiver 收到的数据以 block数据块的形式存储 存储 block 的是receivedBlockHandler: ReceivedBlockHandler根据参数spark.streaming.receiver.writeAheadLog.enable配置的不同默认为 falsereceivedBlockHandler对象对应的类也不同如下
private val receivedBlockHandler: ReceivedBlockHandler {if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {// 先写 WAL再存储到 executor 的内存或硬盘new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)} else {// 直接存到 executor 的内存或硬盘new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)}
}启动 WAL 的好处就是在application 挂掉之后可以恢复数据。 // 调用 receivedBlockHandler.storeBlock 方法存储 block并得到一个 blockStoreResult val blockStoreResult receivedBlockHandler.storeBlock(blockId, receivedBlock) // 使用blockStoreResult初始化一个ReceivedBlockInfo实例 val blockInfo ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) // 发送消息通知 ReceiverTracker 新增并存储了 block trackerEndpoint.askWithRetryBoolean 不管是 WriteAheadLogBasedBlockHandler 还是 BlockManagerBasedBlockHandler 最终都是通过 BlockManager 将 block 数据存储 execuor 内存或磁盘或还有 WAL 方式存入。 这里需要说明的是 streamId每个 InputDStream 都有它自己唯一的 id即 streamIdblockInfo包含 streamId 是为了区分block 是哪个 InputDStream 的数据。之后为 batch 分配 blocks 时需要知道每个 InputDStream 都有哪些未分配的 blocks。
Step2.2: ReceiverSupervisor - ReceiverTracker 将 block 存储之后获得 block 描述信息 blockInfo: ReceivedBlockInfo这里面包含streamId、数据位置、数据条数、数据 size 等信息。 之后封装以 block 作为参数的 AddBlock(blockInfo) 消息并发送给 ReceiverTracker 以通知其有新增 block 数据块。
Step3: ReceiverTracker - ReceivedBlockTracker
ReceiverTracker 收到 ReceiverSupervisor 发来的 AddBlock(blockInfo) 消息后直接调用以下代码将 block 信息传给 ReceivedBlockTracker
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean { receivedBlockTracker.addBlock(receivedBlockInfo) } receivedBlockTracker.addBlock中如果启用了 WAL会将新增的 block 信息以 WAL 方式保存。
无论 WAL 是否启用都会将新增的 block 信息保存到 streamIdToUnallocatedBlockQueues: mutable.HashMap[Int, ReceivedBlockQueue]中该变量 key 为 InputDStream 的唯一 idvalue 为已存储未分配的 block 信息。之后为 batch 分配blocks会访问该结构来获取每个 InputDStream 对应的未消费的 blocks。
11.3动态生成JOB
JobScheduler有两个重要成员一是ReceiverTracker负责分发 receivers 及源源不断地接收数据二是JobGenerator负责定时的生成 jobs 并 checkpoint。
定时逻辑 在 JobScheduler 的主构造函数中会创建 JobGenerator 对象。在 JobGenerator 的主构造函数中会创建一个定时器
private val timer new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime eventLoop.post(GenerateJobs(new Time(longTime))), “JobGenerator”)
该定时器每隔 ssc.graph.batchDuration.milliseconds 会执行一次 eventLoop.post(GenerateJobs(new Time(longTime))) 向 eventLoop 发送 GenerateJobs(new Time(longTime))消息eventLoop收到消息后会进行这个 batch 对应的 jobs 的生成及提交执行eventLoop 是一个消息接收处理器。 需要注意的是timer 在创建之后并不会马上启动将在 StreamingContext#start() 启动 Streaming Application 时间接调用到 timer.start(restartTime.milliseconds)才启动。 为 batch 生成 jobs eventLoop 在接收到 GenerateJobs(new Time(longTime))消息后的主要处理流程有以上图中三步 1.将已接收到的 blocks 分配给 batch 2.生成该 batch 对应的 jobs 3.将 jobs 封装成 JobSet 并提交执行 接下来我们就将逐一展开这三步进行分析
将已接受到的 blocks 分配给 batch 上图是根据源码画出的为 batch 分配 blocks 的流程图这里对 『获得 batchTime 各个 InputDStream 未分配的 blocks』作进一步说明
我们知道了各个 ReceiverInputDStream 对应的 receivers 接收并保存的 blocks 信息会保存在 ReceivedBlockTracker#streamIdToUnallocatedBlockQueues该成员 key 为 streamIdvalue 为该 streamId 对应的 InputDStream 已接收保存但尚未分配的 blocks 信息。 所以获取某 InputDStream 未分配的 blocks 只要以该 InputDStream 的 streamId 来从 streamIdToUnallocatedBlockQueues 来 get 就好。获取之后会清楚该 streamId 对应的value以保证 block 不会被重复分配。 在实际调用中为 batchTime 分配 blocks 时会从streamIdToUnallocatedBlockQueues取出未分配的 blocks 塞进 timeToAllocatedBlocks: mutable.HashMap[Time, AllocatedBlocks] 中以在之后作为该 batchTime 对应的 RDD 的输入数据。 通过以上步骤就可以为 batch 的所有 InputDStream 分配 blocks。也就是为 batch 分配了 blocks。
生成该 batch 对应的 jobs 为指定 batchTime 生成 jobs 的逻辑如上图所示。你可能会疑惑为什么 DStreamGraph#generateJobs(time: Time)为什么返回 Seq[Job]而不是单个 job。这是因为在一个 batch 内可能会有多个 OutputStream 执行了多次 output 操作每次 output 操作都将产生一个 Job最终就会产生多个 Jobs。 我们结合上图对执行流程进一步分析。 在DStreamGraph#generateJobs(time: Time)中对于DStreamGraph成员ArrayBuffer[DStream[_]]的每一项调用DStream#generateJob(time: Time)来生成这个 outputStream 在该 batchTime 的 job。该生成过程主要有三步
Step1: 获取该 outputStream 在该 batchTime 对应的 RDD
每个 DStream 实例都有一个 generatedRDDs: HashMap[Time, RDD[T]] 成员用来保存该 DStream 在每个 batchTime 生成的 RDD当 DStream#getOrCompute(time: Time)调用时 首先会查看generatedRDDs中是否已经有该 time 对应的 RDD若有则直接返回 若无则调用compute(validTime: Time)来生成 RDD这一步根据每个 InputDStream继承 compute 的实现不同而不同。例如对于 FileInputDStream其 compute 实现逻辑如下 1.先通过一个 findNewFiles() 方法找到多个新 file 2.对每个新 file都将其作为参数调用 sc.newAPIHadoopFile(file)生成一个 RDD 实例 3.将 2 中的多个新 file 对应的多个 RDD 实例进行 union返回一个 union 后的 UnionRDD Step2: 根据 Step1中得到的 RDD 生成最终 job 要执行的函数 jobFunc jobFunc定义如下 val jobFunc () { val emptyFunc { (iterator: Iterator[T]) {} } context.sparkContext.runJob(rdd, emptyFunc) } 可以看到每个 outputStream 的 output 操作生成的 Job 其实与 RDD action 一样最终调用 SparkContext#runJob 来提交 RDD DAG 定义的任务
Step3: 根据 Step2中得到的 jobFunc 生成最终要执行的 Job 并返回 Step2中得到了定义 Job 要干嘛的函数-jobFunc这里便以 jobFunc及 batchTime 生成 Job 实例 Some(new Job(time, jobFunc)) 该Job实例将最终封装在 JobHandler 中被执行 至此我们搞明白了 JobScheduler 是如何通过一步步调用来动态生成每个 batchTime 的 jobs。下文我们将分析这些动态生成的 jobs 如何被分发及如何执行。
11.4job 的提交与执行
我们分析了 JobScheduler 是如何动态为每个 batch生成 jobs那么生成的 jobs 是如何被提交的。 在 JobScheduler 生成某个 batch 对应的 Seq[Job] 之后会将 batch 及 Seq[Job] 封装成一个 JobSet 对象JobSet 持有某个 batch 内所有的 jobs并记录各个 job 的运行状态。 之后调用JobScheduler#submitJobSet(jobSet: JobSet)来提交 jobs在该函数中除了一些状态更新主要任务就是执行 jobSet.jobs.foreach(job jobExecutor.execute(new JobHandler(job))) 即对于 jobSet 中的每一个 job执行jobExecutor.execute(new JobHandler(job))要搞懂这行代码干了什么就必须了解 JobHandler 及 jobExecutor。 JobHandler JobHandler 继承了 Runnable为了说明与 job 的关系其精简后的实现如下
private class JobHandler(job: Job) extends Runnable with Logging { import JobScheduler._
def run() { _eventLoop.post(JobStarted(job)) PairRDDFunctions.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop eventLoop if (_eventLoop ! null) { _eventLoop.post(JobCompleted(job)) } }
} JobHandler#run 方法主要执行了 job.run()该方法最终将调用到 『生成该 batch 对应的 jobs的Step2 定义的 jobFunc』jonFunc 将提交对应 RDD DAG 定义的 job。 JobExecutor 知道了 JobHandler 是用来执行 job 的那么 JobHandler 将在哪里执行 job 呢答案是 jobExecutorjobExecutor为 JobScheduler 成员是一个线程池在JobScheduler 主构造函数中创建如下 private val numConcurrentJobs ssc.conf.getInt(“spark.streaming.concurrentJobs”, 1) private val jobExecutor ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, “streaming-job-executor”) JobHandler 将最终在 线程池jobExecutor 的线程中被调用jobExecutor的线程数可通过spark.streaming.concurrentJobs配置默认为1。若配置多个线程就能让多个 job 同时运行若只有一个线程那么同一时刻只能有一个 job 运行。 以上即 jobs 被执行的逻辑
11.5Block 的生成与存储
ReceiverSupervisorImpl共提供了4个将从 receiver 传递过来的数据转换成 block 并存储的方法分别是 pushSingle: 处理单条数据 pushArrayBuffer: 处理数组形式数据 pushIterator: 处理 iterator 形式处理 pushBytes: 处理 ByteBuffer 形式数据 其中pushArrayBuffer、pushIterator、pushBytes最终调用pushAndReportBlock而pushSingle将调用defaultBlockGenerator.addData(data)我们分别就这两种形式做说明
pushAndReportBlock 我们针对存储 block 简化 pushAndReportBlock 后的代码如下 def pushAndReportBlock( receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { … val blockId blockIdOption.getOrElse(nextBlockId) receivedBlockHandler.storeBlock(blockId, receivedBlock) … } 首先获取一个新的 blockId之后调用 receivedBlockHandler.storeBlock, receivedBlockHandler 在 ReceiverSupervisorImpl 构造函数中初始化。当启用了 checkpoint 且 spark.streaming.receiver.writeAheadLog.enable 为 true 时receivedBlockHandler 被初始化为 WriteAheadLogBasedBlockHandler 类型否则将初始化为 BlockManagerBasedBlockHandler类型。 WriteAheadLogBasedBlockHandler#storeBlock 将 ArrayBuffer, iterator, bytes 类型的数据序列化后得到的 serializedBlock 1.交由 BlockManager 根据设置的 StorageLevel 存入 executor 的内存或磁盘中 2.通过 WAL 再存储一份 而BlockManagerBasedBlockHandler#storeBlock将 ArrayBuffer, iterator, bytes 类型的数据交由 BlockManager 根据设置的 StorageLevel 存入 executor 的内存或磁盘中并不再通过 WAL 存储一份 pushSingle pushSingle将调用 BlockGenerator#addData(data: Any) 通过积攒的方式来存储数据。接下来对 BlockGenerator 是如何积攒一条一条数据最后写入 block 的逻辑 上图为 BlockGenerator 的各个成员首选对各个成员做介绍 currentBuffer 变长数组当 receiver 接收的一条一条的数据将会添加到该变长数组的尾部 可能会有一个 receiver 的多个线程同时进行添加数据这里是同步操作 添加前会由 rateLimiter 检查一下速率是否加入的速度过快。如果过快的话就需要 block 住等到下一秒再开始添加。最高频率由 spark.streaming.receiver.maxRate 控制默认值为 Long.MaxValue具体含义是单个 Receiver 每秒钟允许添加的条数。 blockIntervalTimer blockIntervalMs
分别是定时器和时间间隔。blockIntervalTimer中有一个线程每隔blockIntervalMs会执行以下操作 1.将 currentBuffer 赋值给 newBlockBuffer 2.将 currentBuffer 指向新的空的 ArrayBuffer 对象 3.将 newBlockBuffer 封装成 newBlock 4.将 newBlock 添加到 blocksForPushing 队列中blockIntervalMs 由 spark.streaming.blockInterval 控制默认是 200ms。 blockPushingThread blocksForPushing blockQueueSize blocksForPushing 是一个定长数组长度由 blockQueueSize 决定默认为10可通过 spark.streaming.blockQueueSize 改变。上面分析到blockIntervalTimer中的线程会定时将 block 塞入该队列。 还有另一条线程不断送该队列中取出 block然后调用 ReceiverSupervisorImpl.pushArrayBuffer(…) 来将 block 存储这条线程就是blockPushingThread。 PS: blocksForPushing为ArrayBlockingQueue类型。ArrayBlockingQueue是一个阻塞队列能够自定义队列大小当插入时如果队列已经没有空闲位置那么新的插入线程将阻塞到该队列一旦该队列有空闲位置那么阻塞的线程将执行插入 以上通过分析各个成员也说明了 BlockGenerator 是如何存储单条数据的。