Skip to Content
接入指南收发普通消息

收发普通消息

发送普通消息

普通消息支持同步发送与异步发送两种发送方式,用户可根据业务场景选择。

同步发送

同步发送是指客户端发送一条消息到服务端,等待服务端响应之后再发送下一条消息。

import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.remoting.RPCHook; public class Producer { // 实例接入使用公私钥,可在实例令牌管理页面获取 private static final String ACCESS_KEY = "xxx"; private static final String SECRET_KEY = "xxx"; static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY)); } public static void main(String[] args) throws MQClientException, InterruptedException { // "ProducerGroupName"为生产组,用户可使用控制台创建的生产Group或者自定义 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook()); // 实例接入地址,可在实例列表页获取 producer.setNamesrvAddr("1.1.1.1:9876"); producer.start(); for (int i = 0; i < 128; i++) try { { Message msg = new Message("Topic_Name", "Message_Tag", "Message_Key", "Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }

异步发送

异步发送是指客户端发送一条消息之后,不等待服务端返回就发送下一条消息。

import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.remoting.RPCHook; public class Producer { // 实例接入使用公私钥,可在实例令牌管理页面获取 private static final String ACCESS_KEY = "xxx"; private static final String SECRET_KEY = "xxx"; static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY)); } public static void main(String[] args) throws MQClientException, InterruptedException { // "ProducerGroupName"为生产组,用户可使用控制台创建的生产Group或者自定义 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook()); // 实例接入地址,可在实例列表页获取 producer.setNamesrvAddr("1.1.1.1:9876"); producer.start(); for (int i = 0; i < 128; i++) try { { Message msg = new Message("Topic_Name", "Message_Tag", "Message_Key", "Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult result) { // 发送成功 System.out.println("send message success"); } @Override public void onException(Throwable throwable) { // 发送失败,可重新发送或者存储该条消息进行补偿 System.out.println("send message failed."); throwable.printStackTrace(); } }); } } catch (Exception e) { e.printStackTrace(); } try { // 延迟2秒停止producer,测试需要,实际可去掉该逻辑 Thread.sleep(2000); producer.shutdown(); } catch (Exception e) { e.printStackTrace(); } } }

订阅普通消息

import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.remoting.RPCHook; public class PushConsumer { // 实例接入使用公私钥,可在实例令牌管理页面获取 private static final String ACCESS_KEY = "xxx"; private static final String SECRET_KEY = "xxx"; static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY)); } public static void main(String[] args) throws InterruptedException, MQClientException { // "GROUP_NAME"为消费组,可在实例Group管理页面获取 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_NAME", getAclRPCHook(), new AllocateMessageQueueAveragely()); // 实例接入地址,可在实例列表页获取 consumer.setNamesrvAddr("1.1.1.1:8100"); consumer.subscribe("Topic_Name", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }