クイックスタート
このセクションでは、シングルノードのRocketMQクラスタを迅速にデプロイする手順について説明します。メッセージの送受信を行うためのコマンドも、動作確認として含まれています。
- 64bit OS、Linux/Unix/macOSを推奨
- 64bit JDK 1.8+
1. Apache RocketMQを取得する
以下の説明では、RocketMQのインストールプロセスを紹介するために、Linux環境におけるRocketMQ 5.2.0ソースパッケージの適用を例として取り上げます。
RocketMQ 5.2.0のソースパッケージを解凍し、バイナリ実行ファイルをコンパイルしてビルドします。
$ unzip rocketmq-all-5.2.0-source-release.zip
$ cd rocketmq-all-5.2.0-source-release/
$ mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
$ cd distribution/target/rocketmq-5.2.0/rocketmq-5.2.0
2. NameServerを起動する
RocketMQのインストール後、NameServerを起動します。
### start namesrv
$ nohup sh bin/mqnamesrv &
### verify namesrv
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
namesrv.logに'The Name Server boot success..'というメッセージが表示されれば、NameServerが正常に起動したことを意味します。
3. BrokerとProxyを起動する
NameServerの起動後、BrokerとProxyを起動する必要があります。BrokerとProxyが同じプロセスにデプロイされるローカルデプロイモードを推奨します。クラスタデプロイモードもサポートしています。詳細については、デプロイメントの概要を参照してください。
### start broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
### verify broker
$ tail -f ~/logs/rocketmqlogs/proxy.log
The broker[broker-a,192.169.1.2:10911] boot success...
proxy.logに「The broker」[brokerName,ip:port]boot success..」というメッセージが表示されれば、Brokerが正常に起動したことを意味します。
これで、シングルマスターのRocketMQクラスタがデプロイされ、スクリプトを使用して簡単なメッセージを送受信できます。
4. ツールを使用してメッセージを送受信する
ツールでテストする前に、システム環境変数 `NAMESRV_ADDR` のように、ネームサーバーのアドレスをシステムに設定する必要があります。
$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
5. SDKを使用してメッセージを送受信する
クライアントSDKを使用してメッセージを送受信することもできます。詳細については、rocketmq-clientsを参照してください。
Javaプロジェクトを作成します。
pom.xml にSDKの依存関係を追加します。`rocketmq-client-java-version` を最新リリースに置き換えることを忘れないでください。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>${rocketmq-client-java-version}</version>
</dependency>mqadmin cliツールでトピックを作成します。
$ sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
作成したJavaプロジェクトで、メッセージを送信するプログラムを作成し、次のコードで実行します。
import java.io.IOException;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerExample {
private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
public static void main(String[] args) throws ClientException, IOException {
String endpoint = "localhost:8081";
String topic = "TestTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
Message message = provider.newMessageBuilder()
.setTopic(topic)
.setKeys("messageKey")
.setTag("messageTag")
.setBody("messageBody".getBytes())
.build();
try {
SendReceipt sendReceipt = producer.send(message);
logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (ClientException e) {
logger.error("Failed to send message", e);
}
// producer.close();
}
}作成したJavaプロジェクトで、コンシューマーのデモプログラムを作成して実行します。Apache RocketMQは、SimpleConsumerとPushConsumerをサポートしています。
import java.io.IOException;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PushConsumerExample {
private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String endpoints = "localhost:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
String consumerGroup = "YourConsumerGroup";
String topic = "TestTopic";
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// pushConsumer.close();
}
}
6. サーバーをシャットダウンする
練習を終えたら、次のコマンドでサービスをシャットダウンできます。
$ sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
$ sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK