1
1
import json
2
2
from apscheduler .events import EVENT_ALL
3
- from apscheduler .executors .pool import ThreadPoolExecutor , ProcessPoolExecutor
4
- from apscheduler .schedulers . background import BackgroundScheduler
3
+ from apscheduler .executors .asyncio import AsyncIOExecutor
4
+ from apscheduler .executors . pool import ProcessPoolExecutor
5
5
from apscheduler .jobstores .memory import MemoryJobStore
6
6
from apscheduler .jobstores .redis import RedisJobStore
7
7
from apscheduler .jobstores .sqlalchemy import SQLAlchemyJobStore
8
+ from apscheduler .schedulers .asyncio import AsyncIOScheduler
8
9
from apscheduler .triggers .cron import CronTrigger
10
+ from asyncio import iscoroutinefunction
9
11
from datetime import datetime , timedelta
10
12
from sqlalchemy .engine import create_engine
11
13
from sqlalchemy .orm import sessionmaker
@@ -109,9 +111,9 @@ def __find_recent_workday(cls, day: int):
109
111
)
110
112
),
111
113
}
112
- executors = {'default' : ThreadPoolExecutor ( 20 ), 'processpool' : ProcessPoolExecutor (5 )}
114
+ executors = {'default' : AsyncIOExecutor ( ), 'processpool' : ProcessPoolExecutor (5 )}
113
115
job_defaults = {'coalesce' : False , 'max_instance' : 1 }
114
- scheduler = BackgroundScheduler ()
116
+ scheduler = AsyncIOScheduler ()
115
117
scheduler .configure (jobstores = job_stores , executors = executors , job_defaults = job_defaults )
116
118
117
119
@@ -132,9 +134,7 @@ async def init_system_scheduler(cls):
132
134
async with AsyncSessionLocal () as session :
133
135
job_list = await JobDao .get_job_list_for_scheduler (session )
134
136
for item in job_list :
135
- query_job = cls .get_scheduler_job (job_id = str (item .job_id ))
136
- if query_job :
137
- cls .remove_scheduler_job (job_id = str (item .job_id ))
137
+ cls .remove_scheduler_job (job_id = str (item .job_id ))
138
138
cls .add_scheduler_job (item )
139
139
scheduler .add_listener (cls .scheduler_event_listener , EVENT_ALL )
140
140
logger .info ('系统初始定时任务加载成功' )
@@ -169,6 +169,10 @@ def add_scheduler_job(cls, job_info: JobModel):
169
169
:param job_info: 任务对象信息
170
170
:return:
171
171
"""
172
+ job_func = eval (job_info .invoke_target )
173
+ job_executor = job_info .job_executor
174
+ if iscoroutinefunction (job_func ):
175
+ job_executor = 'default'
172
176
scheduler .add_job (
173
177
func = eval (job_info .invoke_target ),
174
178
trigger = MyCronTrigger .from_crontab (job_info .cron_expression ),
@@ -180,7 +184,7 @@ def add_scheduler_job(cls, job_info: JobModel):
180
184
coalesce = True if job_info .misfire_policy == '2' else False ,
181
185
max_instances = 3 if job_info .concurrent == '0' else 1 ,
182
186
jobstore = job_info .job_group ,
183
- executor = job_info . job_executor ,
187
+ executor = job_executor ,
184
188
)
185
189
186
190
@classmethod
@@ -191,6 +195,10 @@ def execute_scheduler_job_once(cls, job_info: JobModel):
191
195
:param job_info: 任务对象信息
192
196
:return:
193
197
"""
198
+ job_func = eval (job_info .invoke_target )
199
+ job_executor = job_info .job_executor
200
+ if iscoroutinefunction (job_func ):
201
+ job_executor = 'default'
194
202
scheduler .add_job (
195
203
func = eval (job_info .invoke_target ),
196
204
trigger = 'date' ,
@@ -203,7 +211,7 @@ def execute_scheduler_job_once(cls, job_info: JobModel):
203
211
coalesce = True if job_info .misfire_policy == '2' else False ,
204
212
max_instances = 3 if job_info .concurrent == '0' else 1 ,
205
213
jobstore = job_info .job_group ,
206
- executor = job_info . job_executor ,
214
+ executor = job_executor ,
207
215
)
208
216
209
217
@classmethod
@@ -214,7 +222,9 @@ def remove_scheduler_job(cls, job_id: Union[str, int]):
214
222
:param job_id: 任务id
215
223
:return:
216
224
"""
217
- scheduler .remove_job (job_id = str (job_id ))
225
+ query_job = cls .get_scheduler_job (job_id = job_id )
226
+ if query_job :
227
+ scheduler .remove_job (job_id = str (job_id ))
218
228
219
229
@classmethod
220
230
def scheduler_event_listener (cls , event ):
0 commit comments