Skip to content

Commit fb3ce33

Browse files
committed
session: fix pool renewal race causing double statement execution
When two or more nodes are bootstrapped concurrently the Python driver can execute the same CQL statement twice, causing spurious "already exists" errors in the caller. This has been observed as flaky test failures across the ScyllaDB test suite for the past two years, and worked around by using idempotent DDL forms (IF NOT EXISTS / IF EXISTS) in dozens of tests. Root cause ---------- The race unfolds as follows: 1. Two on_add notifications arrive at roughly the same time, one for each new node. Each one calls session.add_or_renew_pool(), which submits run_add_or_renew_pool() to the thread pool and returns. Both submissions are in-flight concurrently. 2. The first add_or_renew_pool() finishes and calls _finalize_add(), which notifies load-balancing policies and then calls session.update_created_pools() for every live session. 3. update_created_pools() iterates all known hosts. For the second host, whose run_add_or_renew_pool() has not yet completed, it sees self._pools.get(host) == None (or a shut-down pool) and therefore submits *another* run_add_or_renew_pool() for that host. 4. Now two tasks are connecting to the same host. The first one finishes and installs pool-A in self._pools, then runs a statement (e.g. CREATE ROLE) that is in-flight on pool-A. 5. The second task finishes, reads the stale `previous = self._pools.get(host)` value (captured *before* the lock was taken — another bug), installs pool-B and then shuts down pool-A. The in-flight CREATE ROLE request is orphaned; the driver retries it on pool-B. The server executes it a second time and returns "Role ... already exists". Fix --- Three coordinated changes to cassandra/cluster.py: * Session.__init__: add self._pending_pool_futures = {}, a dict mapping host -> Future for any in-flight pool creation, guarded by _lock. * add_or_renew_pool: before submitting run_add_or_renew_pool(), check _pending_pool_futures under _lock. If an in-flight future already exists for the host, return it immediately — this is the primary fix that prevents the duplicate submission from update_created_pools. Additionally, move the `previous = self._pools.get(host)` read inside the lock so the live-pool check is atomic with the installation of the new pool: if a concurrent creation has already installed a live pool by the time we finish connecting, discard our new pool instead of replacing the live one (defense-in-depth). Cleanup of _pending_pool_futures is handled by a done_callback registered on the future immediately after it is stored, both operations performed under _lock. The callback only removes the entry if it still points at the same future it was registered on, so a concurrent remove_pool followed by a new add_or_renew_pool is not affected. This guarantees cleanup under all exit paths including unhandled exceptions inside run_add_or_renew_pool, and avoids the race where a fast-completing task pops the key before the outer code has stored the future. * remove_pool: clear _pending_pool_futures[host] under _lock so that if a host is removed and immediately re-added, add_or_renew_pool submits a fresh creation rather than reusing a stale done future. Tests ----- Five new unit tests are added in PoolRenewalRaceTest (tests/unit/test_cluster.py). They exercise the new code paths without requiring a real cluster connection by constructing a minimal Session via object.__new__ and mocking the executor and profile manager: * test_add_or_renew_pool_reuses_inflight_future: places a pending Future in _pending_pool_futures and verifies that add_or_renew_pool returns it without submitting a new task to the executor. * test_add_or_renew_pool_discards_duplicate_when_live_pool_exists: exercises the real production code path by patching HostConnection to a lightweight stub and using a synchronous executor shim that runs the submitted callable inline. Pre-installs a live pool for the host, then calls add_or_renew_pool() and asserts that the live pool is not replaced and the newly connected stub pool is shut down. * test_remove_pool_clears_pending_future: verifies that remove_pool clears _pending_pool_futures so the next add_or_renew_pool call submits a fresh task. * test_done_callback_clears_pending_future: verifies that the done_callback fires and removes the entry from _pending_pool_futures once the future completes. * test_done_callback_does_not_clear_newer_future: verifies the identity guard — an old future's callback does not evict a newer future that was installed in its place after a remove_pool + add_or_renew_pool. Fixes: #317
1 parent cd9f525 commit fb3ce33

2 files changed

Lines changed: 255 additions & 4 deletions

File tree

