最新下载
热门教程
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
Springboot整合RocketMQ收发消息代码示例
时间:2022-06-29 01:59:33 编辑:袖梨 来源:一聚教程网
本篇文章小编给大家分享一下Springboot整合RocketMQ收发消息代码示例,文章代码介绍的很详细,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。
创建springboot项目
pom.xml添加rocketmq-spring-boot-starter依赖。
org.apache.rocketmq rocketmq-spring-boot-starter 2.1.0
yml 配置
application.yml
rocketmq: name-server: 192.168.64.141:9876
application-demo1.yml
使用 demo1 profile 指定生产者组组名
rocketmq: producer: group: producer-demo1
application-demo2.yml
使用 demo2 profile 指定生产者组组名
rocketmq: producer: group: producer-demo2
测试
demo 1
发送普通消息
发送 Spring 的通用 Message 对象
发送异步消息
发送顺序消息
生产者
package cn.tedu.demo2.m1; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Component public class Producer { @Autowired private RocketMQTemplate t ; public void send(){ //发送同步消息 t.convertAndSend("Topic1:TagA", "Hello world! "); //发送spring的Message Messagemessage = MessageBuilder.withPayload("Hello Spring message! ").build(); t.send("Topic1:TagA",message); //发送异步消息 t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("发送成功"); } @Override public void onException(Throwable throwable) { System.out.println("发送失败"); } }); //发送顺序消息 t.syncSendOrderly("Topic1", "98456237,创建", "98456237"); t.syncSendOrderly("Topic1", "98456237,支付", "98456237"); t.syncSendOrderly("Topic1", "98456237,完成", "98456237"); } }
消费者
package cn.tedu.demo2.m1; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1") public class Consumer implements RocketMQListener{ @Override public void onMessage(String s) { System.out.println("收到"+s); } }
主类
package cn.tedu.demo2.m1; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); } }
测试类
需要放在 test 文件夹
激活 demo1 profile @ActiveProfiles("demo1")
package cn.tedu.demo2.m1; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; @SpringBootTest @ActiveProfiles("demo1") public class Test1 { @Autowired private Producer producer; @Test public void test1(){ producer.send(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }
demo 2
发送事务消息
生产者
package cn.tedu.demo2.m2; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Component public class Producer { @Autowired private RocketMQTemplate t; public void send(){ Messagemessage = MessageBuilder.withPayload("Hello world").build(); //一旦发送消息,则执行监听器 t.sendMessageInTransaction("Topic2",message,null); } @RocketMQTransactionListener class Lis implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("执行本地事务"); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { System.out.println("执行事务回查"); return RocketMQLocalTransactionState.COMMIT; } } }
消费者
package cn.tedu.demo2.m2; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2") public class Consumer implements RocketMQListener{ @Override public void onMessage(String s) { System.out.println("收到"+s); } }
主类
package cn.tedu.demo2.m2; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); } }
测试类
package cn.tedu.demo2.m2; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; @SpringBootTest @ActiveProfiles("demo2") public class Test2 { @Autowired private Producer producer; @Test public void test1(){ producer.send(); //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间 try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } } }
相关文章
- 《无限暖暖》天星之羽获得位置介绍 12-20
- 《流放之路2》重铸台解锁方法介绍 12-20
- 《无限暖暖》瞄准那个亮亮的成就怎么做 12-20
- 《无限暖暖》魔气怪终结者完成方法 12-20
- 《无限暖暖》曙光毛团获得位置介绍 12-20
- 《无限暖暖》日光果获得位置介绍 12-20