diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/PercentageOfParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/PercentageOfParallelExecutor.java index 181613b270f18cd525b3567687f2b864da0b1215..bb27c59018702874d4fff36873c3bd05ac202000 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/PercentageOfParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/PercentageOfParallelExecutor.java @@ -1,12 +1,13 @@ package com.yomahub.liteflow.flow.parallel.strategy; +import cn.hutool.core.collection.CollUtil; import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.parallel.WhenFutureObj; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.atomic.AtomicInteger; /** * 完成指定阈值任务 @@ -22,6 +23,8 @@ public class PercentageOfParallelExecutor extends ParallelStrategyExecutor { // 获取所有 CompletableFuture 任务 List> whenAllTaskList = this.getWhenAllTaskList(whenCondition, slotIndex); + if (CollUtil.isEmpty(whenAllTaskList)) return; + int total = whenAllTaskList.size(); // 计算阈值数量(向上取整),为 0 时取 1,表示只等待一个完成,即 any @@ -34,15 +37,13 @@ public class PercentageOfParallelExecutor extends ParallelStrategyExecutor { CompletableFuture thresholdFuture = new CompletableFuture<>(); // 原子计数器 - LongAdder completedCount = new LongAdder(); + AtomicInteger completedCount = new AtomicInteger(); // 为每个任务添加回调 whenAllTaskList.forEach(future -> future.whenComplete((result, ex) -> { // 计数 +1 - completedCount.increment(); - - int currentCount = completedCount.intValue(); + int currentCount = completedCount.incrementAndGet(); if (currentCount <= thresholdCount) { // 添加已完成任务 @@ -50,7 +51,7 @@ public class PercentageOfParallelExecutor extends ParallelStrategyExecutor { } // 达到阈值时触发门闩(确保只触发一次) - if (currentCount >= thresholdCount && !thresholdFuture.isDone()) { + if (currentCount == thresholdCount) { thresholdFuture.complete(null); } })