Skip to content

Commit

Permalink
任务加载优化,仅加载可以处理的任务
Browse files Browse the repository at this point in the history
  • Loading branch information
JoeKerouac committed Feb 5, 2025
1 parent 2f2f53e commit d126816
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 65 deletions.
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<properties>
<common-tools.version>2.1.4</common-tools.version>
<slf4j.version>1.7.5</slf4j.version>
<lombok.version>1.18.10</lombok.version>
<lombok.version>1.18.26</lombok.version>
<testng.version>6.14.3</testng.version>
<druid.version>1.1.21</druid.version>
<sqlite.version>3.36.0.3</sqlite.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,58 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

import com.github.joekerouac.async.task.spi.AbstractAsyncTaskProcessor;
import com.github.joekerouac.async.task.spi.ProcessorRegistry;

import lombok.CustomLog;

/**
* @author JoeKerouac
* @date 2023-11-11 16:22
* @since 4.0.0
*/
@CustomLog
public class DefaultProcessorRegistry implements ProcessorRegistry {

private final Map<String, AbstractAsyncTaskProcessor<?>> processors = new ConcurrentHashMap<>();

private final Set<TaskProcessorListener> listeners = new CopyOnWriteArraySet<>();

@Override
public AbstractAsyncTaskProcessor<?> registerProcessor(String taskType, AbstractAsyncTaskProcessor<?> processor) {
return processors.put(taskType, processor);
AbstractAsyncTaskProcessor<?> old = processors.put(taskType, processor);
if (old != null) {
LOGGER.warn("当前已经注册了[{}]的处理器[{}], 新处理器[{}]将替代老处理器", taskType, old.getClass(), processor.getClass());
}

for (TaskProcessorListener listener : listeners) {
try {
listener.onRegister(taskType, old, processor);
} catch (Throwable throwable) {
LOGGER.warn(throwable, "processor添加回调处理失败: [{}:{}]", listener.getClass(), listener);
}
}
return old;
}

@SuppressWarnings("unchecked")
@Override
public <T, P extends AbstractAsyncTaskProcessor<T>> P removeProcessor(String taskType) {
return (P)processors.remove(taskType);
P processor = (P)processors.remove(taskType);
if (processor == null) {
return null;
}

for (TaskProcessorListener listener : listeners) {
try {
listener.onRemove(taskType, processor);
} catch (Throwable throwable) {
LOGGER.warn(throwable, "processor移除回调处理失败: [{}:{}]", listener.getClass(), listener);
}
}
return processor;
}

@Override
Expand All @@ -50,4 +80,16 @@ public Set<String> getAllTaskType() {
public <T, P extends AbstractAsyncTaskProcessor<T>> P getProcessor(String taskType) {
return (P)processors.get(taskType);
}

@Override
public void addListener(TaskProcessorListener listener) {
listeners.add(listener);
processors.forEach((key, value) -> {
try {
listener.onRegister(key, null, value);
} catch (Throwable throwable) {
LOGGER.warn(throwable, "processor添加回调处理失败: [{}:{}]", listener.getClass(), listener);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class DefaultTaskCacheQueue implements TaskCacheQueue {

private volatile boolean start;

private volatile Set<String> taskTypeGroup;

public DefaultTaskCacheQueue(TaskQueueConfig config, AsyncTaskRepository repository) {
this.cacheQueueSize = config.getCacheQueueSize();
this.loadThreshold = config.getLoadThreshold();
Expand Down Expand Up @@ -121,12 +123,12 @@ public DefaultTaskCacheQueue(TaskQueueConfig config, AsyncTaskRepository reposit

long loadInterval = config.getLoadInterval();
boolean loadTaskFromRepository = config.isLoadTaskFromRepository();
Set<String> taskTypeGroup = config.getTaskTypeGroup();
Set<String> allTaskTypeGroup = config.getTaskTypeGroup();
boolean contain = config.isContain();

if (loadTaskFromRepository) {
LOGGER.info("当前需要从数据库中捞取任务执行, taskTypeGroup: [{}], contain: [{}], loadInterval: [{}], cacheQueueSize: [{}]",
taskTypeGroup, contain, loadInterval, cacheQueueSize);
allTaskTypeGroup, contain, loadInterval, cacheQueueSize);
SimpleSchedulerTask schedulerTask = new SimpleSchedulerTask(() -> {
// 捞取未来指定时间内的任务
LocalDateTime now = LocalDateTime.now();
Expand All @@ -140,10 +142,14 @@ public DefaultTaskCacheQueue(TaskQueueConfig config, AsyncTaskRepository reposit
return;
}

if (taskTypeGroup.isEmpty()) {
LOGGER.info("当前需要处理的任务{}为: [{}], 当前实际无法处理任何任务", contain ? "白名单" : "黑名单", allTaskTypeGroup);
return;
}

// 从任务仓库中捞取任务
List<AsyncTask> tasks =
repository.selectPage(ExecStatus.READY, now.plusSeconds(Math.max(MAX_TIME, loadInterval * 3 / 2)),
0, cacheQueueSize, taskTypeGroup, contain);
List<AsyncTask> tasks = repository.selectPage(ExecStatus.READY,
now.plusSeconds(Math.max(MAX_TIME, loadInterval * 3 / 2)), 0, cacheQueueSize, taskTypeGroup, true);

if (tasks.isEmpty()) {
// 没有捞取到任务,记录下本次捞取
Expand All @@ -164,7 +170,7 @@ public DefaultTaskCacheQueue(TaskQueueConfig config, AsyncTaskRepository reposit
schedulerTask.setInitialDelay(0);
loadTask = schedulerTask;
} else {
LOGGER.warn("当前不需要从数据库中捞取任务执行, taskTypeGroup: [{}], contain: [{}]", taskTypeGroup, contain);
LOGGER.warn("当前不需要从数据库中捞取任务执行, taskTypeGroup: [{}], contain: [{}]", allTaskTypeGroup, contain);
loadTask = null;
}
}
Expand Down Expand Up @@ -193,6 +199,14 @@ public synchronized void stop() {
}
}

@Override
public void refreshTaskTypes(Set<String> taskGroup) {
this.taskTypeGroup = taskGroup;
if (loadTask != null) {
loadTask.scheduler();
}
}

@Override
public AsyncTask take() throws InterruptedException {
if (!start) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,36 @@
*/
package com.github.joekerouac.async.task.impl;

import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import javax.validation.constraints.NotNull;

import com.github.joekerouac.async.task.Const;
import com.github.joekerouac.async.task.entity.AsyncTask;
import com.github.joekerouac.async.task.model.AsyncTaskProcessorEngineConfig;
import com.github.joekerouac.async.task.model.ExecStatus;
import com.github.joekerouac.async.task.model.TaskFinishCode;
import com.github.joekerouac.async.task.model.TaskGroupConfig;
import com.github.joekerouac.async.task.model.TaskQueueConfig;
import com.github.joekerouac.async.task.service.InternalTraceService;
import com.github.joekerouac.async.task.spi.AbstractAsyncTaskProcessor;
import com.github.joekerouac.async.task.spi.AsyncTaskProcessorEngine;
import com.github.joekerouac.async.task.spi.AsyncTaskRepository;
import com.github.joekerouac.async.task.spi.AsyncTransactionManager;
import com.github.joekerouac.async.task.spi.MonitorService;
import com.github.joekerouac.async.task.spi.ProcessorRegistry;
import com.github.joekerouac.async.task.spi.TaskCacheQueue;
import com.github.joekerouac.common.tools.collection.CollectionUtil;
import com.github.joekerouac.common.tools.constant.StringConst;
import lombok.CustomLog;

import javax.validation.constraints.NotNull;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.CustomLog;

/**
* @author JoeKerouac
Expand Down Expand Up @@ -212,6 +218,36 @@ public synchronized void start() {
engine = build(config, taskCacheQueue);
engine.start();

ProcessorRegistry processorRegistry = config.getProcessorRegistry();
processorRegistry.addListener(new ProcessorRegistry.TaskProcessorListener() {

private void refreshTaskTypes() {
Set<String> allTaskType = processorRegistry.getAllTaskType();
if (CollectionUtil.isEmpty(allTaskType)) {
taskCacheQueue.refreshTaskTypes(Collections.emptySet());
return;
}

Set<String> taskTypeGroup = allTaskType.stream().filter(taskType -> {
TaskQueueConfig taskQueueConfig = config.getTaskQueueConfig();
Set<String> set = taskQueueConfig.getTaskTypeGroup();
return taskQueueConfig.isContain() == set.contains(taskType);
}).collect(Collectors.toSet());
taskCacheQueue.refreshTaskTypes(taskTypeGroup);
}

@Override
public void onRegister(String taskType, AbstractAsyncTaskProcessor<?> oldProcessor,
AbstractAsyncTaskProcessor<?> newProcessor) {
refreshTaskTypes();
}

@Override
public void onRemove(String taskType, AbstractAsyncTaskProcessor<?> processor) {
refreshTaskTypes();
}
});

Thread monitorThread = new Thread(() -> {
MonitorService monitorService = config.getMonitorService();
InternalTraceService internalTraceService = config.getInternalTraceService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@
*/
package com.github.joekerouac.async.task.service;

import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.github.joekerouac.async.task.Const;
import com.github.joekerouac.async.task.entity.AsyncTask;
import com.github.joekerouac.async.task.model.AsyncTaskProcessorEngineConfig;
Expand All @@ -29,16 +38,8 @@
import com.github.joekerouac.common.tools.constant.ExceptionProviderConst;
import com.github.joekerouac.common.tools.string.StringUtils;
import com.github.joekerouac.common.tools.util.Assert;
import lombok.CustomLog;

import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.CustomLog;

/**
* 异步任务执行引擎
Expand Down Expand Up @@ -212,8 +213,8 @@ protected void runTask(AsyncTask task) {

if (processor == null) {
monitorService.noProcessor(requestId, task.getTask(), task.getProcessor());
// 更新状态为没有处理器,无法处理
repository.update(requestId, ExecStatus.FINISH, TaskFinishCode.NO_PROCESSOR, null, null, Const.IP);
// 如果没有处理器则不处理,可能是处理器还未注册或者被移除了,等待下次处理器被注册后执行
repository.update(requestId, ExecStatus.READY, null, null, null, null);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,43 @@ public interface ProcessorRegistry {
*/
Set<String> getAllTaskType();

/**
* 添加监听
*
* @param listener
* 监听
*/
void addListener(TaskProcessorListener listener);

interface TaskProcessorListener {

/**
* 任务处理器注册时回调,处理异常时忽略
*
* @param taskType
* 任务类型
* @param oldProcessor
* 旧任务处理器
* @param newProcessor
* 任务处理器
*/
default void onRegister(String taskType, AbstractAsyncTaskProcessor<?> oldProcessor,
AbstractAsyncTaskProcessor<?> newProcessor) {

}

/**
* 移除任务处理器时回调,处理异常时忽略
*
* @param taskType
* 任务类型
* @param processor
* 任务处理器
*/
default void onRemove(String taskType, AbstractAsyncTaskProcessor<?> processor) {

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ public interface TaskCacheQueue {
*/
void stop();

/**
* 刷新当前可以处理的taskType
*
* @param taskGroup
* taskGroup
*/
void refreshTaskTypes(Set<String> taskGroup);

/**
* 将任务尝试加到缓存
*
Expand Down
31 changes: 31 additions & 0 deletions doc/问题排查.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# 流式任务问题排查
## 常用sql

查看指定任务流当前执行节点
```mysql

SELECT
ft.type '任务流类型',
tn.request_id '任务流节点requestId',
tn.`status` '任务流节点状态',
task.`status` '异步任务状态',
task.processor '任务处理器'
FROM
flow_task ft
LEFT JOIN task_node tn ON tn.task_request_id = ft.request_id
LEFT JOIN async_task task ON tn.request_id = task.request_id
WHERE
ft.request_id = 'uat_1805487973127421954' and tn.`status` = 'RUNNING';

```

查询指定节点的子节点
```mysql
SELECT
tn.*
FROM
task_node_map tnm
LEFT JOIN task_node tn ON tnm.child_node = tn.request_id
WHERE
tnm.parent_node = '1812794903311155201';
```
2 changes: 1 addition & 1 deletion starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

<properties>
<slf4j.version>1.7.5</slf4j.version>
<lombok.version>1.18.10</lombok.version>
<lombok.version>1.18.26</lombok.version>

<testng.version>6.14.3</testng.version>
<druid.version>1.1.21</druid.version>
Expand Down
Loading

0 comments on commit d126816

Please sign in to comment.