自定义rocketmq-spring-boot-starter

kyaa111 2月前 ⋅ 91 阅读

一般使用rocketmq-starter都会进行不同程度的封装, 在此分享一种封装思路

基于rocketmq-spring-boot-starter


<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

自定义Producer

基础消息体定义


import lombok.Data;

import java.io.Serial;
import java.io.Serializable;
import java.time.LocalDateTime;


@Data
public class BaseMsgBody implements Serializable {

    @Serial
    private static final long serialVersionUID = -7156796935958490859L;

    private LocalDateTime sendTime;

    private String topic;

    public BaseMsgBody() {
        sendTime = LocalDateTime.now();
    }
}

业务消息体

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;


@Data
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class UserViewGoodsTagMsgBody extends BaseMsgBody {

    private Long userId;

    private Long goodsId;
}

Producer封装

import cn.hutool.core.util.StrUtil;
import com.content.system.service.common.exception.mq.MqFailSendException;
import com.content.system.service.common.mq.body.BaseMsgBody;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.core.AbstractMessageSendingTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.MimeTypeUtils;

import java.util.Map;
import java.util.Objects;


@Component
@Slf4j
public class RocketProducer {
    private final MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();

    @Autowired
    private RocketMQTemplate template;

    @Autowired
    private DefaultMQProducer producer;

    public SendResult syncSend(BaseMsgBody body) {
        return syncSend(body, producer.getSendMsgTimeout(), 0, null);
    }

    /**
     * Delay Msg
     *
     * @param body       body
     * @param delayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h num: 18
     * @return SendResult
     */
    public SendResult syncSendDelay(BaseMsgBody body, int delayLevel) {
        return syncSend(body, producer.getSendMsgTimeout(), delayLevel, null);
    }

    public SendResult syncSendOrderly(BaseMsgBody body, String hashKey) {
        return syncSend(body, producer.getSendMsgTimeout(), 0, hashKey);
    }

    /**
     * syncSend
     *
     * @param body       消息内容
     * @param timeout    发送超时时间
     * @param delayLevel 延时等级
     * @param hashKey    顺序消息hash
     * @return SendResult
     */
    public SendResult syncSend(BaseMsgBody body, long timeout, int delayLevel, String hashKey) {
        if (Objects.isNull(body)) {
            log.error("syncSend failed. body is null.");
            throw new IllegalArgumentException("`body` cannot be null");
        }

        if (Objects.isNull(body.getTopic())) {
            log.error("syncSend failed. topic: {} is null, body: {}", body.getTopic(), body);
            throw new IllegalArgumentException("`topic` cannot be null");
        }

        try {
            long now = System.currentTimeMillis();
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(body);
            if (delayLevel > 0) {
                rocketMsg.setDelayTimeLevel(delayLevel);
            }

            SendResult result = Objects.isNull(hashKey) ? producer.send(rocketMsg, messageQueueSelector, timeout)
                    : producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
            long costTime = System.currentTimeMillis() - now;
            if (log.isDebugEnabled()) {
                log.debug("send message cost: {} ms, msgId:{}", costTime, result.getMsgId());
            }
            log.debug("send result: {}", SendResult.encoderSendResultToJson(result));
            if (result.getSendStatus() != SendStatus.SEND_OK) {
                throw MqFailSendException.build(result);
            }
            return result;
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            log.error("syncSend failed. destination:{}, message:{} ", body.getTopic(), body);
            throw new MessagingException(ie.getMessage(), ie);
        } catch (Exception e) {
            log.error("syncSend failed. destination:{}, message:{} ", body.getTopic(), body);
            throw new MessagingException(e.getMessage(), e);
        }
    }

    private org.apache.rocketmq.common.message.Message createRocketMqMessage(BaseMsgBody body) {
        Message<BaseMsgBody> message = MessageBuilder.withPayload(body).build();
        Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders());
        return RocketMQUtil.convertToRocketMessage(template.getMessageConverter(), "UTF-8", body.getTopic(), msg);
    }

    protected Message<Object> doConvert(Object payload, Map<String, Object> headers) {
        MessageHeaders messageHeaders = null;
        Object conversionHint = (headers != null ?
                headers.get(AbstractMessageSendingTemplate.CONVERSION_HINT_HEADER) : null);

        if (headers != null) {
            if (headers instanceof MessageHeaders msgHeaders) {
                messageHeaders = msgHeaders;
            } else {
                messageHeaders = new MessageHeaders(headers);
            }
        }

        MessageConverter converter = template.getMessageConverter();
        Message<?> message = (converter instanceof SmartMessageConverter smartMessageConverter ?
                smartMessageConverter.toMessage(payload, messageHeaders, conversionHint) :
                converter.toMessage(payload, messageHeaders));
        if (message == null) {
            String payloadType = payload.getClass().getName();
            Object contentType = (messageHeaders != null ? messageHeaders.get(MessageHeaders.CONTENT_TYPE) : null);
            throw new MessageConversionException(StrUtil.format("""
                    Unable to convert payload with type='{}', contentType='{}', converter=[{}]
                    """, payloadType, contentType, converter));
        }
        @SuppressWarnings("unchecked")
        MessageBuilder<Object> builder = (MessageBuilder<Object>) MessageBuilder.fromMessage(message);
        builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
        return builder.build();
    }

}


import org.apache.rocketmq.client.producer.SendResult;


public class MqFailSendException extends RuntimeException {

    private MqFailSendException(SendResult result) {
        super(SendResult.encoderSendResultToJson(result));
    }


    public static MqFailSendException build(SendResult result) {
        return new MqFailSendException(result);
    }
}

队列枚举

public enum TopicEnum {

    USER_VIEW_GOODS_TAG_TOPIC("USER_VIEW_GOODS_TAG_TOPIC", "user_view_goods_tag_group",
            UserViewGoodsTagMsgBody::new);

    public final String topic;

    public final String group;

    final Supplier<? extends BaseMsgBody> supplier;

    TopicEnum(String topic, String group, Supplier<? extends BaseMsgBody> supplier) {
        this.topic = topic;
        this.group = group;
        this.supplier = supplier;
    }

    @SuppressWarnings("unchecked")
    public <T extends BaseMsgBody> T newInstance() {
        var body = (T) supplier.get();
        body.setTopic(topic);
        return body;
    }
}

发送消息

UserViewGoodsTagMsgBody body = TopicEnum.USER_VIEW_GOODS_TAG_TOPIC.newInstance();
body.setUserId(userId);
body.setGoodsId(goodsId);
rocketProducer.syncSendDelay(body, 1);

自定义Consumer

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMessageListener {

    String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
    String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
    String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";

    /**
     * Topic name & consumerGroup
     * <p>
     * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
     * load balance. It's required and needs to be globally unique.
     * <p>
     * <p>
     * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
     */
    TopicEnum topicGroup();

    /**
     * Control how to selector message.
     *
     * @see SelectorType
     */
    SelectorType selectorType() default SelectorType.TAG;

    /**
     * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
     */
    String selector default "*";

    /**
     * Control consume mode, you can choice receive message concurrently or orderly.
     */
    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;

    /**
     * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
     */
    MessageModel messageModel() default MessageModel.CLUSTERING;

    /**
     * Max consumer thread number.
     *
     * @see <a href="https://github.com/apache/rocketmq-spring/issues/429">issues#429</a>
     * @deprecated This property is not work well, because the consumer thread pool executor use
     * {@link LinkedBlockingQueue} with default capacity bound (Integer.MAX_VALUE), use
     * {@link RocketMQMessageListener#consumeThreadNumber} .
     */
    @Deprecated
    int consumeThreadMax() default 64;

    /**
     * consumer thread number.
     */
    int consumeThreadNumber() default 20;

    /**
     * Max re-consume times.
     * <p>
     * In concurrently mode, -1 means 16;
     * In orderly mode, -1 means Integer.MAX_VALUE.
     */
    int maxReconsumeTimes() default -1;

    /**
     * Maximum amount of time in minutes a message may block the consuming thread.
     */
    long consumeTimeout() default 15L;

    /**
     * Timeout for sending reply messages.
     */
    int replyTimeout() default 3000;

    /**
     * The property of "access-key".
     */
    String accessKey() default ACCESS_KEY_PLACEHOLDER;

    /**
     * The property of "secret-key".
     */
    String secretKey() default SECRET_KEY_PLACEHOLDER;

    /**
     * Switch flag instance for message trace.
     */
    boolean enableMsgTrace() default false;

    /**
     * The name value of message trace topic.If you don't config,you can use the default trace topic name.
     */
    String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;

    /**
     * The property of "name-server".
     */
    String nameServer() default NAME_SERVER_PLACEHOLDER;

    /**
     * The property of "access-channel".
     */
    String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;

    /**
     * The property of "tlsEnable" default false.
     */
    String tlsEnable() default "false";

    /**
     * The namespace of consumer.
     */
    String namespace() default "";

    /**
     * Message consume retry strategy in concurrently mode.
     * <p>
     * -1,no retry,put into DLQ directly
     * 0,broker control retry frequency
     * >0,client control retry frequency
     */
    int delayLevelWhenNextConsume() default 0;

    /**
     * The interval of suspending the pull in orderly mode, in milliseconds.
     * <p>
     * The minimum value is 10 and the maximum is 30000.
     */
    int suspendCurrentQueueTimeMillis() default 1000;

    /**
     * Maximum time to await message consuming when shutdown consumer, in milliseconds.
     * The minimum value is 0
     */
    int awaitTerminationMillisWhenShutdown() default 1000;
}

public interface RocketListener<S, T, C> {

    /**
     * onMessage
     *
     * @param message message
     * @param ctx     context
     * @return S
     */
    S onMessage(T message, C ctx);
}
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

public interface RocketConcurrentlyListener<T>
        extends RocketListener<ConsumeConcurrentlyStatus, T, ConsumeConcurrentlyContext> {
}
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;


public interface RocketOrderlyListener<T> extends RocketListener<ConsumeOrderlyStatus, T, ConsumeOrderlyContext> {
}

import lombok.Data;

@Data
public class ListenerReply<S, R> {
    private S status;

    private R reply;

    public ListenerReply() {
    }


    public ListenerReply(S status, R reply) {
        this.reply = reply;
        this.status = status;
    }

    public static <S, R> ListenerReply<S, R> build(S status, R reply) {
        return new ListenerReply<>(status, reply);
    }
}

public interface RocketReplyListener<T, C, S, R> {
    /**
     * onMessage
     *
     * @param message data received by the listener
     * @param ctx     context
     * @return data replying to producer
     */
    ListenerReply<S, R> onMessage(T message, C ctx);
}


import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;


public interface RocketConcurrentlyReplyListener<T, R>
        extends RocketReplyListener<T, ConsumeConcurrentlyContext, ConsumeConcurrentlyStatus, R> {

}

import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;


public interface RocketOrderlyReplyListener<T, R>
        extends RocketReplyListener<T, ConsumeOrderlyContext, ConsumeOrderlyStatus, R> {

}

主要配置类

import com.content.system.service.common.mq.ListenerReply;
import com.content.system.service.common.mq.listener.RocketConcurrentlyListener;
import com.content.system.service.common.mq.listener.RocketConcurrentlyReplyListener;
import com.content.system.service.common.mq.listener.RocketListener;
import com.content.system.service.common.mq.listener.RocketOrderlyListener;
import com.content.system.service.common.mq.listener.RocketOrderlyReplyListener;
import com.content.system.service.common.mq.listener.RocketReplyListener;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.apache.rocketmq.spring.support.RocketMQListenerContainer;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;

import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Objects;

/**
 * @author rocket-mq
 */
