前言
Spark Streaming 诞生于2013年,成为Spark平台上流式处理的解决方案,同时也给大家提供除Storm 以外的另一个选择。这篇内容主要介绍Spark Streaming 数据接收流程模块中与Kafka集成相关的功能。
Spark Streaming 与 Kafka 集成接受数据的方式有两种:
1. Receiver-based Approach
2. Direct Approach (No Receivers)
我们会对这两种方案做详细的解析,同时对比两种方案优劣。选型后,我们针对Direct Approach (No Receivers)模式详细介绍其如何实现Exactly Once Semantics,也就是保证接收到的数据只被处理一次,不丢,不重。
Receiver-based Approach
要描述清楚 Receiver-based Approach ,我们需要了解其接收流程,分析其内存使用,以及相关参数配置对内存的影响。
* 数据接收流程 *
启动Spark Streaming(后续缩写为SS)后,SS 会选择一台Executor 启动ReceiverSupervisor,并且标记为Active状态。接着按如下步骤处理:
1. ReceiverSupervisor会启动对应的Receiver(这里是KafkaReceiver)
2. KafkaReceiver 会根据配置启动新的线程接受数据,在该线程中调
用 ReceiverSupervisor.store 方法填充数据,注意,这里是一条一条填充的。 3. ReceiverSupervisor 会调用 BlockGenerator.addData 进行数据填充。 到目前为止,整个过程不会有太多内存消耗,正常的一个线性调用。所有复杂的数据结构都隐含在 BlockGenerator中。
* BlockGenerator 存储结构 *
BlockGenerator 会复杂些,重要的数据存储结构有四个:
1. 维护了一个缓存 currentBuffer ,就是一个无限长度的ArrayBuffer。
currentBuffer 并不会被复用,而是每次都会新建,然后把老的对象直接封装成Block,BlockGenerator会负责保证currentBuffer 只有一个。currentBuffer 填充的速度是可以被限制的,以秒为单位,配置参数为
spark.streaming.receiver.maxRate。这个是Spark内存控制的第一道防线,填充currentBuffer 是阻塞的,消费Kafka的线程直接做填充。
2. 维护了一个 blocksForPushing 队列, size 默认为10个(1.5.1版本),可通
过 spark.streaming.blockQueueSize进行配置。该队列主要用来实现生产-消费模式。每个元素其实是一个currentBuffer形成的block。 3. blockIntervalTimer 是一个定时器。其实是一个生产者,负责将currentBuffer 的
数据放到blocksForPushing 中。通过参数 spark.streaming.blockInterval 设置,默认为200ms。放的方式很简单,直接把currentBuffer做为Block的数据源。这就是为什么currentBuffer不会被复用。 4. blockPushingThread 也是一个定时器,负责将Block从blocksForPushing取出
来,然后交给BlockManagerBasedBlockHandler.storeBlock 方法。10毫秒会取一次,不可配置。到这一步,才真的将数据放到了Spark的BlockManager中。 下面我们会详细分析每一个存储对象对内存的使用情况: * currentBuffer *
首先自然要说下currentBuffer,如果200ms期间你从Kafka接受的数据足够大,则足以把内存承包了。而且currentBuffer使用的并不是spark的storage内存,而是有限的用于运算存储的内存。 默认应该是 heap*0.4。除了把内存搞爆掉了,还有一个是GC。导致receiver所在的Executor 极容易挂掉,处理速度也巨慢。 如果你在SparkUI发现Receiver挂掉了,考虑有没有可能是这个问题。 * blocksForPushing *
blocksForPushing 这个是作为currentBuffer 和BlockManager之间的中转站。默认存储
的数据最大可以达到 10*currentBuffer 大小。一般不打可能,除非你
的 spark.streaming.blockInterval 设置的比10ms 还小,官方推荐最小也要设置成 50ms,你就不要搞对抗了。所以这块不用太担心。 * blockPushingThread *
blockPushingThread 负责从 blocksForPushing 获取数据,并且写入 BlockManager 。blockPushingThread只写他自己所在的Executor的 blockManager,也就是每个batch周期
的数据都会被 一个Executor给扛住了。 这是导致内存被撑爆的最大风险。 建议每个batch周期接受到的数据最好不要超过接受Executor的内存(Storage)的一半。否则在数据量很大的情况下,会导致Receiver所在的Executor直接挂掉。 对应的解决方案是使用多个Receiver来消费同一个topic,使用类似下面的代码
val kafkaDStreams = (1 to kafkaDStreamsNum).map { _ => KafkaUtils.createStream( ssc, zookeeper,
groupId,
Map(\ -> 1),
if (memoryOnly) StorageLevel.MEMORY_ONLY else StorageLevel.MEMORY_AND_DISK_SER_2)}
val unionDStream = ssc.union(kafkaDStreams) unionDStream
? ? ? ? ? ? ? ? 1 2 3 4 5 6 7 8 * 动态控制消费速率以及相关论文 * 前面我们提到,SS的消费速度可以设置上限,其实SS也可以根据之前的周期处理情况来自动调整下一个周期处理的数据量。你可以通过
将 spark.streaming.backpressure.enabled 设置为true 打开该功能。算法的论文可参考: Socc 2014: Adaptive Stream Processing using Dynamic Batch Sizing ,还是有用的,我现在也都开启着。
另外值得提及的是,Spark里除了这个 Dynamic,还有一个就是Dynamic Allocation,也就是Executor数量会根据资源使用情况,自动伸缩。我其实蛮喜欢Spark这个特色的。具体的可以查找下相关设计文档。
Direct Approach (No Receivers)
个人认为,DirectApproach 更符合Spark的思维。我们知道,RDD的概念是一个不变的,分区的数据集合。我们将kafka数据源包裹成了一个KafkaRDD,RDD里的partition 对应的数据源为kafka的partition。唯一的区别是数据在Kafka里而不是事先被放到Spark内存里。其实包括FileInputStream里也是把每个文件映射成一个RDD,比较好奇,为什么一开始会有Receiver-based Approach,额外添加了Receiver这么一个概念。 * DirectKafkaInputDStream *
Spark Streaming通过Direct Approach接收数据的入口自然是
KafkaUtils.createDirectStream 了。在调用该方法时,会先创建
val kc = new KafkaCluster(kafkaParams)