メインコンテンツにスキップ
バージョン: 5.0

順序メッセージ

順序メッセージは、Apache RocketMQの高度な機能を備えたメッセージタイプです。このトピックでは、順序メッセージのシナリオ、動作メカニズム、制限、使用例、および使用上の注意について説明します。

シナリオ

異種システムは、状態同期を使用して、順序付けされたイベント処理、トランザクションマッチング、リアルタイムの増分データ同期などのシナリオで強力な整合性を維持します。上記のシナリオでは、イベント変更が発生したときに、アップストリームアプリケーションからダウンストリームアプリケーションへのメッセージの順序付けられた配信が必要です。 Apache RocketMQは、順序付けられたメッセージ伝送を実装するのに役立つ順序メッセージを提供します。

シナリオ1: トランザクションマッチング トランザクションマッチング

たとえば、証券および株式取引のシナリオでは、複数の入札者が同じ入札価格で入札注文を提供する場合、最初にその入札価格を提供した入札者が落札します。したがって、ダウンストリームの注文処理システムは、価格が提示された順序で注文を処理するように設計する必要があります。

シナリオ2: リアルタイムの増分データ同期

ノーマルメッセージノーマルメッセージ FIFOメッセージ順序メッセージ

たとえば、データベースの変更に関連するデータの増分同期を実行するとします。 Apache RocketMQで提供される順序メッセージを使用して、アップストリームのソースデータベースからダウンストリームのクエリシステムにメッセージを送信できます。メッセージは、追加、削除、および変更操作のバイナリログにすることができます。ダウンストリームシステムは、メッセージが送信された順序でメッセージを取得し、データベースのステータスが同じ順序で更新されるようにします。順序メッセージは、アップストリームシステムの操作とダウンストリームシステムのステータスデータ間の一貫性を確保するのに役立ちます。このシナリオでノーマルメッセージを使用すると、状態の不整合が発生する可能性があります。

動作メカニズム

順序メッセージの定義

順序メッセージは、Apache RocketMQの高度なメッセージタイプです。順序メッセージは、メッセージが送信された順序でコンシューマーに配信されます。このメッセージタイプを使用すると、ビジネスシナリオで順序付けられた処理を実装できます。

順序メッセージの定義上の特徴は、メッセージの送信、保存、および配信の順序です。

Apache RocketMQは、メッセージグループを使用して順序メッセージの順序を決定します。順序メッセージにはメッセージグループを設定する必要があります。メッセージグループ内のメッセージは、先入れ先出し(FIFO)の順序で処理されます。メッセージの順序付けは、異なるメッセージグループまたはメッセージグループに属さないメッセージには適用されません。

メッセージグループベースのメッセージ順序付けにより、ビジネスロジックに基づいてきめ細かいメッセージ順序付けを指定できます。これにより、ビジネスシステムで部分的なメッセージ順序付けを実装し、ビジネスシステムの並行性とスループットの度合いを向上させることができます。

メッセージ順序付け

Apache RocketMQでは、2種類のメッセージ順序が適用されます。生産順序と消費順序です。

  • **生産順序**: Apache RocketMQは、プロデューサーとサーバー間で確立されたプロトコルを使用して、メッセージがプロデューサーからサーバーに順番に送信され、メッセージが送信された順序でメッセージが保存および永続化されることを保証します。

    メッセージの生産順序を確保するには、以下の条件が満たされていることを確認してください。

    • 単一プロデューサー: メッセージの生産順序は、個々のプロデューサーに適用されます。メッセージに同じメッセージグループを設定した場合でも、Apache RocketMQは異なるシステムの異なるプロデューサーからのメッセージの順序を判断できません。

    • シリアル伝送: Apache RocketMQのプロデューサーは、複数のスレッドを使用して安全なアクセスをサポートしています。プロデューサーが複数のスレッドを使用してメッセージを同時に送信する場合、Apache RocketMQは異なるスレッドからのメッセージの順序を判断できません。

上記の条件を満たすプロデューサーがApache RocketMQにメッセージを送信する場合、同じメッセージグループに属するメッセージは、メッセージが送信された順序で同じキューに保存されます。次の図は、Apache RocketMQの順次保存ロジックを示しています。

顺序存储逻辑

前の図では、MessageGroup 1とMessageGroup 4からのメッセージは、同じキュー(MessageQueue 1)に保存されています。 Apache RocketMQは、MessageGroup 1からのメッセージG1-M1、G1-M2、およびG1-M3が送信された順序でキューに保存されることを保証します。 MessageGroup 4からのメッセージG4-M1とG4-M2も、送信された順序で保存されます。ただし、MessageGroup 1とMessageGroup 4からのメッセージは、特定の順序で保存されません。

  • **消費順序**:

    Apache RocketMQは、コンシューマーとサーバー間で確立されたプロトコルを使用して、メッセージが保存された順序で消費されることを保証します。

    メッセージの消費順序を確保するには、以下の条件が満たされていることを確認してください。

    • 配信順序: Apache RocketMQは、クライアントSDKとサーバー側の通信プロトコルを使用して、サーバー上のメッセージストレージ順にメッセージが配信されることを保証します。コンシューマーアプリケーションがメッセージを消費する場合、アプリケーションは受信-処理-応答パスに従って、非同期処理によって caused by asynchronous processing.

      注意
      • PushConsumerコンシューマーがメッセージを消費する場合、Apache RocketMQは、メッセージが保存された順序でメッセージがコンシューマーに1つずつ配信されることを保証します。
      • SimpleConsumerコンシューマーがメッセージを消費する場合、コンシューマーは一度に複数のメッセージをプルする可能性があり、ビジネスアプリケーションにはメッセージ消費順序を実装するソリューションが必要です。コンシューマータイプの詳細については、コンシューマータイプを参照してください。
    • 制限付き再試行: Apache RocketMQは、順序メッセージの配信再試行回数を制限します。メッセージが配信再試行の最大回数に達すると、Apache RocketMQは消費のためのメッセージの配信の再試行を停止します。これにより、キュー内の他のメッセージが配信を常に待機することを防ぎます。

    消費順序が重要なシナリオでは、メッセージの順序が狂うのを防ぐために適切な再試行回数を指定することをお勧めします。

生産順序と消費順序の組み合わせ

FIFOの原則に基づいてメッセージを処理する場合は、生産順序と消費順序が必要です。ほとんどのビジネスシナリオでは、プロデューサーは複数のコンシューマーにマッピングされる可能性があり、すべてのコンシューマーがメッセージの順序付けられた消費を必要とするわけではありません。生産順序と消費順序の設定を組み合わせて、さまざまなビジネスシナリオでの要件を満たすことができます。たとえば、順序付けられたメッセージを送信し、順序付けられていない並列消費を使用してスループットを向上させることができます。次の表に、生産順序と消費順序の設定のさまざまな組み合わせを示します。

生産順序消費順序効果
メッセージグループを設定して、メッセージの順序付けられた配信を実装します。メッセージの順序付けられた消費メッセージの順序は、メッセージグループレベルで保証されます。同じメッセージグループ内のメッセージは、同じ順序で送受信されます。
メッセージグループを設定して、メッセージの順序付けられた配信を実装します。並列消費メッセージは、時系列に並行して消費されます。
メッセージグループを設定せずに、メッセージの順序付けられていない配信を実装します。メッセージの順序付けられた消費メッセージの順序は、キューレベルで保証されます。メッセージの消費は、キューの属性に基づいています。 Apache RocketMQは、消費順序がキュー内の保存順序と同じであることを保証しますが、必ずしもメッセージ送信順序と同じであるとは限りません。
メッセージグループを設定せずに、メッセージの順序付けられていない配信を実装します。並列消費メッセージは、時系列に並行して消費されます。

**順序メッセージのライフサイクル** ライフサイクル

  • 初期化済み: メッセージはプロデューサーによって構築および初期化され、ブローカーに送信される準備ができています。

  • 準備完了: メッセージはブローカーに送信され、コンシューマーから見えて消費可能です。

  • 処理中: メッセージはコンシューマーによって取得され、コンシューマーのローカルビジネスロジックに基づいて処理されます。

    このプロセスでは、ブローカーはコンシューマーが消費を完了し、消費結果を送信するのを待ちます。一定時間内にコンシューマーから応答がない場合、Apache RocketMQはメッセージを再試行します。詳細は、消費再試行を参照してください。

  • 承認済み:コンシューマーは消費を完了し、消費結果をブローカーに送信します。ブローカーは、現在のメッセージが正常に消費されたかどうかをマークします。

    デフォルトでは、Apache RocketMQはすべてのメッセージを保持します。消費結果が送信されると、メッセージデータはすぐに削除されるのではなく、消費済みとして論理的にマークされます。そのため、コンシューマーは、保存期間の満了またはストレージ容量の不足により削除される前に、メッセージを再消費するためにバックトラックできます。

  • 削除済み:メッセージの保存期間が満了したか、ストレージ容量が不足している場合、Apache RocketMQは、最も古い保存済みメッセージを物理ファイルからローリング方式で削除します。詳細は、メッセージの保存とクリーンアップを参照してください。

注意
  • メッセージ消費の失敗またはタイムアウトは、サーバーの再試行ロジックをトリガーします。メッセージの消費再試行がトリガーされると、メッセージはそのライフサイクルの最後に達します。元のメッセージは、新しいメッセージIDを持つ新しいメッセージと見なされます。

  • 順序付けられたメッセージの消費再試行がトリガーされた場合、順序付けられたメッセージに続くメッセージは、順序付けられたメッセージが消費された後にのみ処理できます。

使用制限

順序付けられたメッセージは、MessageTypeがFIFOであるトピックのみをサポートします。

トピックの作成

Apache RocketMQ 5.0でトピックを作成するには、mqadminツールを使用することをお勧めします。ただし、メッセージタイプはプロパティパラメータとして追加する必要があることに注意してください。次に例を示します。

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=FIFO

メッセージの送信

通常のメッセージと比較して、順序付けられたメッセージにはメッセージグループが設定されている必要があります。ワークロードの分離と並行性のスケーリングを可能にするために、ビジネス要件に基づいてメッセージグループをきめ細かく設定することをお勧めします。

FIFOトピックの作成

./bin/mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -n 127.0.0.1:9876 -a +message.type=FIFO
  • -c クラスタ名
  • -t トピック名
  • -n ネームサーバーのアドレス
  • -o 順序付けられたトピックを作成するためのフラグ

次のサンプルコードは、Javaで順序付けられたメッセージを送受信する方法の例を示しています。

        // Send ordered messages. 
MessageBuilder messageBuilder = null;
Message message = messageBuilder.setTopic("topic")
// Specify the message index key. The system uses the key to locate the message.
.setKeys("messageKey")
// Specify the message tag. The consumer can use the tag to filter the message.
.setTag("messageTag")
// Configure a message group for the ordered messages. We recommend that you do not include a large number of messages in the group.
.setMessageGroup("fifoGroup001")
// Configure the message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the messages. Focus on the result of message sending and exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
// Make sure that ordered delivery is applied to the consumer group. Otherwise, the messages are delivered concurrently and in no particular order.
// Consumption example 1: If the consumer type is PushConsumer, the consumer needs to only process the message in the message listener.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
// Return the status based on the consumption result.
return ConsumeResult.SUCCESS;
}
};
// Consumption example 2: If the consumer type is SimpleConsumer, the consumer must actively obtain the message for consumption and submit the consumption result.
// If the consumption of a message in the message group has not finished, the next message in the message group cannot be retrieved if you call the Receive function.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, the consumer must re-initiate the request to obtain the message.
e.printStackTrace();
}

使用上の注意

メッセージ処理の順序が狂わないように、シリアル消費を使用してください。

バッチ消費ではなく、シリアルメッセージ消費を使用することをお勧めします。複数のメッセージを同時に消費すると、メッセージ処理の順序が狂う可能性があります。

たとえば、メッセージ1、2、3、4が1-2-3-4の順序で送信され、バッチ消費の順序が1-[2, 3](バッチで処理されたが失敗した)-[2, 3](再試行)-4の場合、メッセージ3の処理に失敗すると、システムはメッセージ2を繰り返し処理する可能性があります。その結果、メッセージ消費の順序が狂います。

メッセージグループに多数のメッセージを含めないでください。

Apache RocketMQは、同じメッセージグループ内のメッセージが同じキューに格納されることを保証します。多数のメッセージを含むメッセージグループは、対応するキューに過負荷をかけます。これはメッセージングのパフォーマンスに影響を与え、スケーラビリティを阻害します。メッセージグループを設定するときは、注文IDとユーザーIDをメッセージのシーケンス条件として使用できます。これにより、同じユーザーのメッセージの順序が保証されます。

ビジネスアプリケーションのメッセージをメッセージグループごとに分割することをお勧めします。たとえば、注文IDとユーザーIDをメッセージグループのキーワードとして使用して、同じユーザーのメッセージの順序付けられた処理を実装できます。異なるユーザーのメッセージの順序を保証する必要はありません。