Skip to content

Commit 813ede3

Browse files
authored
feat:optimize thread pool (#1326)
* feat:optimize pool * feat:optimize pool * feat:optimize pool
1 parent 1fba901 commit 813ede3

File tree

1 file changed

+17
-15
lines changed

1 file changed

+17
-15
lines changed

src/memos/graph_dbs/polardb.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ def __init__(self, config: PolarDBGraphDBConfig):
165165

166166
# Create connection pool
167167
self.connection_pool = psycopg2.pool.ThreadedConnectionPool(
168-
minconn=5,
168+
minconn=2,
169169
maxconn=maxconn,
170170
host=host,
171171
port=port,
@@ -176,6 +176,7 @@ def __init__(self, config: PolarDBGraphDBConfig):
176176
keepalives_idle=120, # Seconds of inactivity before sending keepalive (should be < server idle timeout)
177177
keepalives_interval=15, # Seconds between keepalive retries
178178
keepalives_count=5, # Number of keepalive retries before considering connection dead
179+
keepalives=1,
179180
options=f"-c search_path={self.db_name}_graph,ag_catalog,$user,public",
180181
)
181182

@@ -250,39 +251,40 @@ def _get_connection(self):
250251
import psycopg2
251252

252253
timeout = self._connection_wait_timeout
253-
if not self._semaphore.acquire(timeout=max(timeout, 0)):
254+
if timeout is None or timeout <= 0:
255+
self._semaphore.acquire()
256+
elif not self._semaphore.acquire(timeout=timeout):
254257
logger.warning(f"Timeout waiting for connection slot ({timeout}s)")
255258
raise RuntimeError("Connection pool busy")
256-
logger.info(
257-
"Connection pool usage: %s/%s",
258-
self.connection_pool.maxconn - self._semaphore._value,
259-
self.connection_pool.maxconn,
260-
)
259+
261260
conn = None
262261
broken = False
263262
try:
264-
conn = self.connection_pool.getconn()
265-
conn.autocommit = True
266263
for attempt in range(2):
264+
conn = self.connection_pool.getconn()
265+
conn.autocommit = True
267266
try:
268267
with conn.cursor() as cur:
269268
cur.execute("SELECT 1")
270269
break
271270
except psycopg2.Error:
272-
logger.warning("Dead connection detected, recreating (attempt %d)", attempt + 1)
271+
logger.warning(f"Dead connection detected, recreating (attempt {attempt + 1})")
273272
self.connection_pool.putconn(conn, close=True)
274-
conn = self.connection_pool.getconn()
275-
conn.autocommit = True
273+
conn = None
276274
else:
277275
raise RuntimeError("Cannot obtain valid DB connection after 2 attempts")
278276
with conn.cursor() as cur:
279277
cur.execute(f'SET search_path = {self.db_name}_graph, ag_catalog, "$user", public;')
280278
yield conn
281-
except Exception:
279+
except (psycopg2.Error, psycopg2.OperationalError) as e:
282280
broken = True
281+
logger.exception(f"Database connection busy : {e}")
282+
raise
283+
except Exception as e:
284+
logger.exception(f"Unexpected error: {e}")
283285
raise
284286
finally:
285-
if conn:
287+
if conn is not None:
286288
try:
287289
self.connection_pool.putconn(conn, close=broken)
288290
logger.debug(f"Returned connection {id(conn)} to pool (broken={broken})")
@@ -2194,7 +2196,7 @@ def get_grouped_counts(
21942196
SELECT {", ".join(cte_select_list)}
21952197
FROM "{self.db_name}_graph"."Memory"
21962198
{where_clause}
2197-
LIMIT 1000
2199+
LIMIT 100
21982200
)
21992201
SELECT {outer_select}, count(*) AS count
22002202
FROM t

0 commit comments

Comments
 (0)