@SuppressWarnings("WeakerAccess")
public class RocketListenerContainer implements InitializingBean,
        RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
    private final static Logger log = LoggerFactory.getLogger(RocketListenerContainer.class);

    private ApplicationContext applicationContext;

    /**
     * The name of the DefaultRocketMQListenerContainer instance
     */
    private String name;

    /**
     * Suspending pulling time in orderly mode.
     * <p>
     * The minimum value is 10 and the maximum is 30000.
     */
    private long suspendCurrentQueueTimeMillis = 1000;

    /**
     * Message consume retry strategy in concurrently mode.
     * <p>
     * -1,no retry,put into DLQ directly
     * 0,broker control retry frequency
     * >0,client control retry frequency
     */
    private int delayLevelWhenNextConsume = 0;

    private String nameServer;

    private AccessChannel accessChannel = AccessChannel.LOCAL;

    private String consumerGroup;

    private String topic;

    private int consumeThreadMax = 64;

    private int consumeThreadNumber = 20;

    private String charset = "UTF-8";

    private MessageConverter messageConverter;

    private RocketListener rocketListener;

    private RocketReplyListener rocketReplyListener;

    private RocketMessageListener rocketMessageListener;

    private DefaultMQPushConsumer consumer;

    private Type messageType;

    private MethodParameter methodParameter;

    private boolean running;

    // The following properties came from @RocketMQMessageListener.
    private ConsumeMode consumeMode;
    private SelectorType selectorType;
    private String selectorExpression;
    private MessageModel messageModel;
    private long consumeTimeout;
    private int maxReconsumeTimes;
    private int replyTimeout;
    private String tlsEnable;
    private String namespace;
    private long awaitTerminationMillisWhenShutdown;

    public long getSuspendCurrentQueueTimeMillis() {
        return suspendCurrentQueueTimeMillis;
    }

    public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
        this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
    }

    public int getDelayLevelWhenNextConsume() {
        return delayLevelWhenNextConsume;
    }

    public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
        this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
    }

    public String getNameServer() {
        return nameServer;
    }

    public void setNameServer(String nameServer) {
        this.nameServer = nameServer;
    }

    public AccessChannel getAccessChannel() {
        return accessChannel;
    }

    public void setAccessChannel(AccessChannel accessChannel) {
        this.accessChannel = accessChannel;
    }

    public String getConsumerGroup() {
        return consumerGroup;
    }

    public void setConsumerGroup(String consumerGroup) {
        this.consumerGroup = consumerGroup;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public int getConsumeThreadMax() {
        return consumeThreadMax;
    }

    public int getConsumeThreadNumber() {
        return consumeThreadNumber;
    }

    public String getCharset() {
        return charset;
    }

    public void setCharset(String charset) {
        this.charset = charset;
    }

    public MessageConverter getMessageConverter() {
        return messageConverter;
    }

    public RocketListenerContainer setMessageConverter(MessageConverter messageConverter) {
        this.messageConverter = messageConverter;
        return this;
    }

    public RocketListener getRocketListener() {
        return rocketListener;
    }

    public void setRocketListener(RocketListener rocketListener) {
        this.rocketListener = rocketListener;
    }

    public RocketReplyListener getRocketReplyListener() {
        return rocketReplyListener;
    }

    public void setRocketReplyListener(RocketReplyListener rocketReplyListener) {
        this.rocketReplyListener = rocketReplyListener;
    }

    public RocketMessageListener getRocketMessageListener() {
        return rocketMessageListener;
    }

    public void setRocketMessageListener(RocketMessageListener anno) {
        this.rocketMessageListener = anno;

        this.consumeMode = anno.consumeMode();
        this.consumeThreadMax = anno.consumeThreadNumber();
        this.consumeThreadNumber = anno.consumeThreadNumber();
        this.messageModel = anno.messageModel();
        this.selectorType = anno.selectorType();
        this.selectorExpression = anno.selector;
        this.consumeTimeout = anno.consumeTimeout();
        this.maxReconsumeTimes = anno.maxReconsumeTimes();
        this.replyTimeout = anno.replyTimeout();
        this.tlsEnable = anno.tlsEnable();
        this.namespace = anno.namespace();
        this.delayLevelWhenNextConsume = anno.delayLevelWhenNextConsume();
        this.suspendCurrentQueueTimeMillis = anno.suspendCurrentQueueTimeMillis();
        this.awaitTerminationMillisWhenShutdown = Math.max(0, anno.awaitTerminationMillisWhenShutdown());
    }

    public ConsumeMode getConsumeMode() {
        return consumeMode;
    }

    public SelectorType getSelectorType() {
        return selectorType;
    }

    public void setSelector {
        this.selectorExpression = selectorExpression;
    }

    public String getSelector {
        return selectorExpression;
    }

    public MessageModel getMessageModel() {
        return messageModel;
    }

    public String getTlsEnable() {
        return tlsEnable;
    }

    public void setTlsEnable(String tlsEnable) {
        this.tlsEnable = tlsEnable;
    }

    public String getNamespace() {
        return namespace;
    }

    public void setNamespace(String namespace) {
        this.namespace = namespace;
    }

    public DefaultMQPushConsumer getConsumer() {
        return consumer;
    }

    public void setConsumer(DefaultMQPushConsumer consumer) {
        this.consumer = consumer;
    }

    public long getAwaitTerminationMillisWhenShutdown() {
        return awaitTerminationMillisWhenShutdown;
    }

    public RocketListenerContainer setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhenShutdown) {
        this.awaitTerminationMillisWhenShutdown = awaitTerminationMillisWhenShutdown;
        return this;
    }

    @Override
    public void destroy() {
        this.setRunning(false);
        if (Objects.nonNull(consumer)) {
            consumer.shutdown();
        }
        log.info("container destroyed, {}", this.toString());
    }

    @Override
    public boolean isAutoStartup() {
        return true;
    }

    @Override
    public void stop(Runnable callback) {
        stop();
        callback.run();
    }

    @Override
    public void start() {
        if (this.isRunning()) {
            throw new IllegalStateException("container already running. " + this.toString());
        }

        try {
            consumer.start();
        } catch (MQClientException e) {
            throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
        }
        this.setRunning(true);

        log.info("running container: {}", this.toString());
    }

    @Override
    public void stop() {
        if (this.isRunning()) {
            if (Objects.nonNull(consumer)) {
                consumer.shutdown();
            }
            setRunning(false);
        }
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    private void setRunning(boolean running) {
        this.running = running;
    }

    @Override
    public int getPhase() {
        // Returning Integer.MAX_VALUE only suggests that
        // we will be the first bean to shutdown and last bean to start
        return Integer.MAX_VALUE;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        initRocketMQPushConsumer();

        this.messageType = getMessageType();
        this.methodParameter = getMethodParameter();
        log.debug("RocketMQ messageType: {}", messageType);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override
    public String toString() {
        return "DefaultRocketListenerContainer{" +
                "consumerGroup='" + consumerGroup + '\'' +
                ", namespace='" + namespace + '\'' +
                ", nameServer='" + nameServer + '\'' +
                ", topic='" + topic + '\'' +
                ", consumeMode=" + consumeMode +
                ", selectorType=" + selectorType +
                ", selectorExpression='" + selectorExpression + '\'' +
                ", messageModel=" + messageModel + '\'' +
                ", tlsEnable=" + tlsEnable +
                '}';
    }

    public void setName(String name) {
        this.name = name;
    }

    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgList) {
                log.debug("received msg: {}", messageExt);
                log.info("received msg id: {}", messageExt.getMsgId());
                try {
                    long now = System.currentTimeMillis();
                    ConsumeConcurrentlyStatus status = handleConcurrentlyMessage(messageExt, context);
                    if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS != status) {
                        log.warn("return status: {}, consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}",
                                status, messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes());
                        return status;
                    }
                    long costTime = System.currentTimeMillis() - now;
                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    log.error("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(),
                            messageExt.getTopic(), messageExt.getReconsumeTimes(), ie);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                } catch (Exception e) {
                    log.error("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(),
                            messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }

    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {

        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> extList, ConsumeOrderlyContext context) {
            for (MessageExt messageExt : extList) {
                log.debug("received msg: {}", messageExt);
                log.info("received msg id: {}", messageExt.getMsgId());
                try {
                    long now = System.currentTimeMillis();
                    ConsumeOrderlyStatus status = handleOrderlyMessage(messageExt, context);
                    if (ConsumeOrderlyStatus.SUCCESS != status) {
                        log.warn("return status: {}, consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}",
                                status, messageExt.getMsgId(),
                                messageExt.getTopic(), messageExt.getReconsumeTimes());
                        return status;
                    }
                    long costTime = System.currentTimeMillis() - now;
                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    log.error("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(),
                            messageExt.getTopic(), messageExt.getReconsumeTimes(), ie);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                } catch (Exception e) {
                    log.error("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(),
                            messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }

            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

    @SuppressWarnings("unchecked")
    private ConsumeConcurrentlyStatus handleConcurrentlyMessage(MessageExt messageExt, ConsumeConcurrentlyContext context)
            throws MQClientException, RemotingException, InterruptedException {
        ConsumeConcurrentlyStatus status = null;
        if (rocketListener != null) {
            status = (ConsumeConcurrentlyStatus) rocketListener.onMessage(doConvertMessage(messageExt), context);
        } else if (rocketReplyListener != null) {
            ListenerReply<ConsumeConcurrentlyStatus, ?> replyContent =
                    rocketReplyListener.onMessage(doConvertMessage(messageExt), context);
            status = replyContent.getStatus();
            sendReply(replyContent.getReply(), messageExt);
        }
        return status;
    }

    @SuppressWarnings("unchecked")
    private ConsumeOrderlyStatus handleOrderlyMessage(MessageExt messageExt, ConsumeOrderlyContext context)
            throws MQClientException, RemotingException, InterruptedException {
        ConsumeOrderlyStatus status = null;
        if (rocketListener != null) {
            status = (ConsumeOrderlyStatus)
                    rocketListener.onMessage(doConvertMessage(messageExt), context);
        } else if (rocketReplyListener != null) {
            ListenerReply<ConsumeOrderlyStatus, ?> replyContent =
                    rocketReplyListener.onMessage(doConvertMessage(messageExt), context);
            status = replyContent.getStatus();
            sendReply(replyContent.getReply(), messageExt);
        }
        return status;
    }


    private void sendReply(Object reply, MessageExt messageExt) throws MQClientException, RemotingException,
            InterruptedException {
        Message<?> message = MessageBuilder.withPayload(reply).build();

        org.apache.rocketmq.common.message.Message replyMessage =
                MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
        DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
        producer.setSendMsgTimeout(replyTimeout);
        producer.send(replyMessage, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                    log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
                } else {
                    log.debug("Consumer replies message success.");
                }
            }

            @Override
            public void onException(Throwable e) {
                log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
            }
        });
    }

    private byte[] convertToBytes(Message<?> message) {
        Message<?> messageWithSerializedPayload = doConvert(message.getPayload(), message.getHeaders());
        Object payloadObj = messageWithSerializedPayload.getPayload();
        byte[] payloads;
        try {
            if (null == payloadObj) {
                throw new RuntimeException("the message cannot be empty");
            }
            if (payloadObj instanceof String) {
                payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
            } else if (payloadObj instanceof byte[]) {
                payloads = (byte[]) messageWithSerializedPayload.getPayload();
            } else {
                String jsonObj = (String) this.messageConverter.fromMessage(messageWithSerializedPayload, payloadObj.getClass());
                if (null == jsonObj) {
                    throw new RuntimeException(String.format(
                            "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
                            this.messageConverter.getClass(), payloadObj.getClass(), payloadObj));
                }
                payloads = jsonObj.getBytes(Charset.forName(charset));
            }
        } catch (Exception e) {
            throw new RuntimeException("convert to bytes failed.", e);
        }
        return payloads;
    }

    private Message<?> doConvert(Object payload, MessageHeaders headers) {
        Message<?> message = this.messageConverter instanceof SmartMessageConverter ?
                ((SmartMessageConverter) this.messageConverter).toMessage(payload, headers, null) :
                this.messageConverter.toMessage(payload, headers);
        if (message == null) {
            String payloadType = payload.getClass().getName();
            Object contentType = headers != null ? headers.get(MessageHeaders.CONTENT_TYPE) : null;
            throw new MessageConversionException("Unable to convert payload with type='" + payloadType +
                    "', contentType='" + contentType + "', converter=[" + this.messageConverter + "]");
        }
        MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
        builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
        return builder.build();
    }

    @SuppressWarnings("unchecked")
    private Object doConvertMessage(MessageExt messageExt) {
        if (Objects.equals(messageType, MessageExt.class) || Objects.equals(messageType, org.apache.rocketmq.common.message.Message.class)) {
            return messageExt;
        } else {
            String str = new String(messageExt.getBody(), Charset.forName(charset));
            if (Objects.equals(messageType, String.class)) {
                return str;
            } else {
                // If msgType not string, use objectMapper change it.
                try {
                    if (messageType instanceof Class) {
                        //if the messageType has not Generic Parameter
                        return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) messageType);
                    } else {
                        //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
                        //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
                        return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) messageType).getRawType(), methodParameter);
                    }
                } catch (Exception e) {
                    log.info("convert failed. str:{}, msgType:{}", str, messageType);
                    throw new RuntimeException("cannot convert message to " + messageType, e);
                }
            }
        }
    }

    private MethodParameter getMethodParameter() {
        Class<?> targetClass;
        Class<?> secondParamClass;
        if (rocketListener != null) {
            targetClass = AopProxyUtils.ultimateTargetClass(rocketListener);
            if (RocketConcurrentlyListener.class.isAssignableFrom(targetClass)) {
                secondParamClass = ConsumeConcurrentlyContext.class;
            } else if (RocketOrderlyListener.class.isAssignableFrom(targetClass)) {
                secondParamClass = ConsumeOrderlyContext.class;
            } else {
                throw new RuntimeException("other listener is not supported");
            }
        } else {
            targetClass = AopProxyUtils.ultimateTargetClass(rocketReplyListener);
            if (RocketConcurrentlyReplyListener.class.isAssignableFrom(targetClass)) {
                secondParamClass = ConsumeConcurrentlyContext.class;
            } else if (RocketOrderlyReplyListener.class.isAssignableFrom(targetClass)) {
                secondParamClass = ConsumeOrderlyContext.class;
            } else {
                throw new RuntimeException("other reply listener is not supported");
            }
        }

        Type messageType = this.getMessageType();
        Class clazz = null;
        if (messageType instanceof ParameterizedType && messageConverter instanceof SmartMessageConverter) {
            clazz = (Class) ((ParameterizedType) messageType).getRawType();
        } else if (messageType instanceof Class) {
            clazz = (Class) messageType;
        } else {
            throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported");
        }

        try {
            final Method method = targetClass.getMethod("onMessage", clazz, secondParamClass);
            return new MethodParameter(method, 0);
        } catch (NoSuchMethodException e) {
            log.error("parameterType:{} of onMessage method is not supported", messageType, e);
            throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported");
        }
    }

    private Type getMessageType() {
        Class<?> targetClass;
        if (rocketListener != null) {
            targetClass = AopProxyUtils.ultimateTargetClass(rocketListener);
        } else {
            targetClass = AopProxyUtils.ultimateTargetClass(rocketReplyListener);
        }
        Type matchedGenericInterface = null;
        while (Objects.nonNull(targetClass)) {
            Type[] interfaces = targetClass.getGenericInterfaces();
            if (Objects.nonNull(interfaces)) {
                for (Type type : interfaces) {
                    Class<?> rawType = (Class<?>) ((ParameterizedType) type).getRawType();
                    if (type instanceof ParameterizedType &&
                            (RocketListener.class.isAssignableFrom(rawType) ||
                                    RocketReplyListener.class.isAssignableFrom(rawType))) {
                        matchedGenericInterface = type;
                        break;
                    }
                }
            }
            targetClass = targetClass.getSuperclass();
        }
        if (Objects.isNull(matchedGenericInterface)) {
            return Object.class;
        }

        Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
        if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
            return actualTypeArguments[0];
        }
        return Object.class;
    }

    private void initRocketMQPushConsumer() throws MQClientException {
        if (rocketListener == null && rocketReplyListener == null) {
            throw new IllegalArgumentException("Property 'rocketListener' or 'rocketReplyListener' is required");
        }
        Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
        Assert.notNull(nameServer, "Property 'nameServer' is required");
        Assert.notNull(topic, "Property 'topic' is required");

        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
                this.rocketMessageListener.accessKey(), this.rocketMessageListener.secretKey());
        boolean enableMsgTrace = rocketMessageListener.enableMsgTrace();
        if (Objects.nonNull(rpcHook)) {
            consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
                    enableMsgTrace, this.applicationContext.getEnvironment().
                    resolveRequiredPlaceholders(this.rocketMessageListener.customizedTraceTopic()));
            consumer.setVipChannelEnabled(false);
        } else {
            log.debug("Access-key or secret-key not configure in " + this + ".");
            consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
                    this.applicationContext.getEnvironment().
                            resolveRequiredPlaceholders(this.rocketMessageListener.customizedTraceTopic()));
        }
        consumer.setNamespace(namespace);
        consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));

        String customizedNameServer = this.applicationContext.getEnvironment()
                .resolveRequiredPlaceholders(this.rocketMessageListener.nameServer());
        if (customizedNameServer != null) {
            consumer.setNamesrvAddr(customizedNameServer);
        } else {
            consumer.setNamesrvAddr(nameServer);
        }
        if (accessChannel != null) {
            consumer.setAccessChannel(accessChannel);
        }
        //set the consumer core thread number and maximum thread number has the same value
        consumer.setConsumeThreadMax(consumeThreadNumber);
        consumer.setConsumeThreadMin(consumeThreadNumber);
        consumer.setConsumeTimeout(consumeTimeout);
        consumer.setMaxReconsumeTimes(maxReconsumeTimes);
        consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown);
        switch (messageModel) {
            case BROADCASTING:
                consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
                break;
            case CLUSTERING:
                consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
                break;
            default:
                throw new IllegalArgumentException("Property 'messageModel' was wrong.");
        }

        switch (selectorType) {
            case TAG:
                consumer.subscribe(topic, selectorExpression);
                break;
            case SQL92:
                consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
                break;
            default:
                throw new IllegalArgumentException("Property 'selectorType' was wrong.");
        }

        switch (consumeMode) {
            case ORDERLY:
                consumer.setMessageListener(new DefaultMessageListenerOrderly());
                break;
            case CONCURRENTLY:
                consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                break;
            default:
                throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
        }

        //if String is not is equal "true" TLS mode will represent the as default value false
        consumer.setUseTLS(Boolean.parseBoolean(tlsEnable));

        if (rocketListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketListener).prepareStart(consumer);
        } else if (rocketReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketReplyListener).prepareStart(consumer);
        }

    }

}
import cn.hutool.core.util.StrUtil;
import com.content.system.service.common.mq.listener.RocketListener;
import com.content.system.service.common.mq.listener.RocketReplyListener;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.scope.ScopedProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.support.BeanDefinitionValidationException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.env.StandardEnvironment;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;


@Configuration
public class RocketListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
    private final static Logger log = LoggerFactory.getLogger(RocketListenerContainerConfiguration.class);

    private ConfigurableApplicationContext applicationContext;

    private final AtomicLong counter = new AtomicLong(0);

    private final StandardEnvironment environment;

    private final RocketMQProperties rocketProperties;

    private final RocketMQMessageConverter rocketMessageConverter;

    public RocketListenerContainerConfiguration(RocketMQMessageConverter rocketMessageConverter,
                                                StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
        this.rocketMessageConverter = rocketMessageConverter;
        this.environment = environment;
        this.rocketProperties = rocketMQProperties;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    }

    @Override
    public void afterSingletonsInstantiated() {
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMessageListener.class)
                .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

        beans.forEach(this::registerContainer);
    }

    @SuppressWarnings("unchecked")
    private void registerContainer(String beanName, Object bean) {
        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);

        RocketMessageListener annotation = clazz.getAnnotation(RocketMessageListener.class);

        String consumerGroup = this.environment.resolvePlaceholders(annotation.topicGroup().group);
        String topic = this.environment.resolvePlaceholders(annotation.topicGroup().topic);

        boolean listenerEnabled =
                (boolean) rocketProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                        .getOrDefault(topic, true);

        if (!listenerEnabled) {
            log.debug(
                    "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
                    consumerGroup, topic);
            return;
        }
        validate(annotation);

        String containerBeanName = String.format("%s_%s", RocketListenerContainer.class.getName(),
                counter.incrementAndGet());
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;

        genericApplicationContext.registerBean(containerBeanName, RocketListenerContainer.class,
                () -> createRocketListenerContainer(containerBeanName, bean, annotation));
        RocketListenerContainer container = genericApplicationContext.getBean(containerBeanName,
                RocketListenerContainer.class);
        if (!container.isRunning()) {
            try {
                container.start();
            } catch (Exception e) {
                log.error("Started container failed. {}", container, e);
                throw new RuntimeException(e);
            }
        }

        log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
    }

    private RocketListenerContainer createRocketListenerContainer(String name, Object bean,
                                                                  RocketMessageListener annotation) {
        RocketListenerContainer container = new RocketListenerContainer();

        container.setRocketMessageListener(annotation);

        String nameServer = environment.resolvePlaceholders(annotation.nameServer());
        nameServer = StrUtil.isEmpty(nameServer) ? rocketProperties.getNameServer() : nameServer;
        String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
        container.setNameServer(nameServer);
        if (!StrUtil.isEmpty(accessChannel)) {
            container.setAccessChannel(AccessChannel.valueOf(accessChannel));
        }
        container.setTopic(environment.resolvePlaceholders(annotation.topicGroup().topic));
        String tags = environment.resolvePlaceholders(annotation.selector);
        if (!StrUtil.isEmpty(tags)) {
            container.setSelector;
        }
        container.setConsumerGroup(environment.resolvePlaceholders(annotation.topicGroup().group));
        container.setTlsEnable(environment.resolvePlaceholders(annotation.tlsEnable()));
        if (RocketListener.class.isAssignableFrom(bean.getClass())) {
            container.setRocketListener((RocketListener) bean);
        } else if (RocketReplyListener.class.isAssignableFrom(bean.getClass())) {
            container.setRocketReplyListener((RocketReplyListener) bean);
        }
        container.setMessageConverter(rocketMessageConverter.getMessageConverter());
        container.setName(name);

        return container;
    }

    private void validate(RocketMessageListener annotation) {
        if (annotation.consumeMode() == ConsumeMode.ORDERLY &&
                annotation.messageModel() == MessageModel.BROADCASTING) {
            throw new BeanDefinitionValidationException(
                    "Bad annotation definition in @RocketMessageListener, messageModel BROADCASTING does not support ORDERLY message!");
        }
    }
}
@Slf4j
@Component
@RocketMessageListener(topicGroup = TopicEnum.USER_VIEW_GOODS_TAG_TOPIC,
        consumeMode = ConsumeMode.CONCURRENTLY, consumeThreadNumber = 5, enableMsgTrace = true)
public class UserViewGoodsTagConsumer implements RocketConcurrentlyListener<UserViewGoodsTagMsgBody> {

    @Override
    public ConsumeConcurrentlyStatus onMessage(UserViewGoodsTagMsgBody message, ConsumeConcurrentlyContext ctx) {
        log.info("接收到普通消息:{}", message);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

相对于官方的rocketmq-starter, 自定义的Consumer不需要通过抛出异常进行重试, 可以自定义重试延时等级