Skip to content

Commit

Permalink
- bug fix: 解决事务管理类的bug,例如事务回调执行完成后未删除,导致在其他事务上下文中重复执行,还有其他一系列事务嵌套时可能发…
Browse files Browse the repository at this point in the history
…生的问题;

- bug fix: `flow_task`表`idx_status`索引unique修改为non unique;PS:实际应该就是non unique;
  • Loading branch information
JoeKerouac committed Mar 12, 2024
1 parent 5d85da5 commit 4599526
Show file tree
Hide file tree
Showing 16 changed files with 459 additions and 206 deletions.
2 changes: 1 addition & 1 deletion README.flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ create table if not exists `flow_task`
DEFAULT CHARSET = utf8mb4 comment '流式任务主任务表';

create unique index `idx_req` ON `flow_task` (`request_id`);
create unique index `idx_status` ON `flow_task` (`type`, `status`, `gmt_update_time`);
create index `idx_status` ON `flow_task` (`type`, `status`, `gmt_update_time`);


create table if not exists `task_node`
Expand Down
2 changes: 1 addition & 1 deletion checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@
<!-- 在方法中最多出现n个显式的return语句;return语句过多意味这方法过大或方法可能难以理解 -->
<module name="ReturnCount">
<!-- 非void返回值的方法(lambda方法)最多显式的出现5个return语句 -->
<property name="max" value="5"/>
<property name="max" value="15"/>
<!-- void返回值的方法最多显式的出现3个return语句 -->
<property name="maxForVoid" value="5"/>
</module>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,22 @@
*/
package com.github.joekerouac.async.task.exception;

import java.sql.SQLException;