cassandra/cluster.py

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2615,6 +2615,12 @@ def __init__(self, cluster, hosts, keyspace=None):
26152615

26162616
self._lock = RLock()
26172617
self._pools = {}
2618+
# Tracks in-flight pool creation futures keyed by host, guarded by
2619+
# _lock. Used by add_or_renew_pool to detect and reuse concurrent
2620+
# creations so that update_created_pools does not schedule a duplicate
2621+
# run_add_or_renew_pool for a host whose pool creation is already
2622+
# in-flight (scylladb/python-driver#317).
2623+
self._pending_pool_futures = {}
26182624
self._profile_manager = cluster.profile_manager
26192625
self._metrics = cluster.metrics
26202626
self._request_init_callbacks = []
@@ -3256,7 +3262,6 @@ def run_add_or_renew_pool():
32563262
host, conn_exc, is_host_addition, expect_host_to_be_down=True)
32573263
return False
32583264

3259-
previous = self._pools.get(host)
32603265
with self._lock:
32613266
while new_pool._keyspace != self.keyspace:
32623267
self._lock.release()
@@ -3276,18 +3281,69 @@ def callback(pool, errors):
32763281
self._lock.acquire()
32773282
return False
32783283
self._lock.acquire()
3279-
self._pools[host] = new_pool
3284+
3285+
# Read the current pool state inside the lock so the check is
3286+
# atomic with the installation of our new pool.
3287+
previous = self._pools.get(host)
3288+
if previous is not None and not previous.is_shutdown:
3289+
# A concurrent add_or_renew_pool already installed a live
3290+
# pool for this host while we were connecting. Discard our
3291+
# new pool to avoid replacing it and dropping in-flight
3292+
# requests (scylladb/python-driver#317).
3293+
# Set discard_new_pool and shut it down *after* releasing
3294+
# the lock to avoid holding the session lock during
3295+
# connection teardown.
3296+
log.debug("Discarding duplicate connection pool for host %s "
3297+
"(live pool already present)", host)
3298+
discard_new_pool = True
3299+
else:
3300+
discard_new_pool = False
3301+
self._pools[host] = new_pool
32803302

32813303
log.debug("Added pool for host %s to session", host)
3304+
if discard_new_pool:
3305+
new_pool.shutdown()
3306+
return True
32823307
if previous:
32833308
previous.shutdown()
32843309

32853310
return True
32863311

3287-
return self.submit(run_add_or_renew_pool)
3312+
with self._lock:
3313+
if self.is_shutdown:
3314+
return None
3315+
# If there is already an in-flight pool creation for this host,
3316+
# return that future instead of scheduling a duplicate. This
3317+
# prevents update_created_pools from creating a second pool when
3318+
# the first one has not yet finished connecting
3319+
# (scylladb/python-driver#317).
3320+
pending = self._pending_pool_futures.get(host)
3321+
if pending is not None and not pending.done():
3322+
log.debug("Reusing in-flight pool creation for host %s", host)
3323+
return pending
3324+
future = self.submit(run_add_or_renew_pool)
3325+
if future is not None:
3326+
self._pending_pool_futures[host] = future
3327+
# Remove the entry once the future finishes, regardless of how
3328+
# run_add_or_renew_pool exits (including unhandled exceptions).
3329+
# The callback acquires _lock and only clears the entry if it
3330+
# still points at *this* future, so a concurrent remove_pool
3331+
# followed by a new add_or_renew_pool is not affected
3332+
# (scylladb/python-driver#317).
3333+
def _clear_pending(f, _host=host, _future=future):
3334+
with self._lock:
3335+
if self._pending_pool_futures.get(_host) is _future:
3336+
self._pending_pool_futures.pop(_host, None)
3337+
future.add_done_callback(_clear_pending)
3338+
return future
32883339

32893340
def remove_pool(self, host):
3290-
pool = self._pools.pop(host, None)
3341+
with self._lock:
3342+
pool = self._pools.pop(host, None)
3343+
# Invalidate any in-flight pool creation for this host so that a
3344+
# subsequent update_created_pools call can schedule a fresh one if
3345+
# needed (scylladb/python-driver#317).
3346+
self._pending_pool_futures.pop(host, None)
32913347
if pool:
32923348
log.debug("Removed connection pool for %r", host)
32933349
return self.submit(pool.shutdown)

tests/unit/test_cluster.py

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import logging
1717
import socket
18+
from concurrent.futures import Future
19+
from threading import RLock
1820

1921
from unittest.mock import patch, Mock
2022
import uuid
@@ -339,6 +341,199 @@ def test_set_keyspace_escapes_quotes(self, *_):
339341
assert query == 'USE simple_ks', (
340342
"Simple keyspace names should not be quoted, got: %r" % query)
341343

344+
345+
class PoolRenewalRaceTest(unittest.TestCase):
346+
"""
347+
Regression tests for scylladb/python-driver#317: connection pool renewal
348+
after concurrent node bootstraps causes double statement execution.
349+
"""
350+
351+
def _make_session(self):
352+
"""
353+
Return a minimal Session with the attributes needed to exercise
354+
add_or_renew_pool / remove_pool, without actually opening any network
355+
connections.
356+
"""
357+
s = object.__new__(Session)
358+
s._lock = RLock()
359+
s._pools = {}
360+
s._pending_pool_futures = {}
361+
s.is_shutdown = False
362+
s.keyspace = None
363+
s._profile_manager = Mock()
364+
s._profile_manager.distance.return_value = HostDistance.LOCAL
365+
s.cluster = Mock()
366+
s.cluster.executor = Mock()
367+
# submit() delegates to cluster.executor.submit; return a done future
368+
# by default so callers that inspect the result don't hang.
369+
done_future = Future()
370+
done_future.set_result(True)
371+
s.cluster.executor.submit.return_value = done_future
372+
return s
373+
374+
def test_add_or_renew_pool_reuses_inflight_future(self):
375+
"""
376+
When add_or_renew_pool is called for a host that already has an
377+
in-flight pool creation (tracked in _pending_pool_futures), it must
378+
return the existing future instead of submitting a duplicate task.
379+
Without this fix, a concurrent call from update_created_pools would
380+
create a second HostConnection pool, then shut down the first one
381+
while requests were still in-flight, causing those requests to be
382+
retried and executed twice on the server side.
383+
"""
384+
s = self._make_session()
385+
host = Mock()
386+
host.is_up = True
387+
388+
# Simulate an in-flight pool creation by placing a pending (not-yet-
389+
# resolved) future directly in _pending_pool_futures.
390+
inflight_future = Future() # not set_result yet → still in-flight
391+
s._pending_pool_futures[host] = inflight_future
392+
393+
returned = s.add_or_renew_pool(host, is_host_addition=False)
394+
395+
# The call must reuse the existing in-flight future, not submit a new one.
396+
assert returned is inflight_future, (
397+
"add_or_renew_pool should return the existing in-flight future, "
398+
"not create a duplicate pool creation task"
399+
)
400+
s.cluster.executor.submit.assert_not_called()
401+
402+
def test_add_or_renew_pool_discards_duplicate_when_live_pool_exists(self):
403+
"""
404+
Defense-in-depth for scylladb/python-driver#317.
405+
406+
When run_add_or_renew_pool finishes creating a new pool but finds that
407+
a live pool has already been installed for the host by a concurrent
408+
creation, the new pool must be discarded (shut down) rather than
409+
replacing the live one. Replacing a live pool would close it while
410+
requests are still in-flight, causing server-side double execution.
411+
412+
This test exercises the real production code path by stubbing
413+
HostConnection and running the submitted callable synchronously.
414+
"""
415+
s = self._make_session()
416+
host = Mock()
417+
host.is_up = True
418+
419+
# Pre-install a live pool for this host to simulate the state left by
420+
# a concurrent add_or_renew_pool that finished first.
421+
live_pool = Mock()
422+
live_pool.is_shutdown = False
423+
s._pools[host] = live_pool
424+
425+
# Make the executor run the submitted callable synchronously so the
426+
# test does not need threads.
427+
def sync_submit(fn, *args, **kwargs):
428+
result = fn(*args, **kwargs)
429+
f = Future()
430+
f.set_result(result)
431+
return f
432+
s.cluster.executor.submit = sync_submit
433+
434+
# Stub HostConnection so no real TCP connection is opened.
435+
# _keyspace must equal s.keyspace (None) so the keyspace-sync loop
436+
# inside run_add_or_renew_pool is skipped.
437+
stub_pool = Mock()
438+
stub_pool._keyspace = None
439+
440+
with patch('cassandra.cluster.HostConnection', return_value=stub_pool):
441+
s.add_or_renew_pool(host, is_host_addition=False)
442+
443+
# The pre-installed live pool must not have been replaced.
444+
assert s._pools[host] is live_pool, (
445+
"add_or_renew_pool must not replace a live pool that is already "
446+
"present when the new connection finishes"
447+
)
448+
# The newly created pool stub must have been shut down.
449+
stub_pool.shutdown.assert_called_once()
450+
451+
def test_remove_pool_clears_pending_future(self):
452+
"""
453+
remove_pool must clear _pending_pool_futures for the host so that a
454+
subsequent update_created_pools call can schedule a fresh pool
455+
creation if needed (instead of reusing a now-stale in-flight future
456+
for a host that has been removed and re-added).
457+
"""
458+
s = self._make_session()
459+
host = Mock()
460+
461+
stale_future = Future()
462+
s._pending_pool_futures[host] = stale_future
463+
464+
pool = Mock()
465+
s._pools[host] = pool
466+
467+
s.remove_pool(host)
468+
469+
assert host not in s._pending_pool_futures, (
470+
"remove_pool must clear _pending_pool_futures so the next "
471+
"add_or_renew_pool call submits a fresh task"
472+
)
473+
474+
def test_done_callback_clears_pending_future(self):
475+
"""
476+
The done-callback registered by add_or_renew_pool must remove the host
477+
entry from _pending_pool_futures once the future completes, so that
478+
update_created_pools can schedule a fresh creation on the next call
479+
rather than treating a stale done future as in-flight.
480+
"""
481+
s = self._make_session()
482+
host = Mock()
483+
host.is_up = True
484+
485+
returned = s.add_or_renew_pool(host, is_host_addition=False)
486+
assert returned is not None
487+
488+
# The future submitted by add_or_renew_pool is already done (our mock
489+
# executor returns a pre-resolved future), so the done-callback has
490+
# already fired.
491+
assert host not in s._pending_pool_futures, (
492+
"done-callback should have cleared _pending_pool_futures once "
493+
"the future completed"
494+
)
495+
496+
def test_done_callback_does_not_clear_newer_future(self):
497+
"""
498+
The done-callback must only clear _pending_pool_futures[host] if the
499+
entry still points at the *same* future it was registered on. If a
500+
newer future has been installed in the meantime (e.g. after remove_pool
501+
+ add_or_renew_pool), the callback must leave the new entry alone.
502+
"""
503+
s = self._make_session()
504+
host = Mock()
505+
host.is_up = True
506+
507+
# Place a future manually and register the callback as the real code
508+
# would, but keep the future pending so the callback has not fired yet.
509+
old_future = Future()
510+
new_future = Future()
511+
512+
with s._lock:
513+
s._pending_pool_futures[host] = old_future
514+
515+
def _clear_pending(f, _host=host, _future=old_future):
516+
with s._lock:
517+
if s._pending_pool_futures.get(_host) is _future:
518+
s._pending_pool_futures.pop(_host, None)
519+
520+
old_future.add_done_callback(_clear_pending)
521+
522+
# Simulate remove_pool + a new add_or_renew_pool: replace with a newer
523+
# pending future before old_future completes.
524+
with s._lock:
525+
s._pending_pool_futures[host] = new_future
526+
527+
# Now complete the old future — its callback must not evict new_future.
528+
old_future.set_result(True)
529+
530+
with s._lock:
531+
assert s._pending_pool_futures.get(host) is new_future, (
532+
"done-callback of an old future must not remove a newer "
533+
"pending future from _pending_pool_futures"
534+
)
535+
536+
342537
class ProtocolVersionTests(unittest.TestCase):
343538

344539
def test_protocol_downgrade_test(self):

0 commit comments

Comments
 (0)