本文へスキップ
バージョン: 5.0

メッセージフィルタリング

コンシューマがトピックを購読すると、Apache RocketMQはトピック内のすべてのメッセージをコンシューマに配信します。ただし、コンシューマがビジネスに関連するメッセージのみを受信するようにしたい場合は、Apache RocketMQブローカにフィルタを設定できます。このトピックでは、メッセージフィルタリング機能とその動作について説明します。また、メッセージの分類方法と、さまざまなフィルタリング方法の使用方法の例についても説明します。

シナリオ

Apache RocketMQはパブリッシュ・サブスクライブパターンに従います。Apache RocketMQはメッセージ指向ミドルウェアであり、分散されたアップストリームとダウンストリームアプリケーション間の通信を容易にするために広く使用されています。現実世界のシナリオでは、アプリケーションはさまざまな方法でメッセージを消費する場合があります。これらのアプリケーションはすべて同じApache RocketMQトピックを購読でき、フィルタを設定して、これらのアプリケーションが関連するメッセージのみを受信できるようにすることができます。

Apache RocketMQのメッセージフィルタリング機能を使用することで、さまざまなコンシューマに送信されるメッセージを効果的に管理できます。これにより、ミッションクリティカルではない大量のメッセージによってシステムが過負荷になるのを防ぐことができます。

Apache RocketMQのメッセージフィルタリング機能はトピックレベルで有効になり、複数のサービスに分散されている1つのビジネスのメッセージを管理できます。異なるビジネスのメッセージを管理する場合は、異なるトピックを購読できます。

機能概要

定義

メッセージフィルタリング機能は、コンシューマが設定した条件に基づいてメッセージをフィルタリングし、条件を満たすメッセージをコンシューマに送信します。

まず、メッセージ属性とタグがApache RocketMQプロデューサーとコンシューマで定義されます。次に、コンシューマにフィルタ条件が設定され、Apache RocketMQブローカは条件に基づいてメッセージをフィルタリングし、フィルタリングされたメッセージをコンシューマに送信します。

**動作メカニズム** 消息过滤

メッセージフィルタリングには、次の手順が含まれます。

  • プロデューサー: プロデューサは、メッセージを初期化する前に、属性とタグをメッセージに添付します。これらの属性とタグは、コンシューマによって設定されたフィルタ条件と照合するために使用されます。

  • コンシューマ: コンシューマは、購読登録操作を呼び出して、メッセージの初期化と消費中に、購読したトピックとメッセージ、またはフィルタ条件をブローカに通知します。

  • ブローカ: コンシューマからのメッセージ要求を受け取ると、Apache RocketMQブローカは、コンシューマから送信されたフィルタ条件の式に基づいて、メッセージを動的にフィルタリングし、フィルタ条件に一致するメッセージをコンシューマに送信します。

分類

Apache RocketMQは、タグベースのフィルタリングと属性ベースのSQLフィルタリングをサポートしています。次の表では、これら2つの方法を比較しています。

項目タグベースのフィルタリング属性ベースのSQLフィルタリング
フィルタ対象メッセージタグ。メッセージ属性。カスタム属性とシステム属性が含まれます。メッセージタグはシステム属性(TAGS)です。
フィルタリング能力完全一致。SQL構文ベースの一致。
シナリオタグに基づいた単純なフィルタリング。タグと属性間の関係を含む複雑なフィルタリング。

フィルタリング方法の使用方法の詳細については、タグベースのフィルタリング属性ベースのSQLフィルタリングを参照してください。

購読の一貫性

フィルタ式は購読の一部です。Apache RocketMQのパブリッシュ・サブスクライブパターンによると、コンシューマグループ内の別のコンシューマとのコンシューマの購読は、フィルタ式を含め、一貫している必要があります。そうしないと、一部のメッセージを消費できない状況が発生する可能性があります。詳細については、購読を参照してください。

タグベースのフィルタリング

タグベースのフィルタリングは、Apache RocketMQによって提供される基本的なメッセージフィルタリング機能です。この機能は、プロデューサーに設定されたタグに基づいてメッセージをフィルタリングします。コンシューマは、タグを使用して、どのメッセージを消費するかを指定します。

シナリオ

次の図は、電子商取引トランザクションシナリオの例を示しています。注文の配置から製品の受信まで、一連のメッセージが生成されます。たとえば、以下のようなメッセージです。

  • 注文メッセージ

  • 支払いメッセージ

  • 物流メッセージ

これらのメッセージは、Trade_Topicという名前のトピックに送信され、複数のシステムが購読者としています。これには以下が含まれます。

  • 決済システム: 支払いメッセージのみを購読します。

  • 物流システム: 物流メッセージのみを購読します。

  • トランザクション成功率分析システム: 注文メッセージと支払いメッセージを購読します。

  • リアルタイム計算システム: すべてのメッセージを購読します。