/**
* @author JoeKerouac
* @date 2022-10-14 14:37:00
* @since 1.0.0
*/
public class DBException extends RuntimeException {

public DBException(final SQLException sqlException) {
super(sqlException);
public DBException(String message) {
super(message);
}

public DBException(String message, Throwable cause) {
super(message, cause);
}

public DBException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.github.joekerouac.async.task.spi.AsyncTransactionManager;
import com.github.joekerouac.async.task.spi.IDGenerator;
import com.github.joekerouac.async.task.spi.ProcessorRegistry;
import com.github.joekerouac.async.task.spi.TraceService;
import com.github.joekerouac.common.tools.scheduler.SchedulerSystem;

import lombok.CustomLog;
Expand Down Expand Up @@ -119,6 +120,11 @@ public class FlowServiceConfig {
@NotNull
private Map<String, ExecuteStrategy> executeStrategies = new HashMap<>();

/**
* trace服务
*/
private TraceService traceService;

public FlowServiceConfig() {
// 注册默认执行策略
executeStrategies.put(StrategyConst.ALL_PARENT_FINISH, new AllParentFinishExecuteStrategy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import com.github.joekerouac.async.task.spi.AsyncTransactionManager;
import com.github.joekerouac.async.task.spi.IDGenerator;
import com.github.joekerouac.async.task.spi.ProcessorRegistry;
import com.github.joekerouac.async.task.spi.TransactionCallback;
import com.github.joekerouac.async.task.spi.TraceService;
import com.github.joekerouac.common.tools.collection.CollectionUtil;
import com.github.joekerouac.common.tools.collection.Pair;
import com.github.joekerouac.common.tools.constant.ExceptionProviderConst;
Expand Down Expand Up @@ -137,6 +137,8 @@ public class FlowServiceImpl implements FlowService {

private final ProcessorRegistry processorRegistry;

private final TraceService traceService;

public FlowServiceImpl(FlowServiceConfig config, StreamTaskEngine streamTaskEngine) {
Const.VALIDATION_SERVICE.validate(config);
schedulerSystem = config.getSchedulerSystem();
Expand All @@ -155,6 +157,7 @@ public FlowServiceImpl(FlowServiceConfig config, StreamTaskEngine streamTaskEngi
executeStrategies = config.getExecuteStrategies();

starter = new Starter();
traceService = config.getTraceService();

this.streamTaskEngine = streamTaskEngine;

Expand Down Expand Up @@ -258,7 +261,6 @@ public TaskNodeStatus queryNodeStatus(final String nodeRequestId) {

@Override
public void notifyNode(String nodeRequestId) {

asyncTaskService.notifyTask(nodeRequestId);
}

Expand All @@ -271,18 +273,34 @@ public void notifyNode(String nodeRequestId) {
private void registerStreamTaskBuildTask(String flowTaskRequestId) {
// 这里可能注册失败,不过我们并不关心
schedulerSystem.registerTask(new TaskDescriptor(flowTaskRequestId, 1000 * 10, () -> {
Throwable throwable = null;
Object trace = null;
try {
if (traceService != null) {
trace = traceService.newTrace();
}
long start = System.currentTimeMillis();
buildNodeMap(flowTaskRequestId);
LOGGER.debug("无限流任务构建完毕, [{}:{}]", flowTaskRequestId, (System.currentTimeMillis() - start));
} catch (Throwable e) {
throwable = e;
Throwable rootCause = ExceptionUtil.getRootCause(e);
// 如果是锁定异常,应该忽略
if (!(rootCause instanceof SQLException)
|| SqlUtil.causeForUpdateNowaitError((SQLException)rootCause)) {
|| !SqlUtil.causeForUpdateNowaitError((SQLException)rootCause)) {
// 如果这里因为网络抖动等导致异常,应该可以快速重试的,现在暂时没有处理
LOGGER.warn(e, "流式任务批量添加处理失败,等待下次处理");
}
} finally {
if (traceService != null) {
try {
traceService.finish(trace, false, null, throwable);
} catch (Throwable e1) {
LOGGER.warn(e1, "trace结束异常");
}
}
}
}));
}, true));
}

/**
Expand Down Expand Up @@ -394,7 +412,6 @@ private boolean buildNodeMap(String taskRequestId) {

List<String> taskNodeRequestIds = taskNodes.stream().map(TaskNode::getRequestId)
.filter(str -> !str.equals(newFirstNode.getRequestId())).collect(Collectors.toList());

/*
* 4、开始更新数据库
*/
Expand Down Expand Up @@ -475,7 +492,7 @@ private void addStreamTask(StreamTaskModel taskModel) {
*/
if (createFlow) {
// 有可能第一次添加仅仅添加了一个节点,所以此时没有node map,也不应该保存
if (pair.getValue().size() > 0) {
if (!pair.getValue().isEmpty()) {
// 创建了主任务,那么也应该把任务节点关系构建出来
taskNodeMapRepository.save(pair.getValue());
}
Expand All @@ -500,18 +517,10 @@ private void addStreamTask(StreamTaskModel taskModel) {
}
});

if (transactionManager.isActualTransactionActive()) {
transactionManager.registerCallback(new TransactionCallback() {
@Override
public void afterCommit() throws RuntimeException {
registerStreamTaskBuildTask(streamId);
schedulerSystem.scheduler(streamId, false);
}
});
} else {
transactionManager.runAfterCommit(() -> {
registerStreamTaskBuildTask(streamId);
schedulerSystem.scheduler(streamId, false);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public boolean addTask(@NotNull AsyncTask task) {
if (result) {
transactionManager.runAfterCommit(() -> {
taskCacheQueue.addTask(task);
LOGGER.info("将任务[{}]添加到内存队列中", task);
LOGGER.info("任务添加, 将任务[{}]添加到内存队列中", task);
});
}
return result;
Expand Down Expand Up @@ -133,8 +133,10 @@ public boolean notifyTask(String requestId) {
transactionManager.runAfterCommit(() -> {
AsyncTask task = repository.selectByRequestId(requestId);
taskCacheQueue.addTask(task);
LOGGER.info("将任务[{}]添加到内存队列中", task);
LOGGER.info("任务通知, 将任务[{}]添加到内存队列中", task);
});
} else {
LOGGER.info("任务通知失败: [{}]", requestId);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,18 @@ public void addTaskWithWait(final String requestId, final Object task, final int
public void notifyTask(final String requestId, TransStrategy transStrategy) {
AsyncTask task = config.getRepository().selectByRequestId(requestId);

if (task == null) {
// 数据库可能是读写的,这里应该能强制让查询走主库
task = config.getTransactionManager().runWithTrans(TransStrategy.NOT_SUPPORTED,
() -> config.getRepository().selectForUpdate(requestId));
}

if (task != null && task.getStatus() == ExecStatus.WAIT) {
String processor = task.getProcessor();
config.getTransactionManager().runWithTrans(transStrategy,
() -> getTaskGroup(task.getProcessor()).notifyTask(requestId));
() -> getTaskGroup(processor).notifyTask(requestId));
} else {
LOGGER.warn("当前要唤醒的任务不存在或者状态已经变更: [{}], [{}]", requestId, task);
}
}

Expand All @@ -252,6 +261,13 @@ public CancelStatus cancelTask(String requestId, TransStrategy transStrategy) {

return config.getTransactionManager().runWithTrans(transStrategy, () -> {
AsyncTask task = config.getRepository().selectByRequestId(requestId);

if (task == null) {
// 数据库可能是读写的,这里应该能强制让查询走主库
task = config.getTransactionManager().runWithTrans(TransStrategy.NOT_SUPPORTED,
() -> config.getRepository().selectForUpdate(requestId));
}

if (task == null) {
return CancelStatus.NOT_EXIST;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ default Connection get(String requestId) throws SQLException {
}

/**
* 根据async task的requestId选择要使用的Connection,无论当前上下文是否存在连接,都新建一个连接返回
* 根据async task的requestId选择要使用的Connection,无论当前上下文是否存在连接,都新建一个连接返回,新建连接应该是没有开启事务的
*
* @param requestId
* async task的requestId,对于分页查询,这个将为null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@
*/
public interface TraceService {

/**
* 新建trace
*
* @return trace上下文,在finish时传入
*/
default Object newTrace() {
// 默认不支持
return null;
}

/**
* 将当前trace上下文dump出来
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import javax.sql.DataSource;

import com.github.joekerouac.async.task.impl.MonitorServiceAdaptor;
import org.testng.Assert;

import com.alibaba.druid.pool.DruidDataSource;
Expand Down Expand Up @@ -105,6 +106,12 @@ public void init() throws Exception {
asyncServiceConfig.setRepository(repository);
asyncServiceConfig.setIdGenerator(() -> UUID.randomUUID().toString());
asyncServiceConfig.setProcessorRegistry(processorRegistry);
asyncServiceConfig.setMonitorService(new MonitorServiceAdaptor() {
@Override
public void uncaughtException(Thread thread, Throwable e) {
e.printStackTrace();
}
});

AsyncTaskService service = new AsyncTaskServiceImpl(asyncServiceConfig);
service.start();
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/resources/init-sql-script/flow-task-init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ create table if not exists `flow_task`
);

create unique index `idx_req` ON `flow_task` (`request_id`);
create unique index `idx_status` ON `flow_task` (`type`);
create index `idx_status` ON `flow_task` (`type`, `status`, `gmt_update_time`);
2 changes: 1 addition & 1 deletion design/flow/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ create table if not exists `flow_task`
DEFAULT CHARSET = utf8mb4 comment '流式任务主任务表';

create unique index `idx_req` ON `flow_task` (`request_id`);
create unique index `idx_status` ON `flow_task` (`type`, `status`, `gmt_update_time`);
create index `idx_status` ON `flow_task` (`type`, `status`, `gmt_update_time`);


create table if not exists `task_node`
Expand Down
2 changes: 2 additions & 0 deletions doc/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,5 @@ queue = new TreeSet<>((t0, t1) -> (int)(t0.getValue().atZone(ZoneOffset.systemDe
- 当数据库使用读写分离,并且对应用透明时(例如云数据库提供的中间件自动读写分离),可能出现任务挂在RUNNING状态的情况,现在查询时使用`select for update`对这种场景强制走主库查询;
- 解决优雅关机问题;
- 优化任务捞取;
- bug fix: 解决事务管理类的bug,例如事务回调执行完成后未删除,导致在其他事务上下文中重复执行,还有其他一系列事务嵌套时可能发生的问题;
- bug fix: `flow_task``idx_status`索引unique修改为non unique;PS:实际应该就是non unique;
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import com.github.joekerouac.async.task.spi.TraceService;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand Down Expand Up @@ -130,7 +131,8 @@ public FlowService flowService(@Autowired FlowServiceConfig flowServiceConfig,
@ConditionalOnMissingBean
public FlowServiceConfig flowServiceConfig(@Autowired FlowServiceConfigModel flowServiceConfigModel,
@Autowired FlowMonitorService flowMonitorService, @Autowired FlowTaskRepository flowTaskRepository,
@Autowired TaskNodeRepository taskNodeRepository, @Autowired TaskNodeMapRepository taskNodeMapRepository) {
@Autowired TaskNodeRepository taskNodeRepository, @Autowired TaskNodeMapRepository taskNodeMapRepository,
@Autowired(required = false) TraceService traceService) {
// 下面这几个bean都是async系统提供的,没有用auto wired
AsyncTaskService asyncTaskService = context.getBean(AsyncTaskService.class);
IDGenerator idGenerator = context.getBean(IDGenerator.class);
Expand Down Expand Up @@ -158,6 +160,7 @@ public FlowServiceConfig flowServiceConfig(@Autowired FlowServiceConfigModel flo
config.setTaskNodeMapRepository(taskNodeMapRepository);
config.setTransactionManager(transactionManager);
config.setSchedulerSystem(schedulerSystem);
config.setTraceService(traceService);

Map<String, ExecuteStrategy> strategies = context.getBeansOfType(ExecuteStrategy.class);
for (final ExecuteStrategy strategy : strategies.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ create table if not exists `flow_task`
);

create unique index `idx_req` ON `flow_task` (`request_id`);
create unique index `idx_status` ON `flow_task` (`type`);
create index `idx_status` ON `flow_task` (`type`, `status`, `gmt_update_time`);

0 comments on commit 4599526

Please sign in to comment.