2. 数据默认就被分布到了多个Executor上。Receiver-based Approach 你需要做特定的
处理,才能让 Receiver分不到多个Executor上。 3. Receiver-based Approach 的方式,一旦你的Batch Processing 被delay了,或者被delay
了很多个batch,那估计你的Spark Streaming程序离奔溃也就不远了。 Direct Approach (No Receivers) 则完全不会存在类似问题。就算你delay了很多个batch time,你内存中的数据只有这次处理的。 4. Direct Approach (No Receivers) 直接维护了 Kafka offset,可以保证数据只有被执行
成功了,才会被记录下来,透过 checkpoint机制。如果采用Receiver-based
Approach,消费Kafka和数据处理是被分开的,这样就很不好做容错机制,比如系统当掉了。所以你需要开启WAL,但是开启WAL带来一个问题是,数据量很大,对HDFS是个很大的负担,而且也会对实时程序带来比较大延迟。 我原先以为Direct Approach 因为只有在计算的时候才拉取数据,可能会比Receiver-based Approach 的方式慢,但是经过我自己的实际测试,总体性能 Direct Approach会更快些,因为Receiver-based Approach可能会有较大的内存隐患,GC也会影响整体处理速度。
如何保证数据接受的可靠性
SS 自身可以做到 at least once 语义,具体方式是通过CheckPoint机制。 * CheckPoint 机制 *
CheckPoint 会涉及到一些类,以及他们之间的关系:
DStreamGraph类负责生成任务执行图,而JobGenerator则是任务真实的提交者。任务的
数据源则来源于DirectKafkaInputDStream,checkPoint 一些相关信息则是由类DirectKafkaInputDStreamCheckpointData 负责。
好像涉及的类有点多,其实没关系,我们完全可以不用关心他们。先看看checkpoint都干了些啥,checkpoint 其实就序列化了一个类而已:
org.apache.spark.streaming.Checkpoint
看看类成员都有哪些:
val master = ssc.sc.master val framework = ssc.sc.appName val jars = ssc.sc.jars val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDurationval pendingTimes = ssc.scheduler.getPendingTimes().toArray
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConfPairs = ssc.conf.getAll
? ? ? ? ? ? ? ? 1 2 3 4 5 6 7 8 其他的都比较容易理解,最重要的是 graph,该类全路径名是:
org.apache.spark.streaming.DStreamGraph
里面有两个核心的数据结构是:
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
? ? 1 2 inputStr