爷们儿,rocketMq中文文档,might

为什么是RocketMQ

动机

在前期阶段,咱们在ActiveMQ 5.x(早于5.3)的基础上构建咱们的分布式音讯中间件。咱们的跨国事务运用它来完成异步通讯、检索、交际网络活动流、数据管道,甚至在买卖过程中也在运用。跟着咱们的买卖事务量增加,来自音讯集群的压力日积月累,亟需处理。

为什么是RocketMQ

依据咱们的研讨,跟着运用中的行列越来越长、虚拟主题越来越多,ActiveMQ的IO模型会抵达一个瓶颈。咱们极力地企图经过节省、断路器或许降级这些手法来处理这个问题,可是都没有很好的作用。因而,咱们开端注意到其时十分盛行的音讯处理方案,Kafka。不走运的是,Kafka并不能满意咱们的需求,尤其是在低推迟和高可用这两点上,点击链接进一步了解细节。

在这样的情况下,咱们决议写一个全新的音讯引擎来处理这一类用处更广泛的运用事例,包含规模从传统的发布/订阅情形到高流量实时零过失买卖体系。咱们信任这个处理方案能带来优点,所以咱们十分乐意把这个项目向社区开源。今时今天,已经有超越100家企业在他们的事务里选用开源版别的RocketMQ。咱们也在阿里云渠道发布了一个依据RocketMQ的商业版PaaS产品。

下面这张表格对比了RocketMQ、ActiveMQ和Kafaka(Apache里最受欢迎的依据java的音讯处理方案)

RocketMQ vs. ActiveMQ vs. Kafka

(译者注:markdown要完好完成这个表格比爷们儿,rocketMq中文文档,might较杂乱,暂时放置,拖延处理)

快速开端

快速开端指引由具体的指令组成,告诉你如安在本机装备RocketMQ音讯投递体系并且收发音讯。

前期必要的预备

以下软件有必要装置:

  1. 64位操作体系,linux/Unit/Mac(引荐);
  2. 64位JDK,1.8+;
  3. Maven 3.2.x
  4. Git

克隆代码库房和构建程序

> git clone -b develop https://github.com/apache/rocketmq.git

> cd rocketmq

> mvn -Prelease-all -DskipTests clean install -U

> cd distribution/target/apache-rocketmq

(译者注:指令调集无需翻译。下願い同。)

发动Name Server

> nohup sh bin/mqnamesrv &

> tail -f ~/logs/rocketmqlogs/namesrv.log

The Name Server boot success...

发动Broker

> nohup sh bin/mqbroker -n localhost:9876 &

> tail -f ~/logs/rocketmqlogs/broker.log

The broker[%s, 172.30.30.233:10911] boot success...

发送和接纳音讯

在发送或接纳音讯之前,咱们需求告诉客户端name servers的方位。RocketMQ供给多种完成办法。为简略起见,咱们现在展现环境变量NAMESRV_ADDR的用法

> export NAMESRV_ADDR=localhost:9876

> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

SendRes爷们儿,rocketMq中文文档,mightult [sendStatus=SEND_OK, msgId= ...

> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

ConsumeMessageThread_%d Receive New Messages: [MessageExt...

封闭一切服务器

> sh bin/mqshutdown broker

The mqbroker(36695) is running...

Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv

The mqnamesrv(36664) is running...

Send shutdown request to mqnamesrv(36664) OK

简略示例

简略的音讯示例

运用RocketMQ发送音讯的3种办法:牢靠同步发送、牢靠异步发送和单向发送

This page exemplifies the夜空中最亮的星原唱se three messa份额ge-sending ways. Checkout the notes along with the example to figure out which way to use for your specific use case.

本页文档展现这3种音讯发送办法。检出这些带有注释的示例代码,能够让你知道每一个用例对应哪一种发送办法。

牢靠同步发送

运用:牢靠同步发送在很多场景中被运用,例如重要的告诉音讯、短信告诉、短信营销体系,等等。

public class SyncProducer {

public static void main(String[] args) throws Exception { //Instantiate with a producer group name.

DefaultMQProducer producer = new

DefaultMQProducer("please_rename_unique_group_name"); //Launch the instance.

p下雪的诗句roducer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body.

Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,

("Hello RocketMQ " +

i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */

); //Call send message to deliver message to one of brokers.

SendResult sendResult = producer.send(msg);

System.out.printf("%s%n", sendResult);

} //Shut down once the producer instance is not longer in use.

producer.shutdown();

}

}

牢靠异步发送

运用:异步发送通常被用于对呼应时刻灵敏的事务场景

public class AsyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name.

DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); //Launch the instance.

producer.start();

producer.setRetryTimesWhenSendAsyncFailed(0); for (int i = 0; i < 100; i++) { final int index = i; //Create a message instance, specifying topic, tag and message body.

Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

producer.send(msg, new SendCallback() { @Override

public void onSuccess(SendResult sendResult) {

System.out.printf("%-10d OK %s %n", index,

sendResult.getMsgId());

} @Override

public void onException(Throwable e) {

Sys爷们儿,rocketMq中文文档,mighttem.out.printf("%-10d Exception %s %n", index, e);

e.printStackTrace();

}

});

} //Shut down once the producer instance is not longer in use.

producer.shutdown();

}

}

单向发送

运用:单向发送用于要求必定牢靠性的场景,例如日志搜集。

public class OnewayProducer { public static void main(String[] args) throws Exception{ //Instantiate with a producer group name.

DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); //Launch the instance.

producer.star21克拉t(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body.

Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,

("Hello RocketMQ " +

i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */

); //Call send message to deliver message to one of brokers.

producer.sendOneway(msg);

} //Shut down once the producer instance is not longer in use.

producer.shutdown();

}

}

次序音讯示例

次序音讯

RocketMQ供给运用先进先出算法的次序音讯完成。

以下示例展现了怎么发送/接纳大局次序音讯和分区次序音讯。

发送音讯示例代码

public class OrderedProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name.

MQProducer producer = new DefaultMQProducer("example_group_name"); //Launch the instance.

producer.start();

String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 100; i++) { int orderId = i % 10; //Create a message instance, specifying topic, tag and message body.

Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,

("Hello 爷们儿,rocketMq中文文档,mightRocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override

public MessageQueue select(List mqs, Message msg, Object arg) {

Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index);

}

}, orderId);

System.out.printf("%s%n", sendResult);

} //server shutdown

producer.shutdown();

}

}

订阅音讯示例代码

public class OrderedConsumer { public static void main(String[] args) throws Exception {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(她new MessageLi爷们儿,rocketMq中文文档,mightstenerOrderly() {

AtomicLong c社旗天气预报onsumeTimes = new AtomicLong(0); @Override

public ConsumeOrderlyStatus consumeMessage(List msgs,

ConsumeOrderlyContext context) {

context.setAutoCommit(false);

System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 2) == 0) { return ConsumeOrderlyStatus.SUCCESS;

} else if ((this.con黑夜影视sumeTimes.get() % 3) == 0) { return ConsumeOrderlyStatus.ROLLBACK;

} else if ((this.consumeTimes.get() % 4) == 0) { return ConsumeOrderlyStatus.COMMIT;

} else if ((this.consumeTimes.get() % 5) == 0) {

context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;

} return ConsumeOrderlyStatus.SUCCESS;

}

});

consumer.start();

System.out.printf("Consumer Started.%n");

}

}

播送示例

什么是播送

播送便是向一个主题的一切订阅者发送同一条音讯。假如你想让一个主题的一切订阅者收到音讯,播送是一个很好的挑选。

生产者示例

public class BroadcastProducer {

public static void main(String[] args) throws Exception {

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

producer.start(); for (int i = 0; i < 100; i++){

Message msg = new Message白狐("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

SendResult sendResult = producer.send(msg);

System.out.printf("%s%n", sendResult);

}

producer.shutdown();

}

}

顾客示例

public class BroadcastConsumer { public static void main(String[] args) throws Exception {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //set to broadcast mode

consumer.setMessageModel(MessageModel.BROADCASTING);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerConcurrently() { @Override

public ConsumeConcurrentlyStatus consumeMessage(List msgs,

ConsumeConcurrentlyContext context) {

System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

consumer.start();

System.out.printf("Broadcast Consumer Started.%n");

}

}

延时音讯示例

什么是延时音讯?

延时音讯供给了一种不同于一般音讯的完成方法——它们只会在设定的时限到了之后才被投递出去。

运用

1. 发动顾客,等候行将接纳的订阅音讯

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.Mes奇热网sageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class ScheduledMessageConsumer { public static void main(String[] args) throws Exception { // Instantiate message consumer

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer"); // Subscribe topics

consumer.subscribe("TestTopic", "*"); // Register message listener

consumer.registerMessageListener(new MessageListenerConcurrently() { @Override

public ConsumeConcurrentlyStatus consumeMessage(List messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period

System.out.println("Receive message[msgId=" + message.getMsgId() + "] "

+ (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");

} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

}); // Launch consumer

consumer.start();

}

}

2. 发送延时音讯

import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rock微软市值etmq.common.message.Message; public class ScheduledMessageProducer { public static void main(String[] args) throws Exception { // Instantiate a producer to send scheduled messages

DefaultMQProducer producer = new Def全本小说aultMQProducer("ExampleProducerGroup"); // Launch producer

producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) {

Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // This message will be delivered to consumer 10 seconds later.

message.setDelayTimeLevel(3); // Send the message

producer.send(message);

} // Shutdown producer after use.

producer.shutdown();

}

}

3. 验证

你应该会在音讯被存储之后10秒钟看到它们被消费。

批量音讯示例

为什么要选用批量音讯?

批量发送音讯能够提高投递小内存音讯时的功能。

运用约束

同一批音讯有必要满意以下条件:相同的主题、相同的waitStoreMsgOK变量设置,并且都不支撑延时发送

别的,一个批量音讯的巨细最好不要大于1MiB。

怎么运用批量音讯

假如你一次发送的音讯总巨细不超越1MB,运用批量音讯就很简略

String topic = "BatchTest";List messages = new ArrayList<>();

messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));

messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));

messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));try {

producer.send(messages);

}假如爱 catch (Exception e) {

e.printStackTrace(); //handle the error}

切分后用List保存

只要在你发送大内存批量音讯并且不确定是否到达巨细约束(1MiB)的时分,才会变得杂乱。

这时分,你应该把它们切分,然后用List保存

public class ListSplitter implements Iterator> { private final int SIZE_LIMIT = 1000 * 1000; private final List messages; private int currIndex; public ListSplitter(List messages) { this.messages = messages;

} @Override public boolean hasNext() { return currIndex < messages.size();

} @Override public List next() { int nextIndex = currIndex; int totaGujeelSize = 0; for (; nextIndex < messages.size(); nextIndex++) {

Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length;

Map properties = message.getProperties(); for (Map.Entry entry : properties.entrySet()) {

tmpSize += entry.getKey().length() + entry.getValue().length();

}

tmpSize = tmpSize + 20; //for log overhead

if (tmpSize > SIZE_LIMIT) { //it is unexpected that single message exceeds the SIZE_LIMIT

//here just let it go, otherwise it will block the splitting process

if (nextIndex - currIndex == 0) { //if the next sublist has no element, add this one and then break, otherwise just break

nextIndex++;

} break;

} if (tmpSize + totalSize > SIZE_LIMIT) { break;

} else {

totalSize += tmpSize;

}

}

List subList = messages.subList(currIndex, nextIndex);

currIndex = nextIndex; return subList;

}

}//then you could split the large list into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) { try {

List listItem = splitter.next();

producer.send(listItem);

} catch (Exception e) {

e.printStackTrace(); //handle the error

}

}

音讯过滤器示例

在大多数情况下,标签是一种协助你挑选所需音讯的简略有用的规划。例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");

consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

该顾客会接纳含有TAGA、TAGB或TAGC标签的音讯。可是,由于有一条音讯只能打一个标签的约束,在杂乱的场景下可能会失效。在这种情况下,你能够用SQL表达式来过滤音讯。

规矩

依据你在发送音讯时设定的特点,SQL特功能够做相应的运算。遵从RocketMQ预设的语法,你能够完成一些风趣的逻辑。这里有一个比如:

将文字转为图片

语法

RocketMQ只预设了一些根本的语法来支撑这个特性。你也能够很容易地扩展它。

  1. Numeric comparison, like >, >=, <, <=, BETWEEN, =;
  2. Character comparison, like =, <>, IN;
  3. IS NULL or IS NOT NULL;
  4. Logical AND, OR, NOT;

以下是根本类型:

  1. Numeric, like 123, 3.1415;
  2. Character, like ‘abc’, must be made with single quotes;
  3. NULL, special constant;
  4. Boolean, TRUE or FALSE;

运用约束

只要push类型的顾客能够经过SQL92标准的句子来挑选音讯。接口如下:

public void subscribe(final String topic, final MessageSelector messageSelector)

生产者示例

在发送音讯时,你能够运用putUserProperty()办法为音讯设置特点。

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

producer.start();

Message msg = new Message("TopicTest",

tag,

("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)

);// Set some properties.msg.putUserProperty("a", String.valueOf(i));

SendResult sendResult = producer.send(msg);

producer.shutdown();

顾客示例

在消费音讯时,运用MessageSelector.bySql()办法和遵从SQL92标准的句子来挑选音讯

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");// only subsribe messages have property a, also a >=0 and a <= 3consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");

consumer.registerMessageListener(new MessageListenerConcurrently() { @Override

public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

consumer.start();

日志插件办理示例

RocketMQ日志插件办理供给了log4j插件、log4j2插件和爷们儿,rocketMq中文文档,mightlogback插件,都能够在事务中运用,以下是怎么设置的示例,

log4j

log4j装备文件如下.

log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender

log4j.appender.mq.Tag=yourTag

log4j.appender.mq.Topic=yourLogTopic

log4j.appender.mq.ProducerGroup=yourLogGroup

log4j.appender.mq.NameServerAddress=yourRocketmqNameserverAddress

log4j.appender.mq.layout=org.apache.log4j.PatternLayout

log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p

运用xml文件装备log4j时,如下装备并增加一个异步插件:

log4j2

log4j2装备如下。假如你期望躲藏,就装备一个异步插件作为引证。【译者注:有问题】

topic="yourLogTopic" tag="yourTag">

logback

运用logback,异步插件也是必需的。

yourTag

yourLogTopic

yourLogGroup

yourRocketmqNameserverAddress

%date %p %t - %m%n

1024

80

2000

true

OpenMessaging兼容示例

OpenMessaging力求树立职业准则和音讯分发、流核算范畴标准,为金融、电子商务、物联网和大数据范畴供给通用结构。规划准则包含,面向云、简略易用、灵敏度高、独立于编程言语,以及能在分布式异构环境中运用。这些标准的一致性将会使得开发一款跨一切干流渠道和操作体系的异构音讯分发运用成为可能。

RocketMQ供给了关于OpenMessaging 0.1.0-alpha版别的部分完成,以下比如展现了怎么依照OpenMessaging标准接入RocketMQ.

OMSProducer

下面这个比如展现了怎么向RocketMQ broker用同步、异步、单向发送的办法发送音讯。

public class OMSProducer { public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory

.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); final Producer producer = messagingAccessPoint.createProducer();

messagingAccessPoint.startup();

System.out.printf("MessagingAccessPoint startup OK%n");

producer.startup();

System.out.printf("Producer startup OK%n");

{

Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));

SendResult sendResult = producer.send(message);

System.out.printf("Send sync me博鳌ssage OK, msgId: %s%n", sendResult.messageId());

}

{ final Promise result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));

result.addListener(new PromiseListener() { @Override

public void operationCompleted(Promise promise) {

System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());

} @Override

public void operationFailed(Promise pr北京印刷学院omise) {

System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());

}

});

}

{

producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELpeachLO_BODY".getBytes(Charset.forName("UTF-8"))));

System.out.printf("Send oneway message OK%n");

}

producer.shutdown();

messagingAccessPoint.shutdown();

}

}

OMSPullConsumer

运用OMS PullConsumer从特定行列中拉取音讯。

public class OMSPullConsumer {

public static void main(String[] args) {

final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory

.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",

OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

messagingAccessPoint.startup();

System.out.printf("MessagingAccessPoint startup OK%n");

consumer.startup();

System.out.printf("Consumer startup OK%n");

Message message = consumer.poll(); if (message != null) {

String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);

System.out.printf("Received one message: %s%n", msgId);

consumer.ack(msgId);

}

consumer.shutdown();

messagingAccessPoint.shutdown(爷们儿,rocketMq中文文档,might);

}腿打开

}

OMSPushConsumer

把OMS PushConsumer和某一个特定的行列绑定在一起,经过MessageListener消费音讯。

public class OMSPushConsumer { public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory

.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); final PushConsumer consumer = messagingAccessPoint.

createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

messagingAccessPoint.startup();

System.out.printf("MessagingAccessPoint startup OK%n");

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override

public void run() {

consumer.shutdown();

messagingAccessPoint.shutdown();

}

}));

consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() { @Override

public void onMessage(final Message message, final Re修人世恶道ceivedMessageContext context) {

System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));

context.ack();

}

});

}

}