Skip to content

Commit

Permalink
feat: extend logging in db.py and dbview.py (#852)
Browse files Browse the repository at this point in the history
* feat: extend logging of db.py and dbview.py

* deps: pin pyright==1.1.285

* Upgrade pyright and added some type: ignore

---------

Co-authored-by: Michael Puehringer <[email protected]>
  • Loading branch information
thinkh and puehringer authored Jul 24, 2023
1 parent 494a27d commit 524d283
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 15 deletions.
2 changes: 1 addition & 1 deletion requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
black~=22.12.0
pyright~=1.1.285
pyright==1.1.308
pytest-runner~=6.0.0
pytest~=7.2.0
ruff==0.0.218
57 changes: 44 additions & 13 deletions tdp_core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any

from flask import abort
from sqlalchemy.exc import OperationalError
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.orm import Session
from visyn_core import manager
from werkzeug.datastructures import MultiDict
Expand Down Expand Up @@ -107,9 +107,18 @@ def __init__(self, engine):
session wrapper of sql alchemy with auto cleanup
:param engine:
"""
_log.info("creating session")
self._engine = engine
import uuid

self._name = uuid.uuid4()
_log.debug("%s - engine status before: %s", self._name, engine.pool.status())
_log.debug("%s - creating session", self._name)
# add connection count and session count with SQLALCHEMY_POOL_SIZE and SQLALCHEMY_MAX_OVERFLOW
# https://stackoverflow.com/questions/34775501/how-could-i-check-the-number-of-active-sqlalchemy-connections-in-a-pool-at-any-g
self._session: Session = manager.db.create_session(engine)
_log.debug("%s - session created", self._name)
self._supports_array_parameter = _supports_sql_parameters(engine.name)
_log.debug("%s - supports array parameter: %s", self._name, self._supports_array_parameter)

def execute(self, sql, **kwargs):
"""
Expand All @@ -118,12 +127,17 @@ def execute(self, sql, **kwargs):
:param kwargs: additional args to replace
:return: the session result
"""
_log.debug("%s - replace array parameter in sql query: %s", self._name, sql)
parsed = to_query(sql, self._supports_array_parameter, kwargs)
_log.info("%s (%s)", parsed, kwargs)
_log.debug("%s - execute the given query with the given args: %s", self._name, sql)
_log.debug("%s (%s)", parsed, kwargs)
try:
return self._session.execute(parsed, kwargs)
except OperationalError as error:
_log.error("OperationalError: %s", error)
abort(408, error)
except SQLAlchemyError as error:
_log.error("SQLAlchemyError: %s", error)

def run(self, sql, **kwargs):
"""
Expand All @@ -132,9 +146,11 @@ def run(self, sql, **kwargs):
:param kwargs: args for this query
:return: list of dicts
"""
_log.debug("%s - run sql statement: %s", self._name, sql)
result = self.execute(sql, **kwargs)
columns = result.keys()
return [{c: r[c] for c in columns} for r in result]
_log.debug("%s - ran sql statement: %s", self._name, sql)
columns = result.keys() # type: ignore
return [{c: r[c] for c in columns} for r in result] # type: ignore

def __call__(self, sql, **kwargs):
return self.run(sql, **kwargs)
Expand All @@ -153,9 +169,11 @@ def rollback(self):

def _destroy(self):
if self._session:
_log.info("removing session again")
_log.debug("%s - removing session", self._name)
self._session.close()
self._session = None # type: ignore
_log.debug("%s - removed session", self._name)
_log.debug("%s - engine status after destroy: %s", self._name, self._engine.pool.status())

def __del__(self):
self._destroy()
Expand Down Expand Up @@ -342,14 +360,18 @@ def get_data(
query = view.query

if callable(query):
_log.debug("GET DATA with callback variant")
# callback variant
return query(engine, arguments, filters), view

with session(engine) as sess:
_log.debug("%s - GET DATA with session", sess._name)
if config.statement_timeout and config.statement_timeout_query:
_log.info("set statement_timeout to {}".format(config.statement_timeout))
_log.debug("set statement_timeout to {}".format(config.statement_timeout))
sess.execute(config.statement_timeout_query.format(config.statement_timeout))
_log.debug("%s - GET DATA before run", sess._name)
r = sess.run(query.format(**replace), **kwargs)
_log.debug("%s - GET DATA after run", sess._name)
return r, view


Expand Down Expand Up @@ -434,10 +456,13 @@ def get_count(database, view_name, args):
return count_query(engine, processed_args, where_clause)

with session(engine) as sess:
_log.debug("%s - GET COUNT with session", sess._name)
if config.statement_timeout and config.statement_timeout_query:
_log.info("set statement_timeout to {}".format(config.statement_timeout))
_log.debug("set statement_timeout to {}".format(config.statement_timeout))
sess.execute(config.statement_timeout_query.format(config.statement_timeout))
_log.debug("%s - GET COUNT before run", sess._name)
r = sess.run(count_query.format(**replace), **kwargs)
_log.debug("%s - GET COUNT after run", sess._name)
if r:
return r[0]["count"]
return 0
Expand Down Expand Up @@ -483,11 +508,14 @@ def derive_columns(table_name, engine, columns=None):
k for k, col in columns.items() if (col["type"] == "categorical" or col["type"] == "set") and "categories" not in col
]
if number_columns or categorical_columns:
with session(engine) as s:
with session(engine) as sess:
_log.debug("%s - DERIVE COLUMNS with session", sess._name)
if number_columns:
template = "min({col}) as {col}_min, max({col}) as {col}_max"
minmax = ", ".join(template.format(col=col) for col in number_columns)
row = next(iter(s.execute("""SELECT {minmax} FROM {table}""".format(table=table_name, minmax=minmax))))
_log.debug("%s - DERIVE COLUMNS number columns before run", sess._name)
row = next(iter(sess.execute("""SELECT {minmax} FROM {table}""".format(table=table_name, minmax=minmax)))) # type: ignore
_log.debug("%s - DERIVE COLUMNS number columns after run", sess._name)
for num_col in number_columns:
columns[num_col]["min"] = row[num_col + "_min"]
columns[num_col]["max"] = row[num_col + "_max"]
Expand All @@ -496,21 +524,24 @@ def derive_columns(table_name, engine, columns=None):
if _differentiates_empty_string_and_null(engine.name):
template += """ AND {col} <> ''"""
template += """ ORDER BY {col} ASC"""
cats = s.execute(template.format(col=col, table=table_name))
categories = [str(r["cat"]) for r in cats if r["cat"] is not None]
_log.debug("%s - DERIVE COLUMNS categorical columns before run: %s, %s", sess._name, table_name, col)
cats = sess.execute(template.format(col=col, table=table_name))
_log.debug("%s - DERIVE COLUMNS categorical columns after run: %s, %s", sess._name, table_name, col)
categories = [str(r["cat"]) for r in cats if r["cat"] is not None] # type: ignore
if columns[col]["type"] == "set":
separator = getattr(columns[col], "separator", ";")
separated_categories = [category.split(separator) for category in categories]
# flatten array
categories = list({category for sublist in separated_categories for category in sublist})
categories.sort() # sort list to avoid random order with each run
columns[col]["categories"] = categories
_log.debug("%s - DERIVE COLUMNS done", sess._name)

return columns


def _fill_up_columns(view, engine):
_log.info("fill up view")
_log.debug("fill up view %s", view)
# update the real object
view.columns = derive_columns(view.table, engine, view.columns)
view.columns_filled_up = True
Expand Down
3 changes: 3 additions & 0 deletions tdp_core/dbview.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ def __init__(self, views=None, agg_score=None, mappings=None):
:param agg_score: optional specify how aggregation should be handled
:param mappings: optional database mappings
"""
_log.debug("create db connector")
self.agg_score = agg_score or default_agg_score
self.views = views or {}
self.dburl: str = None # type: ignore
Expand All @@ -625,7 +626,9 @@ def create_engine(self, config) -> Engine:
"pool_pre_ping": True,
}
engine_options.update(config.get("engine", {}))
_log.debug("db connector: create engine with options %s", engine_options)
return sqlalchemy.create_engine(self.dburl, **engine_options)

def create_sessionmaker(self, engine) -> sessionmaker:
_log.debug("db connector: create_sessionmaker")
return sessionmaker(bind=engine)
2 changes: 1 addition & 1 deletion tdp_core/mapping_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __call__(self, ids):
mapped = session.execute(self._query, ids=ids)

# handle multi mappings
data = sorted(mapped, key=lambda x: x["f"])
data = sorted(mapped, key=lambda x: x["f"]) # type: ignore
grouped = {k: [r["t"] for r in g] for k, g in itertools.groupby(data, lambda x: x["f"])}
# Return according to the given ids to ensure that we are preserving the order correctly
return [grouped.get(id, []) for id in ids]
Expand Down

0 comments on commit 524d283

Please sign in to comment.