Samza 的一個更有趣的特點(diǎn)是有狀態(tài)的流處理。任務(wù)可以通過 Samza 提供的 API 來存儲和查詢數(shù)據(jù)。該數(shù)據(jù)存儲在與流任務(wù)相同的機(jī)器上; 與通過網(wǎng)絡(luò)連接到遠(yuǎn)程數(shù)據(jù)庫相比,Samza 的本地狀態(tài)允許您以更好的性能讀寫大量數(shù)據(jù)。Samza 將這種狀態(tài)復(fù)制到多臺機(jī)器上以實(shí)現(xiàn)容錯(下面詳細(xì)描述)。
一些流處理作業(yè)不需要狀態(tài):如果您只需要一次轉(zhuǎn)換一個消息,或者根據(jù)某些條件過濾掉消息,則您的工作可能很簡單。對任務(wù)進(jìn)程方法的每次調(diào)用都會處理一個傳入消息,每個消息都與所有其他消息無關(guān)。
然而,能夠維護(hù)狀態(tài)為復(fù)雜的流處理作業(yè)開辟了許多可能性:加入輸入流,分組消息和聚合消息組。通過與 SQL 的類比,查詢的 select 和 where 子句通常是無狀態(tài)的,但是連接,分組和聚合函數(shù)(如 sum 和 count)需要狀態(tài)。Samza 尚未提供更高級別的類似 SQL 的語言,但它提供了可用于實(shí)現(xiàn)流聚合和連接的較低級別的原語。
首先,我們來看一下可以在消費(fèi)者網(wǎng)站后端看到的狀態(tài)流處理的一些簡單例子。在本頁后面,我們將討論如何使用 Samza 內(nèi)置的鍵值存儲功能實(shí)現(xiàn)這些應(yīng)用程序。
示例:計(jì)算每個用戶每小時的頁面瀏覽量
在這種情況下,您的狀態(tài)通常由多個計(jì)數(shù)器組成,當(dāng)處理消息時會增加計(jì)數(shù)器。聚合通常限于時間窗口(例如1分鐘,1小時,1天),以便您可以隨時間觀察活動的變化。這種窗口處理對于排名和相關(guān)性是常見的,檢測“趨勢主題”,以及實(shí)時報(bào)告和監(jiān)視。
最簡單的實(shí)現(xiàn)將這種狀態(tài)保持在內(nèi)存中(例如任務(wù)實(shí)例中的哈希映射),并在每個時間窗口的末尾將其寫入數(shù)據(jù)庫或輸出流。但是,您需要考慮當(dāng)容器發(fā)生故障并且內(nèi)存中的狀態(tài)丟失時會發(fā)生什么。您可以通過再次處理當(dāng)前窗口中的所有消息來還原它,但如果窗口長時間可能需要很長時間。Samza 可以通過使?fàn)顟B(tài)容錯而不是試圖重新計(jì)算來加速這種恢復(fù)。
示例:通過 user_id 將用戶配置文件表加入到用戶設(shè)置表中,并發(fā)出連接的流
您可能會想:在流處理系統(tǒng)中加入兩個表格是否有意義?如果您的數(shù)據(jù)庫可以提供數(shù)據(jù)庫中的所有更改的日志。數(shù)據(jù)庫和更改日志流之間存在對偶性:您可以將每個數(shù)據(jù)更改發(fā)布到流中,如果從頭到尾消耗整個流,則可以重構(gòu)數(shù)據(jù)庫的全部內(nèi)容。Samza 專為符合這一理念的數(shù)據(jù)處理工作而設(shè)計(jì)。
如果您有多個數(shù)據(jù)庫表的更改日志流,您可以編寫一個流處理作業(yè),將每個表的最新狀態(tài)保存在本地鍵值存儲中,您可以比通過對原始數(shù)據(jù)庫進(jìn)行查詢更快地訪問它?,F(xiàn)在,當(dāng)一個表中的數(shù)據(jù)發(fā)生變化時,可以將其與另一個表中相同鍵的最新數(shù)據(jù)相加,并輸出加入的結(jié)果。
數(shù)據(jù)規(guī)范化的幾個現(xiàn)實(shí)生活的例子基本上是這樣工作的:
這些用例中的每一個都是大量復(fù)雜的數(shù)據(jù)規(guī)范化問題,可以被認(rèn)為是在許多輸入表上構(gòu)建物化視圖。Samza 可以有力地實(shí)施這些數(shù)據(jù)處理流水線。
示例:使用用戶的郵政編碼來增加一個頁面視圖流(可能允許在后期通過郵政編碼進(jìn)行聚合)
將邊信息連接到實(shí)時 Feed 是流處理的經(jīng)典用途。這在廣告,相關(guān)性排名,欺詐檢測等領(lǐng)域尤為常見。諸如頁面瀏覽的活動事件通常僅包括少量屬性,例如觀看者的 ID 和觀看的項(xiàng)目,但不包括觀看者的詳細(xì)屬性和所查看的項(xiàng)目,例如用戶的郵政編碼。如果要通過查看器或查看項(xiàng)目的屬性聚合流,則需要分別與 users 表或 items 表一起加入。
在數(shù)據(jù)倉庫術(shù)語中,您可以將原始事件流視為中心事實(shí)表中的行,這些行需要與維度表相結(jié)合,以便您可以在分析中使用維度的屬性。
示例:將廣告點(diǎn)擊次數(shù)加入到廣告展示流中(將廣告展示時間的信息鏈接到點(diǎn)擊點(diǎn)擊的信息)
流連接對于“幾乎對齊”的流很有用,您希望在多個輸入流中接收相關(guān)事件,并且您希望將它們組合成一個輸出事件。您不能同時依賴到達(dá)流處理器的事件,但您可以設(shè)置允許事件擴(kuò)展的最長時間。
為了執(zhí)行流之間的連接,您的作業(yè)需要緩存要加入的時間窗口的事件。對于短時間窗口,您可以在內(nèi)存中進(jìn)行此操作(如果機(jī)器發(fā)生故障,則可能會丟失事件)。您還可以使用 Samza 的狀態(tài)存儲來緩沖事件,這樣可以緩沖更多的消息,而不是內(nèi)存中的內(nèi)容。
連接和聚合有許多變化,但大多數(shù)是上述模式的變化和組合。
那么系統(tǒng)如何支持這種狀態(tài)處理呢?我們將通過描述我們在其他流處理系統(tǒng)中看到的內(nèi)容,然后描述 Samza 所做的工作。
在學(xué)術(shù)流處理系統(tǒng)中常見的一種簡單方法是定期將任務(wù)的整體內(nèi)存數(shù)據(jù)保存到持久存儲中。如果內(nèi)存中狀態(tài)僅由幾個值組成,則此方法效果很好。但是,您必須在每個檢查點(diǎn)上存儲完整的任務(wù)狀態(tài),這隨著任務(wù)狀態(tài)的增長而變得越來越昂貴。不幸的是,連接和聚合的許多非平凡用例有大量的狀態(tài) - 通常是很多千兆字節(jié)。這使得國家完全不切實(shí)際。
一些學(xué)術(shù)系統(tǒng)除了完整的檢查點(diǎn)之外還會產(chǎn)生差異,如果只有一些狀態(tài)自最后一個檢查點(diǎn)以來發(fā)生變化,則它們會更小。Storm的Trident抽象類似地保持內(nèi)存中的緩存狀態(tài),并定期向遠(yuǎn)程存儲(如 Cassandra)寫入任何更改。但是,如果大多數(shù)狀態(tài)保持不變,則此優(yōu)化將有所幫助。在一些使用情況下,例如流連接,在該狀態(tài)下有很多的流失是正常的,所以這種技術(shù)本質(zhì)上會降低為每個消息發(fā)出遠(yuǎn)程數(shù)據(jù)庫請求(見下文)。
用于狀態(tài)處理的另一種常見模式是將狀態(tài)存儲在外部數(shù)據(jù)庫或鍵值存儲中。傳統(tǒng)的數(shù)據(jù)庫復(fù)制可用于使數(shù)據(jù)庫容錯。架構(gòu)看起來像這樣:
Samza 允許這種處理方式 - 沒有任何東西阻止您查詢作業(yè)中的遠(yuǎn)程數(shù)據(jù)庫或服務(wù)。然而,遠(yuǎn)程數(shù)據(jù)庫可能會對有狀態(tài)流處理有問題:
Samza 允許任務(wù)以與上述方法不同的方式維護(hù)狀態(tài):
想象一下,您需要一個遠(yuǎn)程數(shù)據(jù)庫,對其進(jìn)行分區(qū)以匹配流處理作業(yè)中的任務(wù)數(shù)量,并將每個分區(qū)與其任務(wù)共同定位。結(jié)果如下:
如果機(jī)器故障,則該機(jī)器上運(yùn)行的所有任務(wù)及其數(shù)據(jù)庫分區(qū)都將丟失。為了使它們高度可用,對數(shù)據(jù)庫分區(qū)的所有寫入都將復(fù)制到持久的更新日志(通常為 Kafka)?,F(xiàn)在,當(dāng)機(jī)器發(fā)生故障時,我們可以在另一臺機(jī)器上重新啟動任務(wù),并使用此更改日志來恢復(fù)數(shù)據(jù)庫分區(qū)的內(nèi)容。
請注意,每個任務(wù)只能訪問自己的數(shù)據(jù)庫分區(qū),而不是任何其他任務(wù)的分區(qū)。這很重要:當(dāng)您通過提供更多的計(jì)算資源來擴(kuò)展您的工作時,Samza 需要將任務(wù)從一臺機(jī)器移到另一臺機(jī)器。通過給每個任務(wù)自己的狀態(tài),任務(wù)可以重新定位而不影響作業(yè)的操作。如果需要,您可以重新分配流,以使特定數(shù)據(jù)庫分區(qū)的所有消息都路由到同一個任務(wù)實(shí)例。
日志壓縮在更改日志主題的后臺運(yùn)行,并確保更改日志不會無限期增長。如果您在存儲中多次覆蓋相同的值,則日志壓縮僅保留最近的值,并拋出日志中的任何舊值。如果從商店中刪除一個項(xiàng)目,則日志壓縮也會將其從日志中刪除。通過正確的調(diào)整,更改日志不會比數(shù)據(jù)庫本身大得多。
通過這種體系結(jié)構(gòu),Samza 允許任務(wù)能夠維持大量的容錯狀態(tài),性能幾乎與純內(nèi)存實(shí)現(xiàn)一樣好。只有一些限制:
沒有什么可以阻止您使用外部數(shù)據(jù)庫,但是對于許多用例,Samza 的本地狀態(tài)是啟用狀態(tài)流處理的強(qiáng)大工具。
任何存儲引擎都可以插入 Samza,如下所述。開箱即用,Samza 搭載了一個使用 JNI API 構(gòu)建在 RocksDB 上的鍵值存儲實(shí)現(xiàn)。
RocksDB 有幾個不錯的屬性。它的內(nèi)存分配不在 Java 堆中,這使得它比基于 Java 的存儲引擎更加內(nèi)存高效,并且不太容易進(jìn)行垃圾回收暫停。對于適合內(nèi)存的小型數(shù)據(jù)集來說,速度非??? 大于內(nèi)存的數(shù)據(jù)集較慢但仍然可能。它是日志結(jié)構(gòu),允許非??焖俚膶懭?。它還包括對塊壓縮的支持,這有助于減少 I / O 和內(nèi)存使用。
Samza 在 RocksDB 前面增加了一個內(nèi)存緩存層,避免了經(jīng)常訪問的對象和批次寫入的反序列化成本。如果快速連續(xù)更新多個相同的密鑰,則批處理將這些更新合并為單個寫入。當(dāng)任務(wù)提交時,寫入將刷新到更改日志。
要在作業(yè)中使用鍵值存儲,請將以下內(nèi)容添加到作業(yè)配置中:
# Use the key-value store implementation for a store called "my-store"
stores.my-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
# Use the Kafka topic "my-store-changelog" as the changelog stream for this store.
# This enables automatic recovery of the store after a failure. If you don't
# configure this, no changelog stream will be generated.
stores.my-store.changelog=kafka.my-store-changelog
# Encode keys and values in the store as UTF-8 strings.
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
stores.my-store.key.serde=string
stores.my-store.msg.serde=string
有關(guān) serde 選項(xiàng)的更多信息,請參閱序列化部分。
這是一個簡單的例子,將每個傳入的消息寫入商店:
public class MyStatefulTask implements StreamTask, InitableTask {
private KeyValueStore<String, String> store;
public void init(Config config, TaskContext context) {
this.store = (KeyValueStore<String, String>) context.getStore("my-store");
}
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
store.put((String) envelope.getKey(), (String) envelope.getMessage());
}
}
以下是完整的鍵值存儲API:
public interface KeyValueStore<K, V> {
V get(K key);
void put(K key, V value);
void putAll(List<Entry<K,V>> entries);
void delete(K key);
KeyValueIterator<K,V> range(K from, K to);
KeyValueIterator<K,V> all();
}
配置參考中記錄了鍵值存儲的其他配置屬性。
目前,Samza 提供了一種狀態(tài)存儲工具,可以將狀態(tài)存儲從更改日志流恢復(fù)到用戶指定的目錄以進(jìn)行重用和調(diào)試。
samza-example/target/bin/state-storage-tool.sh \
--config-path=file:///path/to/job/config.properties \
--path=directory/to/put/state/stores
Samza 還提供了一個工具來讀取正在運(yùn)行的工作的 RocksDB 的價值。
samza-example/target/bin/read-rocksdb-tool.sh \
--config-path=file:///path/to/job/config.properties \
--db-path=/tmp/nm-local-dir/state/test-state/Partition_0 \
--db-name=test-state \
--string-key=a,b,c
限制:
RocksDbKeyValueReader kvReader = new RocksDbKeyValueReader(dbName, pathOfdb, config)
Object value = kvReader.get(key)
RocksDB 有幾個粗糙的邊緣。建議您閱讀 RocksDB 調(diào)整指南。需要注意的其他一些注意事項(xiàng)是:
在本節(jié)前面,我們討論了有狀態(tài)流處理的一些示例用例。我們來看看如何使用諸如 Samza 的 RocksDB 商店等鍵值存儲引擎實(shí)現(xiàn)這些功能。
示例:計(jì)算每個用戶每小時的頁面瀏覽量
實(shí)施:您需要兩個處理階段。
請注意,此工作有效地停留在小時標(biāo)記以輸出其結(jié)果。這對 Samza 是完全正確的,因?yàn)閽呙桄I值存儲的內(nèi)容是相當(dāng)快的。當(dāng)工作正在做這個小時工作時,輸入流被緩沖。
示例:通過 user_id 將用戶配置文件表加入到用戶設(shè)置表中,并發(fā)出連接的流
實(shí)現(xiàn):作業(yè)訂閱用戶配置文件數(shù)據(jù)庫和用戶設(shè)置數(shù)據(jù)庫的更改流,都由 user_id 分區(qū)。該作業(yè)保留一個由 user_id 鍵入的鍵值存儲區(qū),其中包含最新的配置文件記錄和每個 user_id 的最新設(shè)置記錄。當(dāng)一個新的事件從兩個流進(jìn)來時,作業(yè)將查找其存儲中的當(dāng)前值,更新相應(yīng)的字段(取決于是否是配置文件更新或設(shè)置更新),并將新加入的記錄寫回商店。存儲的更新日志加倍為任務(wù)的輸出流。
示例:使用用戶的郵政編碼來增加一個頁面視圖流(可能允許在后期通過郵政編碼進(jìn)行聚合)
實(shí)施:作業(yè)訂閱用戶配置文件更新流和頁面瀏覽事件流。兩個流都必須用 user_id 進(jìn)行分區(qū)。該作業(yè)維護(hù)一個鍵值存儲區(qū),其中 key 是 user_id,該值是用戶的郵政編碼。每當(dāng)作業(yè)接收到配置文件更新時,它將從配置文件更新中提取用戶的新郵政編碼,并將其寫入商店。每次收到頁面瀏覽事件時,它會從商店中讀取該用戶的郵政編碼,并使用添加的郵政編碼字段發(fā)送頁面查看事件。
如果下一階段需要通過郵政編碼進(jìn)行匯總,則可以將郵政編碼用作作業(yè)輸出流的分區(qū)鍵。這確保了相同郵政編碼的所有事件都被發(fā)送到同一流分區(qū)。
示例:將廣告點(diǎn)擊次數(shù)加入到廣告展示流中(將廣告展示時間的信息鏈接到點(diǎn)擊點(diǎn)擊的信息)
在此示例中,我們假設(shè)廣告的每次展示都有唯一的標(biāo)識符,例如 UUID,并且相同的標(biāo)識符包含在展示和點(diǎn)擊事件中。該標(biāo)識符用作連接密鑰。
實(shí)施:按照展示 ID 或用戶 ID 分配廣告點(diǎn)擊和廣告展示流(假設(shè)具有相同展示 ID 的兩個事件始終具有相同的用戶 ID)。該任務(wù)保留兩個商店,一個包含點(diǎn)擊事件,一個包含展示事件,使用展示 ID 作為兩個商店的關(guān)鍵。當(dāng)作業(yè)收到點(diǎn)擊事件時,它會在展示商店中查找相應(yīng)的展示,反之亦然。如果找到匹配項(xiàng),則發(fā)送連接對,并刪除條目。如果沒有找到匹配項(xiàng),則將事件寫入相應(yīng)的商店。定期地,作業(yè)將掃描兩個商店,并刪除在連接的時間窗口內(nèi)未匹配的任何舊事件。
Samza 的容錯機(jī)制(將本地商店的寫入發(fā)送到復(fù)制的更改日志)與存儲引擎的數(shù)據(jù)結(jié)構(gòu)和查詢 API 完全分離。雖然鍵值存儲引擎對于通用處理是有利的,但您可以通過實(shí)施StorageEngine接口輕松地為其他類型的查詢添加自己的存儲引擎。Samza 的模式特別適用于與流任務(wù)相同的過程中作為庫運(yùn)行的嵌入式存儲引擎。
其他存儲引擎的一些想法可能是有用的:持久堆(用于運(yùn)行前N個查詢),近似算法(如bloom過濾器和超文本記錄)或全文索引(如Lucene)。(補(bǔ)丁歡迎?。?/p>
如關(guān)于檢查點(diǎn)的部分所述,Samza 目前只支持在出現(xiàn)故障的情況下至少提供一次交付保證(有時稱為“保證交貨”)。這意味著如果任務(wù)失敗,則不會丟失任何消息,但可能會重新傳遞某些消息。
對于上面討論的許多有狀態(tài)處理使用情況,這不是一個問題:如果消息對狀態(tài)的影響是冪等的,則對同一消息進(jìn)行多次處理是安全的。例如,如果商店包含每個用戶的郵政編碼,則兩次處理相同的配置文件更新沒有任何效果,因?yàn)橹貜?fù)的更新不會更改郵政編碼。
但是,對于非冪等操作(如計(jì)數(shù)),至少一次交貨保證可能會給出不正確的結(jié)果。如果Samza任務(wù)失敗并重新啟動,則可能會在發(fā)生故障之前不久處理的一些消息進(jìn)行雙重計(jì)數(shù)。我們計(jì)劃在未來的Samza發(fā)行版中解決這個限制。
更多建議: