首页>行情>正文
.Net Core工作流WorkFlowCore
2023-04-13 09:30:51    来源:博客园
前言

WorkFlowCore是一个针对.NetCore的轻量级的工作流引擎,提供了FluentAPI、多任务、持久化以及并行处理的功能,适合于小型工作流、责任链的需求开发。支持工作流长期运行,提供了各种持久化方式。


(资料图)

本篇开发环境为.Net7,此处不演示Jsonyaml配置,详细文档请查看官方文档和项目源码地址

一、安装与基础使用

通过以下命令安装

Install-Package WorkflowCore

然后注入WorkFlowCore

builder.Services.AddWorkflow();

WorkFlowCore主要分为两部分:步骤工作流

步骤

多个步骤组成一个工作流,每个步骤都可以有输入并产生输出,这些输出可以传递回其所在的工作流。通过创建继承抽象类StepBody或StepBodyAsync的类,并且实现Run或RunAsync方法来定义步骤,很明显它们的区别是是否异步

public class FirstStepBody: StepBody    {        public override ExecutionResult Run(IStepExecutionContext context)        {            Console.WriteLine("Hello world!First");            return ExecutionResult.Next();        }    }

工作流

通过继承IWorkflow接口定义一个工作流,接口只有IdVersionBuild方法(内部可以执行多个步骤),工作流主机使用这些信息来标识工作流

public class MyWorkflow :IWorkflow    {        public string Id => "HelloWorld";        public int Version => 1;        public void Build(IWorkflowBuilder builder)        {            builder                .StartWith()                .Then();        }    }

工作流如果想使用必须在工作流主机中通过RegisterWorkflow()方法注册,并且通过Start()方法启动主机,当然也可以通过Stop()方法停止工作流。执行工作流需要使用StartWorkflow()方法,参数为工作流类的Id,如下

[ApiController]    [Route("[controller]")]    public class WeatherForecastController : ControllerBase    {        private readonly IWorkflowHost _workflowHost;        public WeatherForecastController(IWorkflowHost workflowHost)        {            _workflowHost = workflowHost;        }        [HttpGet(Name = "get")]        public ContentResult Get()        {            if (!_workflowHost.Registry.IsRegistered("HelloWorld",1))            {                _workflowHost.RegisterWorkflow();            }            _workflowHost.Start();            _workflowHost.StartWorkflow("HelloWorld");            //host.Stop();            return Content("ok");        }    }

当然也可以在构建web服务的时候统一注册,然后就可以直接执行啦

var host = app.Services.GetService();host.RegisterWorkflow();host.Start();
二、在步骤之间传递参数

每个步骤都是一个黑盒,因此它们支持输入和输出。这些输入和输出可以映射到一个数据类,该数据类定义与每个工作流实例相关的自定义数据。

以下示例显示了如何定义步骤的输入和输出,然后显示了如何使用内部数据的类型化类定义工作流,以及如何将输入和输出映射到自定义数据类的属性。

//步骤包含属性,并且计算    public class FirstStepBody: StepBody    {        public int Input1 { get; set; }        public int Input2 { get; set; }        public int Output { get; set; }        public override ExecutionResult Run(IStepExecutionContext context)        {            Output = Input1 + Input2;            Console.WriteLine(Output);            return ExecutionResult.Next();        }    }    //工作流包含输入输出的赋值    public class MyWorkflow :IWorkflow    {        public string Id => "HelloWorld";        public int Version => 1;        public void Build(IWorkflowBuilder builder)        {            builder                .StartWith()                .Input(step => step.Input1,data => data.Value1)                .Input(step => step.Input2, data => 100)                .Output(data => data.Answer, step => step.Output)                .Then()                .Input(step => step.Input1, data => data.Value1)                .Input(step => step.Input2, data => data.Answer)                .Output(data => data.Answer, step => step.Output);        }    }    //工作流的属性类    public class MyDataClass    {        public int Value1 { get; set; }        public int Value2 { get; set; }        public int Answer { get; set; }    }    //执行工作流传入参数    MyDataClass myDataClass = new MyDataClass();    myDataClass.Value1 = 100;    myDataClass.Value2 = 200;    //不传入myDataClass则每次执行都是新的数据对象    _workflowHost.StartWorkflow("HelloWorld", myDataClass);

从上述例子可以看到工作流可以定义一个初始的类作为参数传入,每个步骤可以有自己的属性字段去接收参数(可以是工作流类的字段,也可以是固定值),可以用Input方法传入,Output方法输出赋值。如果在工作流执行时不传入参数每次执行都是新的对象的默认值,比如在StartWorkflow方法中不传myDataClass,运行结果是100100,否则是200300

三、外部事件

工作流可以使用WaitFor方法进行等待,通过外部触发此事件,将事件产生的数据传递给工作流,并且让工作流继续执行下面的步骤。示例如下:

public class MyWorkflow :IWorkflow    {        //省略。。。。        public void Build(IWorkflowBuilder builder)        {            builder                .StartWith()                .Input(step => step.Input1,data => data.Value1)                .Input(step => step.Input2, data => 100)                .Output(data => data.Answer, step => step.Output)                .WaitFor("MyEvent",key => "EventKey")                .Output(data => data.Answer,step => step.EventData)                .Then()                .Input(step => step.Input1, data => data.Value1)                .Input(step => step.Input2, data => data.Answer)                .Output(data => data.Answer, step => step.Output);        }    }    //。。。    [HttpGet(Name = "get")]    public ContentResult Get()    {        MyDataClass myDataClass = new MyDataClass();        myDataClass.Value1 = 100;        myDataClass.Value2 = 200;        _workflowHost.StartWorkflow("HelloWorld", myDataClass);            return Content("ok");        }  [HttpPost(Name = "event")]  public ContentResult PublishEvent()  {    _workflowHost.PublishEvent("MyEvent", "EventKey", 200);    return Content("ok");  }

使用WaitFor方法可以使工作流等待监听指定事件的执行,有两个入参事件名称事件关键字。通过工作流主机去触发PublishEvent执行指定的事件,有三个入参触发事件名称触发事件关键字和事件参数

需要执行事件,工作流才会继续下一步,如下动图演示:

可以为等待事件设置有效时间,在有效时间之前执行事件是不会继续下一步流程的,只有当大于有效时间之后执行事件才会继续下一步步骤。如下代码设置,为工作流执行时间一天后执行事件才会继续执行,否则就等待不动。

WaitFor("MyEvent",key => "EventKey", data => DateTime.Now.AddDays(1))
四、活动

活动被定义为在工作流中可以被等待的外部工作队列中的步骤。

在本例中,工作流将等待活动activity-1,直到活动完成才继续工作流。它还将data.Value1的值传递给活动,然后将活动的结果映射到data.Value2

然后我们创建一个worker来处理活动项的队列。它使用GetPendingActivity方法来获取工作流正在等待的活动和数据。

//.....    builder    .StartWith()    .Input(step => step.Input1,data => data.Value1)    .Input(step => step.Input2, data => 100)    .Output(data => data.Answer, step => step.Output)    .Activity("activity-1", (data) => data.Value1)    .Output(data => data.Value2, step => step.Result)    .Then()    .Input(step => step.Input1, data => data.Value1)    .Input(step => step.Input2, data => data.Answer)    .Output(data => data.Answer, step => step.Output);    //....    [HttpPost(Name = "active")]   public ContentResult PublishEvent()   {    var activity = _workflowHost.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;    if (activity != null)    {      Console.WriteLine(activity.Parameters);      _workflowHost.SubmitActivitySuccess(activity.Token, 100);    }    return Content("ok");   }

活动可以看作一个等待的步骤可以传入参数和输出参数,和事件的区别是事件不能输入参数而是单纯的等待。

五、错误处理

每个步骤都可以配置自己的错误处理行为,可以在以后重试、挂起工作流或终止工作流。

public void Build(IWorkflowBuilder builder)    {        builder                            .StartWith()                .OnError(WorkflowErrorHandling.Retry,TimeSpan.FromMinutes(10))            .Then();    }六、流程控制

工作流的流程控制包括分支、循环等各种操作

决策分支

在工作流中定义多个独立分支,并根据表达式值选择满足条件的分支执行。

使用IWorkflowBuilderCreateBranch方法定义分支。然后我们可以使用branch方法选择一个分支。

选择表达式将与通过branch方法列出的分支相匹配,匹配的分支将安排执行。匹配多个分支将导致并行分支运行。

如果data.Value1的值为1,则此工作流将选择branch1,如果为2,则选择branch2

var branch1 = builder.CreateBranch()    .StartWith()        .Input(step => step.Message, data => "hi from 1")    .Then()        .Input(step => step.Message, data => "bye from 1");  var branch2 = builder.CreateBranch()    .StartWith()        .Input(step => step.Message, data => "hi from 2")    .Then()        .Input(step => step.Message, data => "bye from 2");  builder    .StartWith()    .Decide(data => data.Value1)        .Branch((data, outcome) => data.Value1 == "one", branch1)        .Branch((data, outcome) => data.Value1 == "two", branch2);

并行ForEach

使用ForEach方法启动并行for循环

public class ForEachWorkflow : IWorkflow  {      public string Id => "Foreach";      public int Version => 1;      public void Build(IWorkflowBuilder builder)      {          builder              .StartWith()              .ForEach(data => new List() { 1, 2, 3, 4 })                  .Do(x => x                      .StartWith()                          .Input(step => step.Message, (data, context) => context.Item)                      .Then())              .Then();      }          }

While循环

使用While方法启动while循环

public class WhileWorkflow : IWorkflow  {      public string Id => "While";      public int Version => 1;      public void Build(IWorkflowBuilder builder)      {          builder              .StartWith()              .While(data => data.Counter < 3)                  .Do(x => x                      .StartWith()                      .Then()                          .Input(step => step.Value1, data => data.Counter)                          .Output(data => data.Counter, step => step.Value2))              .Then();      }          }

If判断

使用If方法执行if判断

public class IfWorkflow : IWorkflow  {       public void Build(IWorkflowBuilder builder)      {          builder              .StartWith()              .If(data => data.Counter < 3).Do(then => then                  .StartWith()                      .Input(step => step.Message, data => "Value is less than 3")              )              .If(data => data.Counter < 5).Do(then => then                  .StartWith()                      .Input(step => step.Message, data => "Value is less than 5")              )              .Then();      }          }

并行

使用Parallel方法并行执行任务

public class ParallelWorkflow : IWorkflow  {      public string Id => "parallel-sample";      public int Version => 1;      public void Build(IWorkflowBuilder builder)      {          builder              .StartWith()              .Parallel()                  .Do(then =>                       then.StartWith()                          .Then()                  .Do(then =>                      then.StartWith()                          .Then()              .Join()              .Then();    }        }

Schedule

使用Schedule方法在工作流中注册在指定时间后执行的异步方法

builder    .StartWith(context => Console.WriteLine("Hello"))    .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule        .StartWith(context => Console.WriteLine("Doing scheduled tasks"))    )    .Then(context => Console.WriteLine("Doing normal tasks"));

Recur

使用Recure方法在工作流中设置一组重复的后台步骤,直到满足特定条件为止

builder    .StartWith(context => Console.WriteLine("Hello"))    .Recur(data => TimeSpan.FromSeconds(5), data => data.Counter > 5).Do(recur => recur        .StartWith(context => Console.WriteLine("Doing recurring task"))    )    .Then(context => Console.WriteLine("Carry on"));
七、Saga transaction

saga允许在saga transaction中封装一系列步骤,并为每一个步骤提供补偿步骤,使用CompensateWith方法在对应的步骤后面添加补偿步骤,补偿步骤将会在步骤抛出异常的时候触发。

如下示例,步骤Task2如果抛出一个异常,那么补偿步骤UndoTask2UndoTask1将被触发。

builder    .StartWith(context => Console.WriteLine("Begin"))    .Saga(saga => saga        .StartWith()            .CompensateWith()        .Then()            .CompensateWith()        .Then()            .CompensateWith()    )        .CompensateWith()    .Then(context => Console.WriteLine("End"));

也可以指定重试策略,在指定时间间隔后重试。

builder    .StartWith(context => Console.WriteLine("Begin"))    .Saga(saga => saga        .StartWith()            .CompensateWith()        .Then()            .CompensateWith()        .Then()            .CompensateWith()    )    .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))    .Then(context => Console.WriteLine("End"));
八、持久化

可以使用RedisMongdbSqlserver等持久化,具体可以看文档,此处使用Redis,先安装nuget

Install-Package WorkflowCore.Providers.Redis

然后注入就可以了

builder.Services.AddWorkflow(cfg =>{    cfg.UseRedisPersistence("localhost:6379", "app-name");    cfg.UseRedisLocking("localhost:6379");    cfg.UseRedisQueues("localhost:6379", "app-name");    cfg.UseRedisEventHub("localhost:6379", "channel-name");    //cfg.UseMongoDB(@"mongodb://mongo:27017", "workflow");    //cfg.UseElasticsearch(new ConnectionSettings(new Uri("http://elastic:9200")), "workflows");});

运行打开可以看到

关键词:

.Net Core工作流WorkFlowCore

前言WorkFlowCore是一个针对 NetCore的轻量级的工作流引擎,提供了FluentAPI、多任务、持久化以及并行处理的功能,适合于小型工作流、责任链的2023-04-13

欧佩克秘书长会见俄罗斯代表 双方表示将加强能源合作-看热讯

当地时间12日,石油输出国组织(欧佩克)秘书长海赛姆·盖斯会见了俄罗斯驻奥地利大使兼常驻维也纳国际组织代表米哈伊尔·乌里扬诺夫,双方讨2023-04-13

每日信息:监管趋严 私募管理人登记“降温”

有债券私募人士表示,在上报产品时,收到监管反馈,要求限制单一债券的投资比例等2023-04-13

云梦县气象台发布雷电黄色预警【III级/较重】【2023-04-13】

云梦县气象台2023年04月13日03时03分发布雷电黄色预警信号:预计未来6小时,云梦地区部分乡镇将有雷电活动,局部雨量10-30毫米,阵风6-8级,请注意防2023-04-13

18年,浙江男孩下课误伤女同学,被其父亲连捅12刀,法院判了 当前聚焦

18年,浙江男孩下课误伤女同学,被其父亲连捅12刀,法院判了,违法犯罪2023-04-13

大美新疆丨来巴里坤高家湖湿地 一览天高“海”阔!

大河网讯近年来,巴里坤哈萨克自治县一直利用“旅游+生态”模式发展乡村旅游。这里有着新疆东部保存最完整的湿地生态系统——2023-04-12

黑暗之魂3特大剑怎么用 黑暗之魂3特大剑流加点

在黑暗之魂3游戏中,有很多的玩家喜欢使用特大剑,毕竟特大剑的武器伤害非常之高。今天小编就为大家带来了一篇关于游戏内特大剑的评测及其推荐2023-04-12

海上反恐题材影片《深海危机》4月15日上映

中国青年报客户端讯(中青报·中青网记者沈杰群)日前,中国首部海上反恐题材影片《深海危机》在北京举行首映礼。影片讲述海军2023-04-12

2023年成都夏天时间长吗

要说一年中大家最为关注的除了各种节假日剩下的就是天气了,这不时下已经进入了春末,很多地方的温度都出现了阶梯状的上升,成都也不例外,大2023-04-12

城市观察|“优待志愿者”盼更实的措施_新消息

北京日报客户端|记者王琪鹏张楠在北京当志愿者,都有哪些福利?根据现行《北京市志愿服务促进条例》,本市鼓励企业在同2023-04-12

黑龙江颈动脉狭窄治疗哪做的好-颈动脉狭窄的早期症状

动脉硬化所致颈动脉狭窄主要发生于中老年人,往往与多种心血管危险因素有关。头臂大动脉炎所致颈动脉狭窄常见于青少年,特别是年轻女性。颈动2023-04-12

每日速读!铜库存

4月12日LME铜库存减5475吨至56800吨。4月12日LME铝库存增14725吨至5274502023-04-12

【全球报资讯】纽威数控(688697)4月12日主力资金净卖出80.13万元

截至2023年4月12日收盘,纽威数控(688697)报收于28 74元,上涨2 9%,换手率5 26%,成交量4 13万手,成交额1 17亿元。2023-04-12

163相册

1、在相册中可以设立独立的域名。2、用户的相册拥有可独立访问的域名。3、非常便于分享!。本文(163相册,关于163相册2023-04-12

低阻抗喷油器阻值一般为多少欧_昆明民办高中排名一览表

1、高中是对孩子发展最有影响的时期,选择好的高中对孩子的将来是很有帮助的,我家的孩子现在在昆三中空港实验学校念高二呢。22023-04-12

伊拉克战争20周年丨假面掉落时 谎言终被戳穿-全球最新

(点击图片查看详情)入侵伊拉克前,美国声称将“解放伊拉克人民”“帮伊拉克人建立一个团结、稳定和自由的国家”。然而,谎言终究会被戳穿。2023-04-12

隆基绿能5亿元高溢价增资氢能子公司

4月10日晚,隆基绿能公告称,拟通过全资子公司西安隆基绿能投资有限公司向后者控股子公司西安隆基氢能科技有限公司(“隆基氢能”)增资4 6亿元2023-04-12

三星手机双卡_三星s4双卡双待 焦点短讯

1、尊敬的三星用户您好:根据您的描述:S4系列手机包括I9500、I9502、I9508、I959、I9507V、I952023-04-12

欧科亿营收、净利增速创三年来最低 数控刀具、海外市场成突破口_环球观速讯

这是过去三年里,欧科亿营收、净利润增速最低的一年,公司2021年曾实现营收增长41 04%,净利润涨超100%。2023-04-12

今日快讯:高考语文试卷作文押题素材

1、高考语文试卷作文2021年高考语文考试已经结束,同学们都十分关心试卷试题和答案。本文将为大家奉上全国一卷、全国二卷、全国三卷、四川、辽2023-04-12

资讯推荐:国家港口能源物流产业计量测试中心落户广州南沙

国家港口能源物流产业计量测试中心落户广州南沙国家港口能源物流产业计量测试中心(下称“物流计量中心”)揭牌仪式11日在广州南沙举行。据介绍2023-04-12

中国船舶:融资净买入8879.23万元,融资余额21.64亿元(04-11)

2023年4月11日中国船舶融资净买入8879 23万元,融资余额21 64亿元2023-04-12

热议:智通全球财经日志|4月12日

全球财经日志|2023年4月12日2023-04-12

我,16岁就打工,干过工地摆过摊,现在非洲带黑人干活,月入两万|每日视讯

这是我们讲述的第1884位真人故事我叫马海军,14岁辍学,16岁开始打工。我干过工地卖过馒头,还在火柴厂拣选过火柴杆。18岁后学会了开旋切机切2023-04-12

全球快播:女高管不服从工作安排被炒获赔98万 法院判决其未违反劳动纪律

福州一名梁姓员工与公司发生劳动纠纷。梁某自1995年起在该公司工作,曾担任文员、副经理、分公司经理等职位,签订了无固定期限的劳动合同。公2023-04-11