Tag过滤

タグの設定

  • プロデューサーは、メッセージを送信する前に、各メッセージに1つのタグのみを添付します。

  • タグは文字列です。文字列の推奨最大長は128文字です。

フィルタリングルール

タグベースのフィルタリングは、文字列に基づいた正確なフィルタリングを実装します。次のフィルタリングルールを設定できます。

  • シングルタグマッチ: フィルタ式を単一のタグに設定して、そのタグが付いているメッセージのみを受信できます。

  • マルチタグマッチ: フィルタ式に複数のタグを設定して、いずれかのタグが付いているメッセージを受信できます。タグは2つの縦線(||)で区切ります。たとえば、Tag1||Tag2||Tag3は、Tag1、Tag2、またはTag3が付いているメッセージがすべてコンシューマに送信されることを示します。

  • すべて一致: アスタリスク(*)を使用してすべてのタグに一致させることができます。これは、トピック内のすべてのメッセージがコンシューマに送信されることを意味します。

  • タグを設定してメッセージを送信する

    Message message = messageBuilder.setTopic("topic")
    // Specify the message index key so that the system can use a keyword to accurately locate the message.
    .setKeys("messageKey")
    // Specify the message tag so that consumers can use the tag to filter the message.
    // This example indicates that the tag of the message is set to "TagA".
    .setTag("TagA")
    // Message body.
    .setBody("messageBody".getBytes())
    .build();
  • タグを指定してメッセージを購読する

    String topic = "Your Topic";
    // Subscribe to messages that carry tag "TagA".
    FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG);
    pushConsumer.subscribe(topic, filterExpression);
  • 複数のタグを指定してメッセージを購読する

    String topic = "Your Topic";
    // Subscribe to messages that carry tag TagA, TagB, or TagC.
    FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG);
    pushConsumer.subscribe(topic, filterExpression);
  • トピック内のすべてのメッセージを購読する

    String topic = "Your Topic";
    // Subscribe to all messages.
    FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
    pushConsumer.subscribe(topic, filterExpression);

属性ベースのSQLフィルタリング

属性ベースのSQLフィルタリングは、Apache RocketMQによって提供される高度なメッセージフィルタリング方法です。プロデューサーがメッセージに設定する属性と属性値(キーと値とも呼ばれます)に基づいてメッセージをフィルタリングします。プロデューサーは、1つのメッセージに複数の属性を設定できます。コンシューマは、SQL式で属性を指定して、特定のメッセージを受信できます。

情報

タグはシステム属性であるため、タグベースのフィルタリングは属性ベースのSQLフィルタリングの一種です。SQL構文では、タグ属性はTAGSで表されます。

シナリオ

次の図は、電子商取引トランザクションシナリオの例を示しています。注文の配置から製品の受信まで、一連のメッセージが生成されます。メッセージは、注文メッセージと物流メッセージに分類されます。物流メッセージには地域属性が設定されており、地域属性の値は杭州と上海です。

  • 注文メッセージ

  • 物流メッセージ

    • 地域属性の値が杭州である物流メッセージ

    • 地域属性の値が上海である物流メッセージ

これらのメッセージは、Trade_Topicという名前のトピックに送信され、次のシステムが購読者としています。

  • 物流システム1: 地域属性の値が杭州である物流メッセージのみを購読します。

  • 物流システム2: すべての物流メッセージを購読します。

  • 注文追跡システム: 注文メッセージのみを購読します。

  • リアルタイム計算システム: すべてのメッセージを購読します。

sql过滤

メッセージ属性の設定

プロデューサーは、メッセージを送信する前に、カスタム属性をメッセージに設定できます。各属性は、カスタムのキーと値のペアです。

1つのメッセージに複数の属性を設定できます。

フィルタリングルール

フィルタ式を作成する際は、SQL92構文に従う必要があります。具体的には

構文説明
IS NULL属性が存在しないことを指定します。a IS NULL: 属性aは存在しません。
IS NOT NULL属性が存在することを指定します。a IS NOT NULL: 属性aは存在します。
> >= < <=数値を比較します。文字列を比較するには使用できません。文字列を比較しようとすると、コンシューマの起動時にエラーが報告されます。**注記** 数値に変換できる文字列も数値とみなされます。 a IS NOT NULL AND a > 100: 属性aが存在し、属性aの値が100より大きいです。 a IS NOT NULL AND a > 'abc': エラーの例。abcは文字列です。そのため、aとabcを比較することはできません。
BETWEEN xxx AND xxx数値を比較します。文字列を比較するには使用できません。文字列を比較しようとすると、コンシューマの起動時にエラーが報告されます。構文は>>= xxx AND <= xxxと同等です。属性の値が2つの数値の間にあるか、2つの数値のいずれかに等しいことを意味します。a IS NOT NULL AND (a BETWEEN 10 AND 100): 属性aが存在し、属性aの値が10以上100以下です。
NOT BETWEEN xxx AND xxx数値を比較します。文字列を比較するには使用できません。文字列を比較しようとすると、コンシューマの起動時にエラーが報告されます。構文は < xxx OR>xxxと同等です。属性の値が左側の数値より小さいか、右側の数値より大きいことを意味します。a IS NOT NULL AND (a NOT BETWEEN 10 AND 100): 属性aが存在し、属性aの値が10未満または100より大きいです。
IN (xxx, xxx)属性の値が集合に含まれていることを示します。集合内の要素は文字列のみです。a IS NOT NULL AND (a IN ('abc', 'def')): 属性aが存在し、属性aの値がabcまたはdefです。
= <>等号演算子と不等号演算子。数値と文字列の比較に使用できます。a IS NOT NULL AND (a = 'abc' OR a<>'def'): 属性aが存在し、属性aの値がabcであるか、属性aの値がdefではないかのいずれかです。
AND OR論理AND演算子と論理OR演算子。これらは単純な論理関数を組み合わせるために使用でき、各論理関数は括弧で囲む必要があります。a IS NOT NULL AND (a > 100) OR (b IS NULL):属性aが存在し、属性aの値が100より大きい、または属性bが存在しない。

SQL属性ベースのフィルタリングは、カスタムメッセージ属性の設定とSQLフィルタ式を定義することで実装されます。フィルタ式は有効な結果を生成しない可能性があります。Apache RocketMQブローカは、次のロジックに基づいてメッセージを処理します。

  • 例外処理:フィルタ式の評価時に例外が報告された場合、ブローカはデフォルトで受信メッセージをフィルタリングし、コンシューマにメッセージを配信しません。たとえば、数値と非数値を比較すると例外が発生します。

  • NULL値の処理:フィルタ式の計算結果がNULLである場合、または値がブール値でない場合、ブローカはデフォルトで受信メッセージをフィルタリングし、コンシューマにメッセージを配信しません。ブール値は真偽値を表し、trueまたはfalseになります。プロデューサーが送信するメッセージのカスタム属性を設定しなかったと仮定しますが、このカスタム属性はSQL式でフィルタ条件として使用されます。この場合、フィルタ式の評価結果はNULLになります。

  • 不一致の数値の処理:カスタムメッセージ属性の値が浮動小数点数だが、フィルタ式で使用される属性値が整数の場合、ブローカはデフォルトで受信メッセージをフィルタリングし、コンシューマにメッセージを配信しません。

  • メッセージにタグと属性を設定し、メッセージを送信する

    Message message = messageBuilder.setTopic("topic")
    // Specify the message index key so that the system can use a keyword to accurately locate the message.
    .setKeys("messageKey")
    // Specify the message tag so that consumers can use the tag to filter the message.
    // This example indicates that the message tag is set to "messageTag".
    .setTag("messageTag")
    // You can also set custom attributes for the messages, such as environment, region, and logical branch.
    // In this example, the custom attribute is region and the attribute value is Hangzhou.
    .addProperty("Region", "Hangzhou")
    // Message body.
    .setBody("messageBody".getBytes())
    .build();
  • カスタム属性に基づいてメッセージを購読およびフィルタリングする

    String topic = "topic";
    // Subscribe only to messages whose value of the region attribute is Hangzhou.
    FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND Region='Hangzhou'", FilterExpressionType.SQL92);
    simpleConsumer.subscribe(topic, filterExpression);
  • 複数のカスタム属性に基づいてメッセージを購読およびフィルタリングする

    String topic = "topic";
    // Subscribe to messages whose value of the region attribute is Hangzhou and value of the price attribute is greater than 30.
    FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND price IS NOT NULL AND Region = 'Hangzhou' AND price > 30", FilterExpressionType.SQL92);
    simpleConsumer.subscribe(topic, filterExpression);
  • トピック内のすべてのメッセージを購読する

    String topic = "topic";
    // Subscribe to all the messages.
    FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
    simpleConsumer.subscribe(topic, filterExpression);

使用方法

メッセージに対してトピックとタグを適切に設定します。

トピック、タグ、属性を使用してメッセージを分割できます。メッセージを分割する際には、次の点に注意してください。

  • メッセージの種類:順序付きメッセージや通常のメッセージなど、異なる種類のメッセージは、異なるトピックを使用して分割する必要があります。タグを使用してメッセージの種類を分割しないでください。

  • ビジネスドメイン:異なるビジネスドメインや部門は、異なるトピックを使用する必要があります。たとえば、ロジスティクスメッセージと支払いメッセージでは、トピックを異にする必要があります。ロジスティクスメッセージは、タグを使用して通常のメッセージと緊急メッセージにさらに分割できます。

  • メッセージの量と重要度:量または関連性の重要度が異なるメッセージは、異なるトピックに分割する必要があります。