首页 > 基础资料 博客日记
Spring组件实现事件发布订阅-全网最详细介绍
2024-09-20 06:00:06基础资料围观98次
一、发布订阅模式
发布订阅模式(Publish-Subscribe Pattern),又称为观察者模式(Observer Pattern),是一种消息传递模式,允许对象(称为“订阅者”或“观察者”)订阅消息或事件,并在发布者发出这些消息或事件时得到通知。
基本概念:
- 发布者(Publisher):发送消息或事件的对象,它不知道订阅者的存在。
- 订阅者(Subscriber):接收消息或事件的对象,它对感兴趣的消息或事件进行订阅。
- 消息或事件:发布者发送的数据或信号。
- 消息代理(Message Broker):通常是一个中介组件,用于管理订阅者列表和消息的分发。
工作流程:
- 订阅:订阅者向消息代理注册,表明它们对特定主题或事件感兴趣。
- 发布:当发布者有新的消息或事件发生时,它将消息发送给消息代理。
- 分发:消息代理将消息传递给所有订阅了该主题或事件的订阅者。
优点:
- 解耦:发布者和订阅者之间是解耦的,它们不需要知道对方的存在,只需要通过消息代理进行通信。
- 灵活性:可以轻松地添加或移除订阅者,而不影响发布者或其他订阅者。
- 可扩展性:系统可以处理大量的订阅者和发布者,因为消息代理可以扩展以适应需求。
- 异步通信:消息的发送和接收可以是异步的,这有助于提高系统的性能和响应能力。
- 事件驱动:发布订阅模式支持事件驱动的架构,使得系统能够响应状态变化。
- 容错性:如果一个订阅者失败,它不会影响发布者或其他订阅者接收消息。
- 多播:消息可以同时发送给多个订阅者,而不需要发布者进行多次发送。
二、Spring提供的发布订阅模式相关类介绍
使用org.springframework.context包下的类实现订阅发布机制:
ApplicationEvent:一个抽象类,继承自java.util.EventObject,它提供了关于事件的基本信息,比如事件的时间戳。一般在实际项目里实现这个抽象类来传递具体的事件数据。ApplicationEventPublisher:一个接口,它定义了发布事件的方法。一般在项目里调用它的publishEvent方法来发布事件。
ApplicationEventPublisherAware:一个由 Spring 提供的标记接口,它继承自 Aware
接口。实现这个接口的 bean 会自动获得 ApplicationEventPublisher
的实例,Spring 容器会在创建 bean 时自动注入 ApplicationEventPublisher
。在下面的代码示例里会实现这个接口作为一个发布事件的类。
ApplicationListener:用于实现事件监听器,以便在 Spring 应用程序上下文中监听和响应 ApplicationEvent
事件。ApplicationListener
接口定义了事件监听器的基本行为,允许你指定对哪些类型的事件感兴趣,并且定义了当这些事件发生时应该调用的方法。
三、具体实现
为了体现发布订阅模式的解耦、灵活、可扩展性,本文模拟了两种业务场景之后发布的事件:付款(pay)和售卖(sell),完成之后均需要进行两种操作:记录(record)和通知(notify)。
基础实现,每个操作定义一个监听器:
1.实现ApplicationEvent接口,这里定义了两个Event类,分别对应上述两种操作:
import org.springframework.context.ApplicationEvent;
public class PayEvent extends ApplicationEvent {
public PayEvent(String payInfo){
super(payInfo);
}
}
import org.springframework.context.ApplicationEvent;
public class SellEvent extends ApplicationEvent {
public SellEvent(String sellDetail){
super(sellDetail);
}
}
2.实现ApplicationEventPublisherAware接口,定义了一个发布事件的通用方法:
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;
@Component
public class EventPublisher implements ApplicationEventPublisherAware {
private ApplicationEventPublisher applicationEventPublisher;
//发布事件的通用方法
public void publishEvent(ApplicationEvent event){
applicationEventPublisher.publishEvent(event);
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher){
this.applicationEventPublisher = applicationEventPublisher;
}
}
3. 实现ApplicationListener接口,这里定义了四个监听类,分别用来监听两种业务场景(pay和sell)的两种操作(record和notify)。每个监听类定义不同类型的ApplicationEvent 来区分监听的触发条件:
import com.virtual.universe.service.eventService.event.PayEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
@Component
public class PaymentNotifyListener implements ApplicationListener<PayEvent> {
@Override
public void onApplicationEvent(PayEvent payEvent){
System.out.println("=========PaymentListener notify pay info"+ payEvent.getSource());
}
}
import com.virtual.universe.service.eventService.event.PayEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
@Component
public class PaymentRecordListener implements ApplicationListener<PayEvent> {
@Override
public void onApplicationEvent(PayEvent payEvent){
System.out.println("=========PaymentListener save pay record, "+ payEvent.getSource());
}
}
import com.virtual.universe.service.eventService.event.SellEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
@Component
public class SellNotifyListener implements ApplicationListener<SellEvent> {
@Override
public void onApplicationEvent(SellEvent sellEvent){
System.out.println("=========SellListener2"+ sellEvent.getSource());
}
}
import com.virtual.universe.service.eventService.event.SellEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
@Component
public class SellRecordListener implements ApplicationListener<SellEvent> {
@Override
public void onApplicationEvent(SellEvent sellEvent){
System.out.println("=========SellListener1"+ sellEvent.getSource());
}
}
4.在具体业务代码里发布事件:
@Service
public class PublishEventService {
@Resource
private EventPublisher eventPublisher;
public void publishPayEvent(String payDetail){
PayEvent payEvent = new PayEvent(payDetail);
eventPublisher.publishEvent(payEvent);
}
public void publishSellEvent(String sellDetail){
SellEvent sellEvent = new SellEvent(sellDetail);
eventPublisher.publishEvent(sellEvent);
}
}
运行代码,分别触发PublishEventService类里的publishPayEvent和publishSellEvent方法,会打印:
以publishPayEvent方法为例,调用一次publishEvent,所有监听PayEvent的监听器都触发了onApplicationEvent方法。而监听SellEvent的监听器则未被触发。
在实际项目中,可以在新增订阅操作的时候,新创建一个Listener实现类,在里面实现新的业务逻辑,以达到业务扩展并保证发布和订阅逻辑的解耦。
但是这种写法需要实现多个Listener类,写法比较繁琐,那么有没有更简单更加灵活可扩展的写法呢?让我们来看第二种写法。
优化一:每种业务场景定义一个监听器触发多个操作
在上面写法的基础上,我们重写pay的事件监听代码:
1、创建一个PaymentHandler接口:
import com.virtual.universe.service.eventService.event.PayEvent;
public interface PaymentHandler {
void handlePayment(PayEvent payEvent);
}
再给notify和record两个操作分别实现一个handler类,都实现了上面这个接口:
import com.virtual.universe.service.eventService.event.PayEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class PaymentNotifyHandler implements PaymentHandler{
@Override
public void handlePayment(PayEvent payEvent) {
log.info("=============payment handle notify pay info: {}", payEvent.getSource());
}
}
import com.virtual.universe.service.eventService.event.PayEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class PaymentRecordHandler implements PaymentHandler{
@Override
public void handlePayment(PayEvent payEvent){
log.info("=============payment handle save the record: {}", payEvent.getSource());
}
}
2.重写Listener类。由于上面两个Handler类都加了@Component注解,所以在下方代码里,会将上面两个类的bean注入到handlers里,从而在事件发布后,会依次调用上面两个类里的handlePayment方法。
并且在下方代码里我们定义了一个单线程的线程池,用来异步处理事件发布后的操作。
@Component
public class PaymentListener implements ApplicationListener<PayEvent> {
private static final ExecutorService PAYMENT_THREAD_POOL = Executors.newSingleThreadExecutor();
@Resource
List<PaymentHandler> handlers;
@Override
public void onApplicationEvent(PayEvent payEvent){
if (!CollectionUtils.isEmpty(handlers)) {
PAYMENT_THREAD_POOL.submit(()->{
handlers.forEach(handler -> handler.handlePayment(payEvent));
});
}
}
}
这种写法,每种业务场景只需定义一个监听器,新增的操作只需新定义一个Handler类并重写handle方法即可。
优化二:所有业务场景定义一个监听器
我们还可以自定义一个通用监听器,监听所有的业务场景。
1.定义一个CommentEvent用来传递所有业务场景的事件,里面有一个eventType字段用来定义不同的业务触发场景。
import org.springframework.context.ApplicationEvent;
public class CommentEvent extends ApplicationEvent {
private Integer eventType;
public CommentEvent(Integer eventType, String eventDetail){
super(eventDetail);
this.eventType = eventType;
}
public Integer getEventType(){
return this.eventType;
}
}
再定义一个类存放所有的type:
public interface ActionTypes {
Integer PAYMENT_TYPE = 1;
Integer SELL_TYPE = 2;
}
2、新建一个通用的Handler接口:
import com.virtual.universe.service.eventService.event.CommentEvent;
public interface EventHandler {
Integer getEventType();
void handleEvent(CommentEvent payEvent);
}
然后需要为所有的操作实现一个handler类,以pay业务的record操作为例:
实现了两个方法,getEventType:返回前面定义的常量PAYMENT_TYPE,这个方法在后面监听器的handler加载逻辑里会用到。handleEvent:实现具体的操作代码。
import com.virtual.universe.service.eventService.event.CommentEvent;
import com.virtual.universe.service.eventService.handlers.ActionTypes;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class PayRecordHandler implements EventHandler {
@Override
public void handleEvent(CommentEvent event) {
log.info("=============COMMON pay handle save the record: {}", event.getSource());
}
@Override
public Integer getEventType() {
return ActionTypes.PAYMENT_TYPE;
}
}
3、创建一个通用的监听器类,需要在init方法里便利所有的handler的实现类,以eventType作为key生成一个handlerType:
import com.virtual.universe.service.eventService.event.CommentEvent;
import com.virtual.universe.service.eventService.handlers.common.EventHandler;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
public class CommonListener implements ApplicationListener<CommentEvent> {
private static final ExecutorService PAYMENT_THREAD_POOL = Executors.newSingleThreadExecutor();
@Resource
List<EventHandler> handlers;
private Map<Integer, List<EventHandler>> handlerMap;
@PostConstruct
public void init(){
handlerMap = new ConcurrentHashMap<>(handlers.size());
handlers.stream().forEach(handler -> {
List<EventHandler> handlerList = handlerMap.get(handler.getEventType());
if (Objects.isNull(handlerList)) {
handlerMap.putIfAbsent(handler.getEventType(), new LinkedList<>());
handlerMap.get(handler.getEventType()).add(handler);
} else {
handlerList.add(handler);
}
});
}
@Override
public void onApplicationEvent(CommentEvent event){
List<EventHandler> handlerList = handlerMap.get(event.getEventType());
if (!CollectionUtils.isEmpty(handlerList)) {
PAYMENT_THREAD_POOL.submit(()->{
handlerList.forEach(handler -> handler.handleEvent(event));
});
}
}
}
4.最后,调用发布方法的代码改成使用CommentEvent:
import com.virtual.universe.service.eventService.event.CommentEvent;
import com.virtual.universe.service.eventService.event.PayEvent;
import com.virtual.universe.service.eventService.event.SellEvent;
import com.virtual.universe.service.eventService.handlers.ActionTypes;
import com.virtual.universe.service.eventService.publisher.EventPublisher;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class PublishEventService {
@Resource
private EventPublisher eventPublisher;
public void publishPayEvent(String payDetail){
// PayEvent payEvent = new PayEvent(payDetail);
CommentEvent payEvent = new CommentEvent(ActionTypes.PAYMENT_TYPE, payDetail);
eventPublisher.publishEvent(payEvent);
}
public void publishSellEvent(String sellDetail){
// SellEvent sellEvent = new SellEvent(sellDetail);
CommentEvent sellEvent = new CommentEvent(ActionTypes.SELL_TYPE, sellDetail);
eventPublisher.publishEvent(sellEvent);
}
}
这种写法,整个项目只需要定义一个监听器,每次需要新增一个事件类型或事件触发的操作,只需要新增一种eventType或针对已有的eventType新增一个handler实现类。
有关Spring提供的事件发布订阅的组件使用就介绍到这里了,如果您能看到这里,感谢您的时间和耐心,如果有任何问题或疑问,请在留言区告知哦~
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
标签: