コンシューマリトライ
メッセージの消費に失敗した場合、Apache RocketMQはコンシューマリトライポリシーに基づいてメッセージを再配信します。これにより、いくつかの障害を取り除くことができます。このトピックでは、コンシューマリトライ機能の動作メカニズム、バージョン互換性、および使用上の注意について説明します。
シナリオ
Apache RocketMQのコンシューマリトライ機能は、ビジネス処理ロジックの失敗によって影響を受ける可能性のある消費の完全性を保証します。この機能は、ビジネスロジックの失敗に対する保護手段です。ビジネスプロセスを制御するために使用することはできません。
この機能は、以下のシナリオでの使用に適しています。
メッセージの内容が原因でビジネスが失敗する場合。たとえば、トランザクションステータスが返されず、特定の期間内にビジネスが復旧することが期待される場合。
消費失敗の原因がビジネスの継続性に影響を与えない場合。失敗が発生する可能性が低く、後続のメッセージは予想どおりに配信および消費される可能性が非常に高い場合。このような場合、リトライメカニズムを使用してメッセージを再配信し、プロセスをブロックすることを回避できます。
次のシナリオでは、この機能を使用しないでください。
消費失敗が、処理ロジックにおけるメッセージフローを切り替えるための条件として使用される場合。処理ロジックが、多くのメッセージが消費に失敗することを想定している場合。
消費失敗がメッセージ処理のレートを制限するために使用される場合。レート制限は、メッセージをリトライリンクに入れるのではなく、過剰なメッセージを後で処理するために一時的にキューにスタックするために使用する必要があります。
目的
非同期デカップリングにおけるメッセージミドルウェアの一般的な問題は、ダウンストリームサービスがメッセージの処理に失敗した場合に、呼び出しリンク全体の完全性をどのように確保するかということです。金融グレードの信頼性の高いメッセージミドルウェアサービスとして、Apache RocketMQは、すべてのメッセージがビジネスの期待どおりに処理されることを保証するために、適切に設計されたメッセージ確認とリトライメカニズムを使用しています。
Apache RocketMQのメッセージ確認とリトライメカニズムを理解することは、以下の問題の解決に役立ちます。
すべてのメッセージが処理されることを保証する方法:コンシューマーロジックに基づいてすべてのメッセージが処理され、ビジネスステータスが一致していることを保証できます。
例外が発生した場合に処理中のメッセージのステータスが正しいことを保証する方法:停電などの例外が発生した場合に、正しいメッセージステータスを保証できます。
ポリシー概要
コンシューマリトライ機能が有効になっている場合、メッセージの消費に失敗すると、Apache RocketMQブローカーはメッセージを再送信します。指定された回数のリトライ後もメッセージの消費に失敗した場合、ブローカーはメッセージをデッドレターキューに送信します。
トリガー条件
メッセージの消費に失敗した場合。この場合、コンシューマーは失敗ステータスを返すか、システムが例外をスローします。
タイムアウトエラーが発生した場合、またはメッセージがプッシュコンシューマーキューに過度に長い時間残っている場合。
振る舞い
リトライプロセスステートマシン:リトライプロセスにおけるメッセージの状態と変更ロジックを制御します。
リトライ間隔:消費失敗またはタイムアウトが発生してから、メッセージがリトライされるまでの時間。
最大リトライ回数:メッセージを消費のためにリトライできる最大回数。
ポリシーの違い
メッセージリトライポリシーでは、コンシューマータイプに基づいて異なるリトライメカニズムと構成方法を使用します。次の表では、ポリシー間の違いについて説明します。
コンシューマータイプ | リトライプロセスステートマシン | リトライ間隔 | 最大リトライ回数 |
---|---|---|---|
PushConsumer | 準備完了 インフライト WaitingRetry コミット * DLQ | コンシューマーグループの作成時にメタデータで指定されます。順序なしメッセージ:増分順序付きメッセージ:固定 | コンシューマーグループの作成時にメタデータで指定されます。 |
SimpleConsumer | 準備完了 インフライト コミット DLQ | APIのInvisibleDurationパラメーターで指定されます。 | コンシューマーグループの作成時にメタデータで指定されます。 |
リトライポリシーの詳細については、「プッシュコンシューマーのリトライポリシー」および「シンプルコンシューマーのリトライポリシー」を参照してください。
PushConsumerのリトライポリシー
リトライプロセスステートマシン
プッシュコンシューマーがメッセージを消費するとき、メッセージは次のいずれかの状態になります。
- 準備完了 メッセージはApache RocketMQブローカーでの消費を待機しています。
- インフライト メッセージが取得され、コンシューマーによって消費されています。ただし、消費結果はまだ返されていません。
- WaitingRetry この状態はプッシュコンシューマー専用です。メッセージの消費に失敗した場合、またはブローカーがコンシューマーからの消費ステータスの返信を待機しているときにタイムアウトエラーが発生した場合、コンシューマリトライロジックがトリガーされます。最大リトライ回数に達していない場合、リトライ間隔が経過するとメッセージは準備完了状態に戻ります。準備完了状態のメッセージは再度消費できます。リトライ間の間隔を長くすることで、頻繁なリトライを防ぐことができます。
- コミット メッセージが消費されました。コンシューマーが成功応答を返した後、ステートマシンを終了できます。
- DLQ 消費ロジックの予防措置。最大リトライ回数に達してもメッセージの消費に失敗した場合、メッセージはリトライされなくなり、デッドレターキューに送信されます。デッドレターキュー内のメッセージを消費して、ビジネスを復元できます。
メッセージがリトライされると、その状態は準備完了からインフライト、そしてWaitingRetryに変化します。2回の消費間の間隔は、消費にかかった実際の時間とリトライ間隔の合計です。最大消費間隔はブローカーのシステムパラメーターで指定され、超えることはできません。
最大リトライ回数
プッシュコンシューマーの最大リトライ回数は、コンシューマーグループの作成時にメタデータで指定されます。詳細については、「コンシューマーグループ」を参照してください。
たとえば、最大リトライ回数が3の場合、メッセージは4回配信できます。1回の元の試行と3回の再試行です。
リトライ間隔
順序なしメッセージ(順序付きメッセージではないメッセージ):増分。次の表に詳細を示します。
リトライ回数 間隔 リトライ回数 間隔 1 10秒 9 7分 2 30秒 10 8分 3 1分 11 9分 4 2分 12 10分 5 3分 13 20分 6 4分 14 30分 7 5分 15 1時間 8 6分 16 2時間
リトライ回数が16を超えた場合、それ以降の各リトライの間隔は2時間です。
- 順序付きメッセージ:固定。詳細については、「パラメーター制限」を参照してください。
例
プッシュコンシューマーの場合、メッセージリトライは消費失敗のステータスコードによってのみトリガーされます。予期しない例外もSDKによってキャプチャされます。
SimpleConsumer simpleConsumer = null;
// Consumption example: Consume normal messages as a push consumer and trigger a message retry by using a consumption failure.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
// Retry the message until the maximum number of retries is reached.
return ConsumeResult.FAILURE;
}
};
SimpleConsumerのリトライポリシー
リトライプロセスステートマシン
シンプルコンシューマーがメッセージを消費するとき、メッセージは次のいずれかの状態になります。
準備完了 メッセージはApache RocketMQブローカーでの消費を待機しています。
インフライト メッセージが取得され、コンシューマーによって消費されています。ただし、消費結果はまだ返されていません。
コミット メッセージが消費されました。コンシューマーが成功応答を返した後、ステートマシンを終了できます。
DLQ 消費ロジックの予防措置。最大リトライ回数に達してもメッセージの消費に失敗した場合、メッセージはリトライされなくなり、デッドレターキューに送信されます。デッドレターキュー内のメッセージを消費して、ビジネスを復元できます。
リトライ間隔は固定で事前に割り当てられます。コンシューマーがAPIを呼び出すときに、InvisibleDurationパラメーターで設定されます。このパラメーターは、メッセージの最大処理時間を指定します。メッセージがリトライされる場合、パラメーターの値は再利用されます。後続のリトライのために間隔を設定する必要はありません。
InvisibleDurationの値は事前に割り当てられているため、ビジネス要件を満たさない場合があります。APIを呼び出すために使用するコードで値を変更できます。
たとえば、InvisibleDurationの値を20ミリ秒に設定し、メッセージがその時間内に処理できない場合は、リトライメカニズムのトリガーを避けるために値を大きく変更できます。
InvisibleDurationの値を変更するには、次の条件を満たす必要があります。
現在のメッセージでタイムアウトエラーが発生していない。
現在のメッセージの消費ステータスが返されていない。
次の図に示すように、変更はすぐに有効になります。つまり、InvisibleDurationの値はAPIが呼び出された時点から再計算されます。
最大リトライ回数
シンプルコンシューマーの最大リトライ回数は、コンシューマーグループの作成時にメタデータで指定されます。詳細については、「コンシューマーグループ」を参照してください。
メッセージリトライ間隔
メッセージリトライ間隔 = InvisibleDurationの値 − メッセージ処理の実際の時間
したがって、コンシューマリトライ間隔はInvisibleDurationの値によって制御されます。たとえば、InvisibleDurationの値が30ミリ秒で、処理開始から10ミリ秒後に消費失敗が返された場合、次のリトライまでの時間は20ミリ秒です。つまり、リトライ間隔は20ミリ秒です。30ミリ秒以内に消費結果が返されない場合は、タイムアウトエラーが発生してリトライがトリガーされます。その後、リトライ間隔は0ミリ秒です。
例
シンプルコンシューマーは、メッセージがリトライされるのを待つだけで済みます。
// Consumption example: Consume normal messages as a simple consumer. If you want a message to be retried, do not process the message. Wait for it to time out, and the broker retries it automatically.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// If you want a message to be retried after it fails to be consumed, ignore the failure and wait for the message to be visible. Then try to obtain it again from the broker.
});
} catch (ClientException e) {
// If the message fails to be pulled due to throttling or other reasons, you must re-initiate the request to obtain the message.
e.printStackTrace();
}
使用上の注意
消費スロットリングに対処するために消費リトライを使用しないでください
シナリオで述べたように、メッセージリトライは、ビジネス処理とメッセージ消費の失敗がまれな場合に適しています。消費スロットリングなど、失敗が継続するシナリオにはメッセージリトライは適していません。
誤った例:現在の消費レートが上限を超えている場合に、リトライをトリガーするために消費失敗を返します。
正しい例:現在の消費レートが制限を超えている場合は、後でメッセージを取得して消費します。
無限リトライを避けるために適切なリトライ回数を設定してください
Apache RocketMQはカスタムの消費リトライ回数をサポートしていますが、システムへの負荷を軽減するために、リトライ回数を少なくし、リトライ間隔を長く設定することをお勧めします。多数のリトライや無限リトライは避けてください。