java - transaction - spring kafka template




jafと一緒にkafkaを使うときの良い習慣 (3)

あなたの質問を見ることによって、私はあなたがあなたのOLTPシステムのCDC(Change Data Capture)を達成しようとしている、すなわちトランザクションデータベースに行くすべての変更を記録しようとしていると思います。 これに対処する方法は2つあります。

  1. アプリケーションコードは、Kafkaと同様にトランザクションDBへの二重書き込みを行います。 それは矛盾しており、パフォーマンスを妨げます。 矛盾します。2つの独立したシステムへの二重書き込みを行うと、どちらかの書き込みが失敗したときにデータが壊れ、トランザクションフローでデータをKafkaにプッシュするとレイテンシが増えるため、妥協したくありません。
  2. DBコミット(データベース/アプリケーションレベルのトリガーまたはトランザクションログ)から変更を抽出してKafkaに送信します。 それは非常に一貫性があり、あなたの取引に全く影響を与えません。 DBコミットログはコミットが成功した後のDBトランザクションの反映であるため、一貫性があります。 databusdebeziumdebeziumなどのようなこのアプローチを活用する利用可能な解決策がたくさんあります。

CDCがあなたのユースケースであるならば、すでに利用可能な解決策のどれかを使ってみてください。

私は現在JPAとKafkaが使用されているプロジェクトにいます。 これらの操作を組み合わせるための一連のグッドプラクティスを見つけようとしています。

既存のコードでは、プロデューサーはjpaと同じトランザクションで使用されていますが、私が読んだものから、トランザクションを共有していないようです。

@PostMapping
@Transactional
public XDto createX(@RequestBody XRequest request) {
    Xdto dto = xService.create(request);
    kafkaProducer.putToQueue(dto, Type.CREATE);
    return dto;
}

kafkaプロデューサーは次のように定義されています。

public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Type> template;

    public void putToQueue(Dto dto, Type eventType) {
        template.send("event", new Event(dto, eventType));
    }
}

これはjpaとkafkaを組み合わせるための有効な使用例ですか?トランザクション境界は正しく定義されていますか?


トランザクションが失敗した場合、これは意図したとおりには機能しません。 Kafkaインタラクションはトランザクションの一部ではありません。

あなたはTransactionalEventListener見たいと思うかもしれませんあなたはAFTER_COMMITイベントでkafkaにメッセージを書きたいと思うかもしれません。 それでもカフカパブリッシュは失敗するかもしれません。

もう一つの選択肢はあなたがしているようにjpaを使ってdbに書くことです。 データベースから更新されたデータをdebezium読み取ってdebeziumプッシュしましょう。 イベントは別の形式になりますが、はるかに豊かになります。


他の人が言ったように、あなたはデータベースに適用された変更を安全にApache Kafkaに伝播するために変更データキャプチャを使用することができます。 後者はいかなる2フェーズコミットプロトコルもサポートしていないため、単一のトランザクションでデータベースとKafkaを更新することはできません。

テーブル自体をCDC化するか、Kafkaに送信される構造についてさらに制御したい場合は、 "outbox"パターンを適用します。 その場合、アプリケーションは実際のビジネステーブルとKafkaに送信するメッセージを含む「送信トレイ」テーブルに書き込みます。 このブログ記事でこのアプローチの詳細な説明を見つけることができます。

免責事項:私はこの記事の執筆者であり、他の回答で言及されているCDCソリューションの1つであるDebeziumのリーダーです。







apache-kafka