首页 > 极客资料 博客日记
elsa工作流-调度(安排后台作业)
2025-01-12 21:00:03极客资料围观4次
前言
elsa内部很多地方都会用到后台作业,也就是在后台线程中执行一堆任务,这与我们通常理解的后台作业没有区别。
elsa将后台作业也称为调度Schedul,相关功能由Elsa.Scheduling模块提供。
典型的应用是触发器调度和书签调度时会用到后台作业,但后台作业也可以用在其它地方,比如自定义的Actiity,若其内部也需要执行后台任务,可以直接用elsa提供的后台作业接口。
后台作业可以安排在一个Task中,也可以委派给Hangfire、Quartz.net这种三方后台作业框架。
架构图
下面深入浅出说明各个组件。
要在后台执行的具体任务ITask
它表示一个需要在后台执行的具体逻辑,默认情况下有如下3个实现:
- DelegateTask 代表一个普通的委托
- ResumeWorkflowTask 代表让某个工作流实例恢复继续执行
- RunWorkflowTask 执行一个指定类型的工作流,它会产生一个新的工作流实例
已调度的任务IScheduledTask
表示一个已经开始执行调度的任务,在实例化它的某个实现类时,要指定需要执行的ITask,它内部开启线程,根据具体实现类的规则执行,
比如:
ScheduledRecurringTask是IScheduledTask实例,它的构造函数需要指定ITask,间隔时间等,构造函数中会开启线程,按照固定周期时间自动执行ITask的逻辑。它还有两个默认实现,ScheduledCronTask表示基于cron表达式的方式执行,ScheduledSpecificInstantTask开启线程,等到指定时间点时自动执行。
当然它还提供取消此任务的方法。
ISchedule(IScheduledTask的抽象工厂)
它是用来创建IScheduledTask的,针对IScheduledTask不同实现,都配有一个对应的工厂类,这些工厂类都实现ISchedule,内部实现就返回对应类型的IScheduledTask。
- CronSchedule:创建基于cron表达式的任务ScheduledCronTask的工厂类
- RecurringSchedule:创建周期性任务ScheduledRecurringTask的工厂类
- SpecificInstantSchedule:创建指定时间点执行 任务ScheduledSpecificInstantTask的工厂类
IScheduler调度器(IScheduledTask任务容器)
创建出来的任务应该被管理,以便根据需要注销任务,可以把它看成任务IScheduledTask容器,默认实现是LocalScheduler,它创建任务后将任务存储在内存字典中,任务名称必须是全局唯一的,作为key,IScheduledTask实例作为值。
值得注意的是,它并不直接用依赖注入注入IScheduleI,因为ISchedule是抽象工厂,所以它是创建任务时,由调用方传入具体的ITask实现和对应的工厂具体类,比如要创建一个周期的任务,应该在调用方new一个具体ITask实现类,和一个具体的IScheduleI实现类,然后调用IScheduler的ScheduleAsync方法,这样内部的LocalScheduler就会安排好任务,并引用此任务。
后续根据需要,你可以调用IScheduler的ClearScheduleAsync方法来移除指定任务
本地任务调度小节
前面部分讲的基本上都是本地进程内调度,elsa引擎会使用它,你对elsa进行扩展也可以使用它,比如你自定义的Activity内部需要做任务调度时,可以注入IScheduler进行使用。下面部分是说专门针对工作流的调度器,它的默认实现会使用本地任务调度器。
IWorkflowScheduler工作流调度器
所谓的工作流调度器是指,在后台任务中,安排某个工作流在啥时候执行,有两个方面,
- 一个是可以恢复某个工作流实例执行,这个流程实例是之前执行卡住的,比如等待某个人审核,若超过3天没审核则自动继续执行。
- 另一个方面是指,是等待到指定时间点后执行?还是周期性的每隔多长时间自动执行?还是指定一个cron表达式,按此规则自动执行?
都是在后台安排工作流执行,可以用现有的三方框架来实现,比如:hangfire、quartz,本篇只讲elsa调度的执行原理,所以这两种实现不做分析,另外elsa也定义了默认实现DefaultWorkflowScheduler。它内部使用前面说的本地任务框架来安排任务。
这里以其中的,按指定时间点执行新流程和恢复之前的流程实例为例:
public class DefaultWorkflowScheduler(IScheduler scheduler) : IWorkflowScheduler { /// 执行一个全新流程 public async ValueTask ScheduleAtAsync(string taskName, ScheduleNewWorkflowInstanceRequest request, DateTimeOffset at, CancellationToken cancellationToken = default) { await scheduler.ScheduleAsync(taskName, new RunWorkflowTask(request), new SpecificInstantSchedule(at), cancellationToken); } ///恢复之前的流程实例继续执行 public async ValueTask ScheduleAtAsync(string taskName, ScheduleExistingWorkflowInstanceRequest request, DateTimeOffset at, CancellationToken cancellationToken = default) { var task = new ResumeWorkflowTask(request); var schedule = new SpecificInstantSchedule(at); await scheduler.ScheduleAsync(taskName, task, schedule, cancellationToken); }
可以看到,它首先创建ITask的实例,表示将来要执行的具体逻辑,上面代码中的RunWorkflowTask表示执行一个全新流程的任务,ResumeWorkflowTask表示恢复以前的指定流程实例执行。
然后创建一个具体的ISchedule的工厂类,在指定时间点执行任务的工厂类就是SpecificInstantSchedule。
最后调用调度器(也是任务容器)scheduler进行任务安排。
剩下的两组方法分别是按周期性执行和根据cron方式安排流程执行,其原理类似。
IBookmarkScheduler和ITriggerScheduler
针对周期性、cron表达式和指定时间点这几种情况,都有对应的触发器类型,针对这几种触发器内部会使用IWorkflowScheduler,进行流程任务安排,具体说明将在书签和触发器的专门章节说明。
总结
elsa中,后台安排任务有:周期性、cron表达式和指定时间点三种时间方式,安排的任务有两种类型,一种是恢复之前的指定流程实例恢复执行,另一种是安排某个类型的流程全新执行(产生新的流程实例)
流程安排是通过IWorkflowScheduler实现的,它有默认实现(基于进程内的本地任务调度器)、基于hangfire的实现和基于Quartz的实现。
默认流程调度器DefaultWorkflowScheduler使用基于进程内的本地任务调度器实现,这个调度器也可以用于我们自定义的Activity或其它代码,并不是仅用于流程任务安排。
标签:
相关文章
最新发布
- .NET 开发的分流抢票软件,不做广告、不收集隐私
- 深入理解ASP.NET Core 管道的工作原理
- DataV Note:让Jupyter Notebook绽放新活力
- Mysql身份认证过程
- .NET 9 new features-Microsoft.ML.Tokenizers 库
- 掌握设计模式--代理模式
- 前端实现 HTML 网页转 PDF 并导出
- 回顾 2024 年 12 个月的C#/.NET/.NET Core优秀项目和框架简报
- 《深入理解Mybatis原理》Mybatis中的缓存实现原理
- 互联网不景气了那就玩玩嵌入式吧,用纯.NET开发并制作一个智能桌面机器人(一):从.NET IoT入门开始