Spark Streaming 原理

Spark 百味書 2017-05-07

Spark Streaming工作原理

每個Spark Streaming程序提交後,會在Driver把程序計算邏輯描述為一個 RDD DAG 的“模板”,在後面 Job 動態生成的時候,針對每個 batch,Spark Streaming 都將根據這個“模板”生成一個 RDD DAG 的實例。

把生成的DAG實例分配給任務調度器,生成相對應的Job sets。Job sets裡包含了部分的計算邏輯和數據的元信息,分發到相應的Executor。

真正數據的獲取的填充是發生在Executor中的,而Executor存儲著處理過或者未處理的數據。

Executor資源控制

通過設定每個Executor相同的資源(CPU,內存,磁盤,網絡帶寬)作為一個單位。Streaming程序通過添加或減少Executor來提高或者降低Streaming程序的性能。這樣一個Spark Streaming程序就很容易的通過標準Executor來量化性能。

Streaming程序在提交的時候可以通過如下命令定義Executor:

./bin/spark-submit \

--class org.apache.spark.examples.SparkPi \

--master yarn \

--deploy-mode cluster \ # can be client for client mode

--executor-memory 20G \

--num-executors 50 \

/path/to/examples.jar \

1000

Executor數據獲取控制

Direct Approach (NoReceivers) 的接收方式也是可以限制接受數據的量的。你可以通過設置 "spark.streaming.kafka.maxRatePerPartition" 來完成對應的配置。需要注意的是,這裡是對每個Partition進行限速。所以你需要事先知道Kafka有多少個分區,才好評估系統的實際吞吐量,從而設置該值。

相應的,"spark.streaming.backpressure.enabled" 參數在Direct Approach 中也是繼續有效的。 根據JobScheduler反饋作業的執行信息來動態調整Receiver數據接收率。通過屬性“spark.streaming.backpressure.enabled”來控制是否啟用backpressure機制,默認值false,即不啟用。

Executor優雅退出

因此實現的Streaming程序的彈性擴展需要保證Executor上元數據完全處理完畢,處理過的數據的kafka偏移能夠全部提交成功,保證程序重啟時不會重複消費消息。也就是說Streaming程序需要優雅退出。

所以,我們應該使用一種避免數據丟失的方式,官方建議調用 StreamingContext#stop(stopSparkContext: Boolean, stopGracefully: Boolean),將 stopGracefully 設置為 true,這樣可以保證在 driver 結束前處理完所有已經接受的數據。

一個 streaming application 往往是長時間運行的,所以存在兩個問題:

  1. 應該在什麼時候去調用 StreamingContext#stop

  2. 當 streaming application 已經在運行了該怎麼去調用 StreamingContext#stop

ssc.start()

var break = true;

while (break){

if (System.currentTimeMillis() > 1487571750000L){

ssc.stop()

break = false;

}

}

Spark Streaming資源動態分配

Spark 屬於粗粒度資源分配,也就是在默認情況下是先分配好資源然後再進行計算,粗粒度有個好處,因為資源是提前給你分配好,當有計算任務的時候直接使用就可以了。

粗粒度不好的方面就是從Spark Streaming角度講有高峰值、低峰值,在高與低峰值時候需要的資源是不一樣的,如果資源分配按照高峰值考慮的話,在低峰值就是對資源的浪費。

動態資源分配源碼 :

Spark Streaming 原理

在SparkConf中進行Set其配置:

Spark Streaming 原理

Spark Streaming 原理

Spark Streaming 原理

以定時器的頻率來不斷的掃描Executor,正在運行的Scheduler是要運行在不同的Executor中,需要動態的增加Executor或者減少Executor ,例如判斷一個60秒為時間間隔的Executor一個任務都沒有運行,就會把Executor刪除掉。怎麼會減少Executor,是因為當前應用程序中運行的Executor在Driver中會有數據結構對其保持引用,每次任務調度的時候都會循環遍歷Executor的列表,然後查詢列表的可用資源,根據這個類中的時鐘會不斷循環查看是否滿足添加或者刪除Executor的條件,如果滿足添加或者刪除的條件就觸發Executor進行添加與刪除。

Spark Streaming 原理

從Spark Streaming的角度考慮,Spark Streaming要處理的動態資源調整就是Executor的資源動態調整,其最大的挑戰是什麼?

Spark Streaming是按照BachDuration的方式運行的,可能這個BachDuration需要很多資源,下一個又不用那麼多資源,當前BachDuration的資源還沒有等調整完成其運行已經過期了。

if (numExecutor != 0 && streamingDynamicAllocationEnabled) {

throw new IllegalArgumentException(

"Dynamic Allocation for streaming cannot be enabled while spark.executor.instances is set.")

}

if (Utils.isDynamicAllocationEnabled(conf) && streamingDynamicAllocationEnabled) {

throw new IllegalArgumentException(

"""

|Dynamic Allocation cannot be enabled for both streaming and core at the same time.

|Please disable core Dynamic Allocation by setting spark.dynamicAllocation.enabled to

|false to use Dynamic Allocation in streaming.

""".stripMargin)

}

要注意當確定了Streaming程序的Executor數量和Spark Core也使用了動態分配的時候是無法使用動態分配的

相關推薦

推薦中...