它是由京东零售开源的项目,作者是天涯泪小武。
该框架目前正在 京东App后台 接受苛刻、高并发、海量用户等复杂场景业务的检验测试,随时会根据实际情况发布更新和bugFix。目前来说,2019年底上线已经稳定运行2年+
为什么会学习这个框架
最近在学习java并发中的CompletableFuture,它除了提供了更为好用和强大的 Future
特性之外,还提供了函数式编程、异步任务编排组合(可以将多个异步任务串联起来,组成一个完整的链式调用)等能力。
在上网查阅资料的过程中发现一款可支持线程编排的并行框架 asyncTool,在Gitee官网找到后发现该项目不仅开源作者还写了文档去一行一行代码进行教学,这让一个对并发编排任务感兴趣但又没有基础的人来了兴趣,因此我结合源码以及作者的介绍以及其他博客的分析去学习了该框架。
框架的作用
这个模块我特别喜欢作者的介绍,先引出问题然后回答,让我一个并没有并发项目经验的人深刻理解这个框架究竟是要做什么,以下是对作者介绍的借鉴和自己的总结:
为什么需要一个这样的带任务编排的框架
1.当用户请求某个功能时,可能需要去调用各种各样的接口最后汇总结果去展示给用户。
2.在一些需求中可能不同块代码之间有依赖关系,后执行的需要依赖先执行的,以及超时、阻塞、异常等情况的处理
用图片来解释就是(图片来自作者):
并发框架的监控
任务编排其实可以通过CompletableFuture的api经过复杂的组装后实现,但对于该类来说,使用者在supplyAsync()后无法知道该异步任务的执行情况,虽然也有一些方法可以获取执行结果以及执行中的异常,但如果该任务没有执行被跳过或者其他情况那么就不知道了,因此并发框架应该能对每一步任务进行监控,无论成功失败都应该有回调。
每个任务之间应该有强弱依赖
例如上面的图片中的一些情况:
- A执行完后才能执行B
- A B全执行完后才能执行C
- A B其中一个执行后执行C
- A B其中某一个指定执行后执行C
所以在写该框架时每个任务都有自己的依赖任务和下级任务,依赖任务按要求执行后才能执行该任务
任务可以使用依赖任务中的结果
既然任务之间有执行顺序,那么一定想使用上级的结果来处理本级的任务吧。
超时时间设置
虽然每一个任务单独的时间无法设置,但可以设置全组任务的timeout,来控制该任务的执行时间
框架高性能低线程
该框架中全程无锁
AB执行后才能执行C这种情况下,C会运行在AB中后执行完的那个线程上,严格贯彻低线程理念
框架的应用
串行
串行比较好设计,那就来三个任务依次执行即可
在该框架中任务需要实现IWorker接口和ICallback接口,并重写以下方法:
- begin() 任务开始前执行的方法
- action() 任务中耗时操作执行的位置
- result() 任务执行后的结果,可以在此处理action中的结果值
- defalutValue() 当执行中有异常后的默认返回值
模拟串行场景:A任务对参数+1,之后B任务对参数+2,之后C任务对参数+3。
A任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class WorkerA implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {
@Override public void begin() { System.out.println("A - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now()); }
@Override public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) { Integer res = object + 1; return res; }
@Override public void result(boolean success, Integer param, WorkResult<Integer> workResult) { System.out.println("A - param:" + JSON.toJSONString(param)); System.out.println("A - result:" + JSON.toJSONString(workResult));
System.out.println("A - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now()); }
@Override public Integer defaultValue() { System.out.println("A - defaultValue"); return 101; }
}
|
B任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class WorkerB implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {
@Override public void begin() { System.out.println("B - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now()); }
@Override public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) { Integer res = object + 2; return res; }
@Override public void result(boolean success, Integer param, WorkResult<Integer> workResult) { System.out.println("B - param:" + JSON.toJSONString(param)); System.out.println("B - result:" + JSON.toJSONString(workResult));
System.out.println("B - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now()); }
@Override public Integer defaultValue() { System.out.println("B - defaultValue"); return 102; } }
|
任务3:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class WorkerC implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {
@Override public void begin() { System.out.println("C - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now()); }
@Override public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) { Integer res = object + 3; return res; }
@Override public void result(boolean success, Integer param, WorkResult<Integer> workResult) { System.out.println("C - param:" + JSON.toJSONString(param)); System.out.println("C - result:" + JSON.toJSONString(workResult));
System.out.println("C - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now()); }
@Override public Integer defaultValue() { System.out.println("C - defaultValue"); return 103; } }
|
编写WorkerWrapper包装类,该类就是任务编排的core类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>() .id("workerA") .worker(workerA) .callback(workerA) .param(1) .build();
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>() .id("workerB") .worker(workerB) .callback(workerB) .param(2) .depend(wrapperA) .build();
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>() .id("workerC") .worker(workerC) .callback(workerC) .param(3) .depend(wrapperB) .build();
Async.beginWork(1000, wrapperA);
|
通过WorkerWrapper中的静态内部类Build去构建一个WorkerWrapper对象,从而实现任务的编排
最后通过Async类提交任务执行
并行
并行只需3个任务一并丢进beginWork中即可
1
| Async.beginWork(1000, wrapperA,wrapperB,wrapperC);
|
阻塞等待 - 先串行,后并行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { Worker1 worker1 = new Worker1(); Worker2 worker2 = new Worker2(); Worker3 worker3 = new Worker3();
WorkerWrapper wrapper1 = new WorkerWrapper.Builder<Integer,Integer>() .id("worker1") .worker(worker1) .callback(worker1) .param(1) .build();
WorkerWrapper wrapper2 = new WorkerWrapper.Builder<Integer,Integer>() .id("worker2") .worker(worker2) .depend(wrapper1) .callback(worker2) .param(2) .build();
WorkerWrapper wrapper3 = new WorkerWrapper.Builder<Integer,Integer>() .id("worker3") .worker(worker3) .depend(wrapper1) .callback(worker3) .param(3) .build();
Async.beginWork(5000,wrapper1); } }
|
让BC都依赖于A然后把A丢进beginWork即可.
阻塞等待 - 先并行,后串行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { Worker1 worker1 = new Worker1(); Worker2 worker2 = new Worker2(); Worker3 worker3 = new Worker3();
WorkerWrapper wrapper1 = new WorkerWrapper.Builder<Integer,Integer>() .id("worker1") .worker(worker1) .callback(worker1) .param(1) .build();
WorkerWrapper wrapper2 = new WorkerWrapper.Builder<Integer,Integer>() .id("worker2") .worker(worker2) .callback(worker2) .param(2) .build();
WorkerWrapper wrapper3 = new WorkerWrapper.Builder<Integer,Integer>() .id("worker3") .worker(worker3) .depend(wrapper1,wrapper2) .callback(worker3) .param(3) .build();
Async.beginWork(5000,wrapper1,wrapper2); } }
|
C依赖AB,把AB 一起丢入即可
异常/超时回调
这2种场景,可以基于以上场景微调,即可debug调试。
1
| Async.beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper)
|
1.基于全组设定的timeout,如果超时了,则worker中的返回值使用defaultValue()
2.如果当前Worker任务异常了,则当前任务使用defaultValue(),并且depend当前任务的,也FastFail,返回defaultValue()
框架的实现
包结构
图片来自涛声依旧叭
回调包即为实现作者对并发框架每一步任务监控的想法的实现
包装类即为编排任务核心类
执行器即对编排任务根据依赖关系去执行的实现
简单调用流程
Async执行器触发WorkerWrapper包装器类执行,WorkerWrapper中包装了任务类IWorker和回调类ICallback以及任务之间的依赖关系还有异常返回
回调接口:IWorker、ICallback
==IWorker==
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
public interface IWorker<T, V> {
V action(T object, Map<String, WorkerWrapper> allWrappers);
V defaultValue(); }
|
<T, V> T入参泛型 V出参泛型
==ICallback==
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
public interface ICallback<T, V> {
void begin();
void result(boolean success, T param, WorkResult<V> workResult); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
public class DefaultCallback<T, V> implements ICallback<T, V> { @Override public void begin() { }
@Override public void result(boolean success, T param, WorkResult<V> workResult) {
}
}
|
不传回调类默认给一个空的回调
包装类WorkerWrapper
有WorkerWrapper和DependWrapper两个类,因为依赖任务中还有个必要属性must,即判断是否某个任务执行后才能往后执行,还是只要任一执行即能向后执行。
源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
public class WorkerWrapper<T, V> {
private String id;
private T param; private IWorker<T, V> worker; private ICallback<T, V> callback;
private List<WorkerWrapper<?, ?>> nextWrappers;
private List<DependWrapper> dependWrappers;
private AtomicInteger state = new AtomicInteger(0);
private Map<String, WorkerWrapper> forParamUseWrappers;
private volatile WorkResult<V> workResult = WorkResult.defaultResult();
private volatile boolean needCheckNextWrapperResult = true;
private static final int FINISH = 1; private static final int ERROR = 2; private static final int WORKING = 3; private static final int INIT = 0; }
|
几个重要属性:
- worker/callback 任务类和回调类 param action方法中的参数(要处理的参数)
- nextWrappers 在该任务后执行的任务
- dependWrappers 该任务依赖的任务
构建
在阅读源码的过程中发现WorkerWrapper类中有一个静态内部类Builder,看了分析文章以及查阅资料后才知道这是建造者模式
建造者模式:建造者模式将对象的创建过程和表现分离,并且调用方通过指挥者调用方法对对象进行构建,使得调用方不再关心对象构建过程,构建对象的具体过程可以根据传入类型的不同而改变。
1 2 3 4 5 6 7 8
| WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>() .id("workerA") .worker(workerA) .callback(workerA) .param(null) .depend(wrapperB, wrapperC) .build();
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public static class Builder<W, C> {
private String id = UUID.randomUUID().toString();
private W param; private IWorker<W, C> worker; private ICallback<W, C> callback;
private List<WorkerWrapper<?, ?>> nextWrappers;
private List<DependWrapper> dependWrappers;
private Set<WorkerWrapper<?, ?>> selfIsMustSet;
private boolean needCheckNextWrapperResult = true; }
|
内部类Builder中的属性和WorkerWrapper中的一样,主要是通过build()方法去构建WorkerWrapper对象。
不传入线程池默认
1 2 3 4 5 6 7
| public static final ThreadPoolExecutor COMMON_POOL = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, 1024, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), (ThreadFactory) Thread::new);
|
第一个参数:表示线程池的核心线程数是系统可用处理器数量的两倍。这样做可以确保在大多数情况下,有足够的线程来并行处理任务,充分利用多核CPU的计算能力。
第二个参数:最大线程数
第三/四个参数:存活时间,当线程数大于核心线程数时,多余的空闲线程在终止前等待新任务的最长时间。
第五个参数:LinkedBlockingQueue
实例,这是一个基于链表结构的阻塞队列。它的大小没有限制(或者说是Integer.MAX_VALUE)
第六个参数:线程工厂
核心方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public static boolean beginWork(long timeout, ThreadPoolExecutor pool, List<WorkerWrapper> workerWrappers) throws ExecutionException, InterruptedException { if(workerWrappers == null || workerWrappers.size() == 0) { return false; } Map<String, WorkerWrapper> forParamUseWrappers = new ConcurrentHashMap<>(); CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()]; for (int i = 0; i < workerWrappers.size(); i++) { WorkerWrapper wrapper = workerWrappers.get(i); futures[i] = CompletableFuture.runAsync(() -> wrapper.work(pool, timeout, forParamUseWrappers), pool); } try { CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS); return true; } catch (TimeoutException e) { Set<WorkerWrapper> set = new HashSet<>(); totalWorkers(workerWrappers, set); for (WorkerWrapper wrapper : set) { wrapper.stopNow(); } return false; } }
|
WorkerWrapper处理任务
核心work()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
|
private void work(ThreadPoolExecutor poolExecutor, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) { this.forParamUseWrappers = forParamUseWrappers; forParamUseWrappers.put(id, this); long now = SystemClock.now(); if (remainTime <= 0) { fastFail(INIT, null); beginNext(poolExecutor, now, remainTime); return; } if (getState() == FINISH || getState() == ERROR) { beginNext(poolExecutor, now, remainTime); return; }
if (needCheckNextWrapperResult) { if (!checkNextWrapperResult()) { fastFail(INIT, new SkippedException()); beginNext(poolExecutor, now, remainTime); return; } }
if (dependWrappers == null || dependWrappers.size() == 0) { fire(); beginNext(poolExecutor, now, remainTime); return; }
if (dependWrappers.size() == 1) { doDependsOneJob(fromWrapper); beginNext(poolExecutor, now, remainTime); } else { doDependsJobs(poolExecutor, dependWrappers, fromWrapper, now, remainTime); }
}
|
作者的注释十分清晰
工作步骤是:
- 保存所有任务forParamUseWrappers
- 判断是否超时
- 检查是否已经执行过,避免重复处理
- 检查next是否已经开始执行了,避免多余的处理
- 没有任何依赖的话自己直接fire()执行
- 有依赖的话doDependsOneJob/doDependsJobs
只有一个依赖的话,执行完就可以执行自己了
多个依赖的话,需要判断must依赖是否已经全执行才可以执行自己
执行fire()
1 2 3 4 5 6 7
|
private void fire() { workResult = workerDoJob(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
private WorkResult<V> workerDoJob() {
if (!checkIsNullResult()) { return workResult; } try { if (!compareAndSetState(INIT, WORKING)) { return workResult; }
callback.begin();
V resultValue = worker.action(param, forParamUseWrappers);
if (!compareAndSetState(WORKING, FINISH)) { return workResult; }
workResult.setResultState(ResultState.SUCCESS); workResult.setResult(resultValue); callback.result(true, param, workResult);
return workResult; } catch (Exception e) { fastFail(WORKING, e); return workResult; } }
|
beginNext()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
private void beginNext(ExecutorService executorService, long now, long remainTime) { long costTime = SystemClock.now() - now;
if (nextWrappers == null) { return; }
if (nextWrappers.size() == 1) { nextWrappers.get(0).work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers); return; }
CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()]; for (int i = 0; i < nextWrappers.size(); i++) { int finalI = i; futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI) .work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), executorService); }
try { CompletableFuture.allOf(futures).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
|
开启下一个任务,如果没有就结束程序,有一个就启动一个线程,有多个就启动多个。
doDependsOneJob()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private void doDependsOneJob(WorkerWrapper dependWrapper) { if (ResultState.TIMEOUT == dependWrapper.getWorkResult().getResultState()) { workResult = defaultResult(); fastFail(INIT, null); } else if (ResultState.EXCEPTION == dependWrapper.getWorkResult().getResultState()) { workResult = defaultExResult(dependWrapper.getWorkResult().getEx()); fastFail(INIT, null); } else { fire(); } }
|
如果判断到只有一个依赖的时候就要进入这个方法,看自己依赖的方法执行情况,如果依赖的方法超时那自己也超时
doDependsJobs()
先判断依赖的任务是否是must,如果一个都不是那就执行自己。
如果存在需要必须完成的,且fromWrapper不是必须的,就什么也不干,等到必须完成的完成了的时候就会让该任务进行的
如果fromWrapper是必须的,就判断是否所有的必须都完成了,并检查他们的情况,有超时即自己也超时
如何确保多依赖任务不被重复执行
1 2 3 4 5
| if (getState() == FINISH || getState() == ERROR) { beginNext(poolExecutor, now, remainTime); return; }
|
框架的特点
SystemClock
在项目中获取当前时间没有使用System.currentTimeMillis()而是自己写了一个SystemClock
项目中通过单例模式创建了一个时钟类SystemClock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
public class SystemClock {
private final int period;
private final AtomicLong now;
private static class InstanceHolder { private static final SystemClock INSTANCE = new SystemClock(1); }
private SystemClock(int period) { this.period = period; this.now = new AtomicLong(System.currentTimeMillis()); scheduleClockUpdating(); }
private static SystemClock instance() { return InstanceHolder.INSTANCE; }
private void scheduleClockUpdating() { ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(runnable -> { Thread thread = new Thread(runnable, "System Clock"); thread.setDaemon(true); return thread; }); scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS); }
private long currentTimeMillis() { return now.get(); }
public static long now() { return instance().currentTimeMillis(); } }
|
创建了一个守护线程,每1ms对 AtomicLong now
进行更新 System.currentTimeMillis()
,因为守护线程的执行周期是每1ms执行一次,这里是有1ms的延迟。
ScheduledExecutorService
是 ExecutorService
的子类,它基于 ExecutorService
功能实现周期执行的任务。
参考
手写中间件之——并发框架_天涯泪小武的博客-CSDN博客
并行编排AsyncTool框架源码解析_涛声依旧叭的博客-CSDN博客
评论区
欢迎你留下宝贵的意见,昵称输入QQ号会显示QQ头像哦~