中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

Spark Streaming 調(diào)優(yōu)實踐

2019-08-21    來源:raincent

容器云強勢上線!快速搭建集群,上萬Linux鏡像隨意使用

在使用 Spark 和 SparkStreaming 時,當我們將應(yīng)用部署在集群上時,可能會碰到運行慢、占用過多資源、不穩(wěn)定等問題,這時需要做一些優(yōu)化才能達到最好的性能。有時候一個簡單的優(yōu)化可以起到化腐朽為神奇的作用,使得程序能夠更加有效率,也更加節(jié)省資源。本文我們就來介紹一些能夠提高應(yīng)用性能的參數(shù)和配置。

另外需要指出的是,優(yōu)化本身是一個具體性很強的事情,不同的應(yīng)用及落地場景會有不同的優(yōu)化方式,并沒有一個統(tǒng)一的優(yōu)化標準。本文我們簡單聊聊一些在項目中踩過的“坑”,列舉以下常見的優(yōu)化方式。

數(shù)據(jù)序列化

在分布式應(yīng)用中,序列化(serialization)對性能的影響是顯著的。如果使用一種對象序列化慢、占用字節(jié)多的序列化格式,就會嚴重降低計算效率。通常在 Spark 中,主要有如下 3 個方面涉及序列化:

①在算子函數(shù)中使用到外部變量時,該變量會被序列化后進行網(wǎng)絡(luò)傳輸。

②將自定義的類型作為 RDD 的泛型類型時,所有自定義類型對象都會進行序列化。因此這種情況下,也要求自定義的類必須實現(xiàn) Serializable 接口。

③使用可序列化的持久化策略時(比如 MEMORY_ONLY_SER),Spark 會將 RDD 中的每個 partition 都序列化成一個大的字節(jié)數(shù)組。

而 Spark 綜合考量易用性和性能,提供了下面兩種序列化庫。

①Java 序列化:默認情況下,Spark 使用 Java 的對象輸出流框架(ObjectOutputStreamframework)來進行對象的序列化,并且可用在任意實現(xiàn) Java.io.Serializable 接口的自定義類上。我們可以通過擴展 Java.io.Externalizable 來更加精細地控制序列化行為。Java 序列化方式非常靈活,但是通常序列化速度非常慢而且對于很多類會產(chǎn)生非常巨大的序列化結(jié)果。

②Kryo 序列化:Spark 在 2.0.0 以上的版本可以使用 Kryo 庫來非?焖俚剡M行對象序列化,Kryo 要比 Java 序列化更快、更緊湊(10 倍),但是其不支持所有的 Serializable 類型,并且在使用自定義類之前必須先注冊。

我們可以在初始化 SparkConf 時,調(diào)用 conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") 來使用 Kryo。一旦進行了這個配置,Kryo 序列化不僅僅會用在 Shuffling 操作時 worker 節(jié)點間的數(shù)據(jù)傳遞,也會用在 RDDs 序列化到硬盤的過程。

Spark 官方解釋沒有將 Kryo 作為默認序列化方式的唯一原因是,Kryo 必須用戶自己注冊(注意如果我們不注冊自定義類,Kryo 也是可以正常運行的,但是它必須存儲每個對象的完整類名,這是非常浪費的),但是其推薦在網(wǎng)絡(luò)頻繁傳輸?shù)膽?yīng)用中使用 Kryo。

另外值得注意的是,在 Spark2.0.0 之后,Spark 已經(jīng)默認將 Kryo 序列化作為簡單類型(基本類型、基本類型的數(shù)組及 string 類型)RDD 進行 Shuffling 操作時傳輸數(shù)據(jù)的對象序列化方式。

Spark 已經(jīng)自動包含注冊了絕大部分 Scala 的核心類,如果需要向 Kryo 注冊自己的類別,可以使用 registerKryoClasses 方法。使用 Kryo 的代碼框架如下:

// Spark 配置項
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 配置序列化方式
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) // 注冊需要序列化的類
val sc = new SparkContext(conf)

如果我們的對象非常大,可能需要增加 Spark.kryoserializer.buffer 的配置。

同樣在 SparkStreaming 中,通過優(yōu)化序列化格式可以縮減數(shù)據(jù)序列化的開銷,而在 Streaming 中還會涉及以下兩類數(shù)據(jù)的序列化。

輸入數(shù)據(jù):SparkStreaming 中不同于 RDD 默認是以非序列化的形式存于內(nèi)存當中,Streaming 中由接收器(Receiver)接收而來的數(shù)據(jù),默認是以序列化重復(fù)形式(StorageLevel.MEMORY_AND_DISK_SER_2)存放于 Executor 的內(nèi)存當中。而采用這種方式的目的,一方面是由于將輸入數(shù)據(jù)序列化為字節(jié)流可以減少垃圾回收(GC)的開銷,另一方面對數(shù)據(jù)的重復(fù)可以對 Executor 節(jié)點的失敗有更好的容錯性。同時需要注意的是,輸入數(shù)據(jù)流一開始是保存在內(nèi)存當中,當內(nèi)存不足以存放流式計算依賴的輸入數(shù)據(jù)時,會自動存放于硬盤當中。而在 Streaming 中這部分序列化是一個很大的開銷,接收器必須先反序列化(deserialize)接收到的數(shù)據(jù),然后再序列化(serialize)為 Spark 本身的序列化格式。

由 Streaming 操作產(chǎn)生 RDD 的持久化:由流式計算產(chǎn)生的 RDDs 有可能持久化在內(nèi)存當中,例如由于基于窗口操作的數(shù)據(jù)會被反復(fù)使用,所以會持久化在內(nèi)存當中。值得注意的是,不同于 Spark 核心默認使用非序列化的持久化方式(StorageLevel.MEMORY_ONLY),流式計算為了減少垃圾回收(GC)的開銷,默認使用了序列化的持久化方式(StorageLevel.MEMORY_ONLY_SER)。

不管在 Spark 還是在 SparkStreaming 中,使用 Kryo 序列化方式,都可以減少 CPU 和內(nèi)存的開銷。而對于流式計算,如果數(shù)據(jù)量不是很大,并且不會造成過大的垃圾回收(GC)開銷,我們可以考慮利用非序列化對象進行持久化。

例如,我們使用很小的批處理時間間隔,并且沒有基于窗口的操作,可以通過顯示設(shè)置相應(yīng)的存儲級別來關(guān)閉持久化數(shù)據(jù)時的序列化,這樣可以減少序列化引起的 CPU 開銷,但是潛在的增加了 GC 的開銷。

廣播大變量

我們知道,不論 Spark 還是 SparkStreaming 的應(yīng)用,在集群節(jié)點間進行數(shù)據(jù)傳輸時,都會有序列化和反序列化的開銷,而如果我們的應(yīng)用有非常大的對象時,這部分開銷是巨大的。比如應(yīng)用中的任何子任務(wù)需要使用 Driver 節(jié)點的一個大型配置查詢表,這時就可以考慮將該表通過共享變量的方式,廣播到每一個子節(jié)點,從而大大減少在傳輸和序列化上的開銷。

另外,Spark 在 Master 節(jié)點會打印每個任務(wù)的序列化對象大小,我們可以通過觀察任務(wù)的大小,考慮是否需要廣播某些大變量。通常一個任務(wù)的大小超過 20KB,是值得去優(yōu)化的。

當我們將大型的配置查詢表廣播出去時,每個節(jié)點可以讀取配置項進行任務(wù)計算,那么假設(shè)配置發(fā)生了動態(tài)改變時,如何通知各個子節(jié)點配置表更改了呢?(尤其是對于流式計算的任務(wù),重啟服務(wù)代價還是蠻大的。)

我們知道廣播變量是只讀的,也就是說廣播出去的變量沒法再修改,那么應(yīng)該怎么解決這個問題呢?我們可以利用 Spark 中的 unpersist() 函數(shù),Spark 通常會按照 LRU(leastRecentlyUsed)即最近最久未使用原則對老數(shù)據(jù)進行刪除,我們并不需要操作具體的數(shù)據(jù),但如果是手動刪除,可以使用 unpersist() 函數(shù)。

所以這里更新廣播變量的方式是,利用 unpersist() 函數(shù)先將已經(jīng)發(fā)布的廣播變量刪除,然后修改數(shù)據(jù)后重新進行廣播,我們通過一個廣播包裝類來實現(xiàn)這個功能,代碼如下:

import java.io.{ ObjectInputStream, ObjectOutputStream }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag
// 通過包裝器在 DStream 的 foreachRDD 中更新廣播變量
// 避免產(chǎn)生序列化問題
case class BroadcastWrapper[T: ClassTag](
@transient private val ssc: StreamingContext,
@transient private val _v: T) {
@transient private var v = ssc.sparkContext.broadcast(_v)
def update(newValue: T, blocking: Boolean = false): Unit = {
// 刪除 RDD 是否需要鎖定
v.unpersist(blocking)
v = ssc.sparkContext.broadcast(newValue)
}
def value: T = v.value
private def writeObject(out: ObjectOutputStream): Unit = {
out.writeObject(v)
}
private def readObject(in: ObjectInputStream): Unit = {
v = in.readObject().asInstanceOf[Broadcast[T]]
}
}

利用 wrapper 更新廣播變量,可以動態(tài)地更新大型的配置項變量,而不用重新啟動計算服務(wù),大致的處理邏輯如下:

// 定義
val yourBroadcast = BroadcastWrapper yourType

yourStream.transform(rdd => {
// 定期更新廣播變量
if (System.currentTimeMillis - someTime > Conf.updateFreq) {
yourBroadcast.update(newValue, true)
}
// do something else
})

數(shù)據(jù)處理和接收時的并行度

作為分布式系統(tǒng),增加接收和處理數(shù)據(jù)的并行度是提高整個系統(tǒng)性能的關(guān)鍵,也能夠充分發(fā)揮集群機器資源。

關(guān)于 partition 和 parallelism。partition 指的就是數(shù)據(jù)分片的數(shù)量,每一次 Task 只能處理一個 partition 的數(shù)據(jù),這個值太小了會導致每片數(shù)據(jù)量太大,導致內(nèi)存壓力,或者諸多 Executor 的計算能力無法充分利用;但是如果 partition 太大了則會導致分片太多,執(zhí)行效率降低。

在執(zhí)行 Action 類型操作的時候(比如各種 reduce 操作),partition 的數(shù)量會選擇 parentRDD 中最大的那一個。而 parallelism 則指的是在 RDD 進行 reduce 類操作的時候,默認返回數(shù)據(jù)的 paritition 數(shù)量(而在進行 map 類操作的時候,partition 數(shù)量通常取自 parentRDD 中較大的一個,而且也不會涉及 Shuffle,因此這個 parallelism 的參數(shù)沒有影響)。

由上述可得,partition 和 parallelism 這兩個概念密切相關(guān),都是涉及數(shù)據(jù)分片,作用方式其實是統(tǒng)一的。通過 Spark.default.parallelism 可以設(shè)置默認的分片數(shù)量,而很多 RDD 的操作都可以指定一個 partition 參數(shù)來顯式控制具體的分片數(shù)量,如 reduceByKey 和 reduceByKeyAndWindow。

SparkStreaming 接收 Kafka 數(shù)據(jù)的方式,這個過程有一個數(shù)據(jù)反序列化并存儲到 Spark 的開銷,如果數(shù)據(jù)接收成為了整個系統(tǒng)的瓶頸,那么可以考慮增加數(shù)據(jù)接收的并行度。每個輸入 DStream 會創(chuàng)建一個單一的接收器(receiver 在 worker 節(jié)點運行)用來接收一個單一的數(shù)據(jù)流。而對于接收多重數(shù)據(jù)的情況,可以創(chuàng)建多個輸入 DStream 用來接收源數(shù)據(jù)流的不同分支(partitions)。

