提交 3b032798 authored 作者: scc's avatar scc

add new interface

上级 4af475b1
package com.example.rocketmqdemo.consumer;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(consumerGroup = "${xc.rocketmq2.string-consumer-group}", topic = "${xc.rocketmq2.topic-string}")
public class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("消费字符串消息:"+message);
}
}
\ No newline at end of file
package com.example.rocketmqdemo.controller; package com.example.rocketmqdemo.controller;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -8,16 +22,76 @@ import org.springframework.web.bind.annotation.RequestMapping; ...@@ -8,16 +22,76 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@RestController @RestController
public class MqMessageController { public class MqMessageController {
@Autowired @Autowired
private RocketMQTemplate rocketMQTemplate; private RocketMQTemplate rocketMQTemplate;
@Autowired
private XcRocketMq2 xcRocketMq2;
@RequestMapping("/testMessage") @RequestMapping("/testMessage")
public String get(@RequestParam("id") int id) { public String testMessage(@RequestParam("id") int id) {
rocketMQTemplate.convertAndSend("scc","Hello" + id); rocketMQTemplate.convertAndSend("scc","Hello" + id);
return "success"; return "success";
} }
@RequestMapping("/sendSyncMessage")
public List<SendResult> sendSyncMessage(@RequestParam("id") int id, @RequestParam("count") int count) throws UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException, MQClientException {
List<SendResult> results = new ArrayList<>();
for (int i = 0; i < count; i++) {
SendResult result = rocketMQTemplate.syncSend(xcRocketMq2.getTopicString(), "Hello RocketMQ 同步发送消息"+i);
results.add(result);
}
return results;
}
@RequestMapping("/sendASyncMessage")
public List<String> sendASyncMessage(@RequestParam("id") int id, @RequestParam("count") int count) throws UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException, MQClientException {
List<String> results = new ArrayList<>();
// 根据消息数量实例化倒计时计算器
final CountDownLatch2 countDownLatch = new CountDownLatch2(count);
boolean flag;
for (int i = 0; i < count; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
// SendCallback接收异步返回结果的回调
rocketMQTemplate.asyncSend(xcRocketMq2.getTopicString(),"Hello RocketMQ 异步发送消息" + i, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
results.add(sendResult.toString());
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
results.add(e.getMessage());
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 等待5s
countDownLatch.await(5, TimeUnit.SECONDS);
return results;
}
@RequestMapping("/sendOneWay")
public String sendOneWay() throws Exception {
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
// 发送单向消息,没有任何返回结果
rocketMQTemplate.sendOneWay(xcRocketMq2.getTopicString(), "发送单向消息"+i);
}
return "单向发送消息成功";
}
} }
\ No newline at end of file
package com.example.rocketmqdemo.controller;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Data
@Component
public class XcRocketMq2 {
@Value("xc.rocketmq2.topic-string")
public String topicString;
@Value("xc.rocketmq2.topic-object")
public String topicObject;
@Value("xc.rocketmq2.string-consumer-group")
public String stringConsumerGroup;
@Value("xc.rocketmq2.object-consumer-group")
public String objectConsumerGroup;
}
rocketmq: rocketmq:
name-server: ${NAME_SERVER:rocket-nameserver-0.rocket-nameserver.pipeline-zjh:9876} name-server: ${NAME_SERVER:rocket-nameserver-0.rocket-nameserver.pipeline-zjh:9876}
producer: producer:
group: scc-group group: string_producer_group
xc:
topic: ${TOPIC_NAME:scc} rocketmq2:
\ No newline at end of file topic-string: topic_string
topic-object: topic_object
string-consumer-group: string_consumer_group
object-consumer-group: object_consumer_group
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论