事件驱动之 Spring Cloud Bus


在分布式系统中我们往往需要大量的状态广播,以通知系统中各个节点,触发相关行为,例如用于保证分布式数据一致性或进行相关补偿行为。这时候事件驱动型的编程模型有利于我们处理相关问题,在一个节点发布一个事件,另外N个节点监听该事件作出相关反应,这种类似发布-订阅的模型我们往往会考虑利用消息中间件来实现。同样 Spring Cloud Bus 也需要依赖于MQ,我们知道 Spring Cloud Stream 在框架层面为我们抽象了一系列操作消息中间件(目前官方支持 Rabbit MQ,kafka)的API,而 Spring Cloud Bus 基于这些API实现的事件驱动模型几乎可以使我们无感知地在分布式节点之间实现事件监听机制。

服务内部的事件监听机制


Spring提供了 ApplicationEvent 事件抽象和 ApplicationListener 事件监听器抽象为我们在服务内实现事件监听提供了支持。

package priv.just1984.deep.in.java.demo.spring.boot;

import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;

/**
 * @description:
 * @author: yixiezi1994@gmail.com
 * @date: 2019-09-25 16:32
 */
@Slf4j
public class ApplicationEventDemo {

    public static void main(String[] args) throws InterruptedException {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(ApplicationListenerConfiguration.class);
        context.refresh();
        context.publishEvent(new MyApplicationEvent("hello world"));
        Thread.sleep(3000);
        context.close();
    }

    @Configuration
    public static class ApplicationListenerConfiguration {

        @EventListener(MyApplicationEvent.class)
        public void onMyApplicationEvent(MyApplicationEvent event) {
            log.info(JSONObject.toJSONString(event.getData()));
        }

    }

    @Getter
    @Setter
    private static class MyApplicationEvent extends ApplicationEvent {

        private Object data;

        MyApplicationEvent(Object data) {
            super(data);
            this.data = data;
        }

    }

}

如上文的 demo 所示,ApplicationContext 本身继承了 ApplicationEventPublisher,也就意味着 Spring 上下文本身就是一个事件发布器,调用其 publishEvent 方法即可发布事件,在这里我定义了一个 MyApplicationEvent ,在事件发布后被 @EventListener(MyApplicationEvent.class) 注解的方法所监听,执行相关业务逻辑,这就是一个简单的服务内事件驱动示例。

Spring Cloud Bus 实现服务间事件监听


根据上文的示例我们知道在服务内通过 Spring 事件监听机制来解耦我们的业务是十分简洁的,那么我们能否实现在 A 服务 publish 一个 Event,使 B 服务的 Listener 监听到该事件呢。其实 Spring Coud Bus 正式为我们提供了这样的支持。目前 Spring 官方提供两种依赖 spring-cloud-starter-bus-amqpspring-cloud-starter-bus-kafka ,分别支持两种消息中间件,本文中以 kafka 为例。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

其实 Spring Cloud Bus 基于 actuator 已经为我们提供了两种事件传播,/actuator/bus-env 修改上下文中的配置信息,/actuator/bus-refresh 用于刷新配置。为了达到远程传播事件的目的,Spring 中定义了 RemoteApplicationEvent 作为远程事件的抽象,该类继承了原有的 ApplicationEvent,以 bus-env 事件为例,EnvironmentChangeRemoteApplicationEvent 继承了 RemoteApplicationEvent ,当我们访问 /actuator/bus-env 地址时,服务发布了该事件,根据 actuator 的机制我们可以猜想这是在某个 Endpoint 中进行的。

@Endpoint(id = "bus-env")
public class EnvironmentBusEndpoint extends AbstractBusEndpoint {

    public EnvironmentBusEndpoint(ApplicationEventPublisher context, String id) {
    	super(context, id);
    }
    
    @WriteOperation
    public void busEnvWithDestination(String name, String value,
    		@Selector String destination) {
    	Map<String, String> params = Collections.singletonMap(name, value);
    	publish(new EnvironmentChangeRemoteApplicationEvent(this, getInstanceId(),
    			destination, params));
    }
    
    @WriteOperation
    public void busEnv(String name, String value) {
    	Map<String, String> params = Collections.singletonMap(name, value);
    	publish(new EnvironmentChangeRemoteApplicationEvent(this, getInstanceId(), null,
    			params));
    }

}

可以看到 EnvironmentBusEndpoint 中发布了该事件,但如此只是在服务内部发布了一个事件,我们并没有看到与传统的 ApplicationEvent 有何区别,那么该事件是如何在微服务节点间传输的呢?这时候就需要 Bus 消息总线的介入。当我们引入 spring-cloud-starter-bus-kafka 依赖后,BusAutoConfiguration 配置类会被自动装载。

@EventListener(classes = RemoteApplicationEvent.class)
public void acceptLocal(RemoteApplicationEvent event) {
	if (this.serviceMatcher.isFromSelf(event)
			&& !(event instanceof AckRemoteApplicationEvent)) {
		this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
	}
}

可以看到其中声明了一个事件监听器,用于监听所有的 RemoteApplicationEvent,也就是说在前文中我们所发布的 EnvironmentChangeRemoteApplicationEvent 也会被此处接收,随后该事件被 Spring Cloud Stream 的消息通道广播到整个系统中(基于消息中间件,此处不详谈),那么另一端的服务如何接收到该事件呢?

@StreamListener(SpringCloudBusClient.INPUT)
public void acceptRemote(RemoteApplicationEvent event) {
	if (event instanceof AckRemoteApplicationEvent) {
		if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
				&& this.applicationEventPublisher != null) {
			this.applicationEventPublisher.publishEvent(event);
		}
		// If it's an ACK we are finished processing at this point
		return;
	}
	if (this.serviceMatcher.isForSelf(event)
			&& this.applicationEventPublisher != null) {
		if (!this.serviceMatcher.isFromSelf(event)) {
			this.applicationEventPublisher.publishEvent(event);
		}
		if (this.bus.getAck().isEnabled()) {
			AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
					this.serviceMatcher.getServiceId(),
					this.bus.getAck().getDestinationService(),
					event.getDestinationService(), event.getId(), event.getClass());
			this.cloudBusOutboundChannel
					.send(MessageBuilder.withPayload(ack).build());
			this.applicationEventPublisher.publishEvent(ack);
		}
	}
	if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
		// We are set to register sent events so publish it for local consumption,
		// irrespective of the origin
		this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
				event.getOriginService(), event.getDestinationService(),
				event.getId(), event.getClass()));
	}
}

可以看到只要另一端的服务也依赖于 Bus 消息总线,那么便会监听 Bus 用于传播事件的消息通道,经过一系列的判断,核心判断是如果该事件不是由自身节点所发布的,那么就把该事件发布到自身的上下文中。随后该事件被 EnvironmentChangeListener 接收到,执行更新配置信息的相关操作。

public class EnvironmentChangeListener
		implements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> {

	private static Log log = LogFactory.getLog(EnvironmentChangeListener.class);

	@Autowired
	private EnvironmentManager env;

	@Override
	public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) {
		Map<String, String> values = event.getValues();
		log.info("Received remote environment change request. Keys/values to update "
				+ values);
		for (Map.Entry<String, String> entry : values.entrySet()) {
			this.env.setProperty(entry.getKey(), entry.getValue());
		}
	}

}

RemoteApplicationEvent 之所以可以在服务间传播,是因为 Spring Cloud Bus 依赖于 Spring Cloud Stream 进行消息传输,而 Spring Cloud Stream 是对于不同消息中间件的一种封装和上层抽象,也就是说 MQ 在 Bus 总线中承担中间介质的角色,而我们在上层使用时几乎感知不到这一点,个人认为这就是 Spirng 优秀的封装抽象能力带给我们的思考和启发,也是探究源码实现最大的意义所在。

自定义远程事件传播


上文中我们讨论了源码中给我们提供的 EnvironmentChangeRemoteApplicationEvent,那么我自己能否自定义远程事件以支持我们的业务呢?基于 Spring 一贯强大的可拓展性,答案当然是肯定的。例如我想实现 A 服务发布一个添加用户的事件,B 服务监听到该事件后执行添加用户的操作。

自定义添加用户事件(注意:必须提供空参构造器,用于消息反序列化):

package com.just1984.spring.cloud.demo.service.api.bus;