如果我們利用 Receiver 的形式接收 Kafka,一個單一的 Kafka 輸入 DStream 接收了兩個不同 topic 的數(shù)據(jù)流,我們?yōu)榱颂岣卟⑿卸瓤梢詣?chuàng)建兩個輸入流,分別接收其中一個 topic 上的數(shù)據(jù)。這樣就可以創(chuàng)建兩個接收器來并行地接收數(shù)據(jù),從而提高整體的吞吐量。而之后對于多個 DStreams,可以通過 union 操作并為一個 DStream,之后便可以在這個統(tǒng)一的輸入 DStream 上進行操作,代碼示例如下:

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...)}
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

如果采用 Direct 連接方式,前面講過 Spark 中的 partition 和 Kafka 中的 partition 是一一對應(yīng)的,但一般默認設(shè)置為 Kafka 中 partition 的數(shù)量,這樣來達到足夠并行度以接收 Kafka 數(shù)據(jù)。

設(shè)置合理的批處理間隔

對于一個 SparkStreaming 應(yīng)用,只有系統(tǒng)處理數(shù)據(jù)的速度能夠趕上數(shù)據(jù)接收的速度,整個系統(tǒng)才能保持穩(wěn)定,否則就會造成數(shù)據(jù)積壓。換句話說,即每個 batch 的數(shù)據(jù)一旦生成就需要被盡快處理完畢。這一點我們可以通過 Spark 監(jiān)控界面進行查看(在 2.3.4 節(jié)我們介紹過),比較批處理時間必須小于批處理間隔。

通過設(shè)置合理的批處理大小(batchsize),使得每批數(shù)據(jù)能夠在接收后被盡快地處理完成(即數(shù)據(jù)處理的速度趕上數(shù)據(jù)生成的速度)。

如何選取合適的批處理時間呢?一個好的方法是:先保守地設(shè)置一個較大的批處理間隔(如 5~10s),以及一個很低的數(shù)據(jù)速率,來觀測系統(tǒng)是否能夠趕上數(shù)據(jù)傳輸速率。我們可以通過查看每個處理好的 batch 的端到端延遲來觀察,也可以看全局延遲來觀察(可以在 Sparklog4j 的日志里或者使用 StreamingListener 接口,也可以直接在 UI 界面查看)。

如果延遲保持在一個相對穩(wěn)定的狀態(tài),則整個系統(tǒng)是穩(wěn)定的,否則延遲不斷上升,那說明整個系統(tǒng)是不穩(wěn)定的。在實際場景中,也可以直接觀察系統(tǒng)正在運行的 Spark 監(jiān)控界面來判斷系統(tǒng)的穩(wěn)定性。

內(nèi)存優(yōu)化

內(nèi)存優(yōu)化是在所有應(yīng)用落地中必須經(jīng)歷的話題,雖然 Spark 在內(nèi)存方面已經(jīng)為開發(fā)者做了很多優(yōu)化和默認設(shè)置,但是我們還是需要針對具體的情況進行調(diào)試。

在優(yōu)化內(nèi)存的過程中需要從 3 個方面考慮這個問題:對象本身需要的內(nèi)存;訪問這些對象的內(nèi)存開銷;垃圾回收(GCgarbagecollection)導致的開銷。

通常來說,對于 Java 對象而言,有很快的訪問速度,但是很容易消耗原始數(shù)據(jù) 2~5 倍以上的內(nèi)存空間,可以歸結(jié)為以下幾點原因:

① 每個獨立的 Java 對象,都會有一個“對象頭”,大約 16 個字節(jié)用來保存一些基本信息,如指向類的指針,對于一個只包含很少數(shù)據(jù)量在內(nèi)的對象(如一個 Int 類型數(shù)據(jù)),這個開銷是相對巨大的。

② Java 的 String 對象會在原始數(shù)據(jù)的基礎(chǔ)上額外開銷 40 個字節(jié),因為除了字符數(shù)組(Charsarray)本身之外,還需要保存如字符串長度等額外信息,而且由于 String 內(nèi)部存儲字符時是按照 UTF-16 格式編碼的,所以一個 10 字符的字符串開銷很容易超過 60 個字符。

③ 對于集合類(collectionclasses),如 HashMap、LinkedList,通常使用鏈表的形式將數(shù)據(jù)結(jié)構(gòu)鏈在一起,那么對于每一個節(jié)點(entry,如 Map.Entry)都會有一個包裝器(wrapper),而這個包裝器對象不僅包含對象頭,還會保存指向下一個節(jié)點的指針(每個 8 字節(jié))。

④熟悉 Java 的開發(fā)者應(yīng)該知道,Java 數(shù)據(jù)類型分為基本類型和包裝類型,對于 int、long 等基本類型是直接在棧中分配空間,如果我們想將這些類型用在集合類中(如 Map),需要使用對基本數(shù)據(jù)類型打包(當然這是 Java 的一個自動過程),而打包后的基本數(shù)據(jù)類型就會產(chǎn)生額外的開銷。

針對以上內(nèi)存優(yōu)化的基本問題,接下來首先介紹 Spark 中如何管理內(nèi)存,之后介紹一些能夠在具體應(yīng)用中更加有效地使用內(nèi)存的具體策略,例如,如何確定合適的內(nèi)存級別,如何改變數(shù)據(jù)結(jié)構(gòu)或?qū)?shù)據(jù)存儲為序列化格式來節(jié)省內(nèi)存等,也會從 Spark 的緩存及 Java 的垃圾回收方面進行分析,另外,也會對 SparkStreaming 進行分析。

1. 內(nèi)存管理

Spark 對于內(nèi)存的使用主要有兩類用途:執(zhí)行(execution)和存儲(storage)。執(zhí)行類內(nèi)存主要被用于 Shuffle 類操作、join 操作及排序(sort)和聚合(aggregation)類操作,而存儲類內(nèi)存主要用于緩存數(shù)據(jù)(caching)和集群間內(nèi)部數(shù)據(jù)的傳送。

在 Spark 內(nèi)部執(zhí)行和存儲分享同一片內(nèi)存空間(M),當沒有執(zhí)行類內(nèi)存被使用時,存儲類內(nèi)存可以使用全部的內(nèi)存空間,反之亦然。執(zhí)行類內(nèi)存可以剝奪存儲類內(nèi)存的空間,但是有一個前提是,存儲類內(nèi)存所占空間不得低于某一個閾值 R,也就是說 R 指定了 M 中的一塊子空間塊是永遠不會被剝奪的。而另一方面由于實現(xiàn)上的復(fù)雜性,存儲類內(nèi)存是不可以剝奪執(zhí)行類內(nèi)存的。

Spark 的這種設(shè)計方式確保了系統(tǒng)一些很好的特性:首先,如果應(yīng)用不需要緩存數(shù)據(jù),那么所有的空間都可以用作執(zhí)行類內(nèi)存,可以一定程度上避免不必要的內(nèi)存不夠用時溢出到硬盤的情況;其次,如果應(yīng)用需要使用緩存數(shù)據(jù),會有最小的內(nèi)存空間 R 能夠保證這部分數(shù)據(jù)塊免于被剝奪;最后,這種方式對于使用者而言是完全黑盒的,使用者不需要了解內(nèi)部如何根據(jù)不同的任務(wù)負載來進行內(nèi)存劃分。

Spark 提供了兩個相關(guān)的配置,但是大多數(shù)情況下直接使用默認值就能滿足大部分負載情況:

SparkMemory.Fraction 表示 M 的大小占整個 JVM(JavaVirtueMachine)堆空間的比例(默認是 0.6),剩余的空間(40%)被用來保存用戶的數(shù)據(jù)結(jié)構(gòu)及 Spark 內(nèi)部的元數(shù)據(jù)(metadata),另一方面預(yù)防某些異常數(shù)據(jù)記錄造成的 OOM(OutofMemory)錯誤。

Spark.Memory.StorageFraction 表示 R 的大小占整個 M 的比例(默認是 0.5),R 是存儲類內(nèi)存在 M 中占用的空間,其中緩存的數(shù)據(jù)塊不會被執(zhí)行類內(nèi)存剝奪。

2. 優(yōu)化策略

當我們需要初步判斷內(nèi)存的占用情況時,可以創(chuàng)建一個 RDD,然后將其緩存(cache)起來,然后觀察網(wǎng)頁監(jiān)控頁面的存儲頁部分,就可以看出 RDD 占用了多少內(nèi)存。而對于特殊的對象,我們可以調(diào)用 SizeEstimator 的 estimate() 方法來評估內(nèi)存消耗,這對于實驗不同數(shù)據(jù)層的內(nèi)存消耗,以及判斷廣播變量在每個 Executor 堆上所占用的內(nèi)存是非常有效的。

當我們了解了內(nèi)存的消耗情況后,發(fā)現(xiàn)占用內(nèi)存過大,可以著手做一些優(yōu)化,一方面可以在數(shù)據(jù)結(jié)構(gòu)方面進行優(yōu)化。首先需要注意的是,我們要避免本章開頭提到的 Java 本身數(shù)據(jù)結(jié)構(gòu)的頭部開銷,比如基于指針的數(shù)據(jù)結(jié)構(gòu)或者包裝器類型,有以下方式可以進行優(yōu)化:

在設(shè)計數(shù)據(jù)結(jié)構(gòu)時,優(yōu)先使用基本數(shù)據(jù)類型及對象數(shù)組等,避免使用 Java 或者 Scala 標準庫當中的集合類(如 HashMap),在 fastutil 庫中,為基本數(shù)據(jù)類型提供了方便的集合類接口,這些接口也兼容 Java 標準庫。

盡可能避免在數(shù)據(jù)結(jié)構(gòu)中嵌套大量的小對象和指針。

考慮使用數(shù)值類 ID 或者枚舉對象來代替字符串類型作為主鍵(Key)。

如果我們的運行時內(nèi)存小于 32GB,可以加上 JVM 配置 -XX:+UseCompressedOops 將指針的占用空間由 8 個字節(jié)壓縮到 4 個字節(jié),我們也可以在 Spark-env.sh 中進行配置。

假設(shè)我們通過以上策略還是發(fā)現(xiàn)對象占用了過大的內(nèi)存,可以用一個非常簡單的方式來降低內(nèi)存使用,就是將對象以序列化的形式(serializedform)存儲,在 RDD 的持久化接口中使用序列化的存儲級別,如 MEMORY_ONLY_SER,Spark 便會將每個 RDD 分區(qū)存儲為一個很大的字節(jié)數(shù)組。而這種方式會使得訪問數(shù)據(jù)的速度有所下降,因為每個對象訪問時都需要有一個反序列化的過程。在 7.1 節(jié)中我們已經(jīng)介紹過,優(yōu)先使用 Kryo 序列化方式,其占用大小遠低于 Java 本身的序列化方式。

3. 垃圾回收(GC)優(yōu)化

如果我們在應(yīng)用中進行了頻繁的 RDD 變動,那么 JVM 的垃圾回收會成為一個問題(也就是說,假設(shè)在程序中只創(chuàng)建了一個 RDD,后續(xù)所有操作都圍繞這個 RDD,那么垃圾回收就不存在問題)。當 Java 需要通過刪除舊對象來為新對象開辟空間時,它便會掃描我們曾創(chuàng)建的所有對象并找到不再使用的對象。

