[Abp 源码分析]十六、后台作业与后台工作者 (4)

可以看到每 5 秒钟我们的后台作业管理器就会从 IBackgroundJobStore 当中拿到最大 1000 条的后台作业信息,然后遍历这些信息。通过 TryProcessJob(job) 方法来执行后台作业。

而 TryProcessJob() 方法,本质上就是通过反射构建出一个 IBackgroundJob 对象,然后取得序列化的参数值,通过反射得到的 MethodInfo 对象来执行我们的后台任务。执行完成之后,就会从 Store 当中移除掉执行完成的任务。

针对于在执行过程当中所出现的异常,会通过 IEventBus 触发一个 AbpHandledExceptionData 事件记录后台作业执行失败时的异常信息。并且一旦在执行过程当中出现了任何异常的情况,都会将该任务的 IsAbandoned 字段置为 true,当该字段为 true 时,该任务将不再回被执行。

PS:就是在 GetWaitingJobsAsync() 方法时,会过滤掉 IsAbandoned 值为 true 的任务。

private void TryProcessJob(BackgroundJobInfo jobInfo) { try { // 任务执行次数自增 1 jobInfo.TryCount++; // 最后一次执行时间设置为当前时间 jobInfo.LastTryTime = Clock.Now; // 通过反射取得后台作业的类型 var jobType = Type.GetType(jobInfo.JobType); // 通过 Ioc 解析器得到一个临时的后台作业对象,执行完之后既被释放 using (var job = _iocResolver.ResolveAsDisposable(jobType)) { try { // 通过反射得到后台作业的 Execute 方法 var jobExecuteMethod = job.Object.GetType().GetTypeInfo().GetMethod("Execute"); var argsType = jobExecuteMethod.GetParameters()[0].ParameterType; var argsObj = JsonConvert.DeserializeObject(jobInfo.JobArgs, argsType); // 结合持久话存储的参数信息,调用 Execute 方法进行后台作业 jobExecuteMethod.Invoke(job.Object, new[] { argsObj }); // 执行完成之后从 Store 删除该任务的信息 AsyncHelper.RunSync(() => _store.DeleteAsync(jobInfo)); } catch (Exception ex) { Logger.Warn(ex.Message, ex); // 计算下一次执行的时间,一旦超过 2 天该任务都执行失败,则返回 null var nextTryTime = jobInfo.CalculateNextTryTime(); if (nextTryTime.HasValue) { jobInfo.NextTryTime = nextTryTime.Value; } else { // 如果为 null 则说明该任务在 2 天的时间内都没有执行成功,则放弃继续执行 jobInfo.IsAbandoned = true; } // 更新 Store 存储的任务信息 TryUpdate(jobInfo); // 触发异常事件 EventBus.Trigger( this, new AbpHandledExceptionData( new BackgroundJobException( "A background job execution is failed. See inner exception for details. See BackgroundJob property to get information on the background job.", ex ) { BackgroundJob = jobInfo, JobObject = job.Object } ) ); } } } catch (Exception ex) { Logger.Warn(ex.ToString(), ex); // 表示任务不再执行 jobInfo.IsAbandoned = true; // 更新 Store TryUpdate(jobInfo); } } 2.2.2 后台作业

后台作业的默认接口定义为 IBackgroundJob<in TArgs> ,他只有一个 Execute(TArgs args) 方法,用于接收指定类型的作业参数,并执行。

一般来说我们不建议直接通过继承 IBackgroundJob<in TArgs> 来实现后台作业,而是继承自 BackgroundJob<TArgs> 抽象类。该抽象类内部也没有什么特别的实现,主要是注入了一些基础设施,比如说 UOW 与 本地化资源管理器,方便我们开发使用。

后台作业本身是具体执行的对象,而 BackgroundJobInfo 则是存储了后台作业的 Type 类型和参数,方便在需要执行的时候通过反射的方式执行后台作业。

2.2.2 后台作业队列存储

从 IBackgroundJobStore 我们就可以猜到以 Abp 框架的套路,他肯定会有两种实现,第一种就是基于内存的 InMemoryBackgroundJobStore。而第二种呢,就是由 Abp.Zero 模块所提供的基于数据库的 BackgroundJobStore。

IBackgroundJobStore 接口所定义的方法基本上就是增删改查,没有什么复杂的。

public interface IBackgroundJobStore { // 通过 JobId 获取后台任务信息 Task<BackgroundJobInfo> GetAsync(long jobId); // 插入一个新的后台任务信息 Task InsertAsync(BackgroundJobInfo jobInfo); /// <summary> /// Gets waiting jobs. It should get jobs based on these: /// Conditions: !IsAbandoned And NextTryTime &lt;= Clock.Now. /// Order by: Priority DESC, TryCount ASC, NextTryTime ASC. /// Maximum result: <paramref/>. /// </summary> /// <param>Maximum result count.</param> Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount); /// <summary> /// Deletes a job. /// </summary> /// <param>Job information.</param> Task DeleteAsync(BackgroundJobInfo jobInfo); /// <summary> /// Updates a job. /// </summary> /// <param>Job information.</param> Task UpdateAsync(BackgroundJobInfo jobInfo); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wssddj.html