Skip to content

Commit 233fab3

Browse files
committed
accept since param for db_exports
1 parent 1d6cc9e commit 233fab3

File tree

1 file changed

+12
-2
lines changed

1 file changed

+12
-2
lines changed

masterbase/lib.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def make_minio_client(is_secure: bool = False) -> Minio:
5656
return Minio(f"{host}:{port}", access_key=access_key, secret_key=secret_key, secure=is_secure)
5757

5858

59-
def db_export_chunks(engine: Engine, table: str) -> Generator[bytes, None, None]:
59+
def db_export_chunks(engine: Engine, table: str, since: datetime | None = None) -> Generator[bytes, None, None]:
6060
"""Export the given table as an iterable of csv chunks."""
6161

6262
class Shunt:
@@ -72,7 +72,17 @@ def worker():
7272
try:
7373
with engine.connect() as txn:
7474
cursor = txn.connection.dbapi_connection.cursor()
75-
cursor.copy_expert(f"COPY {table} TO STDOUT DELIMITER ',' CSV HEADER", shunt)
75+
if since:
76+
# make a best-effort attempt to match the postgres timestamp format
77+
# this only works up to hourly precision
78+
tzoffset = int(since.utcoffset().total_seconds() / 3600)
79+
sign = "+" if tzoffset >= 0 else "-"
80+
stamp = since.strftime("%Y-%m-%d %H:%M:%S")
81+
stamp += f"{sign}{tzoffset}"
82+
query = f"(SELECT * FROM {table} WHERE created_at >= '{stamp}')"
83+
else:
84+
query = table
85+
cursor.copy_expert(f"COPY {query} TO STDOUT DELIMITER ',' CSV HEADER", shunt)
7686
queue.put(b"")
7787
except Exception as err:
7888
queue.put(err)

0 commit comments

Comments
 (0)