最新下载
热门教程
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
Springboot整合RocketMQ收发消息代码示例
时间:2022-06-29 01:59:33 编辑:袖梨 来源:一聚教程网
本篇文章小编给大家分享一下Springboot整合RocketMQ收发消息代码示例,文章代码介绍的很详细,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。
创建springboot项目
pom.xml添加rocketmq-spring-boot-starter依赖。
org.apache.rocketmq rocketmq-spring-boot-starter 2.1.0
yml 配置
application.yml
rocketmq: name-server: 192.168.64.141:9876
application-demo1.yml
使用 demo1 profile 指定生产者组组名
rocketmq:
producer:
group: producer-demo1
application-demo2.yml
使用 demo2 profile 指定生产者组组名
rocketmq:
producer:
group: producer-demo2
测试
demo 1
发送普通消息
发送 Spring 的通用 Message 对象
发送异步消息
发送顺序消息
生产者
package cn.tedu.demo2.m1;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private RocketMQTemplate t ;
public void send(){
//发送同步消息
t.convertAndSend("Topic1:TagA", "Hello world! ");
//发送spring的Message
Message message = MessageBuilder.withPayload("Hello Spring message! ").build();
t.send("Topic1:TagA",message);
//发送异步消息
t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
});
//发送顺序消息
t.syncSendOrderly("Topic1", "98456237,创建", "98456237");
t.syncSendOrderly("Topic1", "98456237,支付", "98456237");
t.syncSendOrderly("Topic1", "98456237,完成", "98456237");
}
}
消费者
package cn.tedu.demo2.m1; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1") public class Consumer implements RocketMQListener{ @Override public void onMessage(String s) { System.out.println("收到"+s); } }
主类
package cn.tedu.demo2.m1;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
测试类
需要放在 test 文件夹
激活 demo1 profile @ActiveProfiles("demo1")
package cn.tedu.demo2.m1;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo1")
public class Test1 {
@Autowired
private Producer producer;
@Test
public void test1(){
producer.send();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
demo 2
发送事务消息
生产者
package cn.tedu.demo2.m2;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private RocketMQTemplate t;
public void send(){
Message message = MessageBuilder.withPayload("Hello world").build();
//一旦发送消息,则执行监听器
t.sendMessageInTransaction("Topic2",message,null);
}
@RocketMQTransactionListener
class Lis implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("执行本地事务");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("执行事务回查");
return RocketMQLocalTransactionState.COMMIT;
}
}
}
消费者
package cn.tedu.demo2.m2; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2") public class Consumer implements RocketMQListener{ @Override public void onMessage(String s) { System.out.println("收到"+s); } }
主类
package cn.tedu.demo2.m2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
测试类
package cn.tedu.demo2.m2;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo2")
public class Test2 {
@Autowired
private Producer producer;
@Test
public void test1(){
producer.send();
//为了能够收到消费者消费的数据,这里通过休眠模拟等待时间
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
相关文章
- dnf手游刃影技能怎么加点-刃影技能加点攻略 10-29
- 全明星街球派对拉梅洛鲍尔有哪些玩法技巧 10-29
- 辉烬克拉拉怎么配装 10-29
- 饥荒远古织影者如何打 10-29
- 蔚蓝档案海香技能怎么样 10-29
- 碧蓝航线莱姆号强度如何 10-29