コンシューマータイプ
Apache RocketMQは、PushConsumer、SimpleConsumer、PullConsumerというコンシューマータイプをサポートしています。このトピックでは、3つのコンシューマータイプの使用法、動作メカニズム、リトライメカニズム、およびシナリオについて説明します。
背景情報
Apache RocketMQは、PushConsumer、SimpleConsumer、PullConsumerのコンシューマータイプを提供します。3つのコンシューマータイプは、異なる統合および制御方法を持っており、さまざまなビジネスシナリオでのメッセージング要件を満たすために使用できます。次の要素は、ビジネスシナリオに適したコンシューマータイプを選択するのに役立ちます。
並行消費:コンシューマーは、より高いメッセージ処理効率のために、マルチスレッド技術を使用して並行メッセージ消費を実装するにはどうすればよいですか?
同期または非同期メッセージ処理:異なる統合シナリオでは、コンシューマーは受信したメッセージを処理のためにビジネスロジックシステムに非同期的に配布する必要がある場合があります。非同期メッセージ処理を実装するにはどうすればよいですか?
信頼性の高いメッセージ処理:コンシューマーはメッセージを処理するときにどのようにレスポンス結果を返しますか?信頼性の高いメッセージ処理を確保するために、メッセージエラーが発生した場合にメッセージリトライを実装するにはどうすればよいですか?
上記の問題に対する回答については、PushConsumerとSimpleConsumerを参照してください。
機能概要
上の図は、Apache RocketMQにおけるコンシューマーによるメッセージ消費には、メッセージの受信、メッセージの処理、および消費ステータスのコミットという段階が含まれていることを示しています。
3つのタイプのコンシューマーは、異なる実装方法とAPI操作を提供することで、さまざまなメッセージ消費シナリオに適しています。次の表は、3つのタイプのコンシューマーの違いを示しています。
PullConsumerは、ストリーム処理フレームワークでの統合にのみ推奨されます。PushConsumerとSimpleConsumerは、ほとんどのシナリオに対応できます。
ビジネスシナリオに基づいて、PushConsumerとSimpleConsumerを切り替えることができます。異なるコンシューマータイプに切り替えても、Apache RocketMQ内の既存のリソースの使用状況と既存のビジネス処理タスクには影響しません。
同じconsumerGroup内でpullConsumerと他のコンシューマータイプを混在させることは厳禁されています。
項目 | PushConsumer | SimpleConsumer | PullConsumer |
---|---|---|---|
API操作呼び出し | コールバック操作は、メッセージリスナーを使用して消費結果を返すために呼び出されます。コンシューマーは、メッセージリスナーの範囲内でのみ消費ロジックを処理できます。 | ビジネスアプリケーションはメッセージ処理を実装し、対応する操作を呼び出して消費結果を返します。 | ビジネスアプリケーションはメッセージのプルと処理を実装し、対応する操作を呼び出して消費結果を返します。 |
消費並行性管理 | Apache RocketMQ SDKは、メッセージ消費に使用する並行スレッド数を管理するために使用されます。 | メッセージ消費に使用される並行スレッドの数は、個々のビジネスアプリケーションの消費ロジックに基づいています。 | メッセージ消費に使用される並行スレッドの数は、個々のビジネスアプリケーションの消費ロジックに基づいています。 |
負荷分散メカニズム | 5.0バージョンではメッセージベースの負荷分散、以前のバージョンではキューベースの負荷分散。 | メッセージベースの負荷分散。 | キューベースの負荷分散。 |
APIの柔軟性 | API操作はカプセル化されており、柔軟性が低いです。 | アトミック操作は、高い柔軟性を提供します。 | アトミック操作は、高い柔軟性を提供します。 |
シナリオ | このコンシューマータイプは、カスタムプロセスを必要としない開発シナリオに適しています。 | このコンシューマータイプは、カスタムプロセスを必要とする開発シナリオに適しています。 | ストリーム処理フレームワークのシナリオでのみ統合することを推奨します。 |
PushConsumer
PushConsumerは、高度なカプセル化を提供するコンシューマータイプです。メッセージの消費と消費結果の送信は、メッセージリスナーのみを使用して処理されます。メッセージの取得、消費ステータスの送信、および消費リトライは、Apache RocketMQクライアントSDKを使用して完了します。
使用法
PushConsumerは固定された方法で使用されます。コンシューマーの初期化時にメッセージリスナーがPushConsumerコンシューマーに登録され、メッセージ処理ロジックがメッセージリスナーに実装されます。メッセージの取得、リスナー呼び出しのトリガー、およびメッセージリトライは、Apache RocketMQ SDKを使用して処理されます。
サンプルコード
// Message consumption example: Use a PushConsumer consumer to consume messages.
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
// Configure consumer group.
.setConsumerGroup("YourConsumerGroup")
// Specify the access point.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
// Specify the pre-bound subscriptions.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Set the message listener.
.setMessageListener(new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
// Consume the messages and return the consumption result.
return ConsumeResult.SUCCESS;
}
})
.build();
PushConsumerコンシューマーのメッセージリスナーは、次のいずれかの結果を返します。
消費成功:たとえば、Java用のApache RocketMQ SDKを使用してメッセージが消費されると、
ConsumeResult.SUCCESS
が返されます。サーバーは、消費結果に基づいて消費の進捗状況を更新します。消費失敗:たとえば、Java用のApache RocketMQ SDKを使用し、メッセージの消費に失敗した場合、
ConsumeResult.FAILURE
が返されます。Apache RocketMQがメッセージの消費をリトライするかどうかは、消費リトライロジックに依存します。予期しない失敗:たとえば、予期しない例外がスローされた場合、メッセージの消費は失敗します。Apache RocketMQがメッセージの消費をリトライするかどうかは、消費リトライロジックに依存します。
メッセージ処理ロジックで予期しないエラーが発生し、PushConsumerコンシューマーによるメッセージの消費が継続的に妨げられる場合、SDKは消費がタイムアウトしたと見なし、強制的に消費失敗結果をコミットします。その後、メッセージは消費リトライロジックに基づいて処理されます。消費タイムアウトの詳細については、プッシュコンシューマーのリトライポリシーを参照してください。
消費タイムアウトが発生すると、SDKは消費失敗結果をコミットします。ただし、現在の消費スレッドは結果に応答できず、メッセージの処理を続行する可能性があります。::
動作メカニズム
PushConsumerの場合、リアルタイムメッセージ処理は、SDKの典型的なReactorスレッドモデルに基づいています。SDKには、メッセージをプルしてキューに格納する組み込みのロングポーリングスレッドがあります。次に、メッセージはキューから個々のメッセージ消費スレッドに配信されます。メッセージリスナーは、メッセージ消費ロジックに基づいて動作します。次の図は、PushConsumerコンシューマーのメッセージ消費プロセスを示しています。
信頼性のためのリトライ
PushConsumerの場合、クライアントSDKと消費ロジックユニット間の通信は、メッセージリスナーのみを使用して実装されます。クライアントSDKは、メッセージリスナーから返された結果に基づいてメッセージが消費されたかどうかを確認し、メッセージの信頼性を確保するために消費リトライロジックに基づいてリトライを実行します。すべてのメッセージは同期的に消費される必要があります。リスナー操作呼び出しが終了すると、消費結果が返されます。非同期配信は許可されていません。メッセージリトライの詳細については、プッシュコンシューマーのリトライポリシーを参照してください。
メッセージングの信頼性を確保するために、Apache RocketMQはPushConsumerコンシューマーによるメッセージ消費において次の動作を禁止しています。
メッセージの消費が完了する前に消費結果を返す。たとえば、後で消費に失敗するメッセージに対して、事前に消費成功の結果が返されます。この場合、Apache RocketMQは実際の消費結果を確認できず、メッセージの消費をリトライしません。
メッセージリスナーから他のカスタムスレッドにメッセージを配布し、消費結果を事前に返す。メッセージの消費に失敗したが、消費成功の結果が事前に返された場合、Apache RocketMQは実際の消費結果を確認できず、メッセージの消費をリトライしません。
メッセージ順序の保証
Apache RocketMQのFIFOメッセージの場合、コンシューマーグループに順序付きメッセージ消費が構成されている場合、PushConsumerコンシューマーは消費順にメッセージを消費します。PushConsumerコンシューマーがメッセージを消費すると、消費順序は、個々のビジネスアプリケーションがビジネスロジックで消費順序を定義する必要なく、保証されます。
Apache RocketMQにおいて、順序付きメッセージ処理の前提条件は同期コミットです。ビジネスロジックで非同期配信が定義されている場合、Apache RocketMQはメッセージの順序を保証できません。::
シナリオ
PushConsumerはメッセージ処理を同期処理に限定し、各メッセージの処理タイムアウトを制限します。PushConsumerは、以下のシナリオに適しています。
予測可能なメッセージ処理時間:メッセージ処理時間が制限されていない場合、メッセージの信頼性を確保するために、処理に時間がかかるメッセージに対してメッセージ再試行が継続的にトリガーされます。これにより、大量の重複メッセージが発生します。
非同期処理とカスタムプロセスなし:PushConsumerは、消費ロジックのスレッドモデルをReactorスレッドモデルに制限します。クライアントSDKは最大スループットに基づいてメッセージを処理します。このモデルは開発が容易ですが、非同期処理やカスタムプロセスは許可されていません。
SimpleConsumer
SimpleConsumerは、メッセージ処理に対するアトミック操作をサポートするコンシューマータイプです。このようなタイプのコンシューマーは、メッセージを取得し、消費ステータスをコミットし、ビジネスロジックに基づいてメッセージの再試行を実行するための操作を呼び出します。
使用法
SimpleConsumerには複数のAPI操作が含まれます。対応する操作は必要に応じて呼び出され、メッセージを取得してビジネススレッドに配信して処理します。次に、コミット操作を呼び出してメッセージ処理結果をコミットします。サンプルコード
// Consumption example: When a SimpleConsumer consumer consumes normal messages, the consumer obtain messages and commit message consumption results.
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
// Configure consumer group.
.setConsumerGroup("YourConsumerGroup")
// Specify the access point.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
// Specify the pre-bound subscriptions.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Specify the max await time when receive messages from the server.
.setAwaitDuration(Duration.ofSeconds(1))
.build();
try {
// A SimpleConsumer consumer must obtain and process messages.
List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, the consumer must re-initiate the request to obtain the message.
logger.error("Failed to receive message", e);
}
次の表は、SimpleConsumerに提供されるAPI操作について説明しています。
操作 | 説明 | 変更可能なパラメーター |
---|---|---|
ReceiveMessage | コンシューマーはこの操作を呼び出して、サーバーからメッセージを取得できます。注 サーバーは分散ストレージを使用しているため、要求されたメッセージが実際にはサーバー上に存在する場合でも、サーバーは空の結果を返す可能性があります。ReceiveMessage操作を再度呼び出すか、ReceiveMessage操作の同時実行値を大きくすることができます。 | バッチプルサイズ:一度に取得されるメッセージの数。SimpleConsumerコンシューマーは、バッチ消費のために複数のメッセージを取得できます。 メッセージの不可視時間:メッセージの最大処理時間。このパラメーターは、消費に失敗した場合のメッセージ再試行間隔を制御します。詳細については、SimpleConsumerの再試行ポリシーを参照してください。このパラメーターは、ReceiveMessage 操作を呼び出すときに必須です。 |
AckMessage | メッセージがコンシューマーによって消費された後、コンシューマーはこの操作を呼び出して、消費成功の結果をサーバーに返します。 | なし |
ChangeInvisibleDuration | 消費再試行シナリオでは、コンシューマーはこの操作を呼び出して、メッセージ処理時間を変更し、メッセージ再試行間隔を制御できます。 | メッセージの不可視時間:メッセージの最大処理時間。この操作を呼び出して、ReceiveMessage 操作で指定されたメッセージの不可視時間を変更できます。ほとんどの場合、この操作は、メッセージ処理時間を増やしたいシナリオで使用されます。 |
信頼性のためのリトライ
SimpleConsumerコンシューマーがメッセージを消費する場合、クライアントSDKとApache RocketMQサーバー間の通信は、ReceiveMessage
操作とAckMessage
操作を使用して実装されます。クライアントSDKがメッセージを正常に処理すると、AckMessage
操作が呼び出されます。メッセージの処理に失敗した場合、指定されたメッセージの不可視時間が経過した後、メッセージ再試行メカニズムをトリガーするためにackメッセージは返されません。詳細については、SimpleConsumerの再試行ポリシーを参照してください。
メッセージ順序の保証
Apache RocketMQでは、SimpleConsumerコンシューマーは、保存された順序でFifoメッセージを取得します。順序付きメッセージのセット内のメッセージが完全に処理されない場合、順序付きメッセージのセット内の次のメッセージを取得することはできません。
シナリオ
SimpleConsumerは、メッセージを取得し、消費結果をコミットするためのアトミックAPI操作を提供します。PushConsumerと比較して、SimpleConsumerはより優れた柔軟性を提供します。SimpleConsumerは、以下のシナリオに適しています。
制御不能なメッセージ処理時間:メッセージ処理時間が予測不可能な場合は、メッセージが過度に長い時間処理されるのを防ぐために、SimpleConsumerを使用することをお勧めします。メッセージ消費時に推定メッセージ処理時間を指定できます。既存の処理時間がビジネスシナリオに適していない場合は、対応するAPI操作を呼び出してメッセージ処理時間を変更できます。
非同期処理とバッチ消費:SimpleConsumerには、SDKでの複雑なスレッドカプセル化は含まれません。ビジネスアプリケーションはカスタム設定を使用できます。これにより、SimpleConsumerコンシューマーは、非同期配信、バッチ消費、およびその他のカスタムシナリオを実装できます。
カスタムメッセージ消費率:SimpleConsumerを使用する場合、ビジネスアプリケーションはReceiveMessage操作を呼び出してメッセージを取得します。メッセージ取得の頻度を調整して、メッセージ消費率を制御できます。
PullConsumer
後続
使用上の注意
PushConsumerに適切な消費時間制限を指定する
PushConsumerコンシューマーのメッセージ消費時間を制限して、メッセージが長時間処理されるのを防ぐことをお勧めします。メッセージの長時間処理は、メッセージ処理のタイムアウトが原因で重複メッセージが発生し、次のメッセージが消費を待機し続ける可能性があります。メッセージが頻繁に過度に長時間処理される場合は、SimpleConsumerを使用し、ビジネス要件に基づいて適切なメッセージ不可視時間を指定することをお勧めします。