W3Cschool
恭喜您成為首批注冊(cè)用戶
獲得88經(jīng)驗(yàn)值獎(jiǎng)勵(lì)
大家好,我是 V 哥。在實(shí)際的業(yè)務(wù)場(chǎng)景中,Spark任務(wù)出現(xiàn)OOM(Out of Memory) 問(wèn)題通常是由于任務(wù)處理的數(shù)據(jù)量過(guò)大、資源分配不合理或者代碼存在性能瓶頸等原因造成的。針對(duì)不同的業(yè)務(wù)場(chǎng)景和原因,可以從以下幾個(gè)方面進(jìn)行優(yōu)化和解決。
shuffle
或join
操作時(shí),數(shù)據(jù)量暴增。cache()
、persist()
,或?qū)?shù)據(jù)結(jié)構(gòu)進(jìn)行不必要的操作,導(dǎo)致內(nèi)存過(guò)度消耗。
通過(guò)合理的資源分配,確保每個(gè)Executor
有足夠的內(nèi)存處理數(shù)據(jù)。
Executor
負(fù)責(zé)在集群節(jié)點(diǎn)上執(zhí)行任務(wù),默認(rèn)每個(gè)Executor
的內(nèi)存可能不足以處理大數(shù)據(jù)集。可以增加Executor
的內(nèi)存以緩解OOM問(wèn)題。 --executor-memory 8G
可以通過(guò)--executor-memory
選項(xiàng)來(lái)設(shè)置每個(gè)Executor
的內(nèi)存。例如,將內(nèi)存設(shè)置為8GB。如果數(shù)據(jù)量很大,可以根據(jù)情況設(shè)置更大的內(nèi)存。
--conf spark.memory.offHeap.enabled=true
--conf spark.memory.offHeap.size=4G
Executor
分配更多的CPU核心,以加快任務(wù)的處理速度,防止長(zhǎng)時(shí)間占用內(nèi)存。 --executor-cores 4
通過(guò)--executor-cores
設(shè)置每個(gè)Executor
使用的核心數(shù)。例如,可以將核心數(shù)設(shè)置為4,以提升并發(fā)計(jì)算能力。
Spark的內(nèi)存管理策略主要涉及以下幾個(gè)關(guān)鍵參數(shù),它們的優(yōu)化配置可以幫助減少OOM問(wèn)題。
--conf spark.memory.fraction=0.8
--conf spark.memory.storageFraction=0.5
spark.memory.fraction
:該參數(shù)控制了存儲(chǔ)與執(zhí)行內(nèi)存的總占比,默認(rèn)是0.6,可以適當(dāng)調(diào)高。spark.memory.storageFraction
:該參數(shù)決定了在memory.fraction
的基礎(chǔ)上,存儲(chǔ)內(nèi)存的占比。如果需要更多執(zhí)行內(nèi)存,可以適當(dāng)減小該值。unpersist()
來(lái)清理緩存,釋放內(nèi)存。 rdd.unpersist()
StorageLevel.DISK_ONLY
或StorageLevel.MEMORY_AND_DISK
,以減少內(nèi)存占用。 rdd.persist(StorageLevel.MEMORY_AND_DISK)
Spark任務(wù)中的shuffle
、join
、groupBy
等操作通常會(huì)引起大量?jī)?nèi)存消耗,以下優(yōu)化可以減輕這些操作帶來(lái)的OOM風(fēng)險(xiǎn)。
join
、shuffle
等,分區(qū)數(shù)的設(shè)置至關(guān)重要。如果分區(qū)數(shù)過(guò)少,可能會(huì)導(dǎo)致某些分區(qū)數(shù)據(jù)量過(guò)大,進(jìn)而導(dǎo)致內(nèi)存溢出。 rdd.repartition(200)
或者在執(zhí)行某些操作時(shí),顯式指定分區(qū)數(shù):
rdd.reduceByKey(_ + _, numPartitions = 200)
groupByKey
)會(huì)在shuffle時(shí)造成內(nèi)存的壓力,特別是數(shù)據(jù)量較大時(shí),應(yīng)該盡量避免??梢酝ㄟ^(guò)替換為reduceByKey
等具有預(yù)聚合功能的操作來(lái)減少內(nèi)存消耗: rdd.reduceByKey(_ + _)
rdd.map(x => ((x._1 + new Random().nextInt(10)), x._2))
join
操作中,如果一張表很小,可以使用廣播變量,將小表廣播到每個(gè)節(jié)點(diǎn),減少數(shù)據(jù)傳輸和內(nèi)存占用: val broadcastVar = sc.broadcast(smallTable)
largeTable.mapPartitions { partition =>
val small = broadcastVar.value
partition.map(largeRow => ...)
}
Spark的shuffle操作(如groupByKey
、join
)會(huì)導(dǎo)致大量數(shù)據(jù)需要在不同的節(jié)點(diǎn)之間傳輸。如果并行度設(shè)置過(guò)低,容易導(dǎo)致某個(gè)節(jié)點(diǎn)處理的數(shù)據(jù)量過(guò)大,從而引發(fā)OOM。
--conf spark.sql.shuffle.partitions=200
或者在代碼中顯式設(shè)置:
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.sql.shuffle.partitions
的值可能偏小(例如200),根據(jù)數(shù)據(jù)規(guī)模適當(dāng)調(diào)整該值可以減輕單個(gè)節(jié)點(diǎn)的負(fù)載。 --conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=64M
AQE 可以根據(jù)任務(wù)的執(zhí)行情況自動(dòng)調(diào)整shuffle的分區(qū)數(shù),從而避免OOM。
Spark任務(wù)中的OOM問(wèn)題常常由于數(shù)據(jù)量過(guò)大、數(shù)據(jù)傾斜、資源分配不合理等問(wèn)題引起,針對(duì)不同的業(yè)務(wù)場(chǎng)景,可以采取以下措施進(jìn)行優(yōu)化:
cache()
和persist()
操作,并及時(shí)釋放緩存數(shù)據(jù)。好了,今天的內(nèi)容就寫到這里,這些優(yōu)化方法結(jié)合使用,可以有效解決Spark任務(wù)中的OOM問(wèn)題。當(dāng)然還有 JVM 調(diào)優(yōu),硬件配置升級(jí)等等,OOM 問(wèn)題是多方面的,只是今天的文章咱們只關(guān)注 Spark 本身的問(wèn)題而已。關(guān)注威哥愛編程,碼碼通暢不掉發(fā)。
Copyright©2021 w3cschool編程獅|閩ICP備15016281號(hào)-3|閩公網(wǎng)安備35020302033924號(hào)
違法和不良信息舉報(bào)電話:173-0602-2364|舉報(bào)郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號(hào)
聯(lián)系方式:
更多建議: