管理 Samza 狀態(tài)

2018-08-22 17:44 更新

Samza 的一個更有趣的特點(diǎn)是有狀態(tài)的流處理。任務(wù)可以通過 Samza 提供的 API 來存儲和查詢數(shù)據(jù)。該數(shù)據(jù)存儲在與流任務(wù)相同的機(jī)器上; 與通過網(wǎng)絡(luò)連接到遠(yuǎn)程數(shù)據(jù)庫相比,Samza 的本地狀態(tài)允許您以更好的性能讀寫大量數(shù)據(jù)。Samza 將這種狀態(tài)復(fù)制到多臺機(jī)器上以實(shí)現(xiàn)容錯(下面詳細(xì)描述)。

一些流處理作業(yè)不需要狀態(tài):如果您只需要一次轉(zhuǎn)換一個消息,或者根據(jù)某些條件過濾掉消息,則您的工作可能很簡單。對任務(wù)進(jìn)程方法的每次調(diào)用都會處理一個傳入消息,每個消息都與所有其他消息無關(guān)。

然而,能夠維護(hù)狀態(tài)為復(fù)雜的流處理作業(yè)開辟了許多可能性:加入輸入流,分組消息和聚合消息組。通過與 SQL 的類比,查詢的 select 和 where 子句通常是無狀態(tài)的,但是連接,分組和聚合函數(shù)(如 sum 和 count)需要狀態(tài)。Samza 尚未提供更高級別的類似 SQL 的語言,但它提供了可用于實(shí)現(xiàn)流聚合和連接的較低級別的原語。

常用的狀態(tài)處理用例

首先,我們來看一下可以在消費(fèi)者網(wǎng)站后端看到的狀態(tài)流處理的一些簡單例子。在本頁后面,我們將討論如何使用 Samza 內(nèi)置的鍵值存儲功能實(shí)現(xiàn)這些應(yīng)用程序。

窗口聚合

示例:計(jì)算每個用戶每小時的頁面瀏覽量

在這種情況下,您的狀態(tài)通常由多個計(jì)數(shù)器組成,當(dāng)處理消息時會增加計(jì)數(shù)器。聚合通常限于時間窗口(例如1分鐘,1小時,1天),以便您可以隨時間觀察活動的變化。這種窗口處理對于排名和相關(guān)性是常見的,檢測“趨勢主題”,以及實(shí)時報(bào)告和監(jiān)視。

最簡單的實(shí)現(xiàn)將這種狀態(tài)保持在內(nèi)存中(例如任務(wù)實(shí)例中的哈希映射),并在每個時間窗口的末尾將其寫入數(shù)據(jù)庫或輸出流。但是,您需要考慮當(dāng)容器發(fā)生故障并且內(nèi)存中的狀態(tài)丟失時會發(fā)生什么。您可以通過再次處理當(dāng)前窗口中的所有消息來還原它,但如果窗口長時間可能需要很長時間。Samza 可以通過使?fàn)顟B(tài)容錯而不是試圖重新計(jì)算來加速這種恢復(fù)。

桌子加入

示例:通過 user_id 將用戶配置文件表加入到用戶設(shè)置表中,并發(fā)出連接的流

您可能會想:在流處理系統(tǒng)中加入兩個表格是否有意義?如果您的數(shù)據(jù)庫可以提供數(shù)據(jù)庫中的所有更改的日志。數(shù)據(jù)庫和更改日志流之間存在對偶性:您可以將每個數(shù)據(jù)更改發(fā)布到流中,如果從頭到尾消耗整個流,則可以重構(gòu)數(shù)據(jù)庫的全部內(nèi)容。Samza 專為符合這一理念的數(shù)據(jù)處理工作而設(shè)計(jì)。

如果您有多個數(shù)據(jù)庫表的更改日志流,您可以編寫一個流處理作業(yè),將每個表的最新狀態(tài)保存在本地鍵值存儲中,您可以比通過對原始數(shù)據(jù)庫進(jìn)行查詢更快地訪問它?,F(xiàn)在,當(dāng)一個表中的數(shù)據(jù)發(fā)生變化時,可以將其與另一個表中相同鍵的最新數(shù)據(jù)相加,并輸出加入的結(jié)果。

數(shù)據(jù)規(guī)范化的幾個現(xiàn)實(shí)生活的例子基本上是這樣工作的:

  • 像亞馬遜和 EBay 這樣的電子商務(wù)公司需要從商家進(jìn)口商品,將產(chǎn)品規(guī)格化,并向所有相關(guān)商家和定價信息展示產(chǎn)品。
  • Web 搜索需要構(gòu)建一個抓取工具,該抓取工具基本上創(chuàng)建一個網(wǎng)頁內(nèi)容表格,并且連接所有關(guān)聯(lián)屬性,如點(diǎn)擊率或 pagerank。
  • 社交網(wǎng)絡(luò)采用用戶輸入的文字,需要對公司,學(xué)校和技能等實(shí)體進(jìn)行規(guī)范化。

這些用例中的每一個都是大量復(fù)雜的數(shù)據(jù)規(guī)范化問題,可以被認(rèn)為是在許多輸入表上構(gòu)建物化視圖。Samza 可以有力地實(shí)施這些數(shù)據(jù)處理流水線。

流表連接

示例:使用用戶的郵政編碼來增加一個頁面視圖流(可能允許在后期通過郵政編碼進(jìn)行聚合)

將邊信息連接到實(shí)時 Feed 是流處理的經(jīng)典用途。這在廣告,相關(guān)性排名,欺詐檢測等領(lǐng)域尤為常見。諸如頁面瀏覽的活動事件通常僅包括少量屬性,例如觀看者的 ID 和觀看的項(xiàng)目,但不包括觀看者的詳細(xì)屬性和所查看的項(xiàng)目,例如用戶的郵政編碼。如果要通過查看器或查看項(xiàng)目的屬性聚合流,則需要分別與 users 表或 items 表一起加入。

在數(shù)據(jù)倉庫術(shù)語中,您可以將原始事件流視為中心事實(shí)表中的行,這些行需要與維度表相結(jié)合,以便您可以在分析中使用維度的屬性。

流流連接

示例:將廣告點(diǎn)擊次數(shù)加入到廣告展示流中(將廣告展示時間的信息鏈接到點(diǎn)擊點(diǎn)擊的信息)

流連接對于“幾乎對齊”的流很有用,您希望在多個輸入流中接收相關(guān)事件,并且您希望將它們組合成一個輸出事件。您不能同時依賴到達(dá)流處理器的事件,但您可以設(shè)置允許事件擴(kuò)展的最長時間。

為了執(zhí)行流之間的連接,您的作業(yè)需要緩存要加入的時間窗口的事件。對于短時間窗口,您可以在內(nèi)存中進(jìn)行此操作(如果機(jī)器發(fā)生故障,則可能會丟失事件)。您還可以使用 Samza 的狀態(tài)存儲來緩沖事件,這樣可以緩沖更多的消息,而不是內(nèi)存中的內(nèi)容。

更多

連接和聚合有許多變化,但大多數(shù)是上述模式的變化和組合。

管理任務(wù)狀態(tài)的方法

那么系統(tǒng)如何支持這種狀態(tài)處理呢?我們將通過描述我們在其他流處理系統(tǒng)中看到的內(nèi)容,然后描述 Samza 所做的工作。

帶檢查點(diǎn)的內(nèi)存狀態(tài)

在學(xué)術(shù)流處理系統(tǒng)中常見的一種簡單方法是定期將任務(wù)的整體內(nèi)存數(shù)據(jù)保存到持久存儲中。如果內(nèi)存中狀態(tài)僅由幾個值組成,則此方法效果很好。但是,您必須在每個檢查點(diǎn)上存儲完整的任務(wù)狀態(tài),這隨著任務(wù)狀態(tài)的增長而變得越來越昂貴。不幸的是,連接和聚合的許多非平凡用例有大量的狀態(tài) - 通常是很多千兆字節(jié)。這使得國家完全不切實(shí)際。

一些學(xué)術(shù)系統(tǒng)除了完整的檢查點(diǎn)之外還會產(chǎn)生差異,如果只有一些狀態(tài)自最后一個檢查點(diǎn)以來發(fā)生變化,則它們會更小。Storm的Trident抽象類似地保持內(nèi)存中的緩存狀態(tài),并定期向遠(yuǎn)程存儲(如 Cassandra)寫入任何更改。但是,如果大多數(shù)狀態(tài)保持不變,則此優(yōu)化將有所幫助。在一些使用情況下,例如流連接,在該狀態(tài)下有很多的流失是正常的,所以這種技術(shù)本質(zhì)上會降低為每個消息發(fā)出遠(yuǎn)程數(shù)據(jù)庫請求(見下文)。

使用外部商店

用于狀態(tài)處理的另一種常見模式是將狀態(tài)存儲在外部數(shù)據(jù)庫或鍵值存儲中。傳統(tǒng)的數(shù)據(jù)庫復(fù)制可用于使數(shù)據(jù)庫容錯。架構(gòu)看起來像這樣:

stream_job_and_db

Samza 允許這種處理方式 - 沒有任何東西阻止您查詢作業(yè)中的遠(yuǎn)程數(shù)據(jù)庫或服務(wù)。然而,遠(yuǎn)程數(shù)據(jù)庫可能會對有狀態(tài)流處理有問題:

  1. 性能:通過網(wǎng)絡(luò)進(jìn)行數(shù)據(jù)庫查詢速度較慢且昂貴。一個 Kafka 流可以將每個 CPU 內(nèi)核的數(shù)十萬甚至數(shù)百萬條消息傳遞給流處理器,但如果您需要對每個處理的消息進(jìn)行遠(yuǎn)程請求,那么您的吞吐量可能會下降2-3個大小。您可以通過仔細(xì)緩存讀取和批量寫入來稍微緩解這一點(diǎn),但是您將回到以上討論的檢查點(diǎn)問題。
  2. 隔離:如果您的數(shù)據(jù)庫或服務(wù)也向用戶提供請求,則與流處理器使用相同的數(shù)據(jù)庫可能是危險的??蓴U(kuò)展的流處理系統(tǒng)可以以非常高的吞吐量運(yùn)行,并且容易產(chǎn)生大量的負(fù)載(例如,當(dāng)趕上隊(duì)列積壓時)。如果您不是非常小心,可能會對您自己的數(shù)據(jù)庫造成拒絕服務(wù)攻擊,并對用戶的交互式請求造成問題。
  3. 查詢功能:許多可擴(kuò)展數(shù)據(jù)庫暴露了非常有限的查詢界面(例如,僅支持簡單的鍵值查找),因?yàn)橄喈?dāng)于“全表掃描”或豐富的遍歷將太貴了。流過程通常對延遲敏感度較低,因此更豐富的查詢功能將更為可行。
  4. 正確性:當(dāng)流處理器出現(xiàn)故障并需要重新啟動時,數(shù)據(jù)庫狀態(tài)如何與處理任務(wù)保持一致?為此,一些框架(如Storm)將元數(shù)據(jù)附加到數(shù)據(jù)庫條目,但需要仔細(xì)處理,否則流過程會產(chǎn)生不正確的輸出。
  5. 重新處理:有時,在大量歷史數(shù)據(jù)上重新運(yùn)行流程可能會很有用,例如在更新處理任務(wù)的代碼之后。但是,上述問題使得外部查詢的作業(yè)變得不切實(shí)際。

薩姆薩當(dāng)?shù)?/h3>

