RocketMQ Prometheus Exporter
はじめに
Rocketmq-exporter は、RocketMQ ブローカーとクライアント側のすべての関連メトリクスを監視するシステムであり、mqAdmin を介してブローカー側から取得したメトリック値を 87 個のキャッシュにパッケージします。
以前のバージョンでは、87個のconcurrentHashMapがありましたが、Mapは期限切れのメトリクスを削除しないため、ラベルの変更があると新しいメトリクスが生成され、古い未使用のメトリクスは自動的に削除されません。最終的にメモリオーバーフローを引き起こします。しかし、Cache構造を使用することで期限切れの削除が可能になり、有効期限を設定できます。
Rocketmq-exporter が監視メトリックを取得するプロセスを以下の図に示します。ExporterはMQAdminExtを通してMQクラスタからデータのリクエストを行い、リクエストされたデータはMetricServiceを通してPrometheusが必要とするフォーマットに標準化され、その後/metricsインターフェースを通してPrometheusに公開されます。
メトリック構造
Metricクラスはorg.apache.rocketmq.expoter.model.metricsパッケージにあり、本質的にはエンティティクラスのセットであり、各エンティティクラスは1種類のメトリックを表し、合計14個のMetricクラスがあります。これらのクラスは87個のキャッシュのキーとして機能し、異なるラベル値によって区別されます。
- ブローカー関連のメトリッククラス: BrokerRuntimeMetric、BrokerMetric、DLQTopicOffsetMetric、TopicPutNumMetric
- コンシューマー関連のクラス: ConsumerRuntimeConsumeFailedMsgsMetric 、ConsumerRuntimeConsumeFailedTPSMetric 、ConsumerRuntimeConsumeOKTPSMetric、ConsumerRuntimeConsumeRTMetric、ConsumerRuntimePullRTMetric、ConsumerRuntimePullTPSMetric、ConsumerCountMetric、ConsumerMetric、ConsumerTopicDiffMetric
- プロデューサー関連のメトリッククラス: ProducerMetric
Prometheusによるメトリックの取得
RocketMQ-exporter プロジェクトとPrometheusは、サーバー・クライアントの関係に相当します。RocketMQ-exporterプロジェクトはPrometheusクライアントパッケージを導入しており、プロジェクトのMetricFamilySamplesクラスで取得する情報の種類を指定しています。Prometheusはexporterからメトリクスをリクエストし、exporterはそれを対応するタイプにパッケージした後、Prometheusに情報を返します。
rocketmq-exporterプロジェクトが起動すると、様々なメトリクスをrocketmqからmfsオブジェクトに収集します。ブラウザまたはPrometheusが対応するインターフェースにアクセスすると、mfsオブジェクト内のサンプルは、サービスによってPrometheusがサポートするフォーマットされたデータに生成されます。主に以下のステップが含まれます。
ブラウザはip:5557/metricsにアクセスしてRMQMetricsControllerクラスのmetricsメソッドを呼び出します。ここでipはrocketmq-exporterプロジェクトが実行されているホストのIPです。
private void metrics(HttpServletResponse response) throws IOException {
    StringWriter writer = new StringWriter();
    metricsService.metrics(writer);
    response.setHeader("Content-Type", "text/plain; version=0.0.4; charset=utf-8");
    response.getOutputStream().print(writer.toString());
}
新しいStringWriterオブジェクトを作成してメトリックインジケーターを収集し、exporter内のメトリクスをMetricsServiceクラスのmetricsメソッドを通じてwriterオブジェクトに収集し、その後収集されたインジケーターをウェブページに出力します。
収集されたメトリクスのフォーマットは次のとおりです。
<metric name>{<label name>=<label value>, ...} <metric value>
例:
rocketmq_group_diff{group="rmq_group_test_20220114",topic="fusion_console_tst",countOfOnlineConsumers="0",msgModel="1",} 23.0
MetricCollectTaskクラスの5つのスケジュールタスク
MetricCollectTaskクラスには、collectTopicOffset、collectConsumerOffset、collectBrokerStatsTopic、collectBrokerStats、collectBrokerRuntimeStatsという5つのスケジュールタスクがあります。これらは、コンシューマーオフセット情報やブローカーの状態情報などを収集するために使用されます。そのcron式は「cron: 15 0/1 * * * ?」で、1分ごとに1回収集することを意味します。そのコア機能は、mqAdminExtオブジェクトを通じてクラスタ内のブローカーから情報を取得し、対応する87個の監視インジケーターに追加することです。例としてcollectTopicOffsetを取り上げます。
- まず、TopicListオブジェクトを初期化し、mqAdminExt.fetchAllTopicList()メソッドを使用してクラスタ内のすべてのトピック情報を取得します。 - TopicList topicList = null;
 try {
 topicList = mqAdminExt.fetchAllTopicList();
 } catch (Exception ex) {
 log.error(String.format("collectTopicOffset-exception comes getting topic list from namesrv, address is %s",
 JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
 return;
 }
- topicSetにトピックを追加し、各トピックを反復処理して、mqAdminExt.examineTopicStats(topic)関数を使用してトピックの状態を確認します。 - Set < String > topicSet = topicList != null ? topicList.getTopicList() : null;
 for (String topic: topicSet) {
 TopicStatsTable topicStats = null;
 try {
 topicStats = mqAdminExt.examineTopicStats(topic);
 } catch (Exception ex) {
 log.error(String.format("collectTopicOffset-getting topic(%s) stats error. the namesrv address is %s",
 topic,
 JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
 continue;}
- トピックの状態セット、ブローカーごとに分割されたトピック情報のオフセットのためのハッシュテーブルbrokerOffsetMap、ブローカー名をキーとして更新タイムスタンプを格納するハッシュテーブルbrokerUpdateTimestampMapを初期化します。 - Set<Map.Entry<MessageQueue, TopicOffset>> topicStatusEntries = topicStats.getOffsetTable().entrySet();
 HashMap<String, Long> brokerOffsetMap = new HashMap<>();
 HashMap<String, Long> brokerUpdateTimestampMap = new HashMap<>();
 for (Map.Entry<MessageQueue, TopicOffset> topicStatusEntry : topicStatusEntries) {
 MessageQueue q = topicStatusEntry.getKey();
 TopicOffset offset = topicStatusEntry.getValue();
 if (brokerOffsetMap.containsKey(q.getBrokerName())) {
 brokerOffsetMap.put(q.getBrokerName(), brokerOffsetMap.get(q.getBrokerName()) + offset.getMaxOffset());
 } else {
 brokerOffsetMap.put(q.getBrokerName(), offset.getMaxOffset());
 }
 if (brokerUpdateTimestampMap.containsKey(q.getBrokerName())) {
 if (offset.getLastUpdateTimestamp() > brokerUpdateTimestampMap.get(q.getBrokerName())) {
 brokerUpdateTimestampMap.put(q.getBrokerName(), offset.getLastUpdateTimestamp());
 }
 } else {
 brokerUpdateTimestampMap.put(q.getBrokerName(),
 offset.getLastUpdateTimestamp());
 }
 }
- 最後に、brokerOffsetMap内の各項目を反復処理することで、metricsServiceからmetricCollectorオブジェクトを取得し、RMQMetricsCollectorクラスのaddTopicOffsetMetricメソッドを呼び出して、対応する値をRMQMetricsCollectorクラスの87個のメトリクスのいずれかのキャッシュに追加します。 - Set<Map.Entry<String, Long>> brokerOffsetEntries = brokerOffsetMap.entrySet();
 for (Map.Entry<String, Long> brokerOffsetEntry : brokerOffsetEntries) {
 metricsService.getCollector().addTopicOffsetMetric(clusterName, brokerOffsetEntry.getKey(), topic,
 brokerUpdateTimestampMap.get(brokerOffsetEntry.getKey()), brokerOffsetEntry.getValue());
 }
 }
 log.info("topic offset collection task finished...." + (System.currentTimeMillis() - start));
 }
Rocketmq-exporterによるメトリクスの収集フローチャート

クイックスタート
application.ymlの設定
server.portは、Prometheusがrocketmq-exporterをlistenするポートを設定し、デフォルト値は5557です。
- application.ymlの重要な設定には以下が含まれます。
- rocketmq.config.webTelemetryPathは、Prometheusがメトリクスを取得するパスを設定し、デフォルト値は/metricsです。デフォルト値を使用できます。 
- rocketmq.config.enableACL RocketMQクラスタでACL検証が有効になっている場合は、trueに設定し、対応するakとskをaccessKeyとsecretKeyに設定する必要があります。 
- rocketmq.config.outOfTimeSecondsは、メトリクスとその値を保存する有効期限を設定するために使用されます。この時間を超えてもキャッシュ内のキーに書き込み変更が行われなければ、削除されます。一般的には60秒に設定できます(Prometheusがメトリクスを取得する時間間隔は、有効期限に応じて適切に設定する必要があります。有効期限はPrometheusがメトリクスを収集する時間間隔以上である必要があります)。 
- task.*.cronは、スケジュールタスクを通じてexporterがブローカーからメトリクスを取得する時間間隔を設定し、デフォルト値は「15 0/1 * * * ?」で、1分ごとの15秒ごとにメトリクスを取得することを意味します。 
exporterアプリケーションの起動
公式ウェブサイトの設定に従ってPrometheusを起動します
Prometheusのstatic_config: -targetsをexporterの起動IPとポートに設定します。例:localhost:5557。
Prometheusページへのアクセス
localhostがデフォルトのlocalhost:9090で起動している場合、収集されたメトリック値を表示できます(以下の図を参照)。

より良い視覚効果を得て、メトリック値の変化の傾向を観察するためには、PrometheusをGrafanaと連携して使用することをお勧めします。
可観測性メトリック
可観測性メトリックは、主にサーバー側メトリックとクライアント側メトリックの2つのカテゴリに分類されます。サーバー側メトリックはサーバーによって直接生成され、クライアント側メトリックはクライアント上で生成され、クライアントへのRPCリクエストを通じてサーバーによって取得されます。クライアント側メトリックはさらに、プロデューサーメトリックとコンシューマーメトリックに分類できます。すべての87個の可観測性メトリックとその主な意味は次のとおりです。
サーバーメトリック
サーバーメトリック
| メトリック名 | 定義 | 対応するブローカーメトリック名 | 
|---|---|---|
| rocketmq_broker_tps | ブローカーレベルの生産TPS | |
| rocketmq_broker_qps | ブローカーレベルの消費QPS | |
| rocketmq_broker_commitlog_diff | スレーブノードからのブローカーグループの同期遅れメッセージサイズ | |
| rocketmq_brokeruntime_pmdt_0ms | 書き込みリクエストのサーバー側処理時間から書き込み完了まで(0ms) | putMessageDistributeTime | 
| rocketmq_brokeruntime_pmdt_0to10ms | 書き込みリクエストのサーバー側処理時間から書き込み完了まで(0~10ms) | |
| rocketmq_brokeruntime_pmdt_10to50ms | 書き込みリクエストのサーバーサイド処理時間(完了まで)(10~50ms) | |
| rocketmq_brokeruntime_pmdt_50to100ms | 書き込みリクエストのサーバーサイド処理時間(完了まで)(50~100ms) | |
| rocketmq_brokeruntime_pmdt_100to200ms | 書き込みリクエストのサーバーサイド処理時間(完了まで)(100~200ms) | |
| rocketmq_brokeruntime_pmdt_200to500ms | 書き込みリクエストのサーバーサイド処理時間(完了まで)(200~500ms) | |
| rocketmq_brokeruntime_pmdt_500to1s | 書き込みリクエストのサーバーサイド処理時間(完了まで)(500~1000ms) | |
| rocketmq_brokeruntime_pmdt_1to2s | 書き込みリクエストのサーバーサイド処理時間(完了まで)(1~2秒) | |
| rocketmq_brokeruntime_pmdt_2to3s | 書き込みリクエストのサーバーサイド処理時間(完了まで)(2~3秒) | |
| rocketmq_brokeruntime_pmdt_3to4s | 書き込みリクエストのサーバーサイド処理時間(完了まで)(3~4秒) | |
| rocketmq_brokeruntime_pmdt_4to5s | 書き込みリクエストのサーバーサイド処理時間(完了まで)(4~5秒) | |
| rocketmq_brokeruntime_pmdt_5to10s | 書き込みリクエストのサーバーサイド処理時間(完了まで)(5~10秒) | |
| rocketmq_brokeruntime_pmdt_10stomore | 書き込みリクエストのサーバーサイド処理時間(完了まで)(10秒超) | |
| rocketmq_brokeruntime_dispatch_behind_bytes | まだ配信されていないメッセージのバイト数(インデックス作成などの操作) | dispatchBehindBytes | 
| rocketmq_brokeruntime_put_message_size_total | ブローカーに書き込まれたメッセージのサイズの合計 | putMessageSizeTotal | 
| rocketmq_brokeruntime_put_message_average_size | ブローカーに書き込まれたメッセージの平均サイズ | putMessageAverageSize | 
| rocketmq_brokeruntime_remain_transientstore_buffer_numbs | TransientStorePool内のキューの容量 | remainTransientStoreBufferNumbs | 
| rocketmq_brokeruntime_earliest_message_timestamp | ブローカーによって保存されているメッセージの最も古いタイムスタンプ | earliestMessageTimeStamp | 
| rocketmq_brokeruntime_putmessage_entire_time_max | ブローカーが実行を開始して以来、メッセージをブローカーに書き込むのにかかった最長時間 | putMessageEntireTimeMax | 
| rocketmq_brokeruntime_start_accept_sendrequest_time | ブローカーが送信リクエストの受け入れを開始した時刻 | startAcceptSendRequestTimeStamp | 
| rocketmq_brokeruntime_putmessage_times_total | ブローカーにメッセージが書き込まれた総回数 | putMessageTimesTotal | 
| rocketmq_brokeruntime_getmessage_entire_time_max | ブローカーが実行を開始して以来、メッセージのプル処理にかかった最長時間 | getMessageEntireTimeMax | 
| rocketmq_brokeruntime_pagecache_lock_time_mills | pageCacheLockTimeMills | |
| rocketmq_brokeruntime_commitlog_disk_ratio | commitLogが配置されているディスクの使用率 | commitLogDiskRatio | 
| rocketmq_brokeruntime_dispatch_maxbuffer | ブローカーが計算しておらず、0のままの値 | dispatchMaxBuffer | 
| rocketmq_brokeruntime_pull_threadpoolqueue_capacity | プルリクエストを処理するためのスレッドプールキューの容量。 | pullThreadPoolQueueCapacity | 
| rocketmq_brokeruntime_send_threadpoolqueue_capacity | プルリクエストを処理するスレッドプール内のキューの容量 | sendThreadPoolQueueCapacity | 
| rocketmq_brokeruntime_query_threadpool_queue_capacity | クエリリクエストを処理するスレッドプール内のキューの容量 | queryThreadPoolQueueCapacity | 
| rocketmq_brokeruntime_pull_threadpoolqueue_size | プルリクエストを処理するスレッドプール内のキューの実際のサイズ | pullThreadPoolQueueSize | 
| rocketmq_brokeruntime_query_threadpoolqueue_size | クエリリクエストを処理するスレッドプール内のキューの実際のサイズ | queryThreadPoolQueueSize | 
| rocketmq_brokeruntime_send_threadpool_queue_size | 送信リクエストを処理するスレッドプール内のキューの実際のサイズ | sendThreadPoolQueueSize | 
| rocketmq_brokeruntime_pull_threadpoolqueue_headwait_timemills | プルリクエストを処理するスレッドプール内のキューの先頭タスクの待ち時間 | pullThreadPoolQueueHeadWaitTimeMills | 
| rocketmq_brokeruntime_query_threadpoolqueue_headwait_timemills | クエリリクエストを処理するスレッドプール内のキューの先頭タスクの待ち時間 | queryThreadPoolQueueHeadWaitTimeMills | 
| rocketmq_brokeruntime_send_threadpoolqueue_headwait_timemills | 送信リクエストを処理するスレッドプール内のキューの先頭タスクの待ち時間 | sendThreadPoolQueueHeadWaitTimeMills | 
| rocketmq_brokeruntime_msg_gettotal_yesterdaymorning | 昨日の午前0時までに読み取られたメッセージの総回数 | msgGetTotalYesterdayMorning | 
| rocketmq_brokeruntime_msg_puttotal_yesterdaymorning | 昨日の午前0時までに書き込まれたメッセージの総回数 | msgPutTotalYesterdayMorning | 
| rocketmq_brokeruntime_msg_gettotal_todaymorning | 今日の午前0時までに読み取られたメッセージの総回数 | msgGetTotalTodayMorning | 
| rocketmq_brokeruntime_msg_puttotal_todaymorning | 今日の午前0時までに書き込まれたメッセージの総回数 | putMessageTimesTotal | 
| rocketmq_brokeruntime_msg_put_total_today_now | 各ブローカーにこれまで書き込まれたメッセージ数。 | msgPutTotalTodayNow | 
| rocketmq_brokeruntime_msg_gettotal_today_now | 各ブローカーからこれまで読み取られたメッセージ数。 | msgGetTotalTodayNow | 
| rocketmq_brokeruntime_commitlogdir_capacity_free | commitLogが保存されているディレクトリの空き容量。 | commitLogDirCapacity | 
| rocketmq_brokeruntime_commitlogdir_capacity_total | commitLogが保存されているディレクトリの総容量。 | |
| rocketmq_brokeruntime_commitlog_maxoffset | commitLogの最大オフセット。 | commitLogMaxOffset | 
| rocketmq_brokeruntime_commitlog_minoffset | commitLogの最小オフセット。 | commitLogMinOffset | 
| rocketmq_brokeruntime_remain_howmanydata_toflush | remainHowManyDataToFlush | |
| rocketmq_brokeruntime_getfound_tps600 | 過去600秒間のgetMessage中の受信メッセージの平均TPS。 | getFoundTps | 
| rocketmq_brokeruntime_getfound_tps60 | 過去60秒間のgetMessage中の受信メッセージの平均TPS。 | |
| rocketmq_brokeruntime_getfound_tps10 | 過去10秒間のgetMessage中の受信メッセージの平均TPS。 | |
| rocketmq_brokeruntime_gettotal_tps600 | 過去600秒間のgetMessage呼び出しの平均TPS。 | getTotalTps | 
| rocketmq_brokeruntime_gettotal_tps60 | 過去60秒間のgetMessage呼び出しの平均TPS。 | |
| rocketmq_brokeruntime_gettotal_tps10 | 過去10秒間のgetMessage呼び出しの平均TPS。 | |
| rocketmq_brokeruntime_gettransfered_tps600 | getTransferedTps | |
| rocketmq_brokeruntime_gettransfered_tps60 | ||
| rocketmq_brokeruntime_gettransfered_tps10 | ||
| rocketmq_brokeruntime_getmiss_tps600 | 過去600秒間のメッセージが取得されなかったgetMessageの平均TPS | getMissTps | 
| rocketmq_brokeruntime_getmiss_tps60 | 過去60秒間のメッセージが取得されなかったgetMessageの平均TPS | |
| rocketmq_brokeruntime_getmiss_tps10 | 過去10秒間のメッセージが取得されなかったgetMessageの平均TPS | |
| rocketmq_brokeruntime_put_tps600 | 過去600秒間のメッセージ書き込み操作の平均TPS | putTps | 
| rocketmq_brokeruntime_put_tps60 | 過去60秒間のメッセージ書き込み操作の平均TPS | |
| rocketmq_brokeruntime_put_tps10 | 過去10秒間のメッセージ書き込み操作の平均TPS | 
プロデューサーメトリクス
プロデューサーメトリクス
| メトリック名 | 定義 | 
|---|---|
| rocketmq_producer_offset | 現時点でのトピックの最大オフセット | 
| rocketmq_topic_retry_offset | 現時点でのリトライトピックの最大オフセット | 
| rocketmq_topic_dlq_offset | 現時点でのデッドレタートピックの最大オフセット | 
| rocketmq_producer_tps | ブローカーグループ上のトピックの生産TPS | 
| rocketmq_producer_message_size | ブローカーグループ上のトピックの生産メッセージサイズのTPS | 
| rocketmq_queue_producer_tps | キューレベルの生産TPS | 
| rocketmq_queue_producer_message_size | メッセージサイズのキューレベルの生産TPS | 
コンシューマメトリクス
コンシューマメトリクス
| メトリック名 | 定義 | 
|---|---|
| rocketmq_group_diff | コンシューマグループのメッセージ蓄積メッセージ数 | 
| rocketmq_group_retrydiff | コンシューマグループのリトライキューの蓄積メッセージ数 | 
| rocketmq_group_dlqdiff | コンシューマグループのデッドレターキューの蓄積メッセージ数 | 
| rocketmq_group_count | コンシューマグループ内のコンシューマ数 | 
| rocketmq_client_consume_fail_msg_count | 過去1時間にコンシューマグループ内のコンシューマが消費に失敗した回数 | 
| rocketmq_client_consume_fail_msg_tps | コンシューマグループのコンシューマ失敗TPS | 
| rocketmq_client_consume_ok_msg_tps | コンシューマグループのコンシューマ成功TPS | 
| rocketmq_client_consume_rt | メッセージがプルされた後、消費されるのにかかった時間 | 
| rocketmq_client_consumer_pull_rt | クライアントがメッセージをプルするのにかかった時間 | 
| rocketmq_client_consumer_pull_tps | クライアントのメッセージプルTPS | 
| rocketmq_consumer_tps | 各ブローカーグループのサブスクリプショングループの消費TPS | 
| rocketmq_group_consume_tps | サブスクリプショングループの現在の消費TPS(rocketmq_consumer_tpsのためにブローカーごとに集計) | 
| rocketmq_consumer_offset | ブローカーグループ内のサブスクリプショングループの現在の消費オフセット | 
| rocketmq_group_consume_total_offset | サブスクリプショングループの現在の消費オフセット(rocketmq_consumer_offsetのためにブローカーごとに集計) | 
| rocketmq_consumer_message_size | ブローカーグループ内のサブスクリプショングループのメッセージサイズ消費のTPS | 
| rocketmq_send_back_nums | ブローカーグループ内のサブスクリプショングループが消費に失敗し、リトライメッセージに書き込んだ回数 | 
| rocketmq_group_get_latency_by_storetime | コンシューマグループの消費遅延、エクスポータがメッセージを取得した時刻と現在時刻の差。 |