一:普通消息
RocketMQ提供了三种方式来发送普通消息:可靠同步发送、可靠异步发送、单向发送。
1.1 可靠同步发送
同步发送是指发送消息后必须等待RocketMQ服务返回发送的结果,这里会一直同步阻塞,直到拿到RocketMQ服务返回发送的结果才继续往下执行代码。同步发送一般应用于对发送成功可靠性要求很严格的场景。
// 同步发送会有发送结果返回值
// rocketMQTemplate 使用冒号将topic和tag分割
SendResult sendResult = rocketMQTemplate.syncSend("test-topic:test-tag", "test msg");
1.2 可靠异步发送
异步发送是指发送消息后不需要等待RocketMQ服务器返回的发送结果,而是直接执行后面的逻辑。发送方通过设置回调接口来接收RocketMQ服务器异步返回的发送结果,并根据具体的发送结果进行相应处理。
rocketMQTemplate.asyncSend("test-topic:test-tag", "test message", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 成功回调
System.out.println("回调结果: " + sendResult);
}
@Override
public void onException(Throwable throwable) {
// 异常回调
System.out.println(throwable);
}
});
System.out.println("----------");
先打印 ------ 后打印回调函数。
1.3 单向发送
单向发送只负责发送消息,不等待RocketMQ服务器返回的发送结果,也不提供回调函数来接收RocketMQ服务器的响应结果,只负责发送至于发送成功还是发送失败我不关心。单向发送通常用于对可靠性要求不高的场景。
rocketMQTemplate.sendOneWay("test-topic:test-tag", "test msg");
注意:普通消息(以上三种)不保证发送消息的顺序和消费者消费的顺序一致性。
二:顺序消息
一个topic默认有4个消息队列(Message Queue),生产者在发送同一个topic的多个消息会被存储到不同的消息队列中,默认是按照轮询的模式来依次存储到每个队列中的,而消费消息首先从哪个队列中获取消息是不确定的,这导致发送的顺序和消费的消息很可能会不一致。
如果想要保证发送消息的顺序和消费消息的顺序要一致,只需要保证将消息都发送到同一个消息队列上即可,而不是轮询的放到每个队列上。顺序消息只需要在原来的发送方法后面增加Orderly后缀(syncSendOrderly、asyncSendOrderly、sendOneWayOrderly),并在最后传入一个值Hash Key,RocketMQ会根据这个值计算要发送到哪一个队列上。
String orderId = order.getId().toString();
for (int i = 0; i < 10; i++) {
// Hash Key 一般是一组顺序消息的对应的一个唯一值,如 张三创建订单、张三订单付款、张三订单完成,这里hash key就用张三对应的订单id或者订单号
rocketMQTemplate.sendOneWayOrderly("test-topic", order, orderId);
}
10个消息都发送到queueId=3的队列上了。
消费消息时需要指定消费者顺序(单线程)获取消息。
@Component
@RocketMQMessageListener(consumerGroup = "testConsumerGroup", topic = "test-topic", consumeMode = ConsumeMode.ORDERLY)
public class TestTopicConsumer1Listener implements RocketMQListener
@Override
public void onMessage(String message) {
System.out.println("TestTopicConsumerListener 消费消息:" + message);
}
}
顺序消息:保证同一组内的消息按顺序消费。
三:延迟消息
延迟消息是指当生产者发送消息到Broker中不会被消费者立即消费,而是延迟指定的时间消费,注意RocketMQ不支持自定义延迟任意时间,只提供了18个固定时间级别供选择,级别从1开始到18,1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m , 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h。注意由于网络延迟,所以延迟消息消费也会大于指定的延迟时间。延迟消息在发送时只需要指定delayLevel参数即可。
public SendResult syncSend(String destination, Message> message, long timeout, int delayLevel);
GenericMessage message = new GenericMessage(order.toString());
rocketMQTemplate.syncSend("test-topic", message, 3000, 3);
四:批量消息
使用Template发送批量消息时,发送的消息必须是org.springframework.messaging.Message的子类。批量发消息对消息的内容长度有限制,最大为4M,如果超过4M只能分为多批次发送。
// 方式一
List
list.add(new GenericMessage("1"));
list.add(new GenericMessage("2"));
rocketMQTemplate.syncSend("test-topic", list);
// 方式二:使用生产者批量发送,Message是org.apache.rocketmq.common.message.Message
rocketMQTemplate.getProducer.send(Collection
五:消费消息模式
消息模式有两种,默认是负载均衡模式:
负载均衡(MessageModel.CLUSTERING):多个消费者共同瓜分所有消息,一个消息只能被一个消费者消费掉。广播模式(MessageModel.BROADCASTING):每个消费者都会消费同样的消息。一个消息会被所有消费者消费掉。
消费模式使用messageModel参数来指定,注意:如果要测试多个消费者,不能写多个监听器,否则SpringBoot启动失败,只能写一个监听器,然后分别使用不同的端口分别启动Spring Boot。
@Component
@RocketMQMessageListener(consumerGroup = "testConsumerGroup", topic = "test-topic", messageModel = MessageModel.BROADCASTING)
public class TestTopicConsumer1Listener implements RocketMQListener
@Override
public void onMessage(String message) {
System.out.println("TestTopicConsumerListener 消费消息:" + message);
}
}
六:过滤消息
消费者过滤消息可以通过两种方式:
Tag,生产者发送消息时可以指定Tag,消费者消费消息可以指定Tag过滤,也可以使用通配符*表示所有Tag,也可以使用||或来表示多个Tag,使用Template发送带Tag的消息是使用冒号分隔跟在主题后面。SQL 基本语法
6.1 Tag过滤
selectorType默认是Tag,selectorExpression默认是*,可以通过或符号||来选择消费多个Tag。
@Component
@RocketMQMessageListener(
consumerGroup = "testConsumerGroup",
topic = "test-topic",
selectorType = SelectorType.TAG,
selectorExpression = "test-tag || dev-tag")
public class TestTopicConsumerListener implements RocketMQListener
@Override
public void onMessage(MessageExt messageExt) {
System.out.println(messageExt);
System.out.println("body:" + new String(messageExt.getBody()));
}
}
6.2 SQL92过滤
在发送消息的时候RocketMQ运行携带一个Map,可以通过Map中的每个值作为条件来过滤消息。SQL92过滤可以使用SQL中常用的Where条件来过滤要消费的消息,支持如下常用的SQL条件:
AND, OR
>, >=, <, <=, =
BETWEEN A AND B, equals to >=A AND <=B