# OSS.PipeLine **Repository Path**: osscore/OSS.PipeLine ## Basic Information - **Project Name**: OSS.PipeLine - **Description**: 流式事件处理,微服务下业务生命周期管理,强化业务的流程管理,建立业务操作边界,打造标准化的业务执行单元,提高代码复用。 - **Primary Language**: Unknown - **License**: GPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 4 - **Forks**: 3 - **Created**: 2021-06-01 - **Last Updated**: 2026-04-02 ## Categories & Tags **Categories**: process-engine **Tags**: None ## README # OSS.PipeLine 轻量级 .NET 流程引擎与数据管道库,用于构建异步数据处理管道和业务流程。 ## 项目组成 | 库 | 版本 | 描述 | |---|---|---| | **OSS.DataPipe** | v3.5.1 | 轻量级消息管道库,支持生产-消费模式和自动重试 | | **OSS.Pipeline** | v3.6.2 | 轻量级流程引擎,提供 Activity、Branch、Convertor、Enumerator 组件 | ## 架构关系 ``` ┌─────────────────────────────────────────────────────────────┐ │ OSS.Pipeline │ │ (Workflow Engine - Activity, Branch, Convertor, Enumerator)│ └───────────────────────┬─────────────────────────────────────┘ │ depends on ▼ ┌─────────────────────────────────────────────────────────────┐ │ OSS.DataPipe │ │ (Message Pipe - Producer/Consumer with Retry Support) │ └─────────────────────────────────────────────────────────────┘ ``` ## 安装 ```bash # NuGet 安装 Install-Package OSS.DataPipe Install-Package OSS.Pipeline ``` --- ## OSS.DataPipe 使用指南 ### 核心接口 | 接口 | 描述 | |------|------| | `IDataProducer` | 数据生产者,通过 `Push()` 推送数据 | | `IDataConsumer` | 数据消费者,通过 `Pop()` 消费数据 | | `IDataPipeProvider` | 扩展接口,支持自定义队列实现 | ### 基本使用 ```csharp using OSS.DataPipe; // 创建数据生产者 var producer = DataPipeFactory.CreateProducer(async (data) => { Console.WriteLine($"处理数据: {data}"); return true; // 返回 true 表示成功,false 会触发重试 }); // 推送数据 await producer.Push("测试数据"); // 重要:使用完毕后必须释放资源,防止内存泄漏 DataPipeFactory.Release(producer); ``` ### 配置重试策略 ```csharp var policy = new RetryPolicy { RetryTimes = 3, // 最大重试次数 AutoReleaseDataPipe = true, // 自动释放管道 SourceCode = "MyRetryTask" // 数据源标识 }; var producer = DataPipeFactory.CreateProducer( async (data) => { // 业务逻辑 return true; }, policy); // 传入重试策略 ``` ### 自定义队列提供器 实现 `IDataPipeProvider` 接口可集成 RabbitMQ、Kafka 等消息队列: ```csharp public class RabbitMqPipeProvider : IDataPipeProvider { public IDataProducer? CreateProducer( IDataConsumer consumer, DataPipeOption option) { if (option.SourceCode == "RabbitMQ") return new RabbitMqProducer(consumer); return null; // 返回 null 使用默认内存队列 } public bool Release(IDataProducer producer) => true; } // 全局注册 DataPipeFactory.PipeProvider = new RabbitMqPipeProvider(); ``` ### RetryProcessor 直接使用 ```csharp using OSS.DataPipe.Event; // 方式1:使用扩展方法 var processor = new Func>(async (input) => { return await ProcessAsync(input); }) .ToRetryProcessor( new RetryPolicy(3, true), // 重试3次,失败后释放 finallyFailedFunc: (input, result) => { Console.WriteLine($"处理失败,已重试 {result.executed_times} 次"); return Task.CompletedTask; } ); var result = await processor.Process("输入数据"); // 方式2:实现 IRetryEvent 接口 public class MyRetryEvent : IRetryEvent { public async Task> Execute(string para, CancellationToken ct) { var result = await DoWorkAsync(para); return new EventResult(result); // 成功 // return new EventResult(true, "错误信息"); // 需要重试 } public Task FinallyFailed(string para, RetryOutput result, CancellationToken ct) => Task.CompletedTask; public Task FinallySucceeded(string para, string result, CancellationToken ct) => Task.CompletedTask; } ``` --- ## OSS.Pipeline 使用指南 ### 组件类型 | 组件 | PipeType | 描述 | |------|----------|------| | **Activity** | `Activity` | 基本执行单元,处理业务逻辑 | | **BranchGateway** | `BranchGateway` | 条件分支,根据条件路由到不同路径 | | **Convertor** | `Convertor` | 类型转换器,转换数据类型 | | **Enumerator** | `Enumerator` | 枚举器,将集合展开为单个元素 | ### 核心接口 | 接口 | 描述 | |------|------| | `IPipeHead` | 流程入口,接收初始输入 | | `IPipeTail` | 流程出口,产生最终输出 | | `IPipeConnector` | 可连接组件 | | `IPipeLine` | 完整流程 | | `IPipeMonitor` | 流程监控 | ### 构建简单流程 ```csharp using OSS.Pipeline; // 方式1:显式指定头尾组件 var step1 = new SimpleActivity("步骤1", async (input) => { return $"{input} -> 步骤1完成"; }); var step2 = step1.Append("步骤2", async (input) => { return $"{input} -> 步骤2完成"; }); var pipeline = PipelineBuilder.Build("简单流程", step1, step2); var result = await pipeline.Run("开始"); ``` ### 使用流畅 API 构建 ```csharp // 方式2:使用定义函数(推荐) var pipeline = PipelineBuilder.Build( "订单处理流程", new SimpleActivity("验证订单", async (order) => { return await ValidateOrderAsync(order); }), start => start .Append("检查库存", async (order) => await CheckInventoryAsync(order)) .Append("扣减库存", async (order) => await DeductInventoryAsync(order)) .Append("创建支付", async (order) => await CreatePaymentAsync(order)) ); await pipeline.Run(order); ``` ### 内联定义头组件 ```csharp // 方式3:内联定义头组件 var pipeline = PipelineBuilder.Build( "订单流程", "验证订单", // 头组件名称 async (order) => ValidateOrderAsync(order), // 头组件逻辑 start => start .Append("处理订单", async (order) => order) ); await pipeline.Run(order); ``` ### 分支流程 ```csharp // 创建分支网关 var branchGate = new BranchPipe("审核结果分支"); // 连接到主流程 createActivity .Append(reduceInventoryActivity) .Append(auditActivity) .Append(branchGate); // 分支1:审核通过 branchGate.Append( result => result.IsSuccess(), // 条件 new NotifyEmailActivity()); // 满足条件时执行 // 分支2:审核失败 branchGate.Append( "退回库存", result => !result.IsSuccess(), // 条件 async (result) => { await ReturnInventoryAsync(result); return new NotifyResult("已退回库存"); }); var pipeline = PipelineBuilder.Build("订单流程", createActivity, tailActivity); ``` ### 类型转换器 ```csharp var convertor = orderActivity.Append( order => new OrderDto { Id = order.Id, Amount = order.Amount }, "OrderToDto"); ``` ### 配置组件重试 ```csharp var activity = new SimpleActivity("外部API", async (input) => { return await CallExternalApiAsync(input); }) .SetRetryPolicy(new RetryPolicy { RetryTimes = 3, AutoReleaseDataPipe = true }); ``` ### 流程监控 ```csharp public class MyMonitor : IPipeMonitor { public async Task Monitor(MonitorDataItem data) { Console.WriteLine($"[{data.stage}] {data.pipe_code}"); // data.pipe_type - 组件类型 // data.input - 输入参数 // data.output - 输出结果 // data.executed_times - 执行次数 // data.exception - 异常信息 } } pipeline.SetMonitor(new MyMonitor()); ``` ### 自定义 Activity 组件 ```csharp public class CreateOrderActivity : BaseActivity { public CreateOrderActivity() : base("CreateOrder") { } protected override async Task> Executing( CreateOrderReq para, CancellationToken ct) { var order = await _orderService.CreateAsync(para); return new EventResult(order); // 成功 // return new EventResult(true, "创建失败"); // 需要重试 } protected override Task FinallySucceeded(CreateOrderReq para, OrderInfo result, CancellationToken ct) { // 成功后的处理 return Task.CompletedTask; } protected override Task FinallyFailed(CreateOrderReq para, RetryOutput result, CancellationToken ct) { // 最终失败后的处理 return Task.CompletedTask; } } ``` ### 被动 Activity 被动 Activity 不需要输入参数,通过 `AppendPassive` 添加到分支: ```csharp branchGate.AppendPassive("清理缓存", async (result) => { await ClearCacheAsync(result); return result; }); ``` --- ## 应用场景 | 场景 | DataPipe | Pipeline | |------|:--------:|:--------:| | 消息队列处理 | ✓ | | | 事件驱动架构 | ✓ | ✓ | | 订单处理流程 | | ✓ | | 审批工作流 | | ✓ | | 数据同步/ETL | ✓ | ✓ | | 任务编排 | | ✓ | --- ## 构建与测试 ```bash # 构建解决方案 dotnet build src/OSS.PipeLine.sln # 运行测试示例 dotnet run --project src/Tests/OSS.Pipeline.ConsoleTests/OSS.Pipeline.ConsoleTests.csproj ``` ## 目录结构 ``` src/ ├── OSS.DataPipe/ # 消息管道库 │ ├── Interface/ # 核心接口 │ ├── DefaultQueue/ # 默认内存队列 │ └── Event/ # 重试系统 ├── OSS.Pipeline/ # 流程引擎库 │ ├── Base/ # 基础抽象 │ ├── Pipeline/ # 流程构建器 │ └── Pipes/ # 组件实现 │ ├── Activity/ # 活动组件 │ ├── Branch/ # 分支组件 │ ├── Convertor/ # 转换器组件 │ └── Enumerator/ # 枚举器组件 └── Tests/ # 测试项目 ``` ## 详细文档 | 文档 | 描述 | |------|------| | [OSS.DataPipe 详细文档](docs/DataPipe.md) | 核心组件、重试机制、自定义队列实现 | | [OSS.Pipeline 详细文档](docs/Pipeline.md) | 组件类型、流程构建、监控配置 | ## 许可证 MIT License