所以垃圾回收的開銷是和 Java 對象的個數(shù)成比例的,我們要盡可能地使用包含較少對象的數(shù)據(jù)結(jié)構(gòu)(如使用 Int 數(shù)組代替 LinkedList)來降低這部分開銷。另外前面提到的用序列化形式存儲也是一個很好的方法,序列化后每個對象在每個 RDD 分區(qū)下僅有一個對象(一個字節(jié)數(shù)組)。注意當 GC 開銷成為瓶頸時,首先要嘗試的便是序列化緩存(serializedcaching)。

在做 GC 優(yōu)化時,我們首先需要了解 GC 發(fā)生的頻率以及其所消耗的時間。這可以通過在 Java 選項中加入 -verbose:gc-XX:+PrintGCDetails-XX:+PrintGCTimeStamps 來實現(xiàn);之后當 Spark 任務(wù)運行后,便可以在 Worker 日志中看到 GC 發(fā)生時打印的信息。注意這些日志是打印在集群中的 Worker 節(jié)點上的(在工作目錄的 stdout 文件中),而非 Driver 程序。

為了進一步優(yōu)化 GC,首先簡單介紹下 Java 虛擬機內(nèi)部是如何進行內(nèi)存管理的。

①Java 對象是存儲在堆空間內(nèi)的,堆空間被分為兩部分,即年輕區(qū)域(Youngregion)和老年區(qū)域(Oldregion),其中年輕代(Younggeneration)會用來存儲短生命周期的對象,而老年代(Oldgeneration)會用來存儲較長生命周期的對象。

②年輕代的區(qū)域又被分為 3 個部分 [Eden,Survivor1,Survivor2]。

③一個簡單的 GC 流程大致是:當 Eden 區(qū)域滿了,一次小型 GC 過程會將 Eden 和 Survivor1 中還存活的對象復(fù)制到 Survivor2 區(qū)域上,Survivor 區(qū)域是可交換的(即來回復(fù)制),當一個對象存活周期已足夠長或者 Survivor2 區(qū)域已經(jīng)滿時,那么它們會被移動到老年代上,而當老年代的區(qū)域也滿了時,就會觸發(fā)一次完整的 GC 過程。

Java 的這種 GC 機制主要是基于程序中創(chuàng)建的大多數(shù)對象,都會在創(chuàng)建后被很快銷毀,只有極少數(shù)對象會存活下來,所以其分為年輕代和老年代兩部分,而這兩部分 GC 的方式也是不同的,其時間復(fù)雜度也是不同的,年輕代會更加快一些,感興趣的讀者可以進一步查閱相關(guān)資料。

基于以上原因,Spark 在 GC 方面優(yōu)化的主要目標是:只有長生命周期的 RDD 會被存儲在老年代上,而年輕代上有足夠的空間來存儲短生命周期的對象,從而盡可能避免任務(wù)執(zhí)行時創(chuàng)建的臨時對象觸發(fā)完整 GC 流程。我們可以通過以下步驟來一步步優(yōu)化:

①通過 GC 統(tǒng)計信息觀察是否存在過于頻繁的 GC 操作,如果在任務(wù)完成前,完整的 GC 操作被調(diào)用了多次,那么說明可執(zhí)行任務(wù)并沒有獲得足夠的內(nèi)存空間。

②如果觸發(fā)了過多的小型 GC,而完整的 GC 操作并沒有調(diào)用很多次,那么給 Eden 區(qū)域多分配一些內(nèi)存空間會有所幫助。我們可以根據(jù)每個任務(wù)所需內(nèi)存大小來預(yù)估 Eden 的大小,如果 Eden 設(shè)置大小為 E,可以利用配置項 -Xmn=4/3*E 來對年輕代的區(qū)域大小進行設(shè)置(其中 4/3 的比例是考慮到 survivor 區(qū)域所需空間)。

③如果我們觀察 GC 打印的統(tǒng)計信息,發(fā)現(xiàn)老年代接近存滿,那么就需要改變 spark.memory.fraction 來減少存儲類內(nèi)存(用于 caching)的占用,因為與其降低任務(wù)的執(zhí)行速度,不如減少對象的緩存大小。另一個可選方案是減少年輕代的大小,即通過 -Xmn 來進行配置,也可以通過 JVM 的 NewRatio 參數(shù)進行調(diào)整,大多數(shù) JVM 的該參數(shù)的默認值是 2,意思是老年代占整個堆內(nèi)存的 2/3,這個比例需要大于 Spark.Memory.Fraction。

④通過加入 -XX:+UserG1GC 來使用 G1GC 垃圾回收器,這可以一定程度提高 GC 的性能。另外注意對于 executor 堆內(nèi)存非常大的情況,一定通過 -XX:G1HeapRegionSize 來增加 G1 區(qū)域的大小。

針對以上步驟我們舉一個例子,如果我們的任務(wù)是從 HDFS 當中讀取數(shù)據(jù),任務(wù)需要的內(nèi)存空間可以通過從 HDFS 當中讀取的數(shù)據(jù)塊大小來進行預(yù)估,一般解壓后的數(shù)據(jù)塊大小會是原數(shù)據(jù)塊的 2~3 倍,所以如果我們希望 3、4 個任務(wù)同時運行在工作空間中,假設(shè)每個 HDFS 塊大小是 128MB,那么需要將 Eden 大小設(shè)置為 4×3×128MB。改動之后,我們可以監(jiān)控 GC 的頻率和時間消耗,看看有沒有達到優(yōu)化的效果。

對于優(yōu)化 GC,主要還是從降低全局 GC 的頻率出發(fā),executor 中對于 GC 優(yōu)化的配置可以通過 spark.executor.extraJavaOptions 來配置。

4.SparkStreaming 內(nèi)存優(yōu)化

前面介紹了 Spark 中的優(yōu)化策略和關(guān)于 GC 方面的調(diào)優(yōu),對于 SparkStreaming 的應(yīng)用程序,這些策略也都是適用的,除此之外還會有一些其他方面的優(yōu)化點。

對于 SparkStreaming 應(yīng)用所需要的集群內(nèi)存,很大程度上取決于要使用哪種類型的 transformation 操作。比如,假設(shè)我們想使用 10 分鐘數(shù)據(jù)的窗口操作,那么我們的集群必須有足夠的空間能夠保存 10 分鐘的全部數(shù)據(jù);亦或,我們在大量的鍵值上使用了 updateStateByKey 操作,那么所需要的內(nèi)存空間會較大。而如果我們僅僅使用簡單的 Map、Filter、Store 操作,那么所需空間會較小。

默認情況下,接收器接收來的數(shù)據(jù)會以 StorageLevel.MEMORY_AND_DISK_SER_2 的格式存儲,那么如果內(nèi)存不足時,數(shù)據(jù)就會序列化到硬盤上,這樣會損失 SparkStreaming 應(yīng)用的性能。所以通常建議為 SparkStreaming 應(yīng)用分配充足的內(nèi)存,可以在小規(guī)模數(shù)據(jù)集上進行測試和判斷。

另一方面與 Spark 程序有顯著區(qū)別的是,SparkStreaming 程序?qū)崟r性要求會較高,所以我們需要盡可能降低 JVM 垃圾回收所導致的延遲。

基于此,我們可以通過以下幾個參數(shù)對內(nèi)存使用和 GC 開銷進行優(yōu)化調(diào)整。

DStream 的持久化級別:在前文中講過,輸入數(shù)據(jù)默認是持久化為字節(jié)流的,因為相較于反序列化的開銷,其更會降低內(nèi)存的使用并且減少 GC 的開銷。所以優(yōu)先使用 Kryo 序列化方式,可以大大降低序列化后的尺寸和內(nèi)存開銷。另外,如果需要更進一步減少內(nèi)存開銷,可以通過配置 spark.rdd.compress 進行更進一步的壓縮(當然對于目前的集群機器,大多數(shù)內(nèi)存都足夠了)。

及時清理老數(shù)據(jù):默認情況下所有的輸入數(shù)據(jù)和由 DStream 的 Transormation 操作產(chǎn)生的持久 RDD 會被自動清理,即 SparkStreaming 會決定何時對數(shù)據(jù)進行清理。例如,假設(shè)我們使用 10 分鐘的窗口操作,SparkStreaming 會保存之前 10 分鐘的所有數(shù)據(jù),并及時清理過時的老數(shù)據(jù)。數(shù)據(jù)保存的時間可以通過 stremingContext.remember 進行設(shè)置。

CMS 垃圾回收器:不同于之前我們在 Spark 中的建議,由于需要減少 GC 間的停頓,所以這里建議使用并發(fā)標記清除類的 GC 方式。即使并發(fā) GC 會降低全局系統(tǒng)的生產(chǎn)吞吐量,但是使用這種 GC 可以使得每個 Batch 的處理時間更加一致(不會因為某個 Batch 處理時發(fā)生了 GC,而導致處理時間劇增)。我們需要確保在 Driver 節(jié)點(在 spark-submit 中使用—driver-java-options)和 Executor 節(jié)點(在 Spark 配置中使用 spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC)都設(shè)置了 CMSGC 方式。

其他減少 GC 開銷的方式有:可以通過 OFF_HEAP 存儲級別的 RDD 持久化方式,以及可以在 Executor 上使用更小的堆內(nèi)存,從而降低每個 JVM 堆垃圾回收的壓力。

實例——項目實戰(zhàn)中的調(diào)優(yōu)示例

在某輿情監(jiān)控系統(tǒng)中,對于每天爬取的千萬級游戲玩家評論信息都要實時地進行詞頻統(tǒng)計,對于爬取到的游戲玩家評論數(shù)據(jù),我們會生產(chǎn)輸入到 Kafka 中,而另一端的消費者,我們采用了 SparkStreaming 來進行流式處理,首先利用 Direct 方式從 Kafka 拉取 batch,之后經(jīng)過分詞、統(tǒng)計等相關(guān)處理,回寫到數(shù)據(jù)庫(DataBase,DB)上,由此高效實時的完成每天大量數(shù)據(jù)的詞頻統(tǒng)計任務(wù)。

對于數(shù)據(jù)量較小的情況,一般是不會暴露問題的,但是數(shù)據(jù)量增大后,就會暴露各種問題,這就需要進行一些調(diào)優(yōu)和參數(shù)配置?梢酝ㄟ^以下幾方面進行調(diào)優(yōu)嘗試。

1. 合理的批處理時間(batchDuration)

關(guān)于 SparkStreaming 的批處理時間設(shè)置是非常重要的,SparkStreaming 在不斷接收數(shù)據(jù)的同時,需要處理數(shù)據(jù)的時間,所以如果設(shè)置過段的批處理時間,會造成數(shù)據(jù)堆積,即未完成的 batch 數(shù)據(jù)越來越多,從而發(fā)生阻塞。

另外值得注意的是,batchDuration 本身也不能設(shè)置為小于 500ms,這會導致 SparkStreaming 進行頻繁地提交作業(yè),造成額外的開銷,減少整個系統(tǒng)的吞吐量;相反如果將 batchDuration 時間設(shè)置得過長,又會影響整個系統(tǒng)的吞吐量。

如何設(shè)置一個合理的批處理時間,需要根據(jù)應(yīng)用本身、集群資源情況,以及關(guān)注和監(jiān)控 SparkStreaming 系統(tǒng)的運行情況來調(diào)整,重點關(guān)注監(jiān)控界面中的 TotalDelay,如圖 1 所示。

 

 

圖 1 SparkUI 中全局延遲

2. 合理的 Kafka 拉取量(maxRatePerPartition 參數(shù)設(shè)置)

