クイックスタート
このセクションでは、シングルノードのRocketMQクラスタを迅速にデプロイする手順について説明します。メッセージの送受信を行うためのコマンドも、動作確認として含まれています。
- 64bit OS、Linux/Unix/macOSを推奨
- 64bit JDK 1.8+
1. Apache RocketMQを取得する
以下の説明では、RocketMQのインストールプロセスを紹介するために、Linux環境におけるRocketMQ 5.2.0ソースパッケージの適用を例として取り上げます。
RocketMQ 5.2.0のソースパッケージを解凍し、バイナリ実行ファイルをコンパイルしてビルドします。
$ unzip rocketmq-all-5.2.0-source-release.zip
$ cd rocketmq-all-5.2.0-source-release/
$ mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
$ cd distribution/target/rocketmq-5.2.0/rocketmq-5.2.0
2. NameServerを起動する
RocketMQのインストール後、NameServerを起動します。
### start namesrv
$ nohup sh bin/mqnamesrv &
 
### verify namesrv 
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
namesrv.logに'The Name Server boot success..'というメッセージが表示されれば、NameServerが正常に起動したことを意味します。
3. BrokerとProxyを起動する
NameServerの起動後、BrokerとProxyを起動する必要があります。BrokerとProxyが同じプロセスにデプロイされるローカルデプロイモードを推奨します。クラスタデプロイモードもサポートしています。詳細については、デプロイメントの概要を参照してください。
### start broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
### verify broker
$ tail -f ~/logs/rocketmqlogs/proxy.log 
The broker[broker-a,192.169.1.2:10911] boot success...
proxy.logに「The broker」[brokerName,ip:port]boot success..」というメッセージが表示されれば、Brokerが正常に起動したことを意味します。
これで、シングルマスターのRocketMQクラスタがデプロイされ、スクリプトを使用して簡単なメッセージを送受信できます。
4. ツールを使用してメッセージを送受信する
ツールでテストする前に、システム環境変数 `NAMESRV_ADDR` のように、ネームサーバーのアドレスをシステムに設定する必要があります。
$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...
5. SDKを使用してメッセージを送受信する
クライアントSDKを使用してメッセージを送受信することもできます。詳細については、rocketmq-clientsを参照してください。
- Javaプロジェクトを作成します。 
- pom.xml にSDKの依存関係を追加します。`rocketmq-client-java-version` を最新リリースに置き換えることを忘れないでください。 - <dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-client-java</artifactId>
 <version>${rocketmq-client-java-version}</version>
 </dependency>
- mqadmin cliツールでトピックを作成します。 - $ sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
- 作成したJavaプロジェクトで、メッセージを送信するプログラムを作成し、次のコードで実行します。 - import java.io.IOException;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.producer.Producer;
 import org.apache.rocketmq.client.apis.producer.SendReceipt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 public class ProducerExample {
 private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
 public static void main(String[] args) throws ClientException, IOException {
 String endpoint = "localhost:8081";
 String topic = "TestTopic";
 ClientServiceProvider provider = ClientServiceProvider.loadService();
 ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
 ClientConfiguration configuration = builder.build();
 Producer producer = provider.newProducerBuilder()
 .setTopics(topic)
 .setClientConfiguration(configuration)
 .build();
 Message message = provider.newMessageBuilder()
 .setTopic(topic)
 .setKeys("messageKey")
 .setTag("messageTag")
 .setBody("messageBody".getBytes())
 .build();
 try {
 SendReceipt sendReceipt = producer.send(message);
 logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
 } catch (ClientException e) {
 logger.error("Failed to send message", e);
 }
 // producer.close();
 }
 }
- 作成したJavaプロジェクトで、コンシューマーのデモプログラムを作成して実行します。Apache RocketMQは、SimpleConsumerとPushConsumerをサポートしています。 - import java.io.IOException;
 import java.util.Collections;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
 import org.apache.rocketmq.client.apis.consumer.PushConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 public class PushConsumerExample {
 private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
 private PushConsumerExample() {
 }
 public static void main(String[] args) throws ClientException, IOException, InterruptedException {
 final ClientServiceProvider provider = ClientServiceProvider.loadService();
 String endpoints = "localhost:8081";
 ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
 .setEndpoints(endpoints)
 .build();
 String tag = "*";
 FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
 String consumerGroup = "YourConsumerGroup";
 String topic = "TestTopic";
 PushConsumer pushConsumer = provider.newPushConsumerBuilder()
 .setClientConfiguration(clientConfiguration)
 .setConsumerGroup(consumerGroup)
 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
 .setMessageListener(messageView -> {
 logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
 return ConsumeResult.SUCCESS;
 })
 .build();
 Thread.sleep(Long.MAX_VALUE);
 // pushConsumer.close();
 }
 }
6. サーバーをシャットダウンする
練習を終えたら、次のコマンドでサービスをシャットダウンできます。
$ sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
$ sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK