Skip to content

Commit

Permalink
日志优化,执行优化
Browse files Browse the repository at this point in the history
  • Loading branch information
JoeKerouac committed Jan 20, 2024
1 parent 04282b8 commit ba0cbf9
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import com.github.joekerouac.async.task.model.ExecStatus;
import com.github.joekerouac.async.task.model.TaskFinishCode;
import com.github.joekerouac.async.task.model.TaskQueueConfig;
import com.github.joekerouac.async.task.service.InternalTraceService;
import com.github.joekerouac.async.task.spi.AsyncTaskRepository;
import com.github.joekerouac.async.task.spi.TaskCacheQueue;
import com.github.joekerouac.common.tools.collection.Pair;
import com.github.joekerouac.common.tools.constant.StringConst;
import com.github.joekerouac.common.tools.lock.LockTaskUtil;
import com.github.joekerouac.common.tools.scheduler.SimpleSchedulerTask;

Expand Down Expand Up @@ -206,6 +208,7 @@ public AsyncTask take() throws InterruptedException {

do {
taskRequestId = takeFromMem();
LOGGER.info("[taskExec] [{}] [{}], 从内存获取到任务", InternalTraceService.currentTrace(), taskRequestId);
} while (!lockTask(taskRequestId));

// 任务锁定后从数据库刷新任务状态,因为内存中的可能已经不对了
Expand Down Expand Up @@ -284,12 +287,13 @@ public void removeTask(Set<String> taskRequestIds) {
* @return true表示锁定成功,false表示锁定失败,任务不能执行
*/
private boolean lockTask(String taskRequestId) {
while (repository.casUpdate(taskRequestId, ExecStatus.READY, ExecStatus.RUNNING, Const.IP) <= 0) {
String currentExecIp = Const.IP + StringConst.DOT + InternalTraceService.currentTrace();
while (repository.casUpdate(taskRequestId, ExecStatus.READY, ExecStatus.RUNNING, currentExecIp) <= 0) {
// 如果CAS更新失败,则从数据库刷新任务,看任务是否已经不一致了
AsyncTask task = repository.selectByRequestId(taskRequestId);

if (task == null) {
LOGGER.info("[taskExec] 任务已经被删除, 忽略该任务 [{}]", taskRequestId);
LOGGER.info("[taskExec] [{}] 任务已经被删除, 忽略该任务 [{}]", InternalTraceService.currentTrace(), taskRequestId);
return false;
}

Expand All @@ -298,22 +302,23 @@ private boolean lockTask(String taskRequestId) {
// 如果任务已经不是READY状态,那么就无需处理了
if (status != ExecStatus.READY) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[taskExec] [{}] 任务 [{}] 已经在其他机器处理了,无需重复处理", taskRequestId, task);
LOGGER.debug("[taskExec] [{}] [{}] 任务 [{}] 已经在其他机器处理了,无需重复处理", InternalTraceService.currentTrace(),
taskRequestId, task);
}

String execIp = task.getExecIp();
// 理论上不应该出现
if (Objects.equals(execIp, Const.IP) && task.getTaskFinishCode() != TaskFinishCode.CANCEL) {
LOGGER.warn("[taskExec] [{}] 当前任务的执行IP与本主机一致,但是状态不是ready, status: [{}], task: [{}]", taskRequestId,
status, task);
if (Objects.equals(execIp, currentExecIp) && task.getTaskFinishCode() != TaskFinishCode.CANCEL) {
LOGGER.warn("[taskExec] [{}] [{}] 当前任务的执行IP与本主机一致,但是状态不是ready, status: [{}], task: [{}]",
InternalTraceService.currentTrace(), taskRequestId, status, task);
}

// 结束锁定循环,重新从内存队列中捞取数据
return false;
}
}

LOGGER.debug("[taskExec] [{}] 任务锁定成功, 准备执行", taskRequestId);
LOGGER.debug("[taskExec] [{}] [{}] 任务锁定成功, 准备执行", InternalTraceService.currentTrace(), taskRequestId);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ protected AsyncTaskProcessorEngine build(TaskGroupConfig taskGroupConfig, TaskCa
asyncTaskProcessorEngineConfig.setTraceService(taskGroupConfig.getTraceService());
asyncTaskProcessorEngineConfig.setMonitorService(taskGroupConfig.getMonitorService());
asyncTaskProcessorEngineConfig.setRepository(taskGroupConfig.getRepository());
asyncTaskProcessorEngineConfig.setInternalTraceService(taskGroupConfig.getInternalTraceService());
return config.getEngineFactory().create(asyncTaskProcessorEngineConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import javax.validation.constraints.NotNull;

import com.github.joekerouac.async.task.service.InternalTraceService;
import com.github.joekerouac.async.task.spi.AsyncTaskRepository;
import com.github.joekerouac.async.task.spi.MonitorService;
import com.github.joekerouac.async.task.spi.ProcessorRegistry;
Expand Down Expand Up @@ -65,4 +66,9 @@ public class AsyncTaskProcessorEngineConfig {
@NotNull
private AsyncTaskRepository repository;

/**
* 内部trace
*/
@NotNull
private InternalTraceService internalTraceService;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import com.github.joekerouac.async.task.service.InternalTraceService;
import com.github.joekerouac.async.task.spi.AsyncTaskProcessorEngineFactory;
import com.github.joekerouac.async.task.spi.AsyncTaskRepository;
import com.github.joekerouac.async.task.spi.AsyncTransactionManager;
Expand Down Expand Up @@ -101,4 +102,10 @@ public class TaskGroupConfig {
@NotNull
private AsyncTransactionManager transactionManager;

/**
* 内部trace生成器,只需要保证本实例唯一即可
*/
@NotNull
private InternalTraceService internalTraceService;

}
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,12 @@ public AsyncTaskServiceImpl(@NotNull AsyncServiceConfig config) {
this.taskGroupMap = new HashMap<>();
this.config = config;

InternalTraceService internalTraceService = new InternalTraceService();
Map<Set<String>, AsyncTaskExecutorConfig> executorConfigs = config.getExecutorConfigs();
Set<String> set = new HashSet<>();
if (!CollectionUtil.isEmpty(executorConfigs)) {
executorConfigs.forEach((processorNames, executorConfig) -> {
TaskGroup taskGroup = build(config, executorConfig, processorNames, true);
TaskGroup taskGroup = build(config, executorConfig, processorNames, true, internalTraceService);
for (String processorName : processorNames) {
Assert.assertTrue(set.add(processorName),
StringUtils.format("处理器有多个配置, processor: [{}]", processorName),
Expand All @@ -112,7 +113,7 @@ public AsyncTaskServiceImpl(@NotNull AsyncServiceConfig config) {
});
}

this.defaultGroup = build(config, config.getDefaultExecutorConfig(), set, false);
this.defaultGroup = build(config, config.getDefaultExecutorConfig(), set, false, internalTraceService);

TaskClearRunner taskClearRunner = new TaskClearRunner(config.getRepository(), config.getProcessorRegistry());
Thread taskClearThread = new Thread(taskClearRunner, "异步任务自动清理线程");
Expand All @@ -132,10 +133,12 @@ public AsyncTaskServiceImpl(@NotNull AsyncServiceConfig config) {
* 任务列表
* @param contain
* true表示异步任务引擎只处理processorGroup中包含的任务,false表示异步任务处理引擎不应该处理processorGroup中包含的任务,而应该处理所有其他任务
* @param internalTraceService
* 内部trace服务
* @return 任务组
*/
private TaskGroup build(AsyncServiceConfig asyncServiceConfig, AsyncTaskExecutorConfig executorConfig,
Set<String> taskTypeGroup, boolean contain) {
Set<String> taskTypeGroup, boolean contain, InternalTraceService internalTraceService) {
int cacheQueueSize = executorConfig.getCacheQueueSize();
int loadThreshold = executorConfig.getLoadThreshold();
Assert.assertTrue(
Expand Down Expand Up @@ -163,6 +166,7 @@ private TaskGroup build(AsyncServiceConfig asyncServiceConfig, AsyncTaskExecutor
taskGroupConfig.setMonitorInterval(executorConfig.getMonitorInterval());
taskGroupConfig.setRepository(asyncServiceConfig.getRepository());
taskGroupConfig.setTransactionManager(asyncServiceConfig.getTransactionManager());
taskGroupConfig.setInternalTraceService(internalTraceService);

return new TaskGroup(taskGroupConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class DefaultAsyncTaskProcessorEngine implements AsyncTaskProcessorEngine

private final AsyncTaskRepository repository;

private final InternalTraceService internalTraceService;

/**
* 记录是否启动
*/
Expand All @@ -88,6 +90,7 @@ public DefaultAsyncTaskProcessorEngine(AsyncTaskProcessorEngineConfig engineConf
this.taskCacheQueue = engineConfig.getTaskCacheQueue();
this.monitorService = engineConfig.getMonitorService();
this.repository = engineConfig.getRepository();
this.internalTraceService = engineConfig.getInternalTraceService();
}

@Override
Expand All @@ -110,12 +113,14 @@ public synchronized void start() {
currentThread.setContextClassLoader(loader);
while (start) {
try {
AsyncTask task = taskCacheQueue.take();
if (task == null) {
return;
}

runTask(task);
InternalTraceService.runWithTrace(internalTraceService.generate(), () -> {
AsyncTask task = taskCacheQueue.take();
if (task == null) {
return;
}

runTask(task);
});
} catch (Throwable throwable) {
if (start || !(throwable instanceof InterruptedException)) {
monitorService.uncaughtException(currentThread, throwable);
Expand Down Expand Up @@ -151,7 +156,8 @@ public synchronized void stop() {
*/
protected void runTask(AsyncTask task) {
Long t0 = System.currentTimeMillis();
LOGGER.info("[taskExec] [{}] 准备执行任务: [{}]", task.getRequestId(), task);
LOGGER.info("[taskExec] [{}] [{}] 准备执行任务: [{}]", InternalTraceService.currentTrace(), task.getRequestId(),
task);

String taskRequestId = task.getRequestId();

Expand All @@ -163,7 +169,8 @@ protected void runTask(AsyncTask task) {

if (l > 0) {
// 理论上不会出现
LOGGER.warn("[taskExec] [{}] 任务 [{}] 未到执行时间,不执行,跳过执行, 当前时间:[{}]", task.getRequestId(), task, now);
LOGGER.warn("[taskExec] [{}] [{}] 任务 [{}] 未到执行时间,不执行,跳过执行, 当前时间:[{}]", InternalTraceService.currentTrace(),
task.getRequestId(), task, now);
// 注意,这里是专门设计为更新数据库而不把任务加入缓存的,防止任务加入队列中后立即再次到这里
repository.update(taskRequestId, ExecStatus.READY, null, null, null, null);
return;
Expand Down Expand Up @@ -220,8 +227,8 @@ protected void runTask(AsyncTask task) {

Long t2 = System.currentTimeMillis();

LOGGER.info(throwable, "[taskExec] [{}] 任务执行结果:[{}:{}], 总耗时: {}ms, 任务执行耗时: {}ms", requestId, result, context,
t2 - t0, t2 - t1);
LOGGER.info(throwable, "[taskExec] [{}] [{}] 任务执行结果:[{}:{}], 总耗时: {}ms, 任务执行耗时: {}ms",
InternalTraceService.currentTrace(), requestId, result, context, t2 - t0, t2 - t1);
try {
switch (result) {
case SUCCESS:
Expand Down Expand Up @@ -259,7 +266,8 @@ protected void runTask(AsyncTask task) {
task.setRetry(retryCount);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug(throwable, "[taskExec] [{}] 任务重试, [{}:{}]", requestId, nextExecTime, context);
LOGGER.debug(throwable, "[taskExec] [{}] [{}] 任务重试, [{}:{}]",
InternalTraceService.currentTrace(), requestId, nextExecTime, context);
}
monitorService.processRetry(requestId, context, processor, throwable, nextExecTime);
// 任务重新加到内存队列中
Expand All @@ -273,7 +281,8 @@ protected void runTask(AsyncTask task) {
break;
default:
throw new IllegalStateException(
StringUtils.format("[taskExec] [{}] 不支持的结果状态: [{}], task: [{}]", requestId, result, task));
StringUtils.format("[taskExec] [{}] [{}] 不支持的结果状态: [{}], task: [{}]",
InternalTraceService.currentTrace(), requestId, result, task));
}
} finally {
if (traceService != null && traceContext != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.github.joekerouac.async.task.service;

import java.lang.management.ManagementFactory;
import java.lang.ref.WeakReference;
import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.github.joekerouac.common.tools.date.DateUtil;
import com.github.joekerouac.common.tools.function.InterruptedTaskWithoutResult;
import com.github.joekerouac.common.tools.lock.LockTaskUtil;
import com.github.joekerouac.common.tools.string.StringUtils;

import lombok.CustomLog;

/**
* 内部使用,外部请勿使用
*
* @author JoeKerouac
* @date 2024-01-20 09:48:18
* @since 4.0.0
*/
@CustomLog
public class InternalTraceService {

private static final ThreadLocal<String> currentTrace = new ThreadLocal<>();

private final String pid;

private long baseTime;

private final int seqLen;

private final int seqMax;

private final AtomicLong counter;

private final ReadWriteLock lock;

InternalTraceService() {
String pidMaxProp = System.getProperty("async.task.pid.max");
int pidMax = 32768;
if (StringUtils.isNotBlank(pidMaxProp)) {
pidMax = Integer.parseInt(pidMaxProp);
}

int pid = Integer.parseInt(ManagementFactory.getRuntimeMXBean().getName().split("@")[0]);
if (pid > pidMax) {
throw new RuntimeException(String.format("当前pid max: %d, 当前pid: %d, 请使用启动参数-Dasync.task.pid.max=xxx"
+ "(将xxx替换为实际的pid最大值)来指定当前操作系统允许的最大pid值;PS: 可以通过cat /prod/sys/kernel/pid_max来查看", pidMax, pid));
}

this.baseTime = System.currentTimeMillis();
this.pid = String.format("%0" + Integer.toString(pidMax).length() + "d", pid);
this.seqLen = 4;
this.seqMax = (int)Math.pow(10, seqLen);
this.counter = new AtomicLong(0);

this.lock = new ReentrantReadWriteLock();

WeakReference<InternalTraceService> reference = new WeakReference<>(this);
// 5秒刷新一次,矫正一次逻辑时钟
Thread refreshThread = new Thread(() -> {
while (true) {
try {
Thread.sleep(1000 * 5);
InternalTraceService traceService = reference.get();
if (traceService != null) {
traceService.refresh();
// help gc
traceService = null;
} else {
break;
}
} catch (Throwable throwable) {
LOGGER.debug("id刷新线程异常,忽略异常", throwable);
}
}
LOGGER.info("id刷新线程停止");
}, "id刷新线程");
refreshThread.setDaemon(true);
refreshThread.start();
}

/**
* 生成一个本机唯一ID作为trace
*
* @return trace
*/
public String generate() {
long[] args = LockTaskUtil.runWithLock(lock.readLock(), () -> {
long value = counter.getAndIncrement();
// 逻辑时间精确到秒,所以这里进位要乘1000换算成秒
return new long[] {value, baseTime};
});

long value = args[0];
long baseTime = args[1];

// 逻辑时间精确到秒,所以这里进位要乘1000换算成秒
long timestamp = baseTime + (value / seqMax) * 1000;
String time = DateUtil.getFormatDate(new Date(timestamp), "yyyyMMddHHmmss");
// transId:machineId + seq
return time + pid + String.format("%0" + seqLen + "d", value % seqMax);
}

public static void runWithTrace(String trace, InterruptedTaskWithoutResult task) throws InterruptedException {
String old = currentTrace.get();
currentTrace.set(trace);
try {
task.run();
} finally {
currentTrace.set(old);
}
}

public static String currentTrace() {
return currentTrace.get();
}

/**
* 如果当前计数器时间偏移过大则矫正计数器
*/
private void refresh() {
LockTaskUtil.runWithLock(lock.writeLock(), () -> {
long value = counter.get();
// 逻辑时间精确到秒,所以这里进位要乘1000换算成秒
long timestamp = baseTime + (value / seqMax) * 1000;
long currentTimeMillis = System.currentTimeMillis();

// 这里设置1秒的冗余,如果逻辑时间加上1秒还是小于当前时间,那么重置时间
if (currentTimeMillis > (timestamp + 1000)) {
baseTime = currentTimeMillis;
counter.set(0);
}
});
}

}
Loading

0 comments on commit ba0cbf9

Please sign in to comment.