本文へスキップ
バージョン: 5.0

RocketMQ Prometheus Exporter

はじめに

Rocketmq-exporter は、RocketMQ ブローカーとクライアント側のすべての関連メトリクスを監視するシステムであり、mqAdmin を介してブローカー側から取得したメトリック値を 87 個のキャッシュにパッケージします。

警告

以前のバージョンでは、87個のconcurrentHashMapがありましたが、Mapは期限切れのメトリクスを削除しないため、ラベルの変更があると新しいメトリクスが生成され、古い未使用のメトリクスは自動的に削除されません。最終的にメモリオーバーフローを引き起こします。しかし、Cache構造を使用することで期限切れの削除が可能になり、有効期限を設定できます。

Rocketmq-exporter が監視メトリックを取得するプロセスを以下の図に示します。ExporterはMQAdminExtを通してMQクラスタからデータのリクエストを行い、リクエストされたデータはMetricServiceを通してPrometheusが必要とするフォーマットに標準化され、その後/metricsインターフェースを通してPrometheusに公開されます。4586095434

メトリック構造

Metricクラスはorg.apache.rocketmq.expoter.model.metricsパッケージにあり、本質的にはエンティティクラスのセットであり、各エンティティクラスは1種類のメトリックを表し、合計14個のMetricクラスがあります。これらのクラスは87個のキャッシュのキーとして機能し、異なるラベル値によって区別されます。

エンティティクラスには、ブローカー、コンシューマー、プロデューサーの3次元のラベルが含まれています。
  • ブローカー関連のメトリッククラス: BrokerRuntimeMetric、BrokerMetric、DLQTopicOffsetMetric、TopicPutNumMetric
  • コンシューマー関連のクラス: ConsumerRuntimeConsumeFailedMsgsMetric 、ConsumerRuntimeConsumeFailedTPSMetric 、ConsumerRuntimeConsumeOKTPSMetric、ConsumerRuntimeConsumeRTMetric、ConsumerRuntimePullRTMetric、ConsumerRuntimePullTPSMetric、ConsumerCountMetric、ConsumerMetric、ConsumerTopicDiffMetric
  • プロデューサー関連のメトリッククラス: ProducerMetric

Prometheusによるメトリックの取得

RocketMQ-exporter プロジェクトとPrometheusは、サーバー・クライアントの関係に相当します。RocketMQ-exporterプロジェクトはPrometheusクライアントパッケージを導入しており、プロジェクトのMetricFamilySamplesクラスで取得する情報の種類を指定しています。Prometheusはexporterからメトリクスをリクエストし、exporterはそれを対応するタイプにパッケージした後、Prometheusに情報を返します。

rocketmq-exporterプロジェクトが起動すると、様々なメトリクスをrocketmqからmfsオブジェクトに収集します。ブラウザまたはPrometheusが対応するインターフェースにアクセスすると、mfsオブジェクト内のサンプルは、サービスによってPrometheusがサポートするフォーマットされたデータに生成されます。主に以下のステップが含まれます。

ブラウザはip:5557/metricsにアクセスしてRMQMetricsControllerクラスのmetricsメソッドを呼び出します。ここでipはrocketmq-exporterプロジェクトが実行されているホストのIPです。

private void metrics(HttpServletResponse response) throws IOException {
StringWriter writer = new StringWriter();
metricsService.metrics(writer);
response.setHeader("Content-Type", "text/plain; version=0.0.4; charset=utf-8");
response.getOutputStream().print(writer.toString());
}

新しいStringWriterオブジェクトを作成してメトリックインジケーターを収集し、exporter内のメトリクスをMetricsServiceクラスのmetricsメソッドを通じてwriterオブジェクトに収集し、その後収集されたインジケーターをウェブページに出力します。

収集されたメトリクスのフォーマットは次のとおりです。

<metric name>{<label name>=<label value>, ...} <metric value>

例:

rocketmq_group_diff{group="rmq_group_test_20220114",topic="fusion_console_tst",countOfOnlineConsumers="0",msgModel="1",} 23.0

MetricCollectTaskクラスの5つのスケジュールタスク