Samza 允許任務(wù)以與上述方法不同的方式維護(hù)狀態(tài):

  • 狀態(tài)存儲在磁盤上,所以作業(yè)可以保持比適合內(nèi)存更多的狀態(tài)。
  • 它存儲在與處理任務(wù)相同的機(jī)器上,以避免通過網(wǎng)絡(luò)進(jìn)行數(shù)據(jù)庫查詢的性能問題。
  • 每個作業(yè)都有自己的數(shù)據(jù)存儲區(qū),以避免共享數(shù)據(jù)庫的隔離問題(如果使用昂貴的查詢,它只影響當(dāng)前的任務(wù),沒有其他的)。
  • 可以插入不同的存儲引擎,實(shí)現(xiàn)豐富的查詢功能。
  • 狀態(tài)不斷復(fù)制,實(shí)現(xiàn)容錯,無需檢查大量狀態(tài)的問題。

想象一下,您需要一個遠(yuǎn)程數(shù)據(jù)庫,對其進(jìn)行分區(qū)以匹配流處理作業(yè)中的任務(wù)數(shù)量,并將每個分區(qū)與其任務(wù)共同定位。結(jié)果如下:

1502855264827156

如果機(jī)器故障,則該機(jī)器上運(yùn)行的所有任務(wù)及其數(shù)據(jù)庫分區(qū)都將丟失。為了使它們高度可用,對數(shù)據(jù)庫分區(qū)的所有寫入都將復(fù)制到持久的更新日志(通常為 Kafka)?,F(xiàn)在,當(dāng)機(jī)器發(fā)生故障時,我們可以在另一臺機(jī)器上重新啟動任務(wù),并使用此更改日志來恢復(fù)數(shù)據(jù)庫分區(qū)的內(nèi)容。

請注意,每個任務(wù)只能訪問自己的數(shù)據(jù)庫分區(qū),而不是任何其他任務(wù)的分區(qū)。這很重要:當(dāng)您通過提供更多的計(jì)算資源來擴(kuò)展您的工作時,Samza 需要將任務(wù)從一臺機(jī)器移到另一臺機(jī)器。通過給每個任務(wù)自己的狀態(tài),任務(wù)可以重新定位而不影響作業(yè)的操作。如果需要,您可以重新分配流,以使特定數(shù)據(jù)庫分區(qū)的所有消息都路由到同一個任務(wù)實(shí)例。

日志壓縮在更改日志主題的后臺運(yùn)行,并確保更改日志不會無限期增長。如果您在存儲中多次覆蓋相同的值,則日志壓縮僅保留最近的值,并拋出日志中的任何舊值。如果從商店中刪除一個項(xiàng)目,則日志壓縮也會將其從日志中刪除。通過正確的調(diào)整,更改日志不會比數(shù)據(jù)庫本身大得多。

通過這種體系結(jié)構(gòu),Samza 允許任務(wù)能夠維持大量的容錯狀態(tài),性能幾乎與純內(nèi)存實(shí)現(xiàn)一樣好。只有一些限制:

  • 如果您希望在任務(wù)之間(跨分區(qū)邊界)共享某些數(shù)據(jù),則需要進(jìn)行一些額外的努力來重新分配和分發(fā)數(shù)據(jù)。每個任務(wù)都需要自己的數(shù)據(jù)副本,所以這可能會使用更多的空間。
  • 當(dāng)容器重新啟動時,可能需要一些時間來恢復(fù)其所有狀態(tài)分區(qū)中的數(shù)據(jù)。時間取決于數(shù)據(jù)量,存儲引擎,訪問模式等因素。根據(jù)經(jīng)驗(yàn),50 MB /秒是合理的恢復(fù)時間。

沒有什么可以阻止您使用外部數(shù)據(jù)庫,但是對于許多用例,Samza 的本地狀態(tài)是啟用狀態(tài)流處理的強(qiáng)大工具。

鍵值存儲

任何存儲引擎都可以插入 Samza,如下所述。開箱即用,Samza 搭載了一個使用 JNI API 構(gòu)建在 RocksDB 上的鍵值存儲實(shí)現(xiàn)。

RocksDB 有幾個不錯的屬性。它的內(nèi)存分配不在 Java 堆中,這使得它比基于 Java 的存儲引擎更加內(nèi)存高效,并且不太容易進(jìn)行垃圾回收暫停。對于適合內(nèi)存的小型數(shù)據(jù)集來說,速度非??? 大于內(nèi)存的數(shù)據(jù)集較慢但仍然可能。它是日志結(jié)構(gòu),允許非??焖俚膶懭?。它還包括對塊壓縮的支持,這有助于減少 I / O 和內(nèi)存使用。

Samza 在 RocksDB 前面增加了一個內(nèi)存緩存層,避免了經(jīng)常訪問的對象和批次寫入的反序列化成本。如果快速連續(xù)更新多個相同的密鑰,則批處理將這些更新合并為單個寫入。當(dāng)任務(wù)提交時,寫入將刷新到更改日志。

要在作業(yè)中使用鍵值存儲,請將以下內(nèi)容添加到作業(yè)配置中:

# Use the key-value store implementation for a store called "my-store"
stores.my-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory

# Use the Kafka topic "my-store-changelog" as the changelog stream for this store.
# This enables automatic recovery of the store after a failure. If you don't
# configure this, no changelog stream will be generated.
stores.my-store.changelog=kafka.my-store-changelog

# Encode keys and values in the store as UTF-8 strings.
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
stores.my-store.key.serde=string
stores.my-store.msg.serde=string

有關(guān) serde 選項(xiàng)的更多信息,請參閱序列化部分。

這是一個簡單的例子,將每個傳入的消息寫入商店:

public class MyStatefulTask implements StreamTask, InitableTask {
  private KeyValueStore<String, String> store;

  public void init(Config config, TaskContext context) {
    this.store = (KeyValueStore<String, String>) context.getStore("my-store");
  }

  public void process(IncomingMessageEnvelope envelope,
                      MessageCollector collector,
                      TaskCoordinator coordinator) {
    store.put((String) envelope.getKey(), (String) envelope.getMessage());
  }
}

以下是完整的鍵值存儲API:

public interface KeyValueStore<K, V> {
  V get(K key);
  void put(K key, V value);
  void putAll(List<Entry<K,V>> entries);
  void delete(K key);
  KeyValueIterator<K,V> range(K from, K to);
  KeyValueIterator<K,V> all();
}

配置參考中記錄了鍵值存儲的其他配置屬性。

調(diào)試鍵值存儲

從更改日志實(shí)現(xiàn)狀態(tài)存儲

目前,Samza 提供了一種狀態(tài)存儲工具,可以將狀態(tài)存儲從更改日志流恢復(fù)到用戶指定的目錄以進(jìn)行重用和調(diào)試。

samza-example/target/bin/state-storage-tool.sh \
  --config-path=file:///path/to/job/config.properties \
  --path=directory/to/put/state/stores

讀取正在運(yùn)行的RocksDB的值

Samza 還提供了一個工具來讀取正在運(yùn)行的工作的 RocksDB 的價值。

samza-example/target/bin/read-rocksdb-tool.sh \
  --config-path=file:///path/to/job/config.properties \
  --db-path=/tmp/nm-local-dir/state/test-state/Partition_0 \
  --db-name=test-state \
  --string-key=a,b,c
  • --config-path(必填):您的工作的配置文件
  • --db-path(必填):您的RocksDB的位置。如果 RocksDB 與工具在同一臺機(jī)器上,這是很方便的。例如,如果您在本地機(jī)器上運(yùn)行 hello-samza,則該位置可能位于/ tmp / hadoop / nm-local-dir / usercache / username / appcache / applicationId / containerId / state / storeName / PartitionNumber
  • --db-name(必需):如果您只有一個狀態(tài)存儲在配置文件中指定,您可以忽略這一個。否則,您需要在此處提供狀態(tài)商店名稱。
  • --string-key:關(guān)鍵列表。這個只有你的鍵是字符串才有效。也有另一種兩種選擇:--integer-key,--long-key。它們分別用于整數(shù)鍵和長鍵。

限制:

  • 這只適用于三種鍵:string,integer和long。這是因?yàn)槲覀冎荒軓拿钚薪邮苓@些鍵(從命令行接受字節(jié),avro,json等)真的很棘手。但是,編程方式也很容易使用這個工具(這兩個鍵值和值都被反序列化)。
RocksDbKeyValueReader kvReader = new RocksDbKeyValueReader(dbName, pathOfdb, config)
Object value = kvReader.get(key)
  • 因?yàn)?Samza 作業(yè)有一些高速緩存和緩沖區(qū),所以您可能無法看到預(yù)期的值(甚至無法看到任何值,如果所有數(shù)據(jù)都被緩存)。一些相關(guān)配置的是 stores.store-name.container.write.buffer.size.bytes,stores.store-name.write.batch.size,stores.store-name.object.cache.size。您可能希望將其設(shè)置為非常小的測試。
  • 由于 RocksDB memtable 在每次寫入時都不會立即刷新到磁盤,所以在寫入磁盤上的SST文件之前,您可能無法看到預(yù)期的值。有關(guān) RocksDb 的更多詳細(xì)信息,可以在這里參考文檔。

已知的問題

RocksDB 有幾個粗糙的邊緣。建議您閱讀 RocksDB 調(diào)整指南。需要注意的其他一些注意事項(xiàng)是:

  1. RocksDB 經(jīng)過高度優(yōu)化,能夠運(yùn)行 SSD 硬盤。非 SSD 的性能顯著下降。
  2. Samza 的 KeyValueStorageEngine.putAll()方法目前不使用 RocksDB 的批處理 API,因?yàn)樗贘ava中不起作用。
  3. 調(diào)用 iterator.seekToFirst()非常慢,如果存在很多刪除。

使用鍵值存儲實(shí)現(xiàn)常見用例

在本節(jié)前面,我們討論了有狀態(tài)流處理的一些示例用例。我們來看看如何使用諸如 Samza 的 RocksDB 商店等鍵值存儲引擎實(shí)現(xiàn)這些功能。

窗口聚合

示例:計(jì)算每個用戶每小時的頁面瀏覽量

實(shí)施:您需要兩個處理階段。

  1. 第一個按用戶 ID 重新劃分輸入數(shù)據(jù),以便將特定用戶的所有事件路由到相同的流任務(wù)。如果輸入流已經(jīng)被用戶 ID 劃分,您可以跳過這個。
  2. 第二階段使用將用戶 ID 映射到運(yùn)行計(jì)數(shù)的鍵值存儲進(jìn)行計(jì)數(shù)。對于每個新事件,作業(yè)將從存儲中讀取適當(dāng)用戶的當(dāng)前計(jì)數(shù),將其遞增,并將其寫回。當(dāng)窗口完成(例如,在一小時結(jié)束時),作業(yè)將遍歷存儲的內(nèi)容,并將聚合發(fā)送到輸出流。

請注意,此工作有效地停留在小時標(biāo)記以輸出其結(jié)果。這對 Samza 是完全正確的,因?yàn)閽呙桄I值存儲的內(nèi)容是相當(dāng)快的。當(dāng)工作正在做這個小時工作時,輸入流被緩沖。

桌子加入

示例:通過 user_id 將用戶配置文件表加入到用戶設(shè)置表中,并發(fā)出連接的流

實(shí)現(xiàn):作業(yè)訂閱用戶配置文件數(shù)據(jù)庫和用戶設(shè)置數(shù)據(jù)庫的更改流,都由 user_id 分區(qū)。該作業(yè)保留一個由 user_id 鍵入的鍵值存儲區(qū),其中包含最新的配置文件記錄和每個 user_id 的最新設(shè)置記錄。當(dāng)一個新的事件從兩個流進(jìn)來時,作業(yè)將查找其存儲中的當(dāng)前值,更新相應(yīng)的字段(取決于是否是配置文件更新或設(shè)置更新),并將新加入的記錄寫回商店。存儲的更新日志加倍為任務(wù)的輸出流。

表格流連接

示例:使用用戶的郵政編碼來增加一個頁面視圖流(可能允許在后期通過郵政編碼進(jìn)行聚合)

