Skip to content

Commit 0c0601e

Browse files
committed
增加静默模式,适用于无网络或者不想要接收消息场景
1 parent 5bdf254 commit 0c0601e

5 files changed

Lines changed: 109 additions & 25 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,10 @@
44

55

66
### 修复
7-
- MSG_PUSH_TOKEN环境变量读取问题
7+
- MSG_PUSH_TOKEN环境变量读取问题
8+
9+
## 2025年5月12日 TODO
10+
11+
1. CLI 命令增加一个 监听单个任务的功能:
12+
输入 taskflow monitor 'XXX', 其中'XXX'表示需要监听的任务的进程名称,针对已经有一个且只有一个任务已经在跑的场景,通过 monitor 在任务结束发送消息。
13+
<!-- 2. 发送消息改为 -->

README.md

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,17 @@ python -m build
107107
- name: "任务1-数据准备"
108108
command: "python scripts/prepare_data.py --input data/raw --output data/processed"
109109
status: "pending"
110+
# silent: false # 默认会发送消息通知
110111

111112
- name: "任务2-模型训练"
112113
command: "python scripts/train_model.py --data data/processed --epochs 10"
113114
status: "pending"
115+
silent: true # 静默模式,不发送消息通知
114116

115117
- name: "任务3-结果评估"
116118
command: "python scripts/evaluate.py --model-path models/latest.pt"
117119
status: "pending"
120+
# silent: false # 默认会发送消息通知
118121
```
119122

120123
### 2. (方法一)使用Python API (推荐使用方法二、三)
@@ -133,7 +136,8 @@ task_manager.run()
133136
# 您也可以动态添加任务
134137
task_manager.add_task_by_config(
135138
name="额外任务",
136-
command="echo '这是一个动态添加的任务'"
139+
command="echo '这是一个动态添加的任务'",
140+
silent=True # 设置为静默模式,不发送消息通知
137141
)
138142
```
139143

@@ -214,14 +218,43 @@ taskflow examples/tasks.yaml
214218
- name: "示例任务"
215219
command: "python script.py"
216220
status: "pending" # pending, running, completed, failed
217-
retry: 3 # 失败后重试次数
218-
timeout: 3600 # 任务超时时间(秒)
219-
depends_on: ["前置任务名称"] # 依赖的任务
220-
env: # 环境变量设置
221-
KEY1: "VALUE1"
222-
KEY2: "VALUE2"
221+
retry: 3 # 失败后重试次数 (TODO)
222+
timeout: 3600 # 任务超时时间(秒)(TODO)
223+
depends_on: ["前置任务名称"] # 依赖的任务 (TODO)
224+
silent: false # 是否静默执行(不发送消息通知)
223225
```
224226
227+
### 静默模式
228+
229+
MultiTaskFlow 支持静默模式,可以通过配置让某些任务不发送消息通知。这对于以下场景非常有用:
230+
231+
- **中间过程任务**:对于工作流中的中间步骤,可能不需要收到每个步骤的通知
232+
- **调试阶段任务**:在开发和调试阶段,可以关闭消息通知以避免干扰
233+
- **高频执行任务**:对于频繁执行的任务,可以只关注最终结果而不是每次执行
234+
235+
#### 配置静默模式:
236+
237+
1. **在YAML配置文件中**:
238+
```yaml
239+
- name: "静默任务"
240+
command: "python script.py"
241+
silent: true # 设置为静默模式
242+
```
243+
244+
2. **通过API动态添加**:
245+
```python
246+
task_manager.add_task_by_config(
247+
name="静默任务",
248+
command="echo '这是静默任务'",
249+
silent=True
250+
)
251+
```
252+
253+
当任务设置为静默模式时:
254+
- 任务执行时不会发送消息通知
255+
- 任务信息仍会记录在日志文件中
256+
- 如果所有任务都是静默模式,任务流管理器结束时也不会发送总结报告
257+
225258
### 自定义通知
226259

227260
您可以配置系统在任务状态变更时发送通知:

examples/tasks.yaml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,30 @@
66
- name: "数据预处理"
77
command: "python examples/scripts/data_preprocess.py --dataset coco --output-dir data/processed"
88
status: "pending"
9+
# silent: false # 默认会发送消息通知
910

1011
- name: "模型训练-阶段1"
1112
command: "python examples/scripts/train_model.py --model yolov8 --epochs 5 --batch-size 32 --dataset data/processed"
1213
status: "pending"
14+
silent: true # 静默模式,不发送消息通知
1315

1416
- name: "模型评估-阶段1"
1517
command: "python examples/scripts/evaluate_model.py --model-path checkpoints/yolov8_epoch_5.pt --dataset data/val"
1618
status: "pending"
19+
# silent: false # 默认会发送消息通知
1720

1821
- name: "模型训练-阶段2"
1922
command: "python examples/scripts/train_model.py --model yolov8 --epochs 5 --batch-size 16 --resume checkpoints/yolov8_epoch_5.pt"
2023
status: "pending"
24+
silent: true # 静默模式,不发送消息通知
2125

2226
- name: "模型评估-阶段2"
2327
command: "python examples/scripts/evaluate_model.py --model-path checkpoints/yolov8_epoch_5.pt --dataset data/val"
2428
status: "pending"
29+
# silent: false # 默认会发送消息通知
2530

2631
# 非Python任务示例
2732
- name: "数据归档"
2833
command: "bash examples/scripts/archive_data.sh checkpoints/yolov8_epoch_5.pt results/"
29-
status: "pending"
34+
status: "pending"
35+
# silent: false # 默认会发送消息通知

multitaskflow/process_monitor.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ class ProcessMonitor(Thread):
161161
process_cmd: 进程命令,用于查找进程
162162
pid: 进程ID
163163
start_time: 进程开始时间
164+
silent: 是否静默执行(不发送消息通知)
164165
165166
Methods:
166167
set_result: 设置任务执行结果和错误信息
@@ -182,7 +183,7 @@ class ProcessMonitor(Thread):
182183
"""
183184
}
184185

185-
def __init__(self, process_name: str, process_cmd: str, logger: logging.Logger, start_time: datetime = None):
186+
def __init__(self, process_name: str, process_cmd: str, logger: logging.Logger, start_time: datetime = None, silent: bool = False):
186187
"""
187188
初始化进程监控器
188189
@@ -191,6 +192,7 @@ def __init__(self, process_name: str, process_cmd: str, logger: logging.Logger,
191192
process_cmd: 进程命令(用于查找进程)
192193
logger: 日志记录器
193194
start_time: 开始时间,默认为当前时间
195+
silent: 是否静默执行(不发送消息通知),默认为False
194196
"""
195197
super().__init__()
196198
self.process_name = process_name
@@ -200,6 +202,7 @@ def __init__(self, process_name: str, process_cmd: str, logger: logging.Logger,
200202
self.daemon = True # 设置为守护线程,随主线程退出
201203
self.return_code = None
202204
self.error_message = None
205+
self.silent = silent
203206
load_dotenv()
204207
self.pid = self._find_process_pid()
205208

@@ -302,6 +305,8 @@ def send_notification(self):
302305
"""
303306
发送进程结束通知
304307
308+
如果设置了silent=True,将跳过消息发送,只记录日志
309+
305310
Returns:
306311
bool: 通知是否发送成功
307312
"""
@@ -317,6 +322,14 @@ def send_notification(self):
317322
error_msg=error_msg
318323
)
319324

325+
# 记录任务完成日志
326+
self.logger.info(f"任务 {self.process_name}{status},运行时长: {self.get_duration()}")
327+
328+
# 如果设置了静默模式,跳过消息发送
329+
if self.silent:
330+
self.logger.info("任务设置为静默模式,跳过消息发送")
331+
return True
332+
320333
return Msg_push(self.MESSAGE_TEMPLATE["title"], content, self.logger)
321334

322335
def setup_logger(name: str, log_dir: str = "logs") -> logging.Logger:

multitaskflow/task_flow.py

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -63,21 +63,23 @@ class Task:
6363
end_time: 结束时间
6464
return_code: 命令返回值
6565
duration: 执行时长
66+
silent: 是否静默执行(不发送消息通知)
6667
"""
6768

6869
STATUS_PENDING = "pending"
6970
STATUS_RUNNING = "running"
7071
STATUS_COMPLETED = "completed"
7172
STATUS_FAILED = "failed"
7273

73-
def __init__(self, name: str, command: str, status: str = STATUS_PENDING):
74+
def __init__(self, name: str, command: str, status: str = STATUS_PENDING, silent: bool = False):
7475
"""
7576
初始化任务实例
7677
7778
Args:
7879
name: 任务名称
7980
command: 要执行的命令行字符串
8081
status: 初始状态,默认为"pending"
82+
silent: 是否静默执行(不发送消息通知),默认为False
8183
"""
8284
self.name = name
8385
self.command = command
@@ -89,6 +91,7 @@ def __init__(self, name: str, command: str, status: str = STATUS_PENDING):
8991
self.error_message = None
9092
self.duration = None
9193
self.monitor = None
94+
self.silent = silent
9295

9396
def to_dict(self) -> Dict[str, Any]:
9497
"""
@@ -105,7 +108,8 @@ def to_dict(self) -> Dict[str, Any]:
105108
"end_time": self.end_time.strftime('%Y-%m-%d %H:%M:%S') if self.end_time else None,
106109
"duration": str(self.duration) if self.duration else "未完成",
107110
"return_code": self.return_code,
108-
"error_message": self.error_message
111+
"error_message": self.error_message,
112+
"silent": self.silent
109113
}
110114

111115
def update_duration(self):
@@ -241,6 +245,11 @@ def load_tasks(self):
241245
从配置文件加载初始任务
242246
243247
配置文件应为YAML格式,包含任务列表,每个任务需指定名称和命令
248+
任务可以包含以下参数:
249+
- name: 任务名称(必需)
250+
- command: 要执行的命令(必需)
251+
- status: 任务状态(可选,默认为"pending")
252+
- silent: 是否静默执行(可选,默认为False,设为True时不发送消息通知)
244253
"""
245254
try:
246255
with open(self.config_path, 'r', encoding='utf-8') as f:
@@ -253,7 +262,8 @@ def load_tasks(self):
253262
task = Task(
254263
name=task_config['name'],
255264
command=task_config['command'],
256-
status=task_config.get('status', 'pending')
265+
status=task_config.get('status', 'pending'),
266+
silent=task_config.get('silent', False)
257267
)
258268
self.add_task(task)
259269

@@ -275,18 +285,19 @@ def add_task(self, task: Task):
275285
self.total_tasks += 1
276286
self.logger.info(f"新任务已添加: {task.name}")
277287

278-
def add_task_by_config(self, name: str, command: str):
288+
def add_task_by_config(self, name: str, command: str, silent: bool = False):
279289
"""
280290
通过参数添加新任务
281291
282292
Args:
283293
name: 任务名称
284294
command: 要执行的命令
295+
silent: 是否静默执行(不发送消息通知),默认为False
285296
286297
Returns:
287298
Task: 新添加的任务实例
288299
"""
289-
task = Task(name=name, command=command)
300+
task = Task(name=name, command=command, silent=silent)
290301
self.add_task(task)
291302
return task
292303

@@ -381,6 +392,9 @@ def execute_task(self, task: Task) -> bool:
381392
self.logger.info(f"开始执行任务: {task.name}")
382393
self.logger.info(f"执行命令: {task.command}")
383394

395+
if task.silent:
396+
self.logger.info("任务设置为静默模式,不会发送消息通知")
397+
384398
task.start()
385399

386400
try:
@@ -399,7 +413,8 @@ def execute_task(self, task: Task) -> bool:
399413
process_name=task.name,
400414
process_cmd=task.command,
401415
logger=self.logger,
402-
start_time=task.start_time
416+
start_time=task.start_time,
417+
silent=task.silent # 传递静默模式设置
403418
)
404419
task.monitor.start()
405420

@@ -459,7 +474,8 @@ def check_new_tasks(self):
459474
task = Task(
460475
name=task_config['name'],
461476
command=task_config['command'],
462-
status=task_config.get('status', 'pending')
477+
status=task_config.get('status', 'pending'),
478+
silent=task_config.get('silent', False)
463479
)
464480
self.add_task(task)
465481

@@ -518,6 +534,8 @@ def run(self):
518534
def stop(self):
519535
"""
520536
停止任务流管理器并发送总结报告
537+
538+
如果所有任务都是静默模式,则不发送总结报告
521539
"""
522540
self.stop_event.set()
523541
self.logger.info("正在停止任务流管理器...")
@@ -526,15 +544,23 @@ def stop(self):
526544
if self.running:
527545
self.end_time = datetime.now()
528546
summary = self.generate_summary()
529-
self.logger.info("发送任务总结报告...")
530-
self.logger.info(summary) # 在日志中也记录摘要
547+
self.logger.info("任务总结报告:")
548+
self.logger.info(summary) # 在日志中记录摘要
531549

532-
# 发送总结报告
533-
Msg_push(
534-
title="任务流管理器执行报告",
535-
content=summary,
536-
logger=self.logger
537-
)
550+
# 检查是否所有任务都是静默模式
551+
all_silent = all(task.silent for task in self.tasks if len(self.tasks) > 0)
552+
553+
# 只有在非静默模式下才发送总结报告
554+
if not all_silent:
555+
# 发送总结报告
556+
self.logger.info("发送任务总结报告...")
557+
Msg_push(
558+
title="任务流管理器执行报告",
559+
content=summary,
560+
logger=self.logger
561+
)
562+
else:
563+
self.logger.info("所有任务都设置为静默模式,跳过发送总结报告")
538564

539565
def is_running(self) -> bool:
540566
"""

0 commit comments

Comments
 (0)