Spark Streaming 原理
Spark Streaming工作原理
每個Spark Streaming程序提交後,會在Driver把程序計算邏輯描述為一個 RDD DAG 的“模板”,在後面 Job 動態生成的時候,針對每個 batch,Spark Streaming 都將根據這個“模板”生成一個 RDD DAG 的實例。
把生成的DAG實例分配給任務調度器,生成相對應的Job sets。Job sets裡包含了部分的計算邏輯和數據的元信息,分發到相應的Executor。
真正數據的獲取的填充是發生在Executor中的,而Executor存儲著處理過或者未處理的數據。
Executor資源控制
通過設定每個Executor相同的資源(CPU,內存,磁盤,網絡帶寬)作為一個單位。Streaming程序通過添加或減少Executor來提高或者降低Streaming程序的性能。這樣一個Spark Streaming程序就很容易的通過標準Executor來量化性能。
Streaming程序在提交的時候可以通過如下命令定義Executor:
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \ # can be client for client mode
--executor-memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000
Executor數據獲取控制
Direct Approach (NoReceivers) 的接收方式也是可以限制接受數據的量的。你可以通過設置 "spark.streaming.kafka.maxRatePerPartition" 來完成對應的配置。需要注意的是,這裡是對每個Partition進行限速。所以你需要事先知道Kafka有多少個分區,才好評估系統的實際吞吐量,從而設置該值。
相應的,"spark.streaming.backpressure.enabled" 參數在Direct Approach 中也是繼續有效的。 根據JobScheduler反饋作業的執行信息來動態調整Receiver數據接收率。通過屬性“spark.streaming.backpressure.enabled”來控制是否啟用backpressure機制,默認值false,即不啟用。
Executor優雅退出
因此實現的Streaming程序的彈性擴展需要保證Executor上元數據完全處理完畢,處理過的數據的kafka偏移能夠全部提交成功,保證程序重啟時不會重複消費消息。也就是說Streaming程序需要優雅退出。
所以,我們應該使用一種避免數據丟失的方式,官方建議調用 StreamingContext#stop(stopSparkContext: Boolean, stopGracefully: Boolean)
,將 stopGracefully 設置為 true,這樣可以保證在 driver 結束前處理完所有已經接受的數據。
一個 streaming application 往往是長時間運行的,所以存在兩個問題:
應該在什麼時候去調用
StreamingContext#stop
當 streaming application 已經在運行了該怎麼去調用
StreamingContext#stop
ssc.start()
var break = true;
while (break){
if (System.currentTimeMillis() > 1487571750000L){
ssc.stop()
break = false;
}
}
Spark Streaming資源動態分配
Spark 屬於粗粒度資源分配,也就是在默認情況下是先分配好資源然後再進行計算,粗粒度有個好處,因為資源是提前給你分配好,當有計算任務的時候直接使用就可以了。
粗粒度不好的方面就是從Spark Streaming角度講有高峰值、低峰值,在高與低峰值時候需要的資源是不一樣的,如果資源分配按照高峰值考慮的話,在低峰值就是對資源的浪費。
動態資源分配源碼 :
在SparkConf中進行Set其配置:
以定時器的頻率來不斷的掃描Executor,正在運行的Scheduler是要運行在不同的Executor中,需要動態的增加Executor或者減少Executor ,例如判斷一個60秒為時間間隔的Executor一個任務都沒有運行,就會把Executor刪除掉。怎麼會減少Executor,是因為當前應用程序中運行的Executor在Driver中會有數據結構對其保持引用,每次任務調度的時候都會循環遍歷Executor的列表,然後查詢列表的可用資源,根據這個類中的時鐘會不斷循環查看是否滿足添加或者刪除Executor的條件,如果滿足添加或者刪除的條件就觸發Executor進行添加與刪除。
從Spark Streaming的角度考慮,Spark Streaming要處理的動態資源調整就是Executor的資源動態調整,其最大的挑戰是什麼?
Spark Streaming是按照BachDuration的方式運行的,可能這個BachDuration需要很多資源,下一個又不用那麼多資源,當前BachDuration的資源還沒有等調整完成其運行已經過期了。
if (numExecutor != 0 && streamingDynamicAllocationEnabled) {
throw new IllegalArgumentException(
"Dynamic Allocation for streaming cannot be enabled while spark.executor.instances is set.")
}
if (Utils.isDynamicAllocationEnabled(conf) && streamingDynamicAllocationEnabled) {
throw new IllegalArgumentException(
"""
|Dynamic Allocation cannot be enabled for both streaming and core at the same time.
|Please disable core Dynamic Allocation by setting spark.dynamicAllocation.enabled to
|false to use Dynamic Allocation in streaming.
""".stripMargin)
}
要注意當確定了Streaming程序的Executor數量和Spark Core也使用了動態分配的時候是無法使用動態分配的