首页 > 基础资料 博客日记
封装CompletionService的并发任务分发器(优化版)
2025-06-10 15:30:04基础资料围观18次
这篇文章介绍了封装CompletionService的并发任务分发器(优化版),分享给大家做个参考,收藏Java资料网收获更多编程知识
这个框架代码用了很长时间,使用场景也挺多,初衷是简化CompletionService的编程接口,尽量减少业务代码处的感知。
今天找deepseek做了一版优化,优化点:
- 整体的超时控制
- 超时、异常处理和封装
- 取消未完成的任务
核心代码
public class TaskDispatcher<T> {
private final CompletionService<T> completionService;
/**
* 待处理任务
*/
private final Set<Future<T>> pending = Sets.newHashSet();
/**
* 超时时间, 单位: s
*/
private long timeout = 10000;
public TaskDispatcher(Executor executor, long timeout) {
completionService = new ExecutorCompletionService<>(executor);
if (timeout > 0) {
this.timeout = timeout;
}
}
public void submit(Callable<T> task) {
Future<T> future = completionService.submit(task);
pending.add(future);
}
/**
* 仅获取执行的任务结果
*
* @param ignoreException 忽略执行时发生的异常
* @return
*/
public List<T> taskCompletedResult(boolean ignoreException) {
List<TaskResult<T>> taskResultList = taskCompleted();
List<T> res = Lists.newArrayList();
if (CollectionUtils.isEmpty(taskResultList)) {
return res;
}
boolean hasError = false;
for (TaskResult<T> taskResult : taskResultList) {
if (!taskResult.isTimeout() && taskResult.getError() == null) {
res.add(taskResult.getValue());
} else if (taskResult.isTimeout() && !ignoreException) {
LoggerUtils.error("执行任务时超时");
hasError = true;
} else if (taskResult.getError() != null && !ignoreException) {
LoggerUtils.error("执行任务时发生异常", taskResult.getError());
hasError = true;
}
}
if (hasError) {
throw new ZHException("任务并发处理时发生异常");
}
return res;
}
/**
* 获取执行的任务
*
* @return
*/
public List<TaskResult<T>> taskCompleted() {
long deadline = System.currentTimeMillis() + timeout;
List<TaskResult<T>> results = Lists.newArrayList();
int totalTasks = pending.size();
try {
for (int i = 0; i < totalTasks; i++) {
long remaining = Math.max(0, deadline - System.currentTimeMillis());
Future<T> future = completionService.poll(remaining, TimeUnit.MILLISECONDS);
TaskResult<T> result = new TaskResult<>();
if (future == null) {
result.setTimeout(true);
} else {
pending.remove(future);
try {
result.setValue(future.get());
} catch (ExecutionException e) {
result.setError(e.getCause());
}
}
results.add(result);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("任务结果收集中断", e);
} finally {
pending.forEach(f -> f.cancel(true));
pending.clear();
}
return results;
}
@Data
static class TaskResult<T> {
private T value;
private Throwable error;
private boolean isTimeout;
}
}
需要自己声明线程池bean,使用方式如下
TaskDispatcher<Integer> taskDispatcher = new TaskDispatcher<Integer>(threadExecutor, TIME_OUT);
for (long index: indexList) {
taskDispatcher.submit(() -> xxxService.count(index));
}
为了便于在计数求和场景使用,进一步实现了一个子类
public class IntSumTaskDispatcher extends TaskDispatcher<Integer> {
public IntSumTaskDispatcher(Executor executor, long timeout, boolean throwException) {
super(executor, timeout);
}
/**
* 对所有结果求和
*
* @return
*/
public int takeCompletedSum() {
List<Integer> countResList = taskCompletedResult(true);
int count = 0;
for (Integer countSingle : countResList) {
if (countSingle == null) {
continue;
}
count += countSingle;
}
return count;
}
}
文章来源:https://www.cnblogs.com/wuyuegb2312/p/18920444
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
标签: