通常メッセージ
通常メッセージは、Apache RocketMQで特別な機能を持たないメッセージです。FIFOメッセージ、遅延メッセージ、トランザクションメッセージなどの機能付きメッセージとは異なります。このトピックでは、通常メッセージのシナリオ、動作メカニズム、使用方法、および使用上の注意について説明します。
シナリオ
通常メッセージは、一般的にマイクロサービスの非同期化、データ統合、イベント駆動型シナリオで使用されます。これらのシナリオのほとんどは、信頼性の高い伝送チャネル以外、メッセージ処理のタイミングや順序に関する要件が低いか、またはありません。
シナリオ1:マイクロサービスの非同期化
上の図は、オンラインEコマーストランザクションのシナリオを示しています。このシナリオでは、上流の注文システムは注文と支払いを独立した通常メッセージとしてカプセル化し、Apache RocketMQブローカーに送信します。その後、下流のシステムは必要に応じてブローカーからメッセージを購読し、ローカルの消費ロジックに基づいてタスクを処理します。メッセージは互いに独立しており、関連付ける必要はありません。
シナリオ2:データ統合伝送
上の図は、オフラインログ収集の例を示しています。インストゥルメンテーションコンポーネントを使用して、フロントエンドアプリケーションから操作ログを収集し、Apache RocketMQに転送します。各メッセージは、Apache RocketMQによる処理を必要としないログデータの一部です。Apache RocketMQは、ログデータを下流のストレージと分析システムに送信するだけで済みます。バックエンドアプリケーションは、後続の処理タスクを担当します。
動作メカニズム
通常メッセージの定義
通常メッセージは、Apache RocketMQの基本機能を持つメッセージです。通常メッセージは、プロデューサーとコンシューマ間の非同期化と通信をサポートします。
通常メッセージのライフサイクル
初期化:メッセージはプロデューサーによって構築および初期化され、ブローカーに送信される準備が整います。
準備完了:メッセージはブローカーに送信され、コンシューマに表示され、消費可能になります。
処理中:メッセージはコンシューマによって取得され、コンシューマのローカルビジネスロジックに基づいて処理されます。
このプロセスでは、ブローカーはコンシューマが消費を完了して消費結果を送信するのを待ちます。一定時間内にコンシューマから応答がない場合、Apache RocketMQはメッセージを再試行します。詳細については、消費時の再試行を参照してください。
確認済み:コンシューマは消費を完了し、消費結果をブローカーに送信します。ブローカーは、現在のメッセージが正常に消費されたかどうかをマークします。
デフォルトでは、Apache RocketMQはすべてのメッセージを保持します。消費結果が送信されると、メッセージデータはすぐに削除されるのではなく、論理的に消費済みとしてマークされます。そのため、保持期間の期限切れやストレージ容量の不足によって削除される前に、コンシューマはメッセージを遡って再消費できます。
削除済み:メッセージの保持期間が期限切れになった場合、またはストレージ容量が不足している場合、Apache RocketMQはローリング方式で物理ファイルから最も古く保存されているメッセージを削除します。詳細については、メッセージストレージとクリーンアップを参照してください。
使用上の制限
通常メッセージは、MessageTypeがNormalのトピックのみをサポートします。
例
トピックの作成
Apache RocketMQ 5.0でトピックを作成するには、mqadminツールを使用することをお勧めします。ただし、メッセージタイプをプロパティパラメータとして追加する必要があることに注意してください。例を以下に示します。
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=NORMAL
メッセージの送信
インデックスキーとフィルタタグを設定して、通常メッセージをフィルタリングまたは検索できます。次のサンプルコードは、Javaで通常メッセージを送受信する方法を示しています。
// Send a normal message.
MessageBuilder messageBuilder = new MessageBuilder();
Message message = messageBuilder.setTopic("topic")
// Specify the message index key so that you can accurately search for the message by using a keyword.
.setKeys("messageKey")
// Specify the message tag so that the consumer can filter the message based on the specified tag.
.setTag("messageTag")
// Message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the message. You need to pay attention to the sending result and capture exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
// Consumption example 1: When you consume a normal message as a push consumer, you need only to process the message in the message listener.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
// Return the status based on the consumption result.
return ConsumeResult.SUCCESS;
}
};
// Consumption example 2: When you consume a normal message as a simple consumer, you must obtain and consume the message, and submit the consumption result.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, you must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, you must re-initiate the request to obtain the message.
e.printStackTrace();
}
使用上の注意
トラブルシューティングを容易にするために、グローバルに一意のインデックスキーを設定する
Apache RocketMQでは、メッセージキーであるカスタムインデックスキーを設定できます。メッセージのクエリとトレースを行う際に、インデックスキーはこれらのメッセージを効率的かつ正確に見つけるのに役立ちます。
そのため、メッセージを送信する際には、注文IDやユーザーIDなど、サービスの一意の情報をインデックスとして使用することをお勧めします。これにより、将来、メッセージを迅速に見つけることができます。