Skip to content
Open
75 changes: 62 additions & 13 deletions crmprtd/insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from sqlalchemy import and_
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.exc import IntegrityError, DBAPIError
from sqlalchemy.engine import URL, make_url
import functools

from crmprtd.constants import InsertStrategy
from crmprtd.db_exceptions import InsertionError
Expand Down Expand Up @@ -54,6 +56,13 @@ def max_power_of_two(num):
return 2 ** floor(mathlog(num, 2))


@functools.lru_cache(maxsize=None)
def sanitize_connection(sesh):
url_str = sesh.bind.url.render_as_string(hide_password=True)
sanitized_connection_string = url_str.replace(sesh.bind.url.drivername + "://", "")
return sanitized_connection_string


def get_bisection_chunk_sizes(remainder):
chunk_list = []
while remainder != 0:
Expand Down Expand Up @@ -124,32 +133,48 @@ def insert_single_obs(sesh, obs):
try:
# Create a nested SAVEPOINT context manager to rollback to in the
# event of unique constraint errors
log.debug("New SAVEPOINT for single observation")
log.debug(
"New SAVEPOINT for single observation",
extra={"database:", sanitize_connection(sesh)},
)
with sesh.begin_nested():
sesh.add(obs)
except IntegrityError as e:
log.debug(
"Failure, observation already exists",
extra={"observation": obs, "exception": e},
extra={
"observation": obs,
"exception": e,
"datebase": sanitize_connection(sesh),
},
)
db_metrics = DBMetrics(0, 1, 0)
except InsertionError as e:
# TODO: InsertionError is an defined by crmprtd. It can't be raised by
# SQLAlchemy unless something very tricky is going on. Why is this here?
log.warning(
"Failure occured during insertion",
extra={"observation": obs, "exception": e},
extra={
"observation": obs,
"exception": e,
"datebase": sanitize_connection(sesh),
},
)
db_metrics = DBMetrics(0, 0, 1)
else:
log.info("Successfully inserted observations: 1")
log.info(
"Successfully inserted observations: 1",
extra={"datebase": sanitize_connection(sesh)},
)
db_metrics = DBMetrics(1, 0, 0)
sesh.commit()
return db_metrics


def single_insert_strategy(sesh, observations):
log.info("Using Single Insert Strategy")
log.info(
"Using Single Insert Strategy", extra={"datebase": sanitize_connection(sesh)}
)
dbm = DBMetrics(0, 0, 0)
for obs in observations:
dbm += insert_single_obs(sesh, obs)
Expand Down Expand Up @@ -187,7 +212,10 @@ def bisect_insert_strategy(sesh, observations):
but in the optimal case it reduces the transactions to a constant
1.
"""
log.debug("Begin mass observation insertion", extra={"num_obs": len(observations)})
log.debug(
"Begin mass observation insertion",
extra={"num_obs": len(observations), "datebase": sanitize_connection(sesh)},
)

# Base cases
if len(observations) < 1:
Expand All @@ -198,7 +226,13 @@ def bisect_insert_strategy(sesh, observations):
else:
try:
with sesh.begin_nested():
log.debug("New SAVEPOINT", extra={"num_obs": len(observations)})
log.debug(
"New SAVEPOINT",
extra={
"num_obs": len(observations),
"datebase": sanitize_connection(sesh),
},
)
sesh.add_all(observations)
except IntegrityError:
log.debug("Failed, splitting observations.")
Expand All @@ -211,15 +245,21 @@ def bisect_insert_strategy(sesh, observations):
else:
log.info(
f"Successfully inserted observations: {len(observations)}",
extra={"num_obs": len(observations)},
extra={
"num_obs": len(observations),
"datebase": sanitize_connection(sesh),
},
)
db_metrics = DBMetrics(len(observations), 0, 0)
sesh.commit()
return db_metrics


def chunk_bisect_insert_strategy(sesh, observations):
log.info("Using Chunk + Bisection Strategy")
log.info(
"Using Chunk + Bisection Strategy",
extra={"datebase": sanitize_connection(sesh)},
)
dbm = DBMetrics(0, 0, 0)
for chunk in bisection_chunks(observations):
dbm += bisect_insert_strategy(sesh, chunk)
Expand Down Expand Up @@ -269,7 +309,10 @@ def insert_bulk_obs(sesh, observations):
except DBAPIError as e:
# Something really unanticipated happened. Duplicate rows do not trigger an
# exception.
log.exception("Unexpected error during bulk insertion")
log.exception(
"Unexpected error during bulk insertion",
extra={"datebase": sanitize_connection(sesh)},
)
return DBMetrics(0, 0, num_to_insert)
sesh.commit()
num_inserted = len(result)
Expand All @@ -287,16 +330,22 @@ def bulk_insert_strategy(sesh, observations, chunk_size=1000):
:param chunk_size: Size of chunks.
:return: DMMetrics describing result of insertion
"""
log.info("Using Bulk Insert Strategy")
log.info(
"Using Bulk Insert Strategy", extra={"datebase": sanitize_connection(sesh)}
)
dbm = DBMetrics(0, 0, 0)
for chunk in fixed_length_chunks(observations, chunk_size=chunk_size):
chunk_dbm = insert_bulk_obs(sesh, chunk)
dbm += chunk_dbm
log.info(
f"Bulk insert progress: "
f"{dbm.successes} inserted, {dbm.skips} skipped, {dbm.failures} failed"
f"{dbm.successes} inserted, {dbm.skips} skipped, {dbm.failures} failed",
extra={"datebase": sanitize_connection(sesh)},
)
log.info(f"Successfully inserted observations: {dbm.successes}")
log.info(
f"Successfully inserted observations: {dbm.successes}",
extra={"datebase": sanitize_connection(sesh)},
)
return dbm


Expand Down
12 changes: 10 additions & 2 deletions crmprtd/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.engine import URL, make_url

from crmprtd.constants import InsertStrategy
from crmprtd.align import align
from crmprtd.insert import insert
from crmprtd.insert import insert, sanitize_connection
from crmprtd.download_utils import verify_date
from crmprtd.infer import infer
from crmprtd import add_version_arg, add_logging_args, setup_logging, NETWORKS
Expand Down Expand Up @@ -205,7 +206,14 @@ def process(
sample_size=sample_size,
)
log.info("Insert: done")
log.info("Data insertion results", extra={"results": results, "network": network})
log.info(
"Data insertion results",
extra={
"results": results,
"network": network,
"datebase": sanitize_connection(sesh),
},
)


# Note: this function was buried in crmprtd.__init__.py but is
Expand Down