MetricCollectTaskクラスには、collectTopicOffset、collectConsumerOffset、collectBrokerStatsTopic、collectBrokerStats、collectBrokerRuntimeStatsという5つのスケジュールタスクがあります。これらは、コンシューマーオフセット情報やブローカーの状態情報などを収集するために使用されます。そのcron式は「cron: 15 0/1 * * * ?」で、1分ごとに1回収集することを意味します。そのコア機能は、mqAdminExtオブジェクトを通じてクラスタ内のブローカーから情報を取得し、対応する87個の監視インジケーターに追加することです。例としてcollectTopicOffsetを取り上げます。

  1. まず、TopicListオブジェクトを初期化し、mqAdminExt.fetchAllTopicList()メソッドを使用してクラスタ内のすべてのトピック情報を取得します。

    TopicList topicList = null;
    try {
    topicList = mqAdminExt.fetchAllTopicList();
    } catch (Exception ex) {
    log.error(String.format("collectTopicOffset-exception comes getting topic list from namesrv, address is %s",
    JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
    return;
    }
  2. topicSetにトピックを追加し、各トピックを反復処理して、mqAdminExt.examineTopicStats(topic)関数を使用してトピックの状態を確認します。

    Set < String > topicSet = topicList != null ? topicList.getTopicList() : null;
    for (String topic: topicSet) {
    TopicStatsTable topicStats = null;
    try {
    topicStats = mqAdminExt.examineTopicStats(topic);
    } catch (Exception ex) {
    log.error(String.format("collectTopicOffset-getting topic(%s) stats error. the namesrv address is %s",
    topic,
    JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
    continue;}
  3. トピックの状態セット、ブローカーごとに分割されたトピック情報のオフセットのためのハッシュテーブルbrokerOffsetMap、ブローカー名をキーとして更新タイムスタンプを格納するハッシュテーブルbrokerUpdateTimestampMapを初期化します。

    Set<Map.Entry<MessageQueue, TopicOffset>> topicStatusEntries = topicStats.getOffsetTable().entrySet();
    HashMap<String, Long> brokerOffsetMap = new HashMap<>();
    HashMap<String, Long> brokerUpdateTimestampMap = new HashMap<>();
    for (Map.Entry<MessageQueue, TopicOffset> topicStatusEntry : topicStatusEntries) {
    MessageQueue q = topicStatusEntry.getKey();
    TopicOffset offset = topicStatusEntry.getValue();
    if (brokerOffsetMap.containsKey(q.getBrokerName())) {
    brokerOffsetMap.put(q.getBrokerName(), brokerOffsetMap.get(q.getBrokerName()) + offset.getMaxOffset());
    } else {
    brokerOffsetMap.put(q.getBrokerName(), offset.getMaxOffset());
    }
    if (brokerUpdateTimestampMap.containsKey(q.getBrokerName())) {
    if (offset.getLastUpdateTimestamp() > brokerUpdateTimestampMap.get(q.getBrokerName())) {
    brokerUpdateTimestampMap.put(q.getBrokerName(), offset.getLastUpdateTimestamp());
    }
    } else {
    brokerUpdateTimestampMap.put(q.getBrokerName(),
    offset.getLastUpdateTimestamp());
    }
    }

  4. 最後に、brokerOffsetMap内の各項目を反復処理することで、metricsServiceからmetricCollectorオブジェクトを取得し、RMQMetricsCollectorクラスのaddTopicOffsetMetricメソッドを呼び出して、対応する値をRMQMetricsCollectorクラスの87個のメトリクスのいずれかのキャッシュに追加します。

     Set<Map.Entry<String, Long>> brokerOffsetEntries = brokerOffsetMap.entrySet();
    for (Map.Entry<String, Long> brokerOffsetEntry : brokerOffsetEntries) {
    metricsService.getCollector().addTopicOffsetMetric(clusterName, brokerOffsetEntry.getKey(), topic,
    brokerUpdateTimestampMap.get(brokerOffsetEntry.getKey()), brokerOffsetEntry.getValue());
    }
    }
    log.info("topic offset collection task finished...." + (System.currentTimeMillis() - start));
    }

Rocketmq-exporterによるメトリクスの収集フローチャート

95680412354

クイックスタート

application.ymlの設定

server.portは、Prometheusがrocketmq-exporterをlistenするポートを設定し、デフォルト値は5557です。

  • application.ymlの重要な設定には以下が含まれます。

  • rocketmq.config.webTelemetryPathは、Prometheusがメトリクスを取得するパスを設定し、デフォルト値は/metricsです。デフォルト値を使用できます。

  • rocketmq.config.enableACL RocketMQクラスタでACL検証が有効になっている場合は、trueに設定し、対応するakとskをaccessKeyとsecretKeyに設定する必要があります。

  • rocketmq.config.outOfTimeSecondsは、メトリクスとその値を保存する有効期限を設定するために使用されます。この時間を超えてもキャッシュ内のキーに書き込み変更が行われなければ、削除されます。一般的には60秒に設定できます(Prometheusがメトリクスを取得する時間間隔は、有効期限に応じて適切に設定する必要があります。有効期限はPrometheusがメトリクスを収集する時間間隔以上である必要があります)。

  • task.*.cronは、スケジュールタスクを通じてexporterがブローカーからメトリクスを取得する時間間隔を設定し、デフォルト値は「15 0/1 * * * ?」で、1分ごとの15秒ごとにメトリクスを取得することを意味します。

exporterアプリケーションの起動

公式ウェブサイトの設定に従ってPrometheusを起動します

Prometheusのstatic_config: -targetsをexporterの起動IPとポートに設定します。例:localhost:5557。

Prometheusページへのアクセス

localhostがデフォルトのlocalhost:9090で起動している場合、収集されたメトリック値を表示できます(以下の図を参照)。

90671925984

ヒント

より良い視覚効果を得て、メトリック値の変化の傾向を観察するためには、PrometheusをGrafanaと連携して使用することをお勧めします。

可観測性メトリック

可観測性メトリックは、主にサーバー側メトリックとクライアント側メトリックの2つのカテゴリに分類されます。サーバー側メトリックはサーバーによって直接生成され、クライアント側メトリックはクライアント上で生成され、クライアントへのRPCリクエストを通じてサーバーによって取得されます。クライアント側メトリックはさらに、プロデューサーメトリックとコンシューマーメトリックに分類できます。すべての87個の可観測性メトリックとその主な意味は次のとおりです。

サーバーメトリック

サーバーメトリック

メトリック名定義対応するブローカーメトリック名
rocketmq_broker_tpsブローカーレベルの生産TPS
rocketmq_broker_qpsブローカーレベルの消費QPS
rocketmq_broker_commitlog_diffスレーブノードからのブローカーグループの同期遅れメッセージサイズ
rocketmq_brokeruntime_pmdt_0ms書き込みリクエストのサーバー側処理時間から書き込み完了まで(0ms)putMessageDistributeTime
rocketmq_brokeruntime_pmdt_0to10ms書き込みリクエストのサーバー側処理時間から書き込み完了まで(0~10ms)
rocketmq_brokeruntime_pmdt_10to50ms書き込みリクエストのサーバーサイド処理時間(完了まで)(10~50ms)
rocketmq_brokeruntime_pmdt_50to100ms書き込みリクエストのサーバーサイド処理時間(完了まで)(50~100ms)
rocketmq_brokeruntime_pmdt_100to200ms書き込みリクエストのサーバーサイド処理時間(完了まで)(100~200ms)
rocketmq_brokeruntime_pmdt_200to500ms書き込みリクエストのサーバーサイド処理時間(完了まで)(200~500ms)
rocketmq_brokeruntime_pmdt_500to1s書き込みリクエストのサーバーサイド処理時間(完了まで)(500~1000ms)
rocketmq_brokeruntime_pmdt_1to2s書き込みリクエストのサーバーサイド処理時間(完了まで)(1~2秒)
rocketmq_brokeruntime_pmdt_2to3s書き込みリクエストのサーバーサイド処理時間(完了まで)(2~3秒)
rocketmq_brokeruntime_pmdt_3to4s書き込みリクエストのサーバーサイド処理時間(完了まで)(3~4秒)
rocketmq_brokeruntime_pmdt_4to5s書き込みリクエストのサーバーサイド処理時間(完了まで)(4~5秒)
rocketmq_brokeruntime_pmdt_5to10s書き込みリクエストのサーバーサイド処理時間(完了まで)(5~10秒)
rocketmq_brokeruntime_pmdt_10stomore書き込みリクエストのサーバーサイド処理時間(完了まで)(10秒超)
rocketmq_brokeruntime_dispatch_behind_bytesまだ配信されていないメッセージのバイト数(インデックス作成などの操作)dispatchBehindBytes
rocketmq_brokeruntime_put_message_size_totalブローカーに書き込まれたメッセージのサイズの合計putMessageSizeTotal
rocketmq_brokeruntime_put_message_average_sizeブローカーに書き込まれたメッセージの平均サイズputMessageAverageSize
rocketmq_brokeruntime_remain_transientstore_buffer_numbsTransientStorePool内のキューの容量remainTransientStoreBufferNumbs
rocketmq_brokeruntime_earliest_message_timestampブローカーによって保存されているメッセージの最も古いタイムスタンプearliestMessageTimeStamp
rocketmq_brokeruntime_putmessage_entire_time_maxブローカーが実行を開始して以来、メッセージをブローカーに書き込むのにかかった最長時間putMessageEntireTimeMax
rocketmq_brokeruntime_start_accept_sendrequest_timeブローカーが送信リクエストの受け入れを開始した時刻startAcceptSendRequestTimeStamp
rocketmq_brokeruntime_putmessage_times_totalブローカーにメッセージが書き込まれた総回数putMessageTimesTotal
rocketmq_brokeruntime_getmessage_entire_time_maxブローカーが実行を開始して以来、メッセージのプル処理にかかった最長時間getMessageEntireTimeMax
rocketmq_brokeruntime_pagecache_lock_time_millspageCacheLockTimeMills
rocketmq_brokeruntime_commitlog_disk_ratiocommitLogが配置されているディスクの使用率commitLogDiskRatio
rocketmq_brokeruntime_dispatch_maxbufferブローカーが計算しておらず、0のままの値dispatchMaxBuffer
rocketmq_brokeruntime_pull_threadpoolqueue_capacityプルリクエストを処理するためのスレッドプールキューの容量。pullThreadPoolQueueCapacity
rocketmq_brokeruntime_send_threadpoolqueue_capacityプルリクエストを処理するスレッドプール内のキューの容量sendThreadPoolQueueCapacity
rocketmq_brokeruntime_query_threadpool_queue_capacityクエリリクエストを処理するスレッドプール内のキューの容量queryThreadPoolQueueCapacity
rocketmq_brokeruntime_pull_threadpoolqueue_sizeプルリクエストを処理するスレッドプール内のキューの実際のサイズpullThreadPoolQueueSize
rocketmq_brokeruntime_query_threadpoolqueue_sizeクエリリクエストを処理するスレッドプール内のキューの実際のサイズqueryThreadPoolQueueSize
rocketmq_brokeruntime_send_threadpool_queue_size送信リクエストを処理するスレッドプール内のキューの実際のサイズsendThreadPoolQueueSize
rocketmq_brokeruntime_pull_threadpoolqueue_headwait_timemillsプルリクエストを処理するスレッドプール内のキューの先頭タスクの待ち時間pullThreadPoolQueueHeadWaitTimeMills
rocketmq_brokeruntime_query_threadpoolqueue_headwait_timemillsクエリリクエストを処理するスレッドプール内のキューの先頭タスクの待ち時間queryThreadPoolQueueHeadWaitTimeMills
rocketmq_brokeruntime_send_threadpoolqueue_headwait_timemills送信リクエストを処理するスレッドプール内のキューの先頭タスクの待ち時間sendThreadPoolQueueHeadWaitTimeMills
rocketmq_brokeruntime_msg_gettotal_yesterdaymorning昨日の午前0時までに読み取られたメッセージの総回数msgGetTotalYesterdayMorning
rocketmq_brokeruntime_msg_puttotal_yesterdaymorning昨日の午前0時までに書き込まれたメッセージの総回数msgPutTotalYesterdayMorning
rocketmq_brokeruntime_msg_gettotal_todaymorning今日の午前0時までに読み取られたメッセージの総回数msgGetTotalTodayMorning
rocketmq_brokeruntime_msg_puttotal_todaymorning今日の午前0時までに書き込まれたメッセージの総回数putMessageTimesTotal
rocketmq_brokeruntime_msg_put_total_today_now各ブローカーにこれまで書き込まれたメッセージ数。msgPutTotalTodayNow
rocketmq_brokeruntime_msg_gettotal_today_now各ブローカーからこれまで読み取られたメッセージ数。msgGetTotalTodayNow
rocketmq_brokeruntime_commitlogdir_capacity_freecommitLogが保存されているディレクトリの空き容量。commitLogDirCapacity
rocketmq_brokeruntime_commitlogdir_capacity_totalcommitLogが保存されているディレクトリの総容量。
rocketmq_brokeruntime_commitlog_maxoffsetcommitLogの最大オフセット。commitLogMaxOffset
rocketmq_brokeruntime_commitlog_minoffsetcommitLogの最小オフセット。commitLogMinOffset
rocketmq_brokeruntime_remain_howmanydata_toflushremainHowManyDataToFlush
rocketmq_brokeruntime_getfound_tps600過去600秒間のgetMessage中の受信メッセージの平均TPS。getFoundTps
rocketmq_brokeruntime_getfound_tps60過去60秒間のgetMessage中の受信メッセージの平均TPS。
rocketmq_brokeruntime_getfound_tps10過去10秒間のgetMessage中の受信メッセージの平均TPS。
rocketmq_brokeruntime_gettotal_tps600過去600秒間のgetMessage呼び出しの平均TPS。getTotalTps
rocketmq_brokeruntime_gettotal_tps60過去60秒間のgetMessage呼び出しの平均TPS。
rocketmq_brokeruntime_gettotal_tps10過去10秒間のgetMessage呼び出しの平均TPS。
rocketmq_brokeruntime_gettransfered_tps600getTransferedTps
rocketmq_brokeruntime_gettransfered_tps60
rocketmq_brokeruntime_gettransfered_tps10
rocketmq_brokeruntime_getmiss_tps600過去600秒間のメッセージが取得されなかったgetMessageの平均TPSgetMissTps
rocketmq_brokeruntime_getmiss_tps60過去60秒間のメッセージが取得されなかったgetMessageの平均TPS
rocketmq_brokeruntime_getmiss_tps10過去10秒間のメッセージが取得されなかったgetMessageの平均TPS
rocketmq_brokeruntime_put_tps600過去600秒間のメッセージ書き込み操作の平均TPSputTps
rocketmq_brokeruntime_put_tps60過去60秒間のメッセージ書き込み操作の平均TPS
rocketmq_brokeruntime_put_tps10過去10秒間のメッセージ書き込み操作の平均TPS
プロデューサーメトリクス

プロデューサーメトリクス

メトリック名定義
rocketmq_producer_offset現時点でのトピックの最大オフセット
rocketmq_topic_retry_offset現時点でのリトライトピックの最大オフセット
rocketmq_topic_dlq_offset現時点でのデッドレタートピックの最大オフセット
rocketmq_producer_tpsブローカーグループ上のトピックの生産TPS
rocketmq_producer_message_sizeブローカーグループ上のトピックの生産メッセージサイズのTPS
rocketmq_queue_producer_tpsキューレベルの生産TPS
rocketmq_queue_producer_message_sizeメッセージサイズのキューレベルの生産TPS
コンシューマメトリクス

コンシューマメトリクス

メトリック名定義
rocketmq_group_diffコンシューマグループのメッセージ蓄積メッセージ数
rocketmq_group_retrydiffコンシューマグループのリトライキューの蓄積メッセージ数
rocketmq_group_dlqdiffコンシューマグループのデッドレターキューの蓄積メッセージ数
rocketmq_group_countコンシューマグループ内のコンシューマ数
rocketmq_client_consume_fail_msg_count過去1時間にコンシューマグループ内のコンシューマが消費に失敗した回数
rocketmq_client_consume_fail_msg_tpsコンシューマグループのコンシューマ失敗TPS
rocketmq_client_consume_ok_msg_tpsコンシューマグループのコンシューマ成功TPS
rocketmq_client_consume_rtメッセージがプルされた後、消費されるのにかかった時間
rocketmq_client_consumer_pull_rtクライアントがメッセージをプルするのにかかった時間
rocketmq_client_consumer_pull_tpsクライアントのメッセージプルTPS
rocketmq_consumer_tps各ブローカーグループのサブスクリプショングループの消費TPS
rocketmq_group_consume_tpsサブスクリプショングループの現在の消費TPS(rocketmq_consumer_tpsのためにブローカーごとに集計)
rocketmq_consumer_offsetブローカーグループ内のサブスクリプショングループの現在の消費オフセット
rocketmq_group_consume_total_offsetサブスクリプショングループの現在の消費オフセット(rocketmq_consumer_offsetのためにブローカーごとに集計)
rocketmq_consumer_message_sizeブローカーグループ内のサブスクリプショングループのメッセージサイズ消費のTPS
rocketmq_send_back_numsブローカーグループ内のサブスクリプショングループが消費に失敗し、リトライメッセージに書き込んだ回数
rocketmq_group_get_latency_by_storetimeコンシューマグループの消費遅延、エクスポータがメッセージを取得した時刻と現在時刻の差。