Skip to content

Commit

Permalink
Merge pull request #3 from risclog-solution/fix-conn-init
Browse files Browse the repository at this point in the history
Ensure connection always get properly initialized.
  • Loading branch information
dataflake authored Dec 7, 2023
2 parents 4448d27 + 84af6d0 commit f0d9c94
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 17 deletions.
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ Changelog
4.2 (unreleased)
----------------

- Ensure connection always get properly initialized.

- Add support for PostgreSQL 13.


4.1 (2023-10-04)
----------------
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def _read(filename):
python_requires='>=3.7',
install_requires=[
'setuptools',
'psycopg2',
'psycopg2 >= 2.4.2',
'Zope >= 5',
'Products.ZSQLMethods',
],
Expand Down
33 changes: 24 additions & 9 deletions src/Products/ZPsycopgDA/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,23 @@ def __init__(self, dsn, tilevel, typecasts, enc='utf-8'):
self.calls = 0
self.make_mappings()

def getconn(self, init=True):
# if init is False we are trying to get hold on an already existing
# connection, so we avoid to (re)initialize it risking errors.
def getconn(self, init='ignored', retry=100):
conn = pool.getconn(self.dsn)
if init:
conn.set_session(isolation_level=int(self.tilevel))
_pool = pool.getpool(self.dsn, create=False)
if id(conn) not in _pool._initialized:
try:
conn.set_session(isolation_level=int(self.tilevel))
except psycopg2.InterfaceError:
# we got a closed connection from a poisoned pool ->
# close it and retry:
pool.putconn(self.dsn, conn, True)
if retry <= 0:
raise ConflictError("InterfaceError from psycopg2")
return self.getconn(retry=retry - 1)
conn.set_client_encoding(self.encoding)
for tc in self.typecasts:
register_type(tc, conn)
_pool._initialized.add(id(conn))
return conn

def putconn(self, close=False):
Expand All @@ -77,24 +85,26 @@ def putconn(self, close=False):
pool.putconn(self.dsn, conn, close)

def getcursor(self):
conn = self.getconn(False)
conn = self.getconn()
return conn.cursor()

def _finish(self, *ignored):
try:
conn = self.getconn(False)
conn = self.getconn()
conn.commit()
self.putconn()
except AttributeError:
pass

def _abort(self, *ignored):
try:
conn = self.getconn(False)
conn = self.getconn()
conn.rollback()
self.putconn()
except AttributeError:
pass
except (psycopg2.OperationalError, psycopg2.InterfaceError):
self.putconn(True)

def open(self):
# this will create a new pool for our DSN if not already existing,
Expand Down Expand Up @@ -189,7 +199,8 @@ def query(self, query_string, max_rows=None, query_data=None):
exc_info=True)
msg = 'TransactionRollbackError from psycopg2'
raise ConflictError(msg)
except psycopg2.OperationalError:
except (psycopg2.OperationalError,
psycopg2.InterfaceError) as e:
msg = 'Operational error on connection, closing it.'
logging.exception(msg)
try:
Expand All @@ -199,6 +210,10 @@ def query(self, query_string, max_rows=None, query_data=None):
logging.debug("Exception while closing pool",
exc_info=True)
pass
errmsg = str(e).replace("\n", " ")
raise ConflictError(
e.__class__.__name__ + " from psycopg2: " + errmsg
)
if c.description is not None:
nselects += 1
if c.description != desc and nselects > 1:
Expand Down
24 changes: 17 additions & 7 deletions src/Products/ZPsycopgDA/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, minconn, maxconn, *args, **kwargs):
self._used = {}
self._rused = {} # id(conn) -> key map
self._keys = 0
self._initialized = set()

for i in range(self.minconn):
self._connect()
Expand Down Expand Up @@ -97,6 +98,10 @@ def _putconn(self, conn, key=None, close=False):
self._pool.append(conn)
else:
conn.close()
try:
self._initialized.remove(id(conn))
except KeyError:
pass

# here we check for the presence of key because it can happen that a
# thread tries to put back a connection after a call to close
Expand All @@ -118,6 +123,11 @@ def _closeall(self):
conn.close()
except Exception:
pass
finally:
try:
self._initialized.remove(id(conn))
except KeyError:
pass
self.closed = True


Expand Down Expand Up @@ -172,13 +182,13 @@ def closeall(self):


def getpool(dsn, create=True):
_connections_lock.acquire()
try:
if dsn not in _connections_pool and create:
_connections_pool[dsn] = \
PersistentConnectionPool(4, 200, dsn)
finally:
_connections_lock.release()
if create:
_connections_lock.acquire()
try:
if dsn not in _connections_pool:
_connections_pool[dsn] = PersistentConnectionPool(4, 200, dsn)
finally:
_connections_lock.release()
return _connections_pool[dsn]


Expand Down

0 comments on commit f0d9c94

Please sign in to comment.