提交 80392366 authored 作者: scc's avatar scc

test5

上级 43f1dc48
...@@ -9,7 +9,6 @@ import org.springframework.stereotype.Service; ...@@ -9,7 +9,6 @@ import org.springframework.stereotype.Service;
@RocketMQMessageListener(consumerGroup = "${xc.rocketmq2.string-consumer-group}", topic = "${xc.rocketmq2.topic-string}") @RocketMQMessageListener(consumerGroup = "${xc.rocketmq2.string-consumer-group}", topic = "${xc.rocketmq2.topic-string}")
public class StringConsumer implements RocketMQListener<String> { public class StringConsumer implements RocketMQListener<String> {
@Override @Override
public void onMessage(String message) { public void onMessage(String message) {
System.out.println("消费字符串消息:"+message); System.out.println("消费字符串消息:"+message);
......
...@@ -9,13 +9,19 @@ import org.apache.rocketmq.common.CountDownLatch2; ...@@ -9,13 +9,19 @@ import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@RestController @RestController
...@@ -85,4 +91,23 @@ public class MqMessageController { ...@@ -85,4 +91,23 @@ public class MqMessageController {
return "单向发送消息成功"; return "单向发送消息成功";
} }
@RequestMapping("/sendDelayMessage")
public String sendDelayMessage(@RequestParam("delay") Integer delay) throws Exception {
rocketMQTemplate.syncSend(xcRocketMq2.getTopicString(),MessageBuilder.withPayload("收到延迟"+delay+"秒消息").build(),3000,delay);
return "延迟消息发送成功";
}
@RequestMapping("/sendTransactionMessage")
public String sendTransactionMessage(@RequestParam("type") String type) throws Exception {
Map<String,String> header = new HashMap<>();
header.put("type", type);
Message message = new GenericMessage("事务消息", header);
rocketMQTemplate.sendMessageInTransaction(xcRocketMq2.getTopicString(), message,null);
return "事务消息发送成功";
}
} }
\ No newline at end of file
package com.example.rocketmqdemo.listener;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@RocketMQTransactionListener
public class MyRocketMQLocalTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
//执行本地事务
System.out.println("本地事务执行。。。。");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("本地事务执行完成。。。。");
MessageHeaders headers = message.getHeaders();
String type = headers.get("type").toString();
if(type.equalsIgnoreCase("commit")){
return RocketMQLocalTransactionState.COMMIT; //根据本地事务的提交完成情况返回不同的标志
}else if(type.equalsIgnoreCase("unknown"))
return RocketMQLocalTransactionState.UNKNOWN;
else{
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("回查操作");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return RocketMQLocalTransactionState.COMMIT; //根据本地事务的提交完成情况返回不同的标志
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论