-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathasync_runner.py
More file actions
166 lines (137 loc) · 7.17 KB
/
Copy pathasync_runner.py
File metadata and controls
166 lines (137 loc) · 7.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""Async-safe wrapper for blocking document-generation calls.
Every Office-document generator in this project (``markdown_to_word``,
``markdown_to_excel``, ``create_presentation``, ``create_eml``,
``create_xml_file``, and every dynamically-registered template tool) is a
synchronous, blocking function. They perform:
* file I/O (opening .docx / .pptx templates, which are zip archives)
* CPU-bound markdown / mustache rendering
* synchronous network I/O (``requests.get`` for image downloads,
``boto3`` S3 uploads)
FastMCP runs on top of an asyncio event loop (uvicorn / Starlette).
Calling a blocking function directly from an ``async def`` MCP tool
handler freezes the loop for the full duration of the call — no other
request can be served, including the Kubernetes liveness / readiness
probes. Repeated probe timeouts cause kubelet to SIGTERM the pod, which
is the failure mode observed in EKS:
ERROR: ASGI callable returned without completing response.
ERROR: Cancel 0 running task(s), timeout graceful shutdown exceeded
(The "0 running tasks" line is the giveaway: there were no async tasks
because the work was running synchronously on the event loop itself.)
Whether blocking work is offloaded to a worker thread is controlled by
the ``RUN_BLOCKING_BY_ASYNCIO_THREAD_ENABLED`` environment variable
(see ``config.Config.run_blocking_by_asyncio_thread_enabled``):
* Disabled (default) — call the sync function inline on the event
loop. The loop is blocked until the call returns. This is the
legacy behaviour for static tools; dynamic tools (previously
registered as sync ``def`` and auto-threaded by FastMCP) will also
run on the event loop in this mode, since the dynamic handlers are
now ``async def`` wrappers around ``run_blocking``.
* Enabled — dispatch the sync function to asyncio's default thread
pool via ``asyncio.to_thread``. The event loop stays free to serve
health probes and concurrent requests. Recommended for EKS.
All tool call sites uniformly write ``await run_blocking(func, *args,
**kwargs)`` — the helper internally chooses the dispatch strategy based
on config, so call sites never branch on the flag.
This helper lives in its own module (not inside ``main.py``) so that
dynamically-registered tool modules can import it without creating a
circular dependency back through ``main``.
"""
from __future__ import annotations
import asyncio
import functools
import logging
import threading
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Optional, TypeVar
from config import get_config
logger = logging.getLogger(__name__)
T = TypeVar("T")
# ---------------------------------------------------------------------------
# Bounded ThreadPoolExecutor singleton
# ---------------------------------------------------------------------------
# Why a bounded executor (instead of `asyncio.to_thread`'s default)?
#
# `asyncio.to_thread` runs on Python's default executor whose
# ``max_workers = min(32, os.cpu_count() + 4)``. On EKS, ``os.cpu_count()``
# returns the **host's** core count (not the pod CPU limit), so the default
# can spawn ~32 worker threads on a 1-vCPU pod. For CPU-bound, pure-Python
# work (such as ``markdown_to_word`` parsing) the GIL serialises all of
# them onto a single core anyway, and each thread receives only a small
# slice of CPU. Concurrent requests pile up and every one of them takes
# 10×+ longer than it should — exceeding the client's timeout while the
# event loop stays "healthy".
#
# A bounded executor with a small ``max_workers`` (default 4 for a 1 vCPU
# pod) keeps concurrent work to a level the GIL can actually progress
# through:
#
# * 1 thread holds the GIL doing CPU work (markdown parsing)
# * up to 3 others can be in I/O (S3 upload / image download) — they
# release the GIL while blocked on syscalls
# * additional requests queue cleanly in the executor's work queue
# instead of fanning out and starving each other
#
# Tunable via ``RUN_BLOCKING_MAX_WORKERS`` (see config.Config). Recreate
# the executor by deleting this module from ``sys.modules`` (tests do
# this through their reload helper).
# ---------------------------------------------------------------------------
_EXECUTOR: Optional[ThreadPoolExecutor] = None
_EXECUTOR_LOCK = threading.Lock()
def _get_executor() -> ThreadPoolExecutor:
"""Return the process-wide bounded executor, creating it on first use.
Thread-safe double-checked locking pattern: the fast path requires no
lock acquisition once the executor exists.
"""
global _EXECUTOR
if _EXECUTOR is None:
with _EXECUTOR_LOCK:
if _EXECUTOR is None:
max_workers = get_config().run_blocking_max_workers
_EXECUTOR = ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix="run_blocking",
)
logger.info(
"[run_blocking] bounded ThreadPoolExecutor created max_workers=%d",
max_workers,
)
return _EXECUTOR
async def run_blocking(func: Callable[..., T], /, *args, **kwargs) -> T:
"""Run a synchronous callable, optionally on a bounded worker thread.
Dispatch is governed by ``config.run_blocking_by_asyncio_thread_enabled``
(env var ``RUN_BLOCKING_BY_ASYNCIO_THREAD_ENABLED``):
* **Enabled** — the call is submitted to a process-wide
``ThreadPoolExecutor`` whose size is controlled by
``config.run_blocking_max_workers`` (default 4). The event loop
remains responsive to health probes and concurrent requests while
``func`` runs; when the pool is full, additional calls queue
cleanly instead of spawning unbounded threads that would all
contend for the GIL.
* **Disabled (default)** — the call runs inline on the event loop,
preserving the original blocking behaviour. ``func``'s return
value is returned without ever yielding to the loop.
The signature is identical in both modes — call sites always write
``await run_blocking(func, *args, **kwargs)``.
Args:
func: The blocking function to execute.
*args: Positional arguments forwarded to ``func``.
**kwargs: Keyword arguments forwarded to ``func``.
Returns:
Whatever ``func`` returns.
Raises:
Any exception ``func`` raises propagates to the caller. In
threaded mode the exception is marshalled back from the worker
thread automatically by ``loop.run_in_executor``.
"""
# Read the flag lazily, on every call, so that tests (or admins)
# that mutate the config singleton between calls see the new value
# without having to reload this module.
if get_config().run_blocking_by_asyncio_thread_enabled:
logger.debug("%s is running on bounded executor", func.__name__)
# functools.partial binds kwargs cleanly; run_in_executor only
# accepts positional args, so partial is required (not optional).
bound = functools.partial(func, *args, **kwargs)
loop = asyncio.get_running_loop()
return await loop.run_in_executor(_get_executor(), bound)
# Legacy path: call inline on the event loop (blocks until done).
return func(*args, **kwargs)