メインコンテンツへスキップ
バージョン: 5.0

クイックスタート

このセクションでは、シングルノードのRocketMQクラスタを迅速にデプロイする手順について説明します。メッセージの送受信を行うためのコマンドも、動作確認として含まれています。

システム要件
  1. 64bit OS、Linux/Unix/macOSを推奨
  2. 64bit JDK 1.8+

1. Apache RocketMQを取得する

RocketMQをダウンロード

Apache RocketMQは、バイナリパッケージとソースパッケージの両方で配布されています。Apache RocketMQ 5.2.0のソースパッケージをダウンロードするには、こちらをクリックしてください。コンパイル済みのバイナリパッケージは、そのまま実行できるため、こちらの方が便利かもしれません。

以下の説明では、RocketMQのインストールプロセスを紹介するために、Linux環境におけるRocketMQ 5.2.0ソースパッケージの適用を例として取り上げます。

RocketMQ 5.2.0のソースパッケージを解凍し、バイナリ実行ファイルをコンパイルしてビルドします。

$ unzip rocketmq-all-5.2.0-source-release.zip
$ cd rocketmq-all-5.2.0-source-release/
$ mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
$ cd distribution/target/rocketmq-5.2.0/rocketmq-5.2.0

2. NameServerを起動する

RocketMQのインストール後、NameServerを起動します。

### start namesrv
$ nohup sh bin/mqnamesrv &

### verify namesrv
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
情報

namesrv.logに'The Name Server boot success..'というメッセージが表示されれば、NameServerが正常に起動したことを意味します。

3. BrokerとProxyを起動する

NameServerの起動後、BrokerとProxyを起動する必要があります。BrokerとProxyが同じプロセスにデプロイされるローカルデプロイモードを推奨します。クラスタデプロイモードもサポートしています。詳細については、デプロイメントの概要を参照してください。

### start broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

### verify broker
$ tail -f ~/logs/rocketmqlogs/proxy.log
The broker[broker-a,192.169.1.2:10911] boot success...
情報

proxy.logに「The broker」[brokerName,ip:port]boot success..」というメッセージが表示されれば、Brokerが正常に起動したことを意味します。

注意

これで、シングルマスターのRocketMQクラスタがデプロイされ、スクリプトを使用して簡単なメッセージを送受信できます。

4. ツールを使用してメッセージを送受信する

ツールでテストする前に、システム環境変数 `NAMESRV_ADDR` のように、ネームサーバーのアドレスをシステムに設定する必要があります。

$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

5. SDKを使用してメッセージを送受信する

クライアントSDKを使用してメッセージを送受信することもできます。詳細については、rocketmq-clientsを参照してください。

  1. Javaプロジェクトを作成します。

  2. pom.xml にSDKの依存関係を追加します。`rocketmq-client-java-version` を最新リリースに置き換えることを忘れないでください。

    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>${rocketmq-client-java-version}</version>
    </dependency>
  3. mqadmin cliツールでトピックを作成します。

    $ sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
  4. 作成したJavaプロジェクトで、メッセージを送信するプログラムを作成し、次のコードで実行します。

    import java.io.IOException;
    import org.apache.rocketmq.client.apis.ClientConfiguration;
    import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
    import org.apache.rocketmq.client.apis.ClientException;
    import org.apache.rocketmq.client.apis.ClientServiceProvider;
    import org.apache.rocketmq.client.apis.message.Message;
    import org.apache.rocketmq.client.apis.producer.Producer;
    import org.apache.rocketmq.client.apis.producer.SendReceipt;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    public class ProducerExample {
    private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);

    public static void main(String[] args) throws ClientException, IOException {
    String endpoint = "localhost:8081";
    String topic = "TestTopic";
    ClientServiceProvider provider = ClientServiceProvider.loadService();
    ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
    ClientConfiguration configuration = builder.build();
    Producer producer = provider.newProducerBuilder()
    .setTopics(topic)
    .setClientConfiguration(configuration)
    .build();
    Message message = provider.newMessageBuilder()
    .setTopic(topic)
    .setKeys("messageKey")
    .setTag("messageTag")
    .setBody("messageBody".getBytes())
    .build();
    try {
    SendReceipt sendReceipt = producer.send(message);
    logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
    } catch (ClientException e) {
    logger.error("Failed to send message", e);
    }
    // producer.close();
    }
    }
  5. 作成したJavaプロジェクトで、コンシューマーのデモプログラムを作成して実行します。Apache RocketMQは、SimpleConsumerPushConsumerをサポートしています。

    import java.io.IOException;
    import java.util.Collections;
    import org.apache.rocketmq.client.apis.ClientConfiguration;
    import org.apache.rocketmq.client.apis.ClientException;
    import org.apache.rocketmq.client.apis.ClientServiceProvider;
    import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
    import org.apache.rocketmq.client.apis.consumer.FilterExpression;
    import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
    import org.apache.rocketmq.client.apis.consumer.PushConsumer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    public class PushConsumerExample {
    private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
    final ClientServiceProvider provider = ClientServiceProvider.loadService();
    String endpoints = "localhost:8081";
    ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    .setEndpoints(endpoints)
    .build();
    String tag = "*";
    FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
    String consumerGroup = "YourConsumerGroup";
    String topic = "TestTopic";
    PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    .setConsumerGroup(consumerGroup)
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
    logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
    return ConsumeResult.SUCCESS;
    })
    .build();
    Thread.sleep(Long.MAX_VALUE);
    // pushConsumer.close();
    }
    }

6. サーバーをシャットダウンする

練習を終えたら、次のコマンドでサービスをシャットダウンできます。

$ sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

$ sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK