Skip to content

Commit 83121bf

Browse files
chengzeyiclaude
andcommitted
Apply concurrency modifier at init and improve job logging
- Call concurrency_modifier immediately in JobScaler.__init__ and resize queue if concurrency changes - Add timeout to asyncio.wait in run_jobs for non-blocking job polling - Add detailed logging for job start/completion counts 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 0b1f89e commit 83121bf

File tree

1 file changed

+22
-3
lines changed

1 file changed

+22
-3
lines changed

src/wavespeed/serverless/modules/scaler.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ def __init__(self, config: Dict[str, Any]) -> None:
6262
if concurrency_modifier := config.get("concurrency_modifier"):
6363
self.concurrency_modifier = concurrency_modifier
6464

65+
# Apply concurrency modifier immediately
66+
old_concurrency = self.current_concurrency
67+
self.current_concurrency = self.concurrency_modifier(self.current_concurrency)
68+
if self.current_concurrency != old_concurrency:
69+
self.jobs_queue = asyncio.Queue(maxsize=self.current_concurrency)
70+
6571
# Allow overriding jobs_fetcher and handler in local test mode
6672
if not IS_LOCAL_TEST:
6773
return
@@ -207,20 +213,33 @@ async def run_jobs(self, session: aiohttp.ClientSession) -> None:
207213
tasks: list[asyncio.Task[None]] = []
208214

209215
while self.is_alive() or not self.jobs_queue.empty():
216+
num_tasks = len(tasks)
217+
210218
while len(tasks) < self.current_concurrency and not self.jobs_queue.empty():
211219
job = await self.jobs_queue.get()
212220
task = asyncio.create_task(self.handle_job(session, job))
213221
tasks.append(task)
214222

215-
if tasks:
216-
log.info(f"Jobs in progress: {len(tasks)}")
223+
if len(tasks) > num_tasks:
224+
log.info(
225+
f"Started {len(tasks) - num_tasks} new job(s), "
226+
f"total jobs in progress: {len(tasks)}"
227+
)
217228

229+
if tasks:
218230
done, pending = await asyncio.wait(
219-
tasks, return_when=asyncio.FIRST_COMPLETED
231+
tasks, timeout=0.1, return_when=asyncio.FIRST_COMPLETED
220232
)
221233

234+
num_tasks = len(tasks)
222235
tasks = [t for t in tasks if t not in done]
223236

237+
if len(tasks) < num_tasks:
238+
log.info(
239+
f"Completed {num_tasks - len(tasks)} job(s), "
240+
f"total jobs in progress: {len(tasks)}"
241+
)
242+
224243
await asyncio.sleep(0)
225244

226245
await asyncio.gather(*tasks)

0 commit comments

Comments
 (0)