ze(),
executor.getKeepAliveTime(TimeUnit.SECONDS),
TimeUnit.SECONDS,
ReflectTool.createInstance(executor.getQueue().getClass()),
executor.getThreadFactory(),
ReflectTool.createInstance(executor.getRejectedExecutionHandler().getClass())
);
this.txManager = txManager;
this.txStatus = txManager.getTransaction(definition);
this.txResource = TransactionSynchronizationManager.getResource(txManager.getResourceFactory());
}
public DataSourceTransactionExecutor(DataSourceTransactionManager txManager) {
this(txManager, new DefaultTransactionDefinition());
}
public void addTask(Runnable task) {
callableList.add(DataSourceTransactionTask.Builder.aTask()
.runnable(task).txManager(txManager)
.resource(txResource).definition(new DefaultTransactionDefinition())
.build()
);
}
public void addTask(Runnable task, TransactionDefinition def) {
callableList.add(DataSourceTransactionTask.Builder.aTask()
.runnable(task).txManager(txManager)
.resource(txResource).definition(def)
.build()
);
}
public void execute() throws InterruptedException {
List<Future<TransactionStatus>> futures = new ArrayList<>();
for (Callable<TransactionStatus> callable : callableList) {
futures.add(executor.submit(callable));
}
executor.shutdown();
List<TransactionStatus> statusList = new ArrayList<>();
for (Future<TransactionStatus> future : futures) {
try {
statusList.add(future.get());
} catch (ExecutionException e) {
log.error("任务执行出现异常", e);
statusList.add(null);
}
}
Object[] statusArgs = new Object[statusList.size()];
statusList.toArray(statusArgs);
mergeTaskResult(statusArgs); // 合并每个任务的事务信息
}
/**
* 以 Reactor 异步的方式执行这些任务,需要注意的是,当使用这个方法时,由于
* Reactor 的异步特性,如果业务方法使用了 @Transactional 注解修饰,Spring 的事务处理会发生在实际处理
* 事务之前,可能会导致数据库连接被释放,从而无法绑定对应的事务对象,使用时需要注意这一点
*/
public void asyncExecute() {
List<Mono<TransactionStatus>> monoList = new ArrayList<>();
Scheduler scheduler = Schedulers.fromExecutor(this.executor);
for (Callable<TransactionStatus> callable : callableList) {
monoList.add(Mono.fromCallable(callable)
.subscribeOn(scheduler));
}
Flux.zip(monoList, Tuples::fromArray)
.single()
.flatMap(tuple2 -> Mono.fromRunnable(() -> {
TransactionSynchronizationManager.bindResource(txManager.getResourceFactory(), txResource);
mergeTaskResult(tuple2.toArray());
}))
.subscribeOn(scheduler)
.doOnSubscribe(any -> log.info("开始执行事务的合并操作"))
.doFinally(any -> {
log.debug("合并事务处理执行完成");
scheduler.dispose();
executor.shutdown();
})
.subscribe();
}
private void mergeTaskResult(Object... statusList) {
boolean exFlag = false;
for (Object obj : statusList) {
if (obj == null) {
exFlag = true;
continue;
}
// 在当前上下文中一定是 TransactionStatus 类型的对象
TransactionStatus status = (TransactionStatus) obj;
if (status.isRollbackOnly()) exFlag = true;
}
if (exFlag) {
log.debug("由于任务执行时出现异常,因此会将整个业务进操作进行回滚");
txManager.rollback(txStatus);
/*
这里抛出异常的原因是因为相关的业务方法可能被 @Transactional 修饰过,
从而导致提交只能回滚的事务而导致的提交异常,具体使用时可以考虑替换掉这个异常类型
*/
throw new RuntimeException("需要回滚的异常");
} else {
txManager.commit(txStatus);
}
}
}
|