メインコンテンツへスキップ
バージョン: 5.0

遅延メッセージ

遅延メッセージは、Apache RocketMQ の高度な機能を持つメッセージです。このトピックでは、遅延メッセージと遅延メッセージのシナリオ、動作メカニズム、制限、使用例、および使用上の注意について説明します。

注記

スケジュールされたメッセージと遅延メッセージは基本的に同じです。どちらも、メッセージで設定されたタイミング時間に従って、固定時間でコンシューマーにメッセージを配信します。したがって、以下のセクションでは遅延メッセージを使用します。

シナリオ

分散型時間ベースのスケジューリングやタスクタイムアウト処理などのシナリオでは、正確で信頼性の高い時間ベースのイベントトリガーが必要です。Apache RocketMQ は、遅延メッセージを提供することで、時間ベースのスケジュールタスクの開発を簡素化し、高性能でスケーラブルかつ信頼性の高い時間ベースのトリガーを実装するのに役立ちます。

シナリオ1:分散型時間ベースのスケジューリング 定时消息

分散型時間ベースのスケジューリングシナリオには、たとえば、毎日 5 時にファイルクリーンアップを実行するタスクや、2 分ごとにプッシュメッセージをトリガーするタスクなど、さまざまな時間の粒度レベルを必要とするタスクが含まれます。従来のデータセットベースの時間ベースのスケジューリングソリューションは、分散シナリオでは複雑で非効率的です。対照的に、Apache RocketMQ の遅延メッセージを使用すると、複数の種類の時間トリガーをカプセル化できます。

シナリオ2:タスクタイムアウト処理 超时任务处理

タスクタイムアウト処理を含む一般的なシナリオは、電子商取引の支払いであり、未払いの注文は、すぐにキャンセルされるのではなく、特定の期間未払いである場合にキャンセルされます。この場合、Apache RocketMQ の遅延メッセージを使用して、タイムアウトタスクをチェックしてトリガーできます。

遅延メッセージに基づくタスクタイムアウト処理には、次の利点があります。

  • さまざまな時間の粒度レベルと簡略化された開発: Apache RocketMQ のスケジュールされたメッセージングには、固定時間増分の制限はありません。任意の時間の粒度レベルで、重複排除なしでタスクをトリガーできます。

  • 高いパフォーマンスとスケーラビリティ: Apache RocketMQ の遅延メッセージは、高い同時実行性とスケーラビリティを提供します。これにより、実装が複雑で、スキャンするための頻繁な API 呼び出しが原因でパフォーマンスのボトルネックが発生する可能性がある、従来型のデータベーススキャン方式よりも優れたパフォーマンスを発揮します。

動作メカニズム

遅延メッセージの定義

遅延メッセージは、Apache RocketMQ の高度な機能を持つメッセージです。遅延メッセージを使用すると、コンシューマーは、指定された期間の後、または指定された時間にのみサーバーに送信されたメッセージを消費できます。遅延メッセージを使用すると、分散シナリオで遅延スケジューリングとトリガーを実装できます。

時間設定規則

  • Apache RocketMQ の遅延メッセージのスケジュール時間または遅延時間は、期間ではなくタイムスタンプとして表されます。

  • スケジュール時間は、ミリ秒レベルの Unix タイムスタンプの形式です。メッセージ配信のスケジュール時間をミリ秒レベルの Unix タイムスタンプに変換する必要があります。Unix タイムスタンプ変換ツールを使用すると、時間をミリ秒レベルの Unix タイムスタンプに変換できます。

  • スケジュール時間は、許容時間範囲内である必要があります。スケジュール時間が範囲を超える場合、スケジュール時間は有効にならず、メッセージはサーバー側からすぐに配信されます。

  • デフォルトでは、遅延メッセージの最大時間範囲は 24 時間です。デフォルト値は変更できません。詳細については、パラメータ制限を参照してください。

  • スケジュール時間は、現在の時間より後である必要があります。スケジュール時間が現在の時間よりも前の時間に設定されている場合、スケジュール時間は有効にならず、メッセージはサーバー側からすぐに配信されます。

次のセクションでは、2 つの時間設定の例を示します。

  • 遅延メッセージ: 現在の時刻が 2022 年 6 月 9 日 17:30:00 で、2022 年 6 月 9 日 19:20:00 にメッセージを配信する場合、スケジュールされた時間のミリ秒レベルの Unix タイムスタンプは 1654773600000 です。

  • 遅延メッセージ: 現在の時刻が 2022 年 6 月 9 日 17:30:00 で、1 時間後にメッセージを配信する場合、メッセージの配信時間は 2022 年 6 月 9 日 18:30:00 であり、ミリ秒レベルの Unix タイムスタンプは 1654770600000 です。

スケジュールされたメッセージのライフサイクル

定时消息生命周期

  • 初期化済み: メッセージはプロデューサーによって構築および初期化され、サーバーに送信される準備ができています。

  • タイミング: メッセージはサーバー側に送信され、指定された配信時間まで時間ベースのストレージシステムに格納されます。メッセージのインデックスはすぐには作成されません。

  • 準備完了: 指定された時間に、メッセージは通常のストレージエンジンに書き込まれます。ここで、メッセージはコンシューマーに対して表示され、コンシューマーによる消費を待機します。

  • インフライト: メッセージはコンシューマーによって取得され、コンシューマーのローカルビジネスロジックに基づいて処理されます。

    このプロセスでは、ブローカーはコンシューマーが消費を完了し、消費結果を送信するのを待ちます。コンシューマーから一定期間応答がない場合、Apache RocketMQ はメッセージを再試行します。詳細については、消費リトライを参照してください。

  • Acked: コンシューマーは消費を完了し、消費結果をブローカーに送信します。ブローカーは、現在のメッセージが正常に消費されたかどうかをマークします。

    デフォルトでは、Apache RocketMQ はすべてのメッセージを保持します。消費結果が送信されると、メッセージデータはすぐに削除されるのではなく、論理的に消費済みとしてマークされます。したがって、コンシューマーは、保持期間の満了またはストレージスペースの不足により削除される前に、メッセージを再消費するためにバックトラックできます。

  • 削除済み: メッセージの保持期間が満了するか、ストレージスペースが不足すると、Apache RocketMQ は、最初に保存されたメッセージをローリング方式で物理ファイルから削除します。詳細については、メッセージの保存とクリーンアップを参照してください。

使用制限

メッセージタイプの一貫性

遅延メッセージは、MessageType が Delay のトピックにのみ送信できます。

時間の粒度

Apache RocketMQ の遅延メッセージの時間粒度は、ミリ秒単位です。デフォルトの粒度値は 1000 ミリ秒です。

Apache RocketMQ の遅延メッセージの状態は永続的に保存できます。メッセージングシステムで障害が発生して再起動した場合でも、メッセージは指定された配信時間に基づいて配信されます。ただし、ストレージシステムで例外が発生した場合や再起動された場合、遅延メッセージの配信に遅延が発生する可能性があります。

トピックの作成

Apache RocketMQ 5.0 でトピックを作成するには、mqadmin ツールを使用することをお勧めします。ただし、メッセージタイプをプロパティパラメータとして追加する必要があることに注意してください。次に例を示します。

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=Delay

メッセージの送信

通常のメッセージとは異なり、遅延メッセージには、配信タイムスタンプを指定する必要があります。

DELAY トピックの作成

/bin/mqadmin updateTopic -c DefaultCluster -t DelayTopic -n 127.0.0.1:9876 -a +message.type=DELAY
  • -c クラスター名
  • -t トピック名
  • -n ネームサーバーのアドレス
  • -a 追加属性。ここでは、DELAY メッセージの配信をサポートするために、値が DELAYmessage.type 属性を追加します。

次のコードは、遅延メッセージの配信と消費の Java の例を示しています。

        // Send delay messages.
MessageBuilder messageBuilder = null;
// Specify a millisecond-level Unix timestamp. In this example, the specified timestamp indicates that the message will be delivered in 10 minutes from the current time.
Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = messageBuilder.setTopic("topic")
// Specify the message index key. The system uses the key to locate the message.
.setKeys("messageKey")
// Specify the message tag. The consumer can use the tag to filter messages.
.setTag("messageTag")
.setDeliveryTimestamp(deliverTimeStamp)
// Configure the message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the messages. Focus on the result of message sending and exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
// Consumption example 1: If a scheduled message is consumed by a push consumer, the consumer needs to process the message only in the message listener.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView.getDeliveryTimestamp());
// Return the status based on the consumption result.
return ConsumeResult.SUCCESS;
}
};
// Consumption example 2: If a scheduled message is consumed by a simple consumer, the consumer must obtain the message for consumption and submit the consumption result.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, you must re-initiate the request to obtain the message.
e.printStackTrace();
}
}

使用上の注意

多数のメッセージに対して同じ配信時間をスケジュールしないことをお勧めします。

遅延メッセージは、指定された配信時間にコンシューマーに配信される前に、時間ベースのストレージシステムに格納されます。多数の遅延メッセージに同じ配信時間を指定すると、システムは配信時間にメッセージを同時に処理する必要があります。これにより、システムに大きな負荷がかかり、メッセージの配信が遅れることになります。