基本的なベストプラクティス
プロデューサー
メッセージ送信時の注意事項
タグの使用
アプリケーションはトピックとして識別でき、メッセージのサブタイプはタグとして識別できます。タグはアプリケーションによって自由に設定できます。プロデューサーがメッセージを送信する際にタグを設定した場合にのみ、コンシューマーはメッセージをサブスクライブする際にブローカーを介してタグを使用してメッセージをフィルタリングできます。5.x SDKは`messageBuilder.setTag("messageTag")`を呼び出すことができ、過去のバージョンは`message.setTags("messageTag")`を呼び出すことができます。
キーの使用
サービスレベルでは、各メッセージを一意のサービス識別子にマッピングし、keysフィールドに設定して、将来のメッセージ損失の問題を特定することをお勧めします。サーバーは各メッセージのインデックス(ハッシュインデックス)を作成し、アプリケーションはトピックとキー、およびメッセージを消費したユーザーによってメッセージの内容をクエリできます。ハッシュインデックスであるため、潜在的なハッシュ衝突を回避するために、キーができるだけ一意であることを確認してください。一般的な設定ポリシーでは、注文ID、ユーザーID、リクエストIDなどの個別の固有識別子を使用します。
ログの出力
メッセージが正常に送信されたか失敗したかにかかわらず、サービスのトラブルシューティングのためにメッセージログを出力する必要があります。例外がスローされない限り、送信はメッセージが正常に送信されたことを示します。
メッセージ送信失敗時の処理方法
Producerのsendメソッド自体は内部再試行をサポートしています。5.xの再試行ロジックのリファレンス送信再試行ポリシー:
上記の戦略も、メッセージ送信の成功をある程度保証します。ビジネスでメッセージが損失することなく送信される必要がある場合は、送信同期メソッドが呼び出されて送信に失敗した場合など、発生する可能性のある例外をカバーする必要があります。メッセージをDBに保存し、バックグラウンドスレッドによって定期的に再試行して、メッセージがBrokerに到達するようにします。
上記のDB再試行方法がMQクライアントに統合されておらず、アプリケーションが自分で完了する必要がある理由は、主に次の考慮事項に基づいています。まず、MQクライアントはステートレスモードとして設計されており、任意の水平方向の拡張に便利であり、マシンリソースの消費はCPU、メモリ、ネットワークのみです。次に、MQクライアントが内部的にKVストレージモジュールと統合されている場合、データは同期ディスクフォールの場合にのみ信頼性が高く、同期ディスクフォール自体には大きなパフォーマンスオーバーヘッドがあるため、通常は非同期ディスクフォールを使用します。また、アプリケーションのクローズプロセスはMQ運用保守担当者によって制御されていないため、kill -9などの暴力的なクローズが発生することがよくあります。その結果、データがタイムリーにディスクにドロップされず、損失が発生します。 3つ目は、Producerが存在するマシンは信頼性が低く、一般的には仮想マシンであるため、重要なデータを保存するのには適していません。要約すると、再試行プロセスはアプリケーションによって制御することをお勧めします。
コンシューマー
消費プロセスは冪等です
RocketMQはメッセージの重複(Exactly Once)を回避できないため、ビジネスが消費の重複に非常に敏感な場合は、ビジネスレベルで重複排除することが重要です。これはリレーショナルデータベースの助けを借りて行うことができます。最初に、メッセージの一意のキー(msgIdまたはメッセージコンテンツ内の一意の識別フィールド(注文IDなど))を決定する必要があります。消費前に、リレーショナルデータベースに一意のキーが存在するかどうかを確認します。存在しない場合は、挿入して消費します。それ以外の場合はスキップします。(実際の手順では、原子性の問題を考慮する必要があります。主キーの競合があるかどうかを判断し、挿入に失敗した場合は、直接スキップします)
MsgIdはグローバルに一意の識別子である必要がありますが、実際には、同じメッセージに2つの異なるmsgIdがある場合があります(コンシューマーのアクティブリトランスミッション、クライアントの再投資メカニズムによる重複など)。これにより、ビジネスフィールドの重複消費が必要になります。
消費の遅いプロセス
消費の並列性を高める
メッセージ消費の大部分はIOを集中的に使用する、つまりデータベースを操作したりRPCを呼び出したりする可能性があり、このタイプの消費の消費率はバックエンドデータベースまたは外部システムのスループットに依存します。消費の並列性を高めることで、総消費スループットを向上させることができますが、並列性が一定の程度まで増加すると、スループットは低下します。したがって、アプリケーションは妥当な並列度を設定する必要があります。消費の並列性を変更するには、いくつかの方法があります
- 同じConsumerGroupで、Consumerインスタンスの数を増やして並列性を向上させます(サブスクリプションキューを超えるConsumerインスタンスは無効であることに注意してください)。マシンを追加するか、既存のマシンで複数のプロセスを開始できます。
- 個々のConsumerの消費並列スレッドを改善します。5.x PushConsumer SDKは`PushConsumerBuilder.setConsumptionThreadCount()`でスレッド数を設定できます。SimpleConsumerはビジネ スレッドから自由に並列性を高めることができ、基盤となるスレッドは安全です。履歴SDK PushConsumerは、パラメーターconsumeThreadMinおよびconsumeThreadMaxを変更することで実装できます。
バルクでの消費
一部のビジネ スプロセスがバルク消費をサポートしている場合、消費スループットを大幅に向上させることができます。たとえば、注文控除の適用では、一度に1つの注文を処理するのに1秒かかり、一度に10の注文を処理するのに2秒しかかからない場合があります。そのため、消費スループットを大幅に向上させることができます。 5.x SDKからSimpleConsumerを使用し、インターフェース呼び出しごとにバッチサイズを設定して、一度に複数のメッセージをプルすることをお勧めします。
リセットサイトで重要でないメッセージをスキップする
メッセージが山積みになっている場合、消費率が配信率に追いつかず、ビジネスが十分なデータを要求していない場合は、重要でないメッセージを破棄することを選択できます。リセットサイト機能を使用して、消費サイトを指定された時間または場所に直接調整することをお勧めします。
メッセージごとの消費プロセスを最適化する
たとえば、メッセージの消費プロセスは次のとおりです。
- クエリ[データ1]メッセージに従ってDBから
- クエリ[データ2]メッセージに従ってDBから
- 複雑なビジネ ス計算
- 挿入[データ3]DBに
- 挿入[データ4]DBに
このメッセージの消費中には、DBとの対話が4回あります。各対話を5ミリ秒として計算すると、合計時間は20ミリ秒になります。サービスの計算に5ミリ秒かかると仮定すると、合計時間は25ミリ秒になります。したがって、4つのDB対話を2つに最適化できる場合、合計時間を15ミリ秒に最適化できます。これは、全体的なパフォーマンスが40%向上することを意味します。したがって、アプリケーションが遅延に敏感な場合は、DBをSSDディスクにデプロイできます。 SCSIディスクと比較して、前者のRTははるかに小さくなります。
消費プリントログ
メッセージの数が少ない場合は、消費エントリメソッドにメッセージを出力することをお勧めします。これには時間がかかります。
new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
LOGGER.info("Consume message={}", messageView);
//Do your consume process
return ConsumeResult.SUCCESS;
}
}
各メッセージの消費時間を印刷できれば、消費の遅いなどのオンラインの問題のトラブルシューティングがより便利になります。
ブローカー
ブローカーの役割
ブローカーの役割は、ASYNC_MASTER、SYNC_MASTER、およびSLAVEに分類されます。メッセージの信頼性に関する厳格な要件がある場合は、SYNC_MASTERとSLAVEをデプロイします。メッセージの信頼性が不要な場合は、ASYNC_MASTERとSLAVEをデプロイします。テストのみが便利な場合は、ASYNC_MASTERのみまたはSYNC_MASTERのみのデプロイを選択できます。
FlushDiskType
ASYNC_FLUSHと比較して、SYNC_FLUSHはパフォーマンスが低下しますが、より信頼性が高くなります。したがって、実際のサービスシナリオに基づいてトレードオフを行う必要があります。
ブローカーの設定
パラメータ | デフォルト | 説明 |
---|---|---|
listenPort | 10911 | クライアント接続を受け入れるリスニングポート |
namesrvAddr | null | ネームサーバーアドレス |
ブローカーIP1 | ネットワークInetAddress | ブローカーが現在リスニングしているIPアドレス |
ブローカーIP2 | brokerIP1と同じ | マスター/スレーブブローカーが存在する場合、brokerIP2プロパティがブローカーマスターノードで設定されていると、ブローカースレーブノードは同期のためにマスターノードで設定されたbrokerIP2に接続します。 |
ブローカー名 | null | ブローカー名 |
ブローカークラスタ名 | DefaultCluster | このブローカーが属するクラスタ名 |
ブローカーID | 0 | ブローカーID 0はマスターを示し、その他の正の整数はスレーブを示します |
コミットログ保存パス | $HOME/store/commitlog/ | コミットログを保存するパス |
コンシューマキュー保存パス | $HOME/store/consumequeue/ | コンシューマキューが保存されるパス |
コミットログのマップファイルサイズ | 1024 * 1024 *1024(1G) | コミットログのマッピングファイルサイズ |
削除日時 | 04 | ファイル保存期間を超過したコミットログを1日の何時に削除するか |
ファイル保存期間 | 72 | ファイル保存期間(時間) |
ブローカーロール | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |
ディスクフラッシュタイプ | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSHモードのブローカーは、承認されたプロデューサーを受信する前にメッセージをフラッシュすることを保証します。 ASYNC_FLUSHブローカーは、パフォーマンス向上のため、グループメッセージをフラッシュするためにフラッシュモードを使用します。 |