實(shí)施:作業(yè)訂閱用戶配置文件更新流和頁面瀏覽事件流。兩個流都必須用 user_id 進(jìn)行分區(qū)。該作業(yè)維護(hù)一個鍵值存儲區(qū),其中 key 是 user_id,該值是用戶的郵政編碼。每當(dāng)作業(yè)接收到配置文件更新時,它將從配置文件更新中提取用戶的新郵政編碼,并將其寫入商店。每次收到頁面瀏覽事件時,它會從商店中讀取該用戶的郵政編碼,并使用添加的郵政編碼字段發(fā)送頁面查看事件。

如果下一階段需要通過郵政編碼進(jìn)行匯總,則可以將郵政編碼用作作業(yè)輸出流的分區(qū)鍵。這確保了相同郵政編碼的所有事件都被發(fā)送到同一流分區(qū)。

流流連接

示例:將廣告點(diǎn)擊次數(shù)加入到廣告展示流中(將廣告展示時間的信息鏈接到點(diǎn)擊點(diǎn)擊的信息)

在此示例中,我們假設(shè)廣告的每次展示都有唯一的標(biāo)識符,例如 UUID,并且相同的標(biāo)識符包含在展示和點(diǎn)擊事件中。該標(biāo)識符用作連接密鑰。

實(shí)施:按照展示 ID 或用戶 ID 分配廣告點(diǎn)擊和廣告展示流(假設(shè)具有相同展示 ID 的兩個事件始終具有相同的用戶 ID)。該任務(wù)保留兩個商店,一個包含點(diǎn)擊事件,一個包含展示事件,使用展示 ID 作為兩個商店的關(guān)鍵。當(dāng)作業(yè)收到點(diǎn)擊事件時,它會在展示商店中查找相應(yīng)的展示,反之亦然。如果找到匹配項(xiàng),則發(fā)送連接對,并刪除條目。如果沒有找到匹配項(xiàng),則將事件寫入相應(yīng)的商店。定期地,作業(yè)將掃描兩個商店,并刪除在連接的時間窗口內(nèi)未匹配的任何舊事件。

其他存儲引擎

Samza 的容錯機(jī)制(將本地商店的寫入發(fā)送到復(fù)制的更改日志)與存儲引擎的數(shù)據(jù)結(jié)構(gòu)和查詢 API 完全分離。雖然鍵值存儲引擎對于通用處理是有利的,但您可以通過實(shí)施StorageEngine接口輕松地為其他類型的查詢添加自己的存儲引擎。Samza 的模式特別適用于與流任務(wù)相同的過程中作為庫運(yùn)行的嵌入式存儲引擎。

其他存儲引擎的一些想法可能是有用的:持久堆(用于運(yùn)行前N個查詢),近似算法(如bloom過濾器和超文本記錄)或全文索引(如Lucene)。(補(bǔ)丁歡迎?。?/p>

具有狀態(tài)的容錯語義

如關(guān)于檢查點(diǎn)的部分所述,Samza 目前只支持在出現(xiàn)故障的情況下至少提供一次交付保證(有時稱為“保證交貨”)。這意味著如果任務(wù)失敗,則不會丟失任何消息,但可能會重新傳遞某些消息。

對于上面討論的許多有狀態(tài)處理使用情況,這不是一個問題:如果消息對狀態(tài)的影響是冪等的,則對同一消息進(jìn)行多次處理是安全的。例如,如果商店包含每個用戶的郵政編碼,則兩次處理相同的配置文件更新沒有任何效果,因?yàn)橹貜?fù)的更新不會更改郵政編碼。

但是,對于非冪等操作(如計(jì)數(shù)),至少一次交貨保證可能會給出不正確的結(jié)果。如果Samza任務(wù)失敗并重新啟動,則可能會在發(fā)生故障之前不久處理的一些消息進(jìn)行雙重計(jì)數(shù)。我們計(jì)劃在未來的Samza發(fā)行版中解決這個限制。

窗口  ?

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號