RocketMQ 与 Spring 生态的融合


RocketMQ 作为阿里开源的一款优秀的消息中间件,已经成为很多互联网公司构建其分布式系统的重要组成部分。同时 RocketMQ 成为了 Apache 开源项目,意味着该项目越来越受到开源社区的关注,也意味着更多的人加入到对 RocketMQ 的使用和完善中,相信将其作为企业消息中间件的技术选型不失为一个明智的选择。对于大部分的 Java 系统而言, Spring 已经成为 Java 开发的既定标准,那么我们必然面临如何在 Spring 生态中集成 RocketMQ 客户端,好在官方已经为我们提供了这样的实现,有时候我们也需要在工作中”造轮子”,如何将我们造的”轮子”优雅地集成到 Spring 生态中,这是我们需要思考的问题,或许我们可以在其中得到一种启发。

RocketMQTemplate 模板客户端


Spring 与第三方框架适配的案例中,我时常可以看到 XXXTemplate 这样的封装,例如 RedisTemplateJdbcTemplate 等,这样的模板类有利于集成核心的API,为开发者提供了很好的便宜性,显然 RocketMQ 在集成 Spring 的过程中也参考了这样的设计。

@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass({MQAdmin.class})
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, RocketMQTransactionConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration {
	// ...
	@Bean(destroyMethod = "destroy")
    @ConditionalOnBean(DefaultMQProducer.class)
    @ConditionalOnMissingBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
    public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,
        RocketMQMessageConverter rocketMQMessageConverter) {
        RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
        rocketMQTemplate.setProducer(mqProducer);
        rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
        return rocketMQTemplate;
    }
	// ...
}

RocketMQTemplate 主要由两部分组成,DefaultMQProducerMessageConverter,作为生产者的核心API,必然需要由消息生产者和消息转换器两者组成,其中 DefaultMQProducer 作为 Apache RocketMQ 所提供的默认生产者API,封装了大部分我们所需要的消息发送方式,包含同步发送,异步发送,单向消息发送,事务消息发送等常用操作。只不过 RocketMQMessage 结果有其设计上的特殊性,Spring 需要对其进行进一步的抽象,例如将 RocketMQ 中的 topictag 抽象为 destination,将 org.apache.rocketmq.common.message.Message 抽象为 org.springframework.messaging.MessageSpring 增加这样一层转换的目的主要是尽量不与某一消息中间件的实现强耦合,保持设计上的独立性。

@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass({ MQAdmin.class, ObjectMapper.class })
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
@Import({ JacksonFallbackConfiguration.class, ListenerContainerConfiguration.class })
@AutoConfigureAfter(JacksonAutoConfiguration.class)
public class RocketMQAutoConfiguration {
	// ...
	@Bean
    @ConditionalOnMissingBean(DefaultMQProducer.class)
    @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"})
    public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
        RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
        String nameServer = rocketMQProperties.getNameServer();
        String groupName = producerConfig.getGroup();
        Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
        Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");

        DefaultMQProducer producer;
        String ak = rocketMQProperties.getProducer().getAccessKey();
        String sk = rocketMQProperties.getProducer().getSecretKey();
        if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
            producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
                rocketMQProperties.getProducer().isEnableMsgTrace(),
                rocketMQProperties.getProducer().getCustomizedTraceTopic());
            producer.setVipChannelEnabled(false);
        } else {
            producer = new DefaultMQProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(),
                rocketMQProperties.getProducer().getCustomizedTraceTopic());
        }

        producer.setNamesrvAddr(nameServer);
        producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
        producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
        producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
        producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
        producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
        producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());

        return producer;
    }
	// ...
}

Spring Boot 会自动帮助我们构建 DefaultMQProducer 生产者,但是我们可以需求定义一些初始化参数:

  • group 生产者组
  • sendMessageTimeout 生产者发送消息超时阈值,默认为 3s
  • compressMessageBodyThreshold 消息压缩阈值,默认为 4K
  • retryTimesWhenSendFailed 同步发送模式下失败重试次数,默认为 2
  • retryTimesWhenSendAsyncFailed 异步发送模式下失败重试次数,默认为 2,重试意味着在网络波动情况下可能发生重复消息发送,因此 RocketMQ 要求我们在消费者一方保证逻辑的幂等性
  • retryNextServer 在消息发送失败时是否尝试切换 broker 进行发送,默认为 false
  • maxMessageSize 消息体大小限制,默认为 4M

RocketMQListener 消息监听器


