自定义rocketmq-spring-boot-starter
2022-09-26 23:18:45 841
一般使用rocketmq-starter都会进行不同程度的封装, 在此分享一种封装思路
基于rocketmq-spring-boot-starter
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
自定义Producer
基础消息体定义
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.time.LocalDateTime;
@Data
public class BaseMsgBody implements Serializable {
@Serial
private static final long serialVersionUID = -7156796935958490859L;
private LocalDateTime sendTime;
private String topic;
public BaseMsgBody() {
sendTime = LocalDateTime.now();
}
}
业务消息体
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@Data
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class UserViewGoodsTagMsgBody extends BaseMsgBody {
private Long userId;
private Long goodsId;
}
Producer封装
import cn.hutool.core.util.StrUtil;
import com.content.system.service.common.exception.mq.MqFailSendException;
import com.content.system.service.common.mq.body.BaseMsgBody;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.core.AbstractMessageSendingTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.MimeTypeUtils;
import java.util.Map;
import java.util.Objects;
@Component
@Slf4j
public class RocketProducer {
private final MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
@Autowired
private RocketMQTemplate template;
@Autowired
private DefaultMQProducer producer;
public SendResult syncSend(BaseMsgBody body) {
return syncSend(body, producer.getSendMsgTimeout(), 0, null);
}
/**
* Delay Msg
*
* @param body body
* @param delayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h num: 18
* @return SendResult
*/
public SendResult syncSendDelay(BaseMsgBody body, int delayLevel) {
return syncSend(body, producer.getSendMsgTimeout(), delayLevel, null);
}
public SendResult syncSendOrderly(BaseMsgBody body, String hashKey) {
return syncSend(body, producer.getSendMsgTimeout(), 0, hashKey);
}
/**
* syncSend
*
* @param body 消息内容
* @param timeout 发送超时时间
* @param delayLevel 延时等级
* @param hashKey 顺序消息hash
* @return SendResult
*/
public SendResult syncSend(BaseMsgBody body, long timeout, int delayLevel, String hashKey) {
if (Objects.isNull(body)) {
log.error("syncSend failed. body is null.");
throw new IllegalArgumentException("`body` cannot be null");
}
if (Objects.isNull(body.getTopic())) {
log.error("syncSend failed. topic: {} is null, body: {}", body.getTopic(), body);
throw new IllegalArgumentException("`topic` cannot be null");
}
try {
long now = System.currentTimeMillis();
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(body);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
SendResult result = Objects.isNull(hashKey) ? producer.send(rocketMsg, messageQueueSelector, timeout)
: producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
long costTime = System.currentTimeMillis() - now;
if (log.isDebugEnabled()) {
log.debug("send message cost: {} ms, msgId:{}", costTime, result.getMsgId());
}
log.debug("send result: {}", SendResult.encoderSendResultToJson(result));
if (result.getSendStatus() != SendStatus.SEND_OK) {
throw MqFailSendException.build(result);
}
return result;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("syncSend failed. destination:{}, message:{} ", body.getTopic(), body);
throw new MessagingException(ie.getMessage(), ie);
} catch (Exception e) {
log.error("syncSend failed. destination:{}, message:{} ", body.getTopic(), body);
throw new MessagingException(e.getMessage(), e);
}
}
private org.apache.rocketmq.common.message.Message createRocketMqMessage(BaseMsgBody body) {
Message<BaseMsgBody> message = MessageBuilder.withPayload(body).build();
Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders());
return RocketMQUtil.convertToRocketMessage(template.getMessageConverter(), "UTF-8", body.getTopic(), msg);
}
protected Message<Object> doConvert(Object payload, Map<String, Object> headers) {
MessageHeaders messageHeaders = null;
Object conversionHint = (headers != null ?
headers.get(AbstractMessageSendingTemplate.CONVERSION_HINT_HEADER) : null);
if (headers != null) {
if (headers instanceof MessageHeaders msgHeaders) {
messageHeaders = msgHeaders;
} else {
messageHeaders = new MessageHeaders(headers);
}
}
MessageConverter converter = template.getMessageConverter();
Message<?> message = (converter instanceof SmartMessageConverter smartMessageConverter ?
smartMessageConverter.toMessage(payload, messageHeaders, conversionHint) :
converter.toMessage(payload, messageHeaders));
if (message == null) {
String payloadType = payload.getClass().getName();
Object contentType = (messageHeaders != null ? messageHeaders.get(MessageHeaders.CONTENT_TYPE) : null);
throw new MessageConversionException(StrUtil.format("""
Unable to convert payload with type='{}', contentType='{}', converter=[{}]
""", payloadType, contentType, converter));
}
@SuppressWarnings("unchecked")
MessageBuilder<Object> builder = (MessageBuilder<Object>) MessageBuilder.fromMessage(message);
builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
return builder.build();
}
}
import org.apache.rocketmq.client.producer.SendResult;
public class MqFailSendException extends RuntimeException {
private MqFailSendException(SendResult result) {
super(SendResult.encoderSendResultToJson(result));
}
public static MqFailSendException build(SendResult result) {
return new MqFailSendException(result);
}
}
队列枚举
public enum TopicEnum {
USER_VIEW_GOODS_TAG_TOPIC("USER_VIEW_GOODS_TAG_TOPIC", "user_view_goods_tag_group",
UserViewGoodsTagMsgBody::new);
public final String topic;
public final String group;
final Supplier<? extends BaseMsgBody> supplier;
TopicEnum(String topic, String group, Supplier<? extends BaseMsgBody> supplier) {
this.topic = topic;
this.group = group;
this.supplier = supplier;
}
@SuppressWarnings("unchecked")
public <T extends BaseMsgBody> T newInstance() {
var body = (T) supplier.get();
body.setTopic(topic);
return body;
}
}
发送消息
UserViewGoodsTagMsgBody body = TopicEnum.USER_VIEW_GOODS_TAG_TOPIC.newInstance();
body.setUserId(userId);
body.setGoodsId(goodsId);
rocketProducer.syncSendDelay(body, 1);
自定义Consumer
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMessageListener {
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
/**
* Topic name & consumerGroup
* <p>
* Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
* load balance. It's required and needs to be globally unique.
* <p>
* <p>
* See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
*/
TopicEnum topicGroup();
/**
* Control how to selector message.
*
* @see SelectorType
*/
SelectorType selectorType() default SelectorType.TAG;
/**
* Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
*/
String selector default "*";
/**
* Control consume mode, you can choice receive message concurrently or orderly.
*/
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
/**
* Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
*/
MessageModel messageModel() default MessageModel.CLUSTERING;
/**
* Max consumer thread number.
*
* @see <a href="https://github.com/apache/rocketmq-spring/issues/429">issues#429</a>
* @deprecated This property is not work well, because the consumer thread pool executor use
* {@link LinkedBlockingQueue} with default capacity bound (Integer.MAX_VALUE), use
* {@link RocketMQMessageListener#consumeThreadNumber} .
*/
@Deprecated
int consumeThreadMax() default 64;
/**
* consumer thread number.
*/
int consumeThreadNumber() default 20;
/**
* Max re-consume times.
* <p>
* In concurrently mode, -1 means 16;
* In orderly mode, -1 means Integer.MAX_VALUE.
*/
int maxReconsumeTimes() default -1;
/**
* Maximum amount of time in minutes a message may block the consuming thread.
*/
long consumeTimeout() default 15L;
/**
* Timeout for sending reply messages.
*/
int replyTimeout() default 3000;
/**
* The property of "access-key".
*/
String accessKey() default ACCESS_KEY_PLACEHOLDER;
/**
* The property of "secret-key".
*/
String secretKey() default SECRET_KEY_PLACEHOLDER;
/**
* Switch flag instance for message trace.
*/
boolean enableMsgTrace() default false;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
/**
* The property of "name-server".
*/
String nameServer() default NAME_SERVER_PLACEHOLDER;
/**
* The property of "access-channel".
*/
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
/**
* The property of "tlsEnable" default false.
*/
String tlsEnable() default "false";
/**
* The namespace of consumer.
*/
String namespace() default "";
/**
* Message consume retry strategy in concurrently mode.
* <p>
* -1,no retry,put into DLQ directly
* 0,broker control retry frequency
* >0,client control retry frequency
*/
int delayLevelWhenNextConsume() default 0;
/**
* The interval of suspending the pull in orderly mode, in milliseconds.
* <p>
* The minimum value is 10 and the maximum is 30000.
*/
int suspendCurrentQueueTimeMillis() default 1000;
/**
* Maximum time to await message consuming when shutdown consumer, in milliseconds.
* The minimum value is 0
*/
int awaitTerminationMillisWhenShutdown() default 1000;
}
public interface RocketListener<S, T, C> {
/**
* onMessage
*
* @param message message
* @param ctx context
* @return S
*/
S onMessage(T message, C ctx);
}
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
public interface RocketConcurrentlyListener<T>
extends RocketListener<ConsumeConcurrentlyStatus, T, ConsumeConcurrentlyContext> {
}
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
public interface RocketOrderlyListener<T> extends RocketListener<ConsumeOrderlyStatus, T, ConsumeOrderlyContext> {
}
import lombok.Data;
@Data
public class ListenerReply<S, R> {
private S status;
private R reply;
public ListenerReply() {
}
public ListenerReply(S status, R reply) {
this.reply = reply;
this.status = status;
}
public static <S, R> ListenerReply<S, R> build(S status, R reply) {
return new ListenerReply<>(status, reply);
}
}
public interface RocketReplyListener<T, C, S, R> {
/**
* onMessage
*
* @param message data received by the listener
* @param ctx context
* @return data replying to producer
*/
ListenerReply<S, R> onMessage(T message, C ctx);
}
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
public interface RocketConcurrentlyReplyListener<T, R>
extends RocketReplyListener<T, ConsumeConcurrentlyContext, ConsumeConcurrentlyStatus, R> {
}
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
public interface RocketOrderlyReplyListener<T, R>
extends RocketReplyListener<T, ConsumeOrderlyContext, ConsumeOrderlyStatus, R> {
}
主要配置类
import com.content.system.service.common.mq.ListenerReply;
import com.content.system.service.common.mq.listener.RocketConcurrentlyListener;
import com.content.system.service.common.mq.listener.RocketConcurrentlyReplyListener;
import com.content.system.service.common.mq.listener.RocketListener;
import com.content.system.service.common.mq.listener.RocketOrderlyListener;
import com.content.system.service.common.mq.listener.RocketOrderlyReplyListener;
import com.content.system.service.common.mq.listener.RocketReplyListener;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.apache.rocketmq.spring.support.RocketMQListenerContainer;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Objects;
/**
* @author rocket-mq
*/
@SuppressWarnings("WeakerAccess")
public class RocketListenerContainer implements InitializingBean,
RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
private final static Logger log = LoggerFactory.getLogger(RocketListenerContainer.class);
private ApplicationContext applicationContext;
/**
* The name of the DefaultRocketMQListenerContainer instance
*/
private String name;
/**
* Suspending pulling time in orderly mode.
* <p>
* The minimum value is 10 and the maximum is 30000.
*/
private long suspendCurrentQueueTimeMillis = 1000;
/**
* Message consume retry strategy in concurrently mode.
* <p>
* -1,no retry,put into DLQ directly
* 0,broker control retry frequency
* >0,client control retry frequency
*/
private int delayLevelWhenNextConsume = 0;
private String nameServer;
private AccessChannel accessChannel = AccessChannel.LOCAL;
private String consumerGroup;
private String topic;
private int consumeThreadMax = 64;
private int consumeThreadNumber = 20;
private String charset = "UTF-8";
private MessageConverter messageConverter;
private RocketListener rocketListener;
private RocketReplyListener rocketReplyListener;
private RocketMessageListener rocketMessageListener;
private DefaultMQPushConsumer consumer;
private Type messageType;
private MethodParameter methodParameter;
private boolean running;
// The following properties came from @RocketMQMessageListener.
private ConsumeMode consumeMode;
private SelectorType selectorType;
private String selectorExpression;
private MessageModel messageModel;
private long consumeTimeout;
private int maxReconsumeTimes;
private int replyTimeout;
private String tlsEnable;
private String namespace;
private long awaitTerminationMillisWhenShutdown;
public long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
}
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
}
public int getDelayLevelWhenNextConsume() {
return delayLevelWhenNextConsume;
}
public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
}
public String getNameServer() {
return nameServer;
}
public void setNameServer(String nameServer) {
this.nameServer = nameServer;
}
public AccessChannel getAccessChannel() {
return accessChannel;
}
public void setAccessChannel(AccessChannel accessChannel) {
this.accessChannel = accessChannel;
}
public String getConsumerGroup() {
return consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getConsumeThreadMax() {
return consumeThreadMax;
}
public int getConsumeThreadNumber() {
return consumeThreadNumber;
}
public String getCharset() {
return charset;
}
public void setCharset(String charset) {
this.charset = charset;
}
public MessageConverter getMessageConverter() {
return messageConverter;
}
public RocketListenerContainer setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
return this;
}
public RocketListener getRocketListener() {
return rocketListener;
}
public void setRocketListener(RocketListener rocketListener) {
this.rocketListener = rocketListener;
}
public RocketReplyListener getRocketReplyListener() {
return rocketReplyListener;
}
public void setRocketReplyListener(RocketReplyListener rocketReplyListener) {
this.rocketReplyListener = rocketReplyListener;
}
public RocketMessageListener getRocketMessageListener() {
return rocketMessageListener;
}
public void setRocketMessageListener(RocketMessageListener anno) {
this.rocketMessageListener = anno;
this.consumeMode = anno.consumeMode();
this.consumeThreadMax = anno.consumeThreadNumber();
this.consumeThreadNumber = anno.consumeThreadNumber();
this.messageModel = anno.messageModel();
this.selectorType = anno.selectorType();
this.selectorExpression = anno.selector;
this.consumeTimeout = anno.consumeTimeout();
this.maxReconsumeTimes = anno.maxReconsumeTimes();
this.replyTimeout = anno.replyTimeout();
this.tlsEnable = anno.tlsEnable();
this.namespace = anno.namespace();
this.delayLevelWhenNextConsume = anno.delayLevelWhenNextConsume();
this.suspendCurrentQueueTimeMillis = anno.suspendCurrentQueueTimeMillis();
this.awaitTerminationMillisWhenShutdown = Math.max(0, anno.awaitTerminationMillisWhenShutdown());
}
public ConsumeMode getConsumeMode() {
return consumeMode;
}
public SelectorType getSelectorType() {
return selectorType;
}
public void setSelector {
this.selectorExpression = selectorExpression;
}
public String getSelector {
return selectorExpression;
}
public MessageModel getMessageModel() {
return messageModel;
}
public String getTlsEnable() {
return tlsEnable;
}
public void setTlsEnable(String tlsEnable) {
this.tlsEnable = tlsEnable;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public DefaultMQPushConsumer getConsumer() {
return consumer;
}
public void setConsumer(DefaultMQPushConsumer consumer) {
this.consumer = consumer;
}
public long getAwaitTerminationMillisWhenShutdown() {
return awaitTerminationMillisWhenShutdown;
}
public RocketListenerContainer setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhenShutdown) {
this.awaitTerminationMillisWhenShutdown = awaitTerminationMillisWhenShutdown;
return this;
}
@Override
public void destroy() {
this.setRunning(false);
if (Objects.nonNull(consumer)) {
consumer.shutdown();
}
log.info("container destroyed, {}", this.toString());
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
@Override
public void start() {
if (this.isRunning()) {
throw new IllegalStateException("container already running. " + this.toString());
}
try {
consumer.start();
} catch (MQClientException e) {
throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
}
this.setRunning(true);
log.info("running container: {}", this.toString());
}
@Override
public void stop() {
if (this.isRunning()) {
if (Objects.nonNull(consumer)) {
consumer.shutdown();
}
setRunning(false);
}
}
@Override
public boolean isRunning() {
return running;
}
private void setRunning(boolean running) {
this.running = running;
}
@Override
public int getPhase() {
// Returning Integer.MAX_VALUE only suggests that
// we will be the first bean to shutdown and last bean to start
return Integer.MAX_VALUE;
}
@Override
public void afterPropertiesSet() throws Exception {
initRocketMQPushConsumer();
this.messageType = getMessageType();
this.methodParameter = getMethodParameter();
log.debug("RocketMQ messageType: {}", messageType);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public String toString() {
return "DefaultRocketListenerContainer{" +
"consumerGroup='" + consumerGroup + '\'' +
", namespace='" + namespace + '\'' +
", nameServer='" + nameServer + '\'' +
", topic='" + topic + '\'' +
", consumeMode=" + consumeMode +
", selectorType=" + selectorType +
", selectorExpression='" + selectorExpression + '\'' +
", messageModel=" + messageModel + '\'' +
", tlsEnable=" + tlsEnable +
'}';
}
public void setName(String name) {
this.name = name;
}
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgList) {
log.debug("received msg: {}", messageExt);
log.info("received msg id: {}", messageExt.getMsgId());
try {
long now = System.currentTimeMillis();
ConsumeConcurrentlyStatus status = handleConcurrentlyMessage(messageExt, context);
if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS != status) {
log.warn("return status: {}, consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}",
status, messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes());
return status;
}
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(),
messageExt.getTopic(), messageExt.getReconsumeTimes(), ie);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} catch (Exception e) {
log.error("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(),
messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> extList, ConsumeOrderlyContext context) {
for (MessageExt messageExt : extList) {
log.debug("received msg: {}", messageExt);
log.info("received msg id: {}", messageExt.getMsgId());
try {
long now = System.currentTimeMillis();
ConsumeOrderlyStatus status = handleOrderlyMessage(messageExt, context);
if (ConsumeOrderlyStatus.SUCCESS != status) {
log.warn("return status: {}, consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}",
status, messageExt.getMsgId(),
messageExt.getTopic(), messageExt.getReconsumeTimes());
return status;
}
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(),
messageExt.getTopic(), messageExt.getReconsumeTimes(), ie);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
} catch (Exception e) {
log.error("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(),
messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
@SuppressWarnings("unchecked")
private ConsumeConcurrentlyStatus handleConcurrentlyMessage(MessageExt messageExt, ConsumeConcurrentlyContext context)
throws MQClientException, RemotingException, InterruptedException {
ConsumeConcurrentlyStatus status = null;
if (rocketListener != null) {
status = (ConsumeConcurrentlyStatus) rocketListener.onMessage(doConvertMessage(messageExt), context);
} else if (rocketReplyListener != null) {
ListenerReply<ConsumeConcurrentlyStatus, ?> replyContent =
rocketReplyListener.onMessage(doConvertMessage(messageExt), context);
status = replyContent.getStatus();
sendReply(replyContent.getReply(), messageExt);
}
return status;
}
@SuppressWarnings("unchecked")
private ConsumeOrderlyStatus handleOrderlyMessage(MessageExt messageExt, ConsumeOrderlyContext context)
throws MQClientException, RemotingException, InterruptedException {
ConsumeOrderlyStatus status = null;
if (rocketListener != null) {
status = (ConsumeOrderlyStatus)
rocketListener.onMessage(doConvertMessage(messageExt), context);
} else if (rocketReplyListener != null) {
ListenerReply<ConsumeOrderlyStatus, ?> replyContent =
rocketReplyListener.onMessage(doConvertMessage(messageExt), context);
status = replyContent.getStatus();
sendReply(replyContent.getReply(), messageExt);
}
return status;
}
private void sendReply(Object reply, MessageExt messageExt) throws MQClientException, RemotingException,
InterruptedException {
Message<?> message = MessageBuilder.withPayload(reply).build();
org.apache.rocketmq.common.message.Message replyMessage =
MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
producer.setSendMsgTimeout(replyTimeout);
producer.send(replyMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
} else {
log.debug("Consumer replies message success.");
}
}
@Override
public void onException(Throwable e) {
log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
}
});
}
private byte[] convertToBytes(Message<?> message) {
Message<?> messageWithSerializedPayload = doConvert(message.getPayload(), message.getHeaders());
Object payloadObj = messageWithSerializedPayload.getPayload();
byte[] payloads;
try {
if (null == payloadObj) {
throw new RuntimeException("the message cannot be empty");
}
if (payloadObj instanceof String) {
payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
} else if (payloadObj instanceof byte[]) {
payloads = (byte[]) messageWithSerializedPayload.getPayload();
} else {
String jsonObj = (String) this.messageConverter.fromMessage(messageWithSerializedPayload, payloadObj.getClass());
if (null == jsonObj) {
throw new RuntimeException(String.format(
"empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
this.messageConverter.getClass(), payloadObj.getClass(), payloadObj));
}
payloads = jsonObj.getBytes(Charset.forName(charset));
}
} catch (Exception e) {
throw new RuntimeException("convert to bytes failed.", e);
}
return payloads;
}
private Message<?> doConvert(Object payload, MessageHeaders headers) {
Message<?> message = this.messageConverter instanceof SmartMessageConverter ?
((SmartMessageConverter) this.messageConverter).toMessage(payload, headers, null) :
this.messageConverter.toMessage(payload, headers);
if (message == null) {
String payloadType = payload.getClass().getName();
Object contentType = headers != null ? headers.get(MessageHeaders.CONTENT_TYPE) : null;
throw new MessageConversionException("Unable to convert payload with type='" + payloadType +
"', contentType='" + contentType + "', converter=[" + this.messageConverter + "]");
}
MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
return builder.build();
}
@SuppressWarnings("unchecked")
private Object doConvertMessage(MessageExt messageExt) {
if (Objects.equals(messageType, MessageExt.class) || Objects.equals(messageType, org.apache.rocketmq.common.message.Message.class)) {
return messageExt;
} else {
String str = new String(messageExt.getBody(), Charset.forName(charset));
if (Objects.equals(messageType, String.class)) {
return str;
} else {
// If msgType not string, use objectMapper change it.
try {
if (messageType instanceof Class) {
//if the messageType has not Generic Parameter
return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) messageType);
} else {
//if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
//we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) messageType).getRawType(), methodParameter);
}
} catch (Exception e) {
log.info("convert failed. str:{}, msgType:{}", str, messageType);
throw new RuntimeException("cannot convert message to " + messageType, e);
}
}
}
}
private MethodParameter getMethodParameter() {
Class<?> targetClass;
Class<?> secondParamClass;
if (rocketListener != null) {
targetClass = AopProxyUtils.ultimateTargetClass(rocketListener);
if (RocketConcurrentlyListener.class.isAssignableFrom(targetClass)) {
secondParamClass = ConsumeConcurrentlyContext.class;
} else if (RocketOrderlyListener.class.isAssignableFrom(targetClass)) {
secondParamClass = ConsumeOrderlyContext.class;
} else {
throw new RuntimeException("other listener is not supported");
}
} else {
targetClass = AopProxyUtils.ultimateTargetClass(rocketReplyListener);
if (RocketConcurrentlyReplyListener.class.isAssignableFrom(targetClass)) {
secondParamClass = ConsumeConcurrentlyContext.class;
} else if (RocketOrderlyReplyListener.class.isAssignableFrom(targetClass)) {
secondParamClass = ConsumeOrderlyContext.class;
} else {
throw new RuntimeException("other reply listener is not supported");
}
}
Type messageType = this.getMessageType();
Class clazz = null;
if (messageType instanceof ParameterizedType && messageConverter instanceof SmartMessageConverter) {
clazz = (Class) ((ParameterizedType) messageType).getRawType();
} else if (messageType instanceof Class) {
clazz = (Class) messageType;
} else {
throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported");
}
try {
final Method method = targetClass.getMethod("onMessage", clazz, secondParamClass);
return new MethodParameter(method, 0);
} catch (NoSuchMethodException e) {
log.error("parameterType:{} of onMessage method is not supported", messageType, e);
throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported");
}
}
private Type getMessageType() {
Class<?> targetClass;
if (rocketListener != null) {
targetClass = AopProxyUtils.ultimateTargetClass(rocketListener);
} else {
targetClass = AopProxyUtils.ultimateTargetClass(rocketReplyListener);
}
Type matchedGenericInterface = null;
while (Objects.nonNull(targetClass)) {
Type[] interfaces = targetClass.getGenericInterfaces();
if (Objects.nonNull(interfaces)) {
for (Type type : interfaces) {
Class<?> rawType = (Class<?>) ((ParameterizedType) type).getRawType();
if (type instanceof ParameterizedType &&
(RocketListener.class.isAssignableFrom(rawType) ||
RocketReplyListener.class.isAssignableFrom(rawType))) {
matchedGenericInterface = type;
break;
}
}
}
targetClass = targetClass.getSuperclass();
}
if (Objects.isNull(matchedGenericInterface)) {
return Object.class;
}
Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return actualTypeArguments[0];
}
return Object.class;
}
private void initRocketMQPushConsumer() throws MQClientException {
if (rocketListener == null && rocketReplyListener == null) {
throw new IllegalArgumentException("Property 'rocketListener' or 'rocketReplyListener' is required");
}
Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
Assert.notNull(nameServer, "Property 'nameServer' is required");
Assert.notNull(topic, "Property 'topic' is required");
RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
this.rocketMessageListener.accessKey(), this.rocketMessageListener.secretKey());
boolean enableMsgTrace = rocketMessageListener.enableMsgTrace();
if (Objects.nonNull(rpcHook)) {
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
enableMsgTrace, this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMessageListener.customizedTraceTopic()));
consumer.setVipChannelEnabled(false);
} else {
log.debug("Access-key or secret-key not configure in " + this + ".");
consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMessageListener.customizedTraceTopic()));
}
consumer.setNamespace(namespace);
consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
String customizedNameServer = this.applicationContext.getEnvironment()
.resolveRequiredPlaceholders(this.rocketMessageListener.nameServer());
if (customizedNameServer != null) {
consumer.setNamesrvAddr(customizedNameServer);
} else {
consumer.setNamesrvAddr(nameServer);
}
if (accessChannel != null) {
consumer.setAccessChannel(accessChannel);
}
//set the consumer core thread number and maximum thread number has the same value
consumer.setConsumeThreadMax(consumeThreadNumber);
consumer.setConsumeThreadMin(consumeThreadNumber);
consumer.setConsumeTimeout(consumeTimeout);
consumer.setMaxReconsumeTimes(maxReconsumeTimes);
consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown);
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
break;
case CLUSTERING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
}
switch (selectorType) {
case TAG:
consumer.subscribe(topic, selectorExpression);
break;
case SQL92:
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
break;
default:
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
}
switch (consumeMode) {
case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
break;
case CONCURRENTLY:
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}
//if String is not is equal "true" TLS mode will represent the as default value false
consumer.setUseTLS(Boolean.parseBoolean(tlsEnable));
if (rocketListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketListener).prepareStart(consumer);
} else if (rocketReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketReplyListener).prepareStart(consumer);
}
}
}
import cn.hutool.core.util.StrUtil;
import com.content.system.service.common.mq.listener.RocketListener;
import com.content.system.service.common.mq.listener.RocketReplyListener;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.scope.ScopedProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.support.BeanDefinitionValidationException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.env.StandardEnvironment;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@Configuration
public class RocketListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
private final static Logger log = LoggerFactory.getLogger(RocketListenerContainerConfiguration.class);
private ConfigurableApplicationContext applicationContext;
private final AtomicLong counter = new AtomicLong(0);
private final StandardEnvironment environment;
private final RocketMQProperties rocketProperties;
private final RocketMQMessageConverter rocketMessageConverter;
public RocketListenerContainerConfiguration(RocketMQMessageConverter rocketMessageConverter,
StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMessageConverter = rocketMessageConverter;
this.environment = environment;
this.rocketProperties = rocketMQProperties;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
@Override
public void afterSingletonsInstantiated() {
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMessageListener.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
beans.forEach(this::registerContainer);
}
@SuppressWarnings("unchecked")
private void registerContainer(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
RocketMessageListener annotation = clazz.getAnnotation(RocketMessageListener.class);
String consumerGroup = this.environment.resolvePlaceholders(annotation.topicGroup().group);
String topic = this.environment.resolvePlaceholders(annotation.topicGroup().topic);
boolean listenerEnabled =
(boolean) rocketProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
.getOrDefault(topic, true);
if (!listenerEnabled) {
log.debug(
"Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
consumerGroup, topic);
return;
}
validate(annotation);
String containerBeanName = String.format("%s_%s", RocketListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
genericApplicationContext.registerBean(containerBeanName, RocketListenerContainer.class,
() -> createRocketListenerContainer(containerBeanName, bean, annotation));
RocketListenerContainer container = genericApplicationContext.getBean(containerBeanName,
RocketListenerContainer.class);
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
private RocketListenerContainer createRocketListenerContainer(String name, Object bean,
RocketMessageListener annotation) {
RocketListenerContainer container = new RocketListenerContainer();
container.setRocketMessageListener(annotation);
String nameServer = environment.resolvePlaceholders(annotation.nameServer());
nameServer = StrUtil.isEmpty(nameServer) ? rocketProperties.getNameServer() : nameServer;
String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
container.setNameServer(nameServer);
if (!StrUtil.isEmpty(accessChannel)) {
container.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
container.setTopic(environment.resolvePlaceholders(annotation.topicGroup().topic));
String tags = environment.resolvePlaceholders(annotation.selector);
if (!StrUtil.isEmpty(tags)) {
container.setSelector;
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.topicGroup().group));
container.setTlsEnable(environment.resolvePlaceholders(annotation.tlsEnable()));
if (RocketListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketListener((RocketListener) bean);
} else if (RocketReplyListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketReplyListener((RocketReplyListener) bean);
}
container.setMessageConverter(rocketMessageConverter.getMessageConverter());
container.setName(name);
return container;
}
private void validate(RocketMessageListener annotation) {
if (annotation.consumeMode() == ConsumeMode.ORDERLY &&
annotation.messageModel() == MessageModel.BROADCASTING) {
throw new BeanDefinitionValidationException(
"Bad annotation definition in @RocketMessageListener, messageModel BROADCASTING does not support ORDERLY message!");
}
}
}
@Slf4j
@Component
@RocketMessageListener(topicGroup = TopicEnum.USER_VIEW_GOODS_TAG_TOPIC,
consumeMode = ConsumeMode.CONCURRENTLY, consumeThreadNumber = 5, enableMsgTrace = true)
public class UserViewGoodsTagConsumer implements RocketConcurrentlyListener<UserViewGoodsTagMsgBody> {
@Override
public ConsumeConcurrentlyStatus onMessage(UserViewGoodsTagMsgBody message, ConsumeConcurrentlyContext ctx) {
log.info("接收到普通消息:{}", message);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
相对于官方的rocketmq-starter, 自定义的Consumer不需要通过抛出异常进行重试, 可以自定义重试延时等级