Skip to content

Commit b759d4a

Browse files
committed
concurrent: move request submission off event loop thread
ConcurrentExecutorListResults now uses a dedicated submitter thread instead of calling _execute_next inline from the event loop callback. This decouples I/O completion processing from new request serialization and enqueuing, yielding ~6-9% higher write throughput. The callback now just signals a threading.Event; the submitter thread drains a deque and calls session.execute_async in batches. This avoids blocking the libev event loop thread with request preparation work (query plan, serialization, tablet lookup) that takes ~27us per request. Bug fixes included: - Start submitter thread after initial batch (avoid race on _exec_count) - Write _exec_count under _condition lock (avoid race with _current) - Use exhausted flag to avoid repeated StopIteration on iterator
1 parent 3aa5935 commit b759d4a

1 file changed

Lines changed: 174 additions & 18 deletions

File tree

cassandra/concurrent.py

Lines changed: 174 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
# limitations under the License.
1414

1515

16-
from collections import namedtuple
16+
from collections import deque, namedtuple
1717
from heapq import heappush, heappop
1818
from itertools import cycle
19-
from threading import Condition
19+
from threading import Condition, Event, Thread
2020
import sys
2121

2222
from cassandra.cluster import ResultSet, EXEC_PROFILE_DEFAULT
@@ -193,28 +193,184 @@ class ConcurrentExecutorListResults(_ConcurrentExecutor):
193193

194194
def execute(self, concurrency, fail_fast):
195195
self._exception = None
196-
return super(ConcurrentExecutorListResults, self).execute(concurrency, fail_fast)
196+
self._submit_ready = deque()
197+
self._submit_event = Event()
198+
self._stop_event = Event()
199+
self._exhausted = False
200+
# Submit the initial batch from the calling thread (no contention
201+
# yet -- the submitter thread is not started until afterward).
202+
# Track whether the initial batch consumed all statements.
203+
self._fail_fast = fail_fast
204+
self._results_queue = []
205+
self._current = 0
206+
self._exec_count = 0
207+
with self._condition:
208+
for n in range(concurrency):
209+
if not self._execute_next():
210+
self._exhausted = True
211+
break
212+
return self._results()
213+
214+
def _results(self):
215+
# Always start the submitter thread: it owns ``_current`` accounting
216+
# (incrementing from drained completion signals) so the event-loop
217+
# callback path can stay lock-free in the success case. Even when
218+
# the iterator was fully consumed by the initial batch, the
219+
# submitter still needs to run to record completions.
220+
self._submitter = Thread(target=self._submitter_loop,
221+
daemon=True, name="concurrent-submitter")
222+
self._submitter.start()
223+
224+
try:
225+
with self._condition:
226+
while not self._exhausted or self._current < self._exec_count:
227+
self._condition.wait()
228+
if self._exception and self._fail_fast:
229+
break
230+
finally:
231+
self._stop_event.set()
232+
self._submit_event.set() # wake submitter so it sees the stop
233+
self._submitter.join()
234+
if self._exception and self._fail_fast:
235+
raise self._exception
236+
return [r[1] for r in sorted(self._results_queue)]
197237

198238
def _put_result(self, result, idx, success):
239+
"""Record a completion and signal the submitter thread.
240+
241+
Called from the event-loop callback thread (or from the submitter
242+
thread when execute_async raises synchronously).
243+
244+
Hot path (success, not fail-fast): NO lock acquisition. We rely on
245+
the submitter thread to bump ``_current`` from the drained signal
246+
count under the same lock acquisition that bumps ``_exec_count``.
247+
This removes ~0.5-1us of lock cost from every callback on the
248+
event-loop thread.
249+
250+
Note: ``self._results_queue.append`` and ``self._submit_ready.append``
251+
are safe under the GIL (CPython list/deque appends are atomic).
252+
Under free-threaded builds (PEP 703) the GIL is removed; this
253+
module assumes a GIL build, which is the default for the driver's
254+
supported Python versions.
255+
"""
199256
self._results_queue.append((idx, ExecutionResult(success, result)))
200-
with self._condition:
201-
self._current += 1
202-
if not success and self._fail_fast:
257+
if not success and self._fail_fast:
258+
# Cold path: take the lock to record the exception and wake
259+
# the main thread immediately so it can stop waiting.
260+
with self._condition:
203261
if not self._exception:
204262
self._exception = result
205263
self._condition.notify()
206-
elif not self._execute_next() and self._current == self._exec_count:
207-
self._condition.notify()
208-
209-
def _results(self):
210-
with self._condition:
211-
while self._current < self._exec_count:
212-
self._condition.wait()
213-
if self._exception and self._fail_fast:
214-
raise self._exception
215-
if self._exception and self._fail_fast: # raise the exception even if there was no wait
216-
raise self._exception
217-
return [r[1] for r in sorted(self._results_queue)]
264+
# Signal the submitter thread. It will:
265+
# 1) bump _current under the lock from the drained signal count,
266+
# 2) submit a replacement request,
267+
# 3) notify _results() if all completions have arrived.
268+
self._submit_ready.append(1)
269+
self._submit_event.set()
270+
271+
def _submitter_loop(self):
272+
"""Drain completion signals and submit follow-up requests.
273+
274+
Runs on a dedicated thread so that the libev event-loop thread
275+
only needs to do the lightweight ``deque.append`` + ``Event.set``
276+
in ``_put_result`` rather than the full execute_async cycle
277+
(query-plan, borrow connection, serialise, enqueue).
278+
279+
Owns ``_current`` accounting: each drained completion signal
280+
increments ``_current`` by one under the same lock acquisition
281+
that bumps ``_exec_count`` for the new batch. This keeps the
282+
event-loop callback path lock-free in the success case.
283+
"""
284+
ready = self._submit_ready
285+
ready_event = self._submit_event
286+
stop_event = self._stop_event
287+
enum_stmts = self._enum_statements
288+
session = self.session
289+
profile = self._execution_profile
290+
on_success = self._on_success
291+
on_error = self._on_error
292+
condition = self._condition
293+
while not stop_event.is_set():
294+
ready_event.wait()
295+
ready_event.clear()
296+
# Drain all pending completion signals.
297+
count = 0
298+
while True:
299+
try:
300+
ready.popleft()
301+
count += 1
302+
except IndexError:
303+
break
304+
if count == 0:
305+
continue
306+
if stop_event.is_set():
307+
# Main thread is shutting down (e.g. fail-fast). Do the
308+
# accounting for already-completed requests but skip
309+
# dispatching new ones.
310+
with condition:
311+
self._current += count
312+
if self._exhausted and self._current >= self._exec_count:
313+
condition.notify()
314+
continue
315+
if self._exhausted:
316+
# No more statements to dispatch -- just account for the
317+
# completions we just drained and notify the waiter if
318+
# everything has caught up.
319+
with condition:
320+
self._current += count
321+
if self._current >= self._exec_count:
322+
condition.notify()
323+
continue
324+
# Submit follow-up requests directly (fast path).
325+
# The iterator is only consumed from this thread (the initial
326+
# batch was fully dispatched before this thread started).
327+
#
328+
# Pull statements from the iterator first, then bump _current
329+
# and _exec_count for the entire batch in one lock acquisition,
330+
# then dispatch. This avoids per-request lock overhead while
331+
# ensuring _results() never sees _current >= _exec_count
332+
# prematurely.
333+
batch = []
334+
iterator_done = False
335+
for _ in range(count):
336+
try:
337+
batch.append(next(enum_stmts))
338+
except StopIteration:
339+
iterator_done = True
340+
break
341+
# Single lock acquisition: bump both _current (from the
342+
# drained completion count) and _exec_count (from the new
343+
# batch size) atomically. Setting _exhausted in the same
344+
# critical section ensures the main thread never sees
345+
# _exhausted=True with a stale _exec_count.
346+
with condition:
347+
self._current += count
348+
self._exec_count += len(batch)
349+
if iterator_done:
350+
self._exhausted = True
351+
# Wake the waiter if all completions have caught up.
352+
if self._exhausted and self._current >= self._exec_count:
353+
condition.notify()
354+
# Re-check stop after the lock release: fail-fast may have
355+
# arrived while we were holding the lock; avoid dispatching
356+
# requests we know will be discarded.
357+
if stop_event.is_set():
358+
continue
359+
for idx, (statement, params) in batch:
360+
try:
361+
future = session.execute_async(statement, params,
362+
timeout=None,
363+
execution_profile=profile)
364+
args = (future, idx)
365+
future.add_callbacks(
366+
callback=on_success, callback_args=args,
367+
errback=on_error, errback_args=args)
368+
except Exception as exc:
369+
# Record the failure directly. _put_result handles
370+
# _current accounting and will enqueue another signal
371+
# to _submit_ready -- but that is fine because the
372+
# next drain will attempt another next(enum_stmts).
373+
self._put_result(exc, idx, False)
218374

219375

220376

0 commit comments

Comments
 (0)