トランザクションメッセージ
トランザクションメッセージはApache RocketMQの高度なメッセージタイプです。このトピックでは、トランザクションメッセージのアプリケーションシナリオ、動作メカニズム、制限事項、使用方法、および使用上の注意について説明します。
シナリオ
分散トランザクション
分散システムでコアビジネスロジックが実行されると、複数のダウンストリームビジネスが呼び出されてロジックを同時に処理します。したがって、コアビジネスとダウンストリームビジネス間の実行結果の一貫性を確保することは、分散トランザクションで解決する必要がある最大の課題です。
Eコマースのシナリオでは、ユーザーが注文すると、ダウンストリームシステムがトリガーされてそれに応じて変更が加えられます。例えば、物流システムは出荷を開始し、信用システムはユーザーの信用ポイントを更新し、ショッピングカートシステムはユーザーのショッピングカートをクリアする必要があります。処理分岐は次のとおりです。
注文システムは注文ステータスを未払いから支払済みに変更します。
物流システムは出荷予定レコードを追加し、注文物流レコードを作成します。
信用システムはユーザーの信用ポイントを更新します。
ショッピングカートシステムはショッピングカートをクリアし、ユーザーのショッピングカートレコードを更新します。
従来のXAベースのソリューション: パフォーマンスが低下
分岐間で結果の一貫性を確保するために使用される一般的な方法は、拡張アーキテクチャ(XA)プロトコルに基づく分散トランザクションシステムを使用することです。このシステムは4つのコールブランチを1つの大きなトランザクションにカプセル化し、このトランザクションには4つの独立したトランザクションブランチが含まれます。このソリューションでは結果の一貫性を確保できますが、これを達成するために大量のリソースをロックする必要があります。この数は分岐の数に応じて増加するため、システムの同時実行性が低下します。ダウンストリームブランチを追加すると、システムのパフォーマンスが低下します。
通常のメッセージベースのソリューション: 結果の一貫性に乏しい
XA ソリューションをベースにしたよりシンプルなソリューションでは、注文システムの変更をローカル トランザクションとして扱い、ダウンストリーム システムの変更をダウンストリーム タスクとして処理します。トランザクション ブランチは、注文テーブル トランザクションが追加された通常のメッセージとして扱われます。このソリューションでは、メッセージを非同期に処理して処理ライフサイクルを短縮し、システムの並行性を向上させます。
ただし、このソリューションは、次のように、コア トランザクションとトランザクション ブランチの間で不整合な結果が発生しやすいです。
メッセージは送信されたが、注文は実行されません。その結果、トランザクション全体をロールバックする必要があります。
注文は実行されましたが、メッセージは未送信です。この場合、メッセージは消費するために再送信する必要があります。
タイムアウト エラーを確実に検出できないため、注文をロールバックする必要があるのか、注文の変更をコミットする必要があるのかを判断することが困難になります。
Apache RocketMQ の分散トランザクション メッセージベース ソリューション: 完全な整合性
前述のソリューションで整合性が保証できない理由は、通常のメッセージにはスタンドアロン データベース トランザクションのコミット、ロールバック、統合された調整機能がないためです。
Apache RocketMQ のトランザクション メッセージ機能では、通常のメッセージベース ソリューションに基づいて 2 フェーズ コミットがサポートされます。この機能は、2 フェーズ コミットとローカル トランザクションを組み合わせることでコミット結果のグローバルな整合性を実現します。
Apache RocketMQ のトランザクション メッセージ ソリューションは、強力で、スケーラブルで、開発が容易です。トランザクション メッセージの動作メカニズムとプロセスに関する詳細は、動作メカニズムを参照してください。
動作メカニズム
定義
トランザクション メッセージは Apache RocketMQ によって提供される高度なメッセージタイプで、メッセージ作成とローカル トランザクション間の最終的な整合性を確保します。処理ワークフロー
次の図は、トランザクション メッセージの対話プロセスを示しています。
プロデューサーはメッセージを Apache RocketMQ ブローカーに送信します。
Apache RocketMQ ブローカーはメッセージを保存し、配信待ちとしてマークします。このような状態のメッセージは、ハーフ メッセージと呼ばれます。その後、ブローカーは確認メッセージ (ACK) をプロデューサーに送信します。
プロデューサーはローカル トランザクションを実行します。
プロデューサーはローカル トランザクションの実行結果を送信するための 2 つ目の ACK をブローカーに送信します。実行結果はコミットまたはロールバックです。
ブローカーが受信するメッセージのステータスがコミットの場合、ブローカーはハーフ メッセージを配信可能としてマークし、そのメッセージをコンシューマーに配信します。
ブローカーが受信するメッセージのステータスがロールバックの場合、ブローカーはトランザクションをロールバックし、ハーフ メッセージをコンシューマーに配信しません。
ネットワークが切断されたり、プロデューサー アプリケーションが再起動されたりして、ブローカーが 2 つ目の ACK を受信できなかったり、ハーフ メッセージのステータスが不明の場合、ブローカーは一定時間待機し、プロデューサー クラスター内のプロデューサーにリクエストを送信してハーフ メッセージのステータスを照会します。メモ この期間の長さと照会の最大回数については、パラメーター制限を参照してください。
プロデューサーがリクエストを受信すると、プロデューサーはハーフメッセージに対応するローカルトランザクションの実行結果を確認します。
プロデューサーはローカルトランザクションの実行結果に基づいて、Apache RocketMQブローカーに別のACKを送信します。その後、ブローカーは手順4に従ってハーフメッセージを処理します。
トランザクションメッセージのライフサイクル
初期化: メッセージはプロデューサーによって構築され初期化され、ブローカーに送信する準備が整います。
トランザクション保留中: ハーフメッセージがブローカーに送信されます。ただし、永続ストレージ用にディスクにすぐに書き込まれることはありません。代わりに、トランザクションストレージシステムに格納されます。ローカルトランザクションの第2フェーズが成功したことがシステムによって確認されるまで、メッセージはコミットされません。この期間中、メッセージはダウンストリームのコンシューマーには表示されません。
ロールバック: 第2フェーズでトランザクションの実行結果がロールバックされると、ブローカーはハーフメッセージをロールバックしてワークフローを終了します。
準備完了: メッセージはブローカーに送信され、コンシューマーに表示され、使用できます。
処理中: メッセージはコンシューマーによって取得され、コンシューマーのローカルビジネスロジックに基づいて処理されます。
このプロセスでは、ブローカーはコンシューマーが消費を完了して、消費結果を送信するのを待ちます。一定期間内にコンシューマーから応答がない場合、Apache RocketMQはメッセージを再試行します。詳細については、消費の再試行を参照してください。
確認応答: コンシューマーは消費を完了し、消費結果をブローカーに送信します。ブローカーは、現在のメッセージが正常に消費されたかどうかをマークします。
デフォルトでは、Apache RocketMQはすべてのメッセージを保持します。消費結果が送信されると、メッセージデータはすぐに削除されるのではなく、論理的に消費されたとしてマークされます。したがって、コンシューマーは、保持期間の期限切れまたはストレージスペースの不足のために削除される前に、メッセージをバックトラックして再消費できます。
削除: メッセージの保持期間が期限切れになるか、ストレージスペースが不足すると、Apache RocketMQは物理ファイルから保存された最も古いメッセージをローリング方式で削除します。詳細については、メッセージのストレージとクリーンアップを参照してください。
使用制限
メッセージタイプの一貫性
トランザクションメッセージは、MessageTypeがトランザクションであるトピックでのみ使用できます。
トランザクション中心の消費
Apache RocketMQのトランザクションメッセージ機能は、同じトランザクションをローカルコアイトランザクションとダウンストリームブランチ間で処理できることを保証します。ただし、メッセージ消費結果とアップストリーム実行結果の一貫性は保証されません。したがって、ダウンストリームビジネスはメッセージが正しく処理されるようにする必要があります。障害が発生した場合にメッセージが正しく処理されるように、コンシューマーが消費の再試行を適切に行うことをお勧めします。
中間状態の可視性
Apache RocketMQのトランザクションメッセージ機能は最終的な一貫性のみを保証するため、メッセージがコンシューマーに配信される前に、アップストリームトランザクションとダウンストリームブランチ間でステータスの整合性が保証されません。したがって、トランザクションメッセージは非同期実行を受け入れるトランザクションシナリオにのみ適しています。
トランザクションタイムアウトメカニズム
Apache RocketMQは、トランザクションメッセージのタイムアウトメカニズムを実装しています。ブローカーは半メッセージを受け取ると、一定時間後にトランザクションをコミットするかロールバックするかを判断できない場合、デフォルトでメッセージをロールバックします。タイムアウト期間の詳細については、パラメーター制限を参照してください。
例
トピックを作成する
Apache RocketMQ 5.0でトピックを作成する場合、mqadminツールを使用することをお勧めします。ただし、メッセージタイプはプロパティパラメーターとして追加する必要があることに注意してください。例を次に示します
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=Transaction
メッセージを送信する
トランザクションメッセージを送信することは、次の観点で通常のメッセージを送信することとは異なります
トランザクションメッセージを送信する前に、トランザクションチェッカーを有効にして、ローカルトランザクションの実行と関連付ける必要があります。
プロデューサーを作成するとき、トランザクションチェッカーを設定し、送信されるメッセージのトピックのリストをバインドする必要があります。これにより、クライアントの組み込みトランザクションチェッカーが例外が発生した場合にトピックを復元できるようになります。
トランザクショントピックを作成する
NORMALトピックはトランザクションメッセージの配信をサポートしません。トランザクションメッセージをNORMALトピックに送信すると、エラーが発生します。
./bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster -a +message.type=TRANSACTION
- -cクラスター名
- -tトピック名
- -nネームサーバーのアドレス
- -a追加の属性。トランザクションメッセージの配信をサポートするために、値が
TRANSACTION
のmessage.type
属性を追加します。
次の例では、Javaを例として使用して、トランザクションメッセージを送信する方法を示します
// The demo is used to simulate the order table query service to check whether the order transaction is submitted.
private static boolean checkOrderById(String orderId) {
return true;
}
// The demo is used to simulate the execution result of a local transaction.
private static boolean doLocalTransaction() {
return true;
}
public static void main(String[] args) throws ClientException {
ClientServiceProvider provider = new ClientServiceProvider();
MessageBuilder messageBuilder = new MessageBuilder();
// Build a transaction producer: The transactional message requires the producer to build a transaction checker to check the intermediate status of an exceptional half message.
Producer producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> {
/**
* The transaction checker checks whether the local transaction is correctly committed or rolled back based on the business ID, for example, an order ID.
* If this order is found in the order table, the order insertion action is committed correctly by the local transaction. If no order is found in the order table, the local transaction has been rolled back.
*/
final String orderId = messageView.getProperties().get("OrderId");
if (Strings.isNullOrEmpty(orderId)) {
// Message error. Rollback is returned.
return TransactionResolution.ROLLBACK;
}
return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
})
.build();
// Create a transaction branch.
final Transaction transaction;
try {
transaction = producer.beginTransaction();
} catch (ClientException e) {
e.printStackTrace();
// If the transaction branch fails to be created, the transaction is terminated.
return;
}
Message message = messageBuilder.setTopic("topic")
// Specify the message index key so that the system can use a keyword to accurately locate the message.
.setKeys("messageKey")
// Specify the message tag so that consumers can use the tag to filter the message.
.setTag("messageTag")
// For transactional messages, a unique ID associated with the local transaction is created to verify the query of the local transaction status.
.addProperty("OrderId", "xxx")
// Message body.
.setBody("messageBody".getBytes())
.build();
// Send a half message.
final SendReceipt sendReceipt;
try {
sendReceipt = producer.send(message, transaction);
} catch (ClientException e) {
// If the half message fails to be sent, the transaction can be terminated and the message is rolled back.
return;
}
/**
* Execute the local transaction and check the execution result.
* 1. If the result is Commit, deliver the message.
* 2. If the result is Rollback, roll back the message.
* 3. If an unknown exception occurs, no action is performed until a response is obtained from a half message status query.
*
*/
boolean localTransactionOk = doLocalTransaction();
if (localTransactionOk) {
try {
transaction.commit();
} catch (ClientException e) {
// You can determine whether to retry the message based on your business requirements. If you do not want to retry the message, you can use the half message status query to submit the transaction status.
e.printStackTrace();
}
} else {
try {
transaction.rollback();
} catch (ClientException e) {
// We recommend that you record the exception information. This enables you to submit the transaction status based on the half message status query in the event of a rollback exception. Otherwise, you have to retry the message.
e.printStackTrace();
}
}
}
使用上の注意
多数の半メッセージがタイムアウトしないようにしてください。
Apache RocketMQでは、トランザクションコミット中に例外が発生した場合にトランザクションを確認して、トランザクションの整合性を確保できます。ただし、プロデューサーはローカルトランザクションが不明な結果を返すことを避けるようにする必要があります。多数のトランザクションチェックがあるとシステムのパフォーマンスが低下し、トランザクション処理が遅くなる可能性があります。
進行中のトランザクションを適切に処理する。
半メッセージステータスのクエリ中に、進行中のトランザクションに対してロールバックまたはコミットを返さないでください。代わりに、トランザクションに不明なステータスを維持します。
一般に、トランザクションが進行中の理由は、トランザクションの実行が遅く、クエリが頻繁に行われるためです。2つの解決策が推奨されます
最初のクエリの間隔をより大きな値に設定します。ただし、これはクエリ結果に依存するメッセージに大きな遅延を引き起こす可能性があります。
プログラムが進行中のトランザクションを正しく認識するようにします。