深入理解Spring的容器内事件发布监听机制(2)

想象我们正在做一个关于Spark的任务调度系统,我们需要把任务提交到集群中并监控任务的执行状态,当任务执行完毕(成功或者失败),除了必须对数据库进行更新外,还可以执行一些额外的工作:比如将任务执行结果以邮件的形式发送给用户。这些额外的工作后期还有较大的变动可能:比如还需要以短信的形式通知用户,对于特定的失败任务需要通知相关运维人员进行排查等等,所以不宜直接写死在主流程代码中。最好的方式自然是以事件监听的方式动态的增删对于任务执行结果的处理逻辑。为此我们可以基于JDK提供的事件框架,打造一个能够对任务执行结果进行监听的弹性系统。

任务结束事件的事件源
因为要对任务执行结束这一事件进行监听,所以必须对任务这一概念进行定义,如下

/**
 * @author: takumiCX
 * @create: 2018-11-02
 **/
@Data
public class Task {

private String name;

private TaskFinishStatus status;

}

任务包含任务名和任务状态,其中任务状态是个枚举常量,只有成功和失败两种取值。

/**
 * @author: takumiCX
 * @create: 2018-11-02
 **/
public enum  TaskFinishStatus {
    SUCCEDD,
    FAIL;
}

任务结束事件TaskFinishEvent
自定义事件类型TaskFinishEvent继承自JDK中的EventObject,构造时会传入Task作为事件源。

/**
 * @author: takumiCX
 * @create: 2018-11-02
 **/
public class TaskFinishEvent extends EventObject {
    /**
    * Constructs a prototypical Event.
    *
    * @param source The object on which the Event initially occurred.
    * @throws IllegalArgumentException if source is null.
    */
    public TaskFinishEvent(Object source) {
        super(source);
    }
}

该事件的监听器抽象
继承标记接口EventListner表示该接口的实现类是一个监听器,同时在内部定义了事件发生时的响应方法onTaskFinish(event),接收一个TaskFinishEvent作为参数。

/**
 * @author: takumiCX
 * @create: 2018-11-02
 **/
public interface TaskFinishEventListner extends EventListener {
   
    void onTaskFinish(TaskFinishEvent event);
}

邮件服务监听器
该邮件服务监听器将在监听到任务结束事件时将任务的执行结果发送给用户。

/** * @author: takumiCX * @create: 2018-11-03 **/ @Data public class MailTaskFinishListener implements TaskFinishEventListner { private String emial; @Override public void onTaskFinish(TaskFinishEvent event) { System.out.println("Send Emial to "+emial+" Task:"+event.getSource()); } }

自定义事件发布器

/**
 * @author: takumiCX
 * @create: 2018-11-03
 **/
public class TaskFinishEventPublisher {
   
    private List<TaskFinishEventListner> listners=new ArrayList<>();
   
    //注册监听器
    public synchronized void register(TaskFinishEventListner listner){
        if(!listners.contains(listner)){
            listners.add(listner);
        }
    }
   
    //移除监听器
    public synchronized boolean remove(TaskFinishEventListner listner){
        return listners.remove(listner);
    }
   
   
    //发布任务结束事件
    public void publishEvent(TaskFinishEvent event){
       
        for(TaskFinishEventListner listner:listners){
            listner.onTaskFinish(event);
        }
    }
}

测试代码如下

/**
 * @author: takumiCX
 * @create: 2018-11-03
 **/
public class TestTaskFinishListener {


    public static void main(String[] args) {

//事件源
        Task source = new Task("用户统计", TaskFinishStatus.SUCCEDD);

//任务结束事件
        TaskFinishEvent event = new TaskFinishEvent(source);

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/8aa7eccef7a6ee573b4af64ab1388374.html