RocketMQ 的消息消费端支持推模式与拉模式,当然推拉模式的本质是相同的,实际上 RocketMQ 在底层只实现了拉模式,而推模式是拉模式的一种封装,主要通过在消费端开启一个线程循环拉取消息。一般来讲 PUSH 模式更为常用,因为在 PUSH 模式下消息会被自动拉取,同时消息负载均衡等机制已经在源码内实现,而 PULL 模式开发者手动指定 MessageQueue 进行消费,实现难度更大。另外从 Spring 官方所推崇的 Event Driven 事件驱动型架构风格来看,PUSH 模式也更加符合这一风格,所以在 Spring 层面上主要将 RocketMQ 中的 Consumer 抽象为 RocketMQListener ,这种监听器的设计显然是对 PUSH 模式的实现。

@Configuration
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {

    @Override
    public void afterSingletonsInstantiated() {
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);

        if (Objects.nonNull(beans)) {
            beans.forEach(this::registerContainer);
        }
    }

    private void registerContainer(String beanName, Object bean) {
        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);

        if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
            throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
        }

        RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
        validate(annotation);

        String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
            counter.incrementAndGet());
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;

        genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
            () -> createRocketMQListenerContainer(bean, annotation));
        DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
            DefaultRocketMQListenerContainer.class);
        if (!container.isRunning()) {
            try {
                container.start();
            } catch (Exception e) {
                log.error("Started container failed. {}", container, e);
                throw new RuntimeException(e);
            }
        }

        log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
    }

    private DefaultRocketMQListenerContainer createRocketMQListenerContainer(Object bean, RocketMQMessageListener annotation) {
        DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();

        container.setNameServer(rocketMQProperties.getNameServer());
        container.setTopic(environment.resolvePlaceholders(annotation.topic()));
        container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
        container.setRocketMQMessageListener(annotation);
        container.setRocketMQListener((RocketMQListener) bean);
        container.setObjectMapper(objectMapper);

        return container;
    }

可以看到这里利用 SmartInitializingSingleton 这个 Spring 容器的生命周期回调来实现消息监听器的自动装配,这个回调触发在当所有单例 bean 被初始化完成后,一般来说如果我们需要对 bean 进行个性化的后置处理,可以利用这个回调或是 BeanPostProcessor。在这里通过 ListableBeanFactory#getBeansWithAnnotation 获取到所有被 RocketMQMessageListenerbean,构建 DefaultRocketMQListenerContainer 对象,并将其注册为 Spring Bean。每一个 DefaultRocketMQListenerContainer 都对应一个 RocketMQ 中的 ConsumerRocketMQ 支持不同的消费者配置,RocketMQMessageListener 注解可以帮助我们传递这些配置项:

  • consumerGroup 消费者组
  • topic 订阅主题
  • selectorType 消息过滤方式,默认根据 TAG 过滤
  • selectorExpression 消息过滤表达式,例如 tag1 || tag2 || tag3
  • consumeMode 消费模式,默认为并发模式(允许多个线程同时消费)
  • messageModel 消息通信模式,默认为集群模式(同个消息只发送给消费组中某一消费者)
  • consumeThreadMax 允许的最大消费线程数,默认为 64
public class DefaultRocketMQListenerContainer implements InitializingBean,
    RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
	// ...
	public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {

        @SuppressWarnings("unchecked")
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", messageExt);
                try {
                    long now = System.currentTimeMillis();
                    rocketMQListener.onMessage(doConvertMessage(messageExt));
                    long costTime = System.currentTimeMillis() - now;
                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    log.warn("consume message failed. messageExt:{}", messageExt, e);
                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }

    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {

        @SuppressWarnings("unchecked")
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", messageExt);
                try {
                    long now = System.currentTimeMillis();
                    rocketMQListener.onMessage(doConvertMessage(messageExt));
                    long costTime = System.currentTimeMillis() - now;
                    log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    log.warn("consume message failed. messageExt:{}", messageExt, e);
                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }

            return ConsumeOrderlyStatus.SUCCESS;
        }
    }
	// ...
}

DefaultRocketMQListenerContainer 的内部可以看到我们所定义的 RocketMQListener 最终会根据消费类型被转换为 RocketMQ 中的 MessageListenerConcurrently(并发消费)或是 MessageListenerOrderly(顺序消费),RocketMQ 的顺序消息支持全局顺序或分区顺序,一般来讲出于性能的考虑不建议使用全局顺序,如果业务上不对消费顺序作要求,那么使用并发消费的模式可以最大程序上保证消息的吞吐量。当消费者在 PUSH 模式下接受到消息时,消费被委派给 RocketMQListener 处理,这时候我们的业务逻辑被触发。

RocketMQ 事务消息


