深入解析BPMN2.0四大网关:Activiti/Flowable实战与日志分析
在业务流程管理领域,BPMN2.0标准中的网关(Gateways)是控制流程走向的核心元素。本文将基于Spring Boot环境,通过Activiti/Flowable引擎的实际代码演示,结合执行日志和数据库表变化,深度剖析排他网关、并行网关、包容网关和基于事件网关的工作原理。不同于单纯的概念讲解,我们将从工程实践角度,用可复现的测试案例展示每种网关的分支合并行为。
1. 环境准备与基础配置
在开始网关实验前,我们需要准备一个标准的Spring Boot项目集成Activiti/Flowable。以下是Maven依赖配置示例:
<dependencies> <dependency> <groupId>org.activiti</groupId> <artifactId>activiti-spring-boot-starter</artifactId> <version>7.1.0.M6</version> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> </dependencies>为方便观察网关行为,建议在application.properties中开启调试日志:
logging.level.org.activiti=DEBUG创建基础测试类模板,后续所有网关测试都将继承此类:
@SpringBootTest public abstract class GatewayTestBase { @Autowired private RuntimeService runtimeService; @Autowired private TaskService taskService; @Autowired private RepositoryService repositoryService; protected ProcessInstance startProcess(String processDefinitionKey, Map<String, Object> variables) { return runtimeService.startProcessInstanceByKey(processDefinitionKey, variables); } protected void deployProcess(String bpmnFile) { repositoryService.createDeployment() .addClasspathResource(bpmnFile) .deploy(); } }2. 排他网关(Exclusive Gateway)实战解析
排他网关是最常用的决策节点,其特点是仅选择一条符合条件的路径。我们构建一个订单审核流程来演示其特性:
<process id="exclusiveGatewayDemo"> <startEvent id="start"/> <sequenceFlow id="flow1" sourceRef="start" targetRef="exclusiveGw"/> <exclusiveGateway id="exclusiveGw" name="Approval Gateway"/> <sequenceFlow id="flow2" sourceRef="exclusiveGw" targetRef="approveTask"> <conditionExpression xsi:type="tFormalExpression"> ${order.amount < 1000} </conditionExpression> </sequenceFlow> <sequenceFlow id="flow3" sourceRef="exclusiveGw" targetRef="managerApproveTask"> <conditionExpression xsi:type="tFormalExpression"> ${order.amount >= 1000 && order.amount < 5000} </conditionExpression> </sequenceFlow> <sequenceFlow id="flow4" sourceRef="exclusiveGw" targetRef="directorApproveTask"> <conditionExpression xsi:type="tFormalExpression"> ${order.amount >= 5000} </conditionExpression> </sequenceFlow> <!-- 各审批任务定义省略 --> </process>关键测试案例展示不同金额订单的路由逻辑:
public class ExclusiveGatewayTest extends GatewayTestBase { @Test public void testLowAmountApproval() { Map<String, Object> vars = new HashMap<>(); vars.put("order", new Order(800)); // 测试金额800元 ProcessInstance pi = startProcess("exclusiveGatewayDemo", vars); Task task = taskService.createTaskQuery().processInstanceId(pi.getId()).singleResult(); assertEquals("Approve Task", task.getName()); // 验证进入普通审批 } @Test public void testHighAmountApproval() { Map<String, Object> vars = new HashMap<>(); vars.put("order", new Order(6000)); // 测试金额6000元 ProcessInstance pi = startProcess("exclusiveGatewayDemo", vars); Task task = taskService.createTaskQuery().processInstanceId(pi.getId()).singleResult(); assertEquals("Director Approve Task", task.getName()); // 验证进入总监审批 } }观察引擎日志可以发现关键行为:
DEBUG o.a.e.i.bpmn.behavior.ExclusiveGatewayActivityBehavior - Leaving exclusive gateway 'exclusiveGw' with sequence flow 'flow4'数据库表ACT_RU_EXECUTION的变化显示,无论选择哪条路径,都只会有一条执行记录继续流动。
提示:排他网关的条件表达式应互斥,否则引擎会选择XML中第一个符合条件的路径
3. 并行网关(Parallel Gateway)并发机制揭秘
并行网关用于创建同步的并发路径,典型应用场景如订单处理中的支付与发货并行:
<process id="parallelGatewayDemo"> <startEvent id="start"/> <sequenceFlow id="flow1" sourceRef="start" targetRef="fork"/> <parallelGateway id="fork"/> <sequenceFlow id="flow2" sourceRef="fork" targetRef="paymentTask"/> <sequenceFlow id="flow3" sourceRef="fork" targetRef="shippingTask"/> <userTask id="paymentTask" name="Receive Payment"/> <userTask id="shippingTask" name="Ship Order"/> <sequenceFlow id="flow4" sourceRef="paymentTask" targetRef="join"/> <sequenceFlow id="flow5" sourceRef="shippingTask" targetRef="join"/> <parallelGateway id="join"/> <sequenceFlow id="flow6" sourceRef="join" targetRef="archiveTask"/> <userTask id="archiveTask" name="Archive Order"/> </process>测试代码验证并行执行特性:
public class ParallelGatewayTest extends GatewayTestBase { @Test public void testForkJoinBehavior() { ProcessInstance pi = startProcess("parallelGatewayDemo", new HashMap<>()); // 验证同时创建两个任务 List<Task> tasks = taskService.createTaskQuery() .processInstanceId(pi.getId()) .orderByTaskName() .asc() .list(); assertEquals(2, tasks.size()); assertEquals("Receive Payment", tasks.get(0).getName()); assertEquals("Ship Order", tasks.get(1).getName()); // 模拟完成支付任务 taskService.complete(tasks.get(0).getId()); // 此时归档任务仍未创建 assertNull(taskService.createTaskQuery() .processInstanceId(pi.getId()) .taskName("Archive Order") .singleResult()); // 完成发货任务后,归档任务出现 taskService.complete(tasks.get(1).getId()); assertNotNull(taskService.createTaskQuery() .processInstanceId(pi.getId()) .taskName("Archive Order") .singleResult()); } }数据库观察发现,当流程到达fork网关时,ACT_RU_EXECUTION表会新增两条记录:
| ID | PROC_INST_ID_ | ACT_ID_ | IS_ACTIVE_ |
|---|---|---|---|
| 1 | 1001 | fork | 0 |
| 2 | 1001 | paymentTask | 1 |
| 3 | 1001 | shippingTask | 1 |
日志中的关键条目:
DEBUG o.a.e.i.bpmn.behavior.ParallelGatewayActivityBehavior - Forking execution for parallel gateway 'fork' into 2 branches4. 包容网关(Inclusive Gateway)动态路由剖析
包容网关结合了排他与并行网关的特性,允许动态决定并行路径数量。典型应用如多条件审批流程:
<process id="inclusiveGatewayDemo"> <startEvent id="start"/> <sequenceFlow id="flow1" sourceRef="start" targetRef="inclusiveGw"/> <inclusiveGateway id="inclusiveGw" name="Check Requirements"/> <sequenceFlow id="flow2" sourceRef="inclusiveGw" targetRef="legalReview"> <conditionExpression xsi:type="tFormalExpression"> ${contract.needLegalReview} </conditionExpression> </sequenceFlow> <sequenceFlow id="flow3" sourceRef="inclusiveGw" targetRef="financeReview"> <conditionExpression xsi:type="tFormalExpression"> ${contract.value > 10000} </conditionExpression> </sequenceFlow> <sequenceFlow id="flow4" sourceRef="inclusiveGw" targetRef="techReview"> <conditionExpression xsi:type="tFormalExpression"> ${contract.hasTechnicalClause} </conditionExpression> </sequenceFlow> <!-- 各审核任务定义省略 --> <inclusiveGateway id="joinGw"/> <sequenceFlow id="flow5" sourceRef="legalReview" targetRef="joinGw"/> <sequenceFlow id="flow6" sourceRef="financeReview" targetRef="joinGw"/> <sequenceFlow id="flow7" sourceRef="techReview" targetRef="joinGw"/> <sequenceFlow id="flow8" sourceRef="joinGw" targetRef="finalizeTask"/> </process>测试案例展示不同场景下的路由组合:
public class InclusiveGatewayTest extends GatewayTestBase { @Test public void testSinglePath() { Contract contract = new Contract(); contract.setNeedLegalReview(true); Map<String, Object> vars = new HashMap<>(); vars.put("contract", contract); ProcessInstance pi = startProcess("inclusiveGatewayDemo", vars); List<Task> tasks = taskService.createTaskQuery() .processInstanceId(pi.getId()) .list(); assertEquals(1, tasks.size()); assertEquals("Legal Review", tasks.get(0).getName()); } @Test public void testMultiplePaths() { Contract contract = new Contract(); contract.setValue(15000); contract.setHasTechnicalClause(true); Map<String, Object> vars = new HashMap<>(); vars.put("contract", contract); ProcessInstance pi = startProcess("inclusiveGatewayDemo", vars); List<Task> tasks = taskService.createTaskQuery() .processInstanceId(pi.getId()) .orderByTaskName() .asc() .list(); assertEquals(2, tasks.size()); assertEquals("Finance Review", tasks.get(0).getName()); assertEquals("Technical Review", tasks.get(1).getName()); } }日志中可以看到条件计算过程:
DEBUG o.a.e.i.bpmn.behavior.InclusiveGatewayActivityBehavior - Evaluating condition on sequence flow 'flow3': ${contract.value > 10000} -> true5. 基于事件的网关(Event-based Gateway)异步处理机制
基于事件的网关适用于需要等待外部事件触发的场景,如订单取消或超时处理:
<process id="eventGatewayDemo"> <startEvent id="start"/> <sequenceFlow id="flow1" sourceRef="start" targetRef="eventGw"/> <eventBasedGateway id="eventGw"/> <sequenceFlow id="flow2" sourceRef="eventGw" targetRef="cancelEvent"/> <sequenceFlow id="flow3" sourceRef="eventGw" targetRef="timeoutEvent"/> <intermediateCatchEvent id="cancelEvent"> <messageEventDefinition messageRef="orderCancelMsg"/> </intermediateCatchEvent> <intermediateCatchEvent id="timeoutEvent"> <timerEventDefinition> <timeDuration>PT2H</timeDuration> </timerEventDefinition> </intermediateCatchEvent> <sequenceFlow id="flow4" sourceRef="cancelEvent" targetRef="handleCancel"/> <sequenceFlow id="flow5" sourceRef="timeoutEvent" targetRef="handleTimeout"/> <!-- 各处理任务定义省略 --> </process>测试案例展示事件触发机制:
public class EventGatewayTest extends GatewayTestBase { @Test public void testMessageEventTrigger() throws InterruptedException { ProcessInstance pi = startProcess("eventGatewayDemo", new HashMap<>()); // 初始状态无任务 assertNull(taskService.createTaskQuery() .processInstanceId(pi.getId()) .singleResult()); // 模拟发送取消消息 runtimeService.messageEventReceived("orderCancelMsg", pi.getId()); // 验证处理取消的任务被创建 Task task = taskService.createTaskQuery() .processInstanceId(pi.getId()) .singleResult(); assertEquals("Handle Cancellation", task.getName()); } @Test public void testTimerEventTrigger() { // 需要配置时间偏移测试 } }引擎日志显示事件订阅创建:
DEBUG o.a.e.i.bpmn.behavior.EventBasedGatewayActivityBehavior - Created event subscription for MessageEventDefinition with message 'orderCancelMsg'数据库表ACT_RU_EVENT_SUBSCR会记录当前等待的事件订阅。