[Abp 源码分析]十六、后台作业与后台工作者 (3)

针对于第二个问题,Abp 通过 WaitToStop() 方法会阻塞调用这个 Timer 的线程,并且在 _performingTasks 标识位是 false 的时候释放。

public override void WaitToStop() { // 锁定 CLR 的 Timer 对象 lock (_taskTimer) { // 循环检测 while (_performingTasks) { Monitor.Wait(_taskTimer); } } base.WaitToStop(); }

至于其他的 Start() 方法就是使用 CLR 的 Timer 更改其执行周期,而 Stop() 就是直接将 Timer 的周期设置为无限大,使计时器失效。

2.1.4 总结

Abp 后台工作者的核心就是通过 AbpTimer 来实现周期性任务的执行,用户只需要继承自 PeriodicBackgroundWorkerBase,然后将其添加到 IBackgroundWorkerManager 的集合当中。这样 Abp 在启动之后就会遍历这个工作者集合,然后周期执行这些后台工作者绑定的方法。

当然如果你继承了 PeriodicBackgroundWorkerBase 之后,可以通过设置构造函数的 AbpTimer 来指定自己的执行周期。

2.2 后台作业队列

后台工作队列的管理是通过 IBackgroundJobManager 来处理的,而该接口又继承自 IBackgroundWorker,所以一整个后台作业队列就是一个后台工作者,只不过这个工作者有点特殊。

2.2.1 后台作业管理器

IBackgroundJobManager 接口的定义其实就两个方法,一个 EnqueueAsync<TJob, TArgs>() 用于将一个后台作业加入到执行队列当中。而 DeleteAsync() 方法呢,顾名思义就是从队列当中移除指定的后台作业。

首先看一下其默认实现 BackgroundJobManager,该实现同样是继承自 PeriodicBackgroundWorkerBase 并且其默认周期为 5000 ms。

public class BackgroundJobManager : PeriodicBackgroundWorkerBase, IBackgroundJobManager, ISingletonDependency { // 事件总线 public IEventBus EventBus { get; set; } // 轮训后台作业的间隔,默认值为 5000 毫秒. public static int JobPollPeriod { get; set; } // IOC 解析器 private readonly IIocResolver _iocResolver; // 后台作业队列存储 private readonly IBackgroundJobStore _store; static BackgroundJobManager() { JobPollPeriod = 5000; } public BackgroundJobManager( IIocResolver iocResolver, IBackgroundJobStore store, AbpTimer timer) : base(timer) { _store = store; _iocResolver = iocResolver; EventBus = NullEventBus.Instance; Timer.Period = JobPollPeriod; } }

基础结构基本上就这个样子,接下来看一下他的两个接口方法是如何实现的。

EnqueueAsync<TJob, TArgs> 方法通过传入指定的后台作业对象和相应的参数,同时还有任务的优先级。将其通过 IBackgroundJobStore 进行持久化,并返回一个任务的唯一 JobId 以便进行删除操作。

public async Task<string> EnqueueAsync<TJob, TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) where TJob : IBackgroundJob<TArgs> { // 通过 JobInfo 包装任务的基本信息 var jobInfo = new BackgroundJobInfo { JobType = typeof(TJob).AssemblyQualifiedName, JobArgs = args.ToJsonString(), Priority = priority }; // 如果需要延时执行的话,则用当前时间加上延时的时间作为任务下次运行的时间 if (delay.HasValue) { jobInfo.NextTryTime = Clock.Now.Add(delay.Value); } // 通过 Store 进行持久话存储 await _store.InsertAsync(jobInfo); // 返回后台任务的唯一标识 return jobInfo.Id.ToString(); }

至于删除操作,在 Manager 内部其实也是通过 IBackgroundJobStore 进行实际的删除操作的。

public async Task<bool> DeleteAsync(string jobId) { // 判断 jobId 的值是否有效 if (long.TryParse(jobId, out long finalJobId) == false) { throw new ArgumentException($"The jobId '{jobId}' should be a number.", nameof(jobId)); } // 使用 jobId 从 Store 处筛选到 JobInfo 对象的信息 BackgroundJobInfo jobInfo = await _store.GetAsync(finalJobId); if (jobInfo == null) { return false; } // 如果存在有 JobInfo 则使用 Store 进行删除操作 await _store.DeleteAsync(jobInfo); return true; }

后台作业管理器实质上是一个周期性执行的后台工作者,那么我们的后台作业是每 5000 ms 执行一次,那么他的 DoWork() 方法又在执行什么操作呢?

protected override void DoWork() { // 从 Store 当中获得等待执行的后台作业集合 var waitingJobs = AsyncHelper.RunSync(() => _store.GetWaitingJobsAsync(1000)); // 遍历这些等待执行的后台任务,然后通过 TryProcessJob 进行执行 foreach (var job in waitingJobs) { TryProcessJob(job); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wssddj.html