首页 > 基础资料 博客日记

封装CompletionService的并发任务分发器(优化版)

2025-06-10 15:30:04基础资料围观18

这篇文章介绍了封装CompletionService的并发任务分发器(优化版),分享给大家做个参考,收藏Java资料网收获更多编程知识

这个框架代码用了很长时间,使用场景也挺多,初衷是简化CompletionService的编程接口,尽量减少业务代码处的感知。
今天找deepseek做了一版优化,优化点:

  • 整体的超时控制
  • 超时、异常处理和封装
  • 取消未完成的任务

核心代码

public class TaskDispatcher<T> {

    private final CompletionService<T> completionService;

    /**
     * 待处理任务
     */
    private final Set<Future<T>> pending = Sets.newHashSet();

    /**
     * 超时时间, 单位: s
     */
    private long timeout = 10000;

    public TaskDispatcher(Executor executor, long timeout) {
        completionService = new ExecutorCompletionService<>(executor);
        if (timeout > 0) {
            this.timeout = timeout;
        }
    }

    public void submit(Callable<T> task) {
        Future<T> future = completionService.submit(task);
        pending.add(future);
    }

    /**
     * 仅获取执行的任务结果
     *
     * @param ignoreException 忽略执行时发生的异常
     * @return
     */
    public List<T> taskCompletedResult(boolean ignoreException) {
        List<TaskResult<T>> taskResultList = taskCompleted();
        List<T> res = Lists.newArrayList();
        if (CollectionUtils.isEmpty(taskResultList)) {
            return res;
        }
        boolean hasError = false;
        for (TaskResult<T> taskResult : taskResultList) {
            if (!taskResult.isTimeout() && taskResult.getError() == null) {
                res.add(taskResult.getValue());
            } else if (taskResult.isTimeout() && !ignoreException) {
                LoggerUtils.error("执行任务时超时");
                hasError = true;
            } else if (taskResult.getError() != null && !ignoreException) {
                LoggerUtils.error("执行任务时发生异常", taskResult.getError());
                hasError = true;
            }
        }
        if (hasError) {
            throw new ZHException("任务并发处理时发生异常");
        }
        return res;
    }

    /**
     * 获取执行的任务
     *
     * @return
     */
    public List<TaskResult<T>> taskCompleted() {
        long deadline = System.currentTimeMillis() + timeout;
        List<TaskResult<T>> results = Lists.newArrayList();
        int totalTasks = pending.size();

        try {
            for (int i = 0; i < totalTasks; i++) {
                long remaining = Math.max(0, deadline - System.currentTimeMillis());
                Future<T> future = completionService.poll(remaining, TimeUnit.MILLISECONDS);
                TaskResult<T> result = new TaskResult<>();
                if (future == null) {
                    result.setTimeout(true);
                } else {
                    pending.remove(future);
                    try {
                        result.setValue(future.get());
                    } catch (ExecutionException e) {
                        result.setError(e.getCause());
                    }
                }
                results.add(result);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("任务结果收集中断", e);
        } finally {
            pending.forEach(f -> f.cancel(true));
            pending.clear();
        }
        return results;
    }

    @Data
    static class TaskResult<T> {
        private T value;
        private Throwable error;
        private boolean isTimeout;
    }
}

需要自己声明线程池bean,使用方式如下

        TaskDispatcher<Integer> taskDispatcher = new TaskDispatcher<Integer>(threadExecutor, TIME_OUT);
        for (long index: indexList) {
            taskDispatcher.submit(() -> xxxService.count(index));
        }

为了便于在计数求和场景使用,进一步实现了一个子类

public class IntSumTaskDispatcher extends TaskDispatcher<Integer> {
    public IntSumTaskDispatcher(Executor executor, long timeout, boolean throwException) {
        super(executor, timeout);
    }

    /**
     * 对所有结果求和
     *
     * @return
     */
    public int takeCompletedSum() {
        List<Integer> countResList = taskCompletedResult(true);
        int count = 0;
        for (Integer countSingle : countResList) {
            if (countSingle == null) {
                continue;
            }
            count += countSingle;
        }
        return count;
    }
}


文章来源:https://www.cnblogs.com/wuyuegb2312/p/18920444
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐

标签云