对于 RocketMQ 中每一个 TransactionMQProducer 事务消息生产者而言,都需要一个 TransactionListener 事务监听器与之对应,在我们调用生产者 API 发送事务消息时,源码中会校验该生产者是否持有监听器,不存在则直接抛出异常。RocketMQ 的事务消息为了保证消息的可靠性发送,参考了分布式事务中的 2PC 解决方案。生产者首先发送消息,相当于第一阶段的提交,RocketMQ 把这一类消息成为 HalfMessage 半消息,HalfMessage 对于消费者是不可见的,RocketMQ 会把半消息暂存到内部的某一 topic 中,根据生产者本地事务的执行结果决定提交或回滚半消息。

RocketMQ 事务消息

在生产者本地事务执行完成后,生产者需要向 RocketMQ Server 反馈一个执行结果,Commit 或是 Rollback,由于之前已经成功发送了 Half Message,这时的发送也大概率是成功了,但依然不排除因为网络波动等因素导致执行结果提交失败。因此为了保证事务消息发送的可靠性,RocketMQ 增加了一种状态回查机制,在服务端定时扫描状态未知的 Half Message,向 Producer 发起回查,作为一种补偿机制。

public interface RocketMQLocalTransactionListener {
    RocketMQLocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    RocketMQLocalTransactionState checkLocalTransaction(final Message msg);
}

Spring Boot 开发者通过实现 RocketMQLocalTransactionListener 接口编写本地事务及回查逻辑。

public class RocketMQTransactionAnnotationProcessor
    implements BeanPostProcessor, Ordered, ApplicationContextAware {
	// ...
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (!this.nonProcessedClasses.contains(bean.getClass())) {
            Class<?> targetClass = AopUtils.getTargetClass(bean);
            RocketMQTransactionListener listener = AnnotationUtils.findAnnotation(targetClass, RocketMQTransactionListener.class);
            this.nonProcessedClasses.add(bean.getClass());
            if (listener == null) { // for quick search
                log.trace("No @RocketMQTransactionListener annotations found on bean type: {}", bean.getClass());
            } else {
                try {
                    processTransactionListenerAnnotation(listener, bean);
                } catch (MQClientException e) {
                    log.error("Failed to process annotation " + listener, e);
                    throw new BeanCreationException("Failed to process annotation " + listener, e);
                }
            }
        }

        return bean;
    }

    private void processTransactionListenerAnnotation(RocketMQTransactionListener listener, Object bean)
        throws MQClientException {
        if (transactionHandlerRegistry == null) {
            throw new MQClientException("Bad usage of @RocketMQTransactionListener, " +
                "the class must work with RocketMQTemplate", null);
        }
        if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) {
            throw new MQClientException("Bad usage of @RocketMQTransactionListener, " +
                "the class must implement interface RocketMQLocalTransactionListener",
                null);
        }
        TransactionHandler transactionHandler = new TransactionHandler();
        transactionHandler.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory());
        transactionHandler.setName(listener.txProducerGroup());
        transactionHandler.setBeanName(bean.getClass().getName());
        transactionHandler.setListener((RocketMQLocalTransactionListener) bean);
        transactionHandler.setCheckExecutor(listener.corePoolSize(), listener.maximumPoolSize(),
                listener.keepAliveTime(), listener.blockingQueueSize());

        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
            listener.accessKey(), listener.secretKey());

        if (Objects.nonNull(rpcHook)) {
            transactionHandler.setRpcHook(rpcHook);
        } else {
            log.debug("Access-key or secret-key not configure in " + listener + ".");
        }

        transactionHandlerRegistry.registerTransactionHandler(transactionHandler);
    }
	// ...
}

RocketMQLocalTransactionListener 是如何别转换为 RocketMQ 中的 TransactionListener 呢?这里就利用 Spring 容器中关键性的 Bean 生命周期回调 BeanPostProcessor,上下文中每一个被容器初始化的 bean,都会回调 postProcessAfterInitialization 方法,此方法在 bean 的后置处理中为开发者提供了拓展机会,如果我们需要扩展增强 Spring 的相关机制,BeanPostProcessor 是我们重点要考虑的一种实现方式。


文章作者: Ethan Zhang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Ethan Zhang !
评论
 上一篇
数据批量处理优化之 Spring Batch 数据批量处理优化之 Spring Batch
在企业级生产场景中数据的批量处理往往是一个具有广泛性的需求,例如用户数据的导入导出,内部数据的迁移,业务数据的统计计算等,这样的数据往往都是较大体量,需要从一种形式转换为另一种形式,或是从一个数据源迁移到另一个目的地。显然这一类问题是具有
2020-10-28 Ethan Zhang
下一篇 
Spring Cloud Circuit Breaker 服务容错 Spring Cloud Circuit Breaker 服务容错
参考 Spring Cloud 官方文档的描述,Spring Cloud Circuit Breaker 提供了不同断路器实现的抽象,它为开发者选用不同的断路器实现提供了标准化的 API,在实际的项目应用场景中,我们可以基于 Spring
2020-09-26 Ethan Zhang