對于數(shù)據(jù)源是 Kafka 的 SparkStreaming 應(yīng)用,在 Kafka 數(shù)據(jù)頻率過高的情況下,調(diào)整這個參數(shù)是非常必要的。我們可以改變 spark.streaming.kafka.maxRatePerPartition 參數(shù)的值來進行上限調(diào)整,默認是無上限的,即 Kafka 有多少數(shù)據(jù),SparkStreaming 就會一次性全拉出,但是上節(jié)提到的批處理時間是一定的,不可能動態(tài)變化,如果持續(xù)數(shù)據(jù)頻率過高,同樣會造成數(shù)據(jù)堆積、阻塞的現(xiàn)象。

所以需要結(jié)合 batchDuration 設(shè)置的值,調(diào)整 spark.streaming.kafka.maxRatePerPatition 參數(shù),注意該參數(shù)配置的是 Kafka 每個 partition 拉取的上限,數(shù)據(jù)總量還需乘以所有的 partition 數(shù)量,調(diào)整兩個參數(shù) maxRatePerPartition 和 batchDuration 使得數(shù)據(jù)的拉取和處理能夠平衡,盡可能地增加整個系統(tǒng)的吞吐量,可以觀察監(jiān)控界面中的 InputRate 和 ProcessingTime,如圖 2 所示。

 

 

圖 2 SparkUI 中輸入速率和平均處理時間

3. 緩存反復(fù)使用的 Dstream(RDD)

Spark 中的 RDD 和 SparkStreaming 中的 Dstream 如果被反復(fù)使用,最好利用 cache() 函數(shù)將該數(shù)據(jù)流緩存起來,防止過度地調(diào)度資源造成的網(wǎng)絡(luò)開銷。可以參考并觀察 SchedulingDelay 參數(shù),如圖 3 所示。

 

 

圖 3 SparkUI 中調(diào)度延遲

4. 其他一些優(yōu)化策略

除了以上針對 SparkStreaming 和 Kafka 這個特殊場景方面的優(yōu)化外,對于前面提到的一些常規(guī)優(yōu)化,也可以通過下面幾點來完成。

設(shè)置合理的 GC 方式:使用–conf"spark.executor.extraJavaOptions=-XX:+UseConc MarkSweepGC" 來配置垃圾回收機制。

設(shè)置合理的 parallelism:在 SparkStreaming+kafka 的使用中,我們采用了 Direct 連接方式,前面講過 Spark 中的 partition 和 Kafka 中的 Partition 是一一對應(yīng)的,一般默認設(shè)置為 Kafka 中 Partition 的數(shù)量。

設(shè)置合理的 CPU 資源數(shù):CPU 的 core 數(shù)量,每個 Executor 可以占用一個或多個 core,觀察 CPU 使用率(Linux 命令 top)來了解計算資源的使用情況。例如,很常見的一種浪費是一個 Executor 占用了多個 core,但是總的 CPU 使用率卻不高(因為一個 Executor 并不會一直充分利用多核的能力),這個時候可以考慮讓單個 Executor 占用更少的 core,同時 Worker 下面增加更多的 Executor;或者從另一個角度,增加單個節(jié)點的 worker 數(shù)量,當然這需要修改 Spark 集群的配置,從而增加 CPU 利用率。值得注意是,這里的優(yōu)化有一個平衡,Executor 的數(shù)量需要考慮其他計算資源的配置,Executor 的數(shù)量和每個 Executor 分到的內(nèi)存大小成反比,如果每個 Executor 的內(nèi)存過小,容易產(chǎn)生內(nèi)存溢出(outofmemory)的問題。

高性能的算子:所謂高性能算子也要看具體的場景,通常建議使用 reduceByKey/aggregateByKey 來代替 groupByKey。而存在數(shù)據(jù)庫連接、資源加載創(chuàng)建等需求時,我們可以使用帶 partition 的操作,這樣在每一個分區(qū)進行一次操作即可,因為分區(qū)是物理同機器的,并不存在這些資源序列化的問題,從而大大減少了這部分操作的開銷。例如,可以用 mapPartitions、foreachPartitions 操作來代替 map、foreach 操作。另外在進行 coalesce 操作時,因為會進行重組分區(qū)操作,所以最好進行必要的數(shù)據(jù)過濾 filter 操作。

Kryo 優(yōu)化序列化性能:我們只要設(shè)置序列化類,再注冊要序列化的自定義類型即可(比如算子函數(shù)中使用到的外部變量類型、作為 RDD 泛型類型的自定義類型等)。

5. 結(jié)果

通過以上種種調(diào)整和優(yōu)化,最終我們想要達到的目的便是,整個流式處理系統(tǒng)保持穩(wěn)定,即 SparkStreaming 消費 Kafka 數(shù)據(jù)的速率趕上爬蟲向 Kafka 生產(chǎn)數(shù)據(jù)的速率,使得 Kafka 中的數(shù)據(jù)盡可能快地被處理掉,減少積壓,才能保證實時性,如圖 4 所示。

 

 

圖 4 SparkStreaming 和 Kafka 穩(wěn)定運行監(jiān)控圖

當然不同的應(yīng)用場景會有不同的圖形,這是本文詞頻統(tǒng)計優(yōu)化穩(wěn)定后的監(jiān)控圖,我們可以看到在 ProcessingTime 柱形圖中有一條 Stable 的虛線,而大多數(shù) Batch 都能夠在這一虛線下處理完畢,說明整體 SparkStreaming 是運行穩(wěn)定的。

對于項目中具體的性能調(diào)優(yōu),有以下幾個點需要注意:

一個 DStream 流只關(guān)聯(lián)單一接收器,如果需要并行多個接收器來讀取數(shù)據(jù),那么需要創(chuàng)建多個 DStream 流。一個接收器至少需要運行在一個 Executor 上,甚至更多,我們需要保證在接收器槽占用了部分核后,還能有足夠的核來處理接收到的數(shù)據(jù)。例如在設(shè)置 spark.cores.max 時需要將接收器的占用考慮進來,同時注意在分配 Executor 給接收器時,采用的是輪循的方式(roundrobinfashion)。

當接收器從數(shù)據(jù)源接收到數(shù)據(jù)時,會創(chuàng)建數(shù)據(jù)塊,在每個微秒級的數(shù)據(jù)塊間隔(blockIntervalmilliseconds)中都會有一個新的數(shù)據(jù)塊生成。在每個批處理間隔內(nèi)(batchInterval)數(shù)據(jù)塊的數(shù)量 N=batchInterval/blockInterval。這些數(shù)據(jù)塊會由當前執(zhí)行器(Executor)的數(shù)據(jù)塊管理器(BlockManager)分發(fā)到其他執(zhí)行器的數(shù)據(jù)塊管理器。之后在 Driver 節(jié)點上運行的輸入網(wǎng)絡(luò)追蹤器(NetworkInputTracker)會通知數(shù)據(jù)塊所在位置,以期進一步處理。

RDD 是基于 Driver 節(jié)點上每個批處理間隔產(chǎn)生的數(shù)據(jù)塊(blocks)而創(chuàng)建的,這些數(shù)據(jù)塊是 RDD 的分支(partitions),每個分支是 Spark 中的一個任務(wù)(task)。如果 blockInterval==batchInterval,那么意味著創(chuàng)建了單一分支,并且可能直接在本地處理。

數(shù)據(jù)塊上的映射(map)任務(wù)在執(zhí)行器(一個接收塊,另一個復(fù)制塊)中處理,該執(zhí)行器不考慮塊間隔,除非出現(xiàn)非本地調(diào)度。擁有更大的塊間隔(blockInterval)意味著更大的數(shù)據(jù)塊,如果將 spark.locality.wait 設(shè)置一個更大的值,那么更有可能在本地節(jié)點處理數(shù)據(jù)塊。我們需要在兩個參數(shù)間(blockInterval 和 spark.locality.wait)做一個折中,確保越大的數(shù)據(jù)塊更可能在本地被處理。

除了依賴于 batchInterval 和 blockInterval,我們可以直接通過 inputDstream.repartition(n) 來確定分支的數(shù)量。這個操作會重新打亂(reshuffles)RDD 中的數(shù)據(jù),隨機的分配給 n 個分支。當然打亂(shuffle)過程會造成一定的開銷,但是會有更高的并行度。RDD 的處理是由驅(qū)動程序的 jobscheduler 作為作業(yè)安排的。在給定的時間點上,只有一個作業(yè)是活動的。因此,如果一個作業(yè)正在執(zhí)行,那么其他作業(yè)將排隊。

如果我們有兩個 Dstreams,那么將形成兩個 RDDs,并將創(chuàng)建兩個作業(yè),每個作業(yè)(job)都被安排為一個接著一個地執(zhí)行。為了避免這種情況,可以聯(lián)合兩個 Dstreams(union)。這將確保為 Dstreams 的兩個 RDD 形成單一的 unionRDD。而這個 unionRDD 會被視為一個作業(yè),但是 RDDs 的分區(qū)不會受到影響。

如果批處理時間大于 batchinterval,那么很明顯,接收方的內(nèi)存將逐漸被填滿,并最終拋出異常(很可能是 BlockNotFoundException)。目前沒有辦法暫停接收,那么可以利用 SparkConf 配置項中的 spark.streaming.receiver.maxRate 來控制接收器的速率。

小結(jié)

①SparkStreaming 中需要大量的序列化和反序列化操作,在 2.0.0 以上的 Spark 版本中,我們應(yīng)當優(yōu)先考慮使用 Kryo 序列化方式。

②對于非常大的變量,如配置信息,可以提前利用廣播變量的方式傳送給每一個節(jié)點。

③在流式處理系統(tǒng)中,我們需要兼顧數(shù)據(jù)的接收和數(shù)據(jù)處理,即消費數(shù)據(jù)的速率要趕上生產(chǎn)數(shù)據(jù)的速率。當發(fā)現(xiàn)生產(chǎn)數(shù)據(jù)速率過慢時,可以考慮增加并行度,使用更多的接收器(Receiver);如果處理速度過慢,可以考慮加機器、優(yōu)化程序邏輯及 GC 優(yōu)化等方式。

④Spark 內(nèi)存分為執(zhí)行類內(nèi)存和存儲類內(nèi)存,執(zhí)行類內(nèi)存可以剝奪存儲類內(nèi)存空間,但是存儲類內(nèi)存空間有一個最低閾值會保證保留。

⑤內(nèi)存優(yōu)化最簡單的方式是使用序列化格式進行對象存儲,另外一方面考慮到 Java/Scala 對象本身會有所開銷,應(yīng)盡可能減少對象的數(shù)量。

⑥對于 Spark 而言,垃圾回收采用 G1GC,而 SparkStreaming 采用 CMS。

⑦調(diào)優(yōu)過程是一個觀察,調(diào)整,再觀察,再調(diào)整的過程,針對具體問題需要進行不同策略上的調(diào)整,希望大家多多實踐。

作者介紹:

肖力濤,浙江大學計算機碩士,前騰訊優(yōu)圖實驗室及 WeTest 研究員,現(xiàn)拼多多資深算法工程師,長期進行大數(shù)據(jù)、自然語言處理、深度學習、推薦相關(guān)算法的研究實踐,有豐富的經(jīng)驗。善于總結(jié)和歸納知識體系,整理的個人博客,收到了廣泛的閱讀和好評。擅長數(shù)據(jù)分析處理、算法實踐落地、挖掘用戶行為數(shù)據(jù)、大規(guī)模數(shù)據(jù)處理。

標簽: Spark  數(shù)據(jù)處理

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:貝殼:流式數(shù)據(jù)的平臺化實踐與挑戰(zhàn)

下一篇:Hadoop衰落,數(shù)據(jù)湖項目開始失敗,我們該如何應(yīng)對?