import com.just1984.spring.cloud.demo.service.api.vo.User;
import lombok.Data;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;

/**
 * @description:
 * @author: yixiezi1994@gmail.com
 * @date: 2019-09-24 20:53
 */
@Data
public class AddUserRemoteApplicationEvent extends RemoteApplicationEvent {

    private User user;

    public AddUserRemoteApplicationEvent() {
        this.user = null;
    }

    public AddUserRemoteApplicationEvent(Object source, String originService, User user) {
        super(source, originService);
        this.user = user;
    }

    public AddUserRemoteApplicationEvent(Object source, String originService, String destinationService, User user) {
        super(source, originService, destinationService);
        this.user = user;
    }

}

注意:在事件的接收方必须添加 @RemoteApplicationEventScan 注解扫描自定义事件。

package com.just1984.spring.cloud.demo.service.provider;

import com.just1984.spring.cloud.demo.service.api.mq.SpringCloudDemoProcessor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.hystrix.EnableHystrix;
import org.springframework.cloud.stream.annotation.EnableBinding;

/**
 * @description: 服务提供方
 * @author: yixiezi1994@gmail.com
 * @date: 2019-08-26 19:56
 */
@SpringBootApplication
@EnableDiscoveryClient
@EnableHystrix
@EnableBinding(SpringCloudDemoProcessor.class)
@RemoteApplicationEventScan(basePackages = {"com.just1984.spring.cloud.demo.service.api.bus"})
public class SpringCloudDemoServiceProviderApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudDemoServiceProviderApplication.class, args);
    }

}

事件发布,在 RemoteApplicationEvent 的构造器中有两个参数 originServicedestinationService,分别代表事件源服务和事件目标服务,通过相应的匹配规则我们可以将事件传播到特定的 1 个或 N 个服务,在这里我从 Consumer 服务发布事件到 Provider 服务的所有实例,destinationService 的值为 spring-cloud-demo-service-provider:**,其中 spring-cloud-demo-service-provider 是服务 ID,** 代表任意端口号。

package com.just1984.spring.cloud.demo.service.consumer.service;

import com.just1984.spring.cloud.demo.service.api.bus.AddUserRemoteApplicationEvent;
import com.just1984.spring.cloud.demo.service.api.sdk.ProviderApi;
import com.just1984.spring.cloud.demo.service.api.vo.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.bus.BusProperties;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * @description:
 * @author: yixiezi1994@gmail.com
 * @date: 2019-09-24 20:55
 */
@Service
@Qualifier("remoteEventClientService")
public class RemoteEventClientServiceImpl implements ClientService {

    @Value("${spring-cloud-demo-service-provider.application.name}")
    private String destinationService;

    @Autowired
    private ProviderApi providerApi;

    @Autowired
    private ApplicationEventPublisher publisher;

    @Autowired
    private BusProperties busProperties;

    @Override
    public void addUser(User user) {
        publisher.publishEvent(new AddUserRemoteApplicationEvent(this, busProperties.getId(), destinationService + ":**", user));
    }

    @Override
    public List<User> getUserList() {
        return providerApi.getUserList();
    }

    @Override
    public void clear() {
        providerApi.clear();
    }

}

消费端事件监听,执行业务操作:

package com.just1984.spring.cloud.demo.service.provider.bus;

import com.alibaba.fastjson.JSONObject;
import com.just1984.spring.cloud.demo.service.api.bus.AddUserRemoteApplicationEvent;
import com.just1984.spring.cloud.demo.service.provider.service.ProviderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent;
import org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;

/**
 * @description:
 * @author: yixiezi1994@gmail.com
 * @date: 2019-09-24 19:49
 */
@Slf4j
@Configuration
public class RemoteApplicationEventListenerConfiguration {

    @Autowired
    private ProviderService providerService;

    @EventListener(AddUserRemoteApplicationEvent.class)
    public void onAddUserRemoteApplicationEvent(AddUserRemoteApplicationEvent event) {
        log.info("监听到AddUserRemoteApplicationEvent事件:【{}】", JSONObject.toJSONString(event));
        providerService.addUser(event.getUser());
    }

}

文章作者: Ethan Zhang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Ethan Zhang !
评论