Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ __pycache__
/cdm_task_service_temp
/tmp_creds/
/.venv/
/refdata_mount/
3 changes: 2 additions & 1 deletion cdmtaskservice/app_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,15 @@ async def build_refdata_app(app: FastAPI, cfg: CDMRefdataServiceConfig, service_
logr.info("Done")
logr.info("Initializing CTS refdata client... ")
refcli = await CTSRefdataClient.create(cfg.cts_root_url, cfg.cts_refdata_token)
dest.register("cts refdata client", refcli.close())
dest.register("CTS refdata client", refcli.close())
refman = RefdataManager(
refcli,
s3cfg.get_internal_client(),
coman,
Path(cfg.refdata_local_path).absolute(),
Path(cfg.refdata_meta_path).absolute(),
)
dest.register("refdata manager", refman.close)
app.state._cdmstate = RefdataState(
service_name=service_name,
auth=auth,
Expand Down
72 changes: 67 additions & 5 deletions cdmtaskservice/refserv/refdata_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
""" Manager for staging reference data based on CTS records. """

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
import json
import hashlib
import logging
from pathlib import Path
import traceback
Expand All @@ -12,6 +16,11 @@
from cdmtaskservice import sites
from cdmtaskservice.s3.client import S3Client
from cdmtaskservice.s3.paths import S3Paths
from cdmtaskservice.s3.remote import unpack_archive


_MD5_FILE = "md5.json"
_MD5_FILE_TMP = _MD5_FILE + ".tmp"


class RefdataManager:
Expand All @@ -26,13 +35,31 @@ def __init__(
refdata_meta_path: Path,

):
""" Create the manager. """
"""
Create the manager.

ctsrefcli - the CTS reference data client.
s3cli - the S3 client. Must be able to read all locations where CTS refdata is stored.
coman - a coroutine manager.
refdata_path - where refdata will be stored locally.
refdata_meta_path - where refdata metadata will be stored locally.
"""
self._cli = _not_falsy(ctsrefcli, "ctsrefcli")
self._s3cli = _not_falsy(s3cli, "s3cli")
self._coman = _not_falsy(coman, "coman")
self._refpath = _not_falsy(refdata_path, "refdata_path")
self._metapath = _not_falsy(refdata_meta_path, "refdata_meta_path")
self._logr = logging.getLogger(__name__)
# TODO CODE update to an InterpreterPoolExecutor when upgrading to Python 3.14
# is possible. Currently some dependencies aren't compatible.
# 10 workers seems like plenty, refdata staging should be rare.
self._exe = ProcessPoolExecutor(max_workers=10)

def close(self):
"""
Close any resources associated with the manager.
"""
self._exe.shutdown(cancel_futures=True)

async def stage_refdata(self, refdata_id: str, cluster: sites.Cluster):
"""
Expand Down Expand Up @@ -64,14 +91,18 @@ async def stage_refdata(self, refdata_id: str, cluster: sites.Cluster):

async def _stage_refdata(self, refdata: models.ReferenceData, cluster: sites.Cluster):
try:
refdir = self._refpath / refdata.id
refdir.mkdir(parents=True, exist_ok=True)
arcpath = refdir / Path(refdata.file).name
arcpath = self._refpath / refdata.id / Path(refdata.file).name
arcpath.parent.mkdir(parents=True, exist_ok=True)
self._logr.info(
f"Downloading refdata {refdata.id} from S3 {refdata.file} to {arcpath}"
)
await self._s3cli.download_objects_to_file(S3Paths([refdata.file]), [arcpath])
# TODO NEXT check crc64nve, unzip, delete archive, calc md5s in interpreter thread
await asyncio.get_running_loop().run_in_executor(
self._exe, _unpack_refdata, refdata.id, arcpath, refdata.unpack, self._metapath
)
await self._cli.update_refdata_state(
refdata.id, cluster, models.ReferenceDataState.COMPLETE
)
except Exception as e:
self._logr.exception(
f"Failed to stage refdata {refdata.id} for cluster {cluster.value}"
Expand All @@ -84,3 +115,34 @@ async def _stage_refdata(self, refdata: models.ReferenceData, cluster: sites.Clu
admin_error=str(e),
traceback=traceback.format_exc()
)


# Expected to be run in a new process or interpreter
# Don't block the event loop with all this sync stuff
def _unpack_refdata(refdata_id: str, arcpath: Path, unpack: bool, metadata_dir: Path):
logging.basicConfig(level=logging.INFO)
logr = logging.getLogger(__name__)
if unpack:
logr.info(
f"Unpacking refdata {refdata_id} archive {arcpath}"
)
unpack_archive(arcpath)
refparent = arcpath.parent
logr.info(f"Generating md5s for refdata {refdata_id}")
files = [file for file in refparent.rglob('*') if file.is_file()]
file_md5s = []
for f in files:
with open(f, "rb") as fo:
file_md5s.append({
"file": str(f.relative_to(refparent)),
"md5": hashlib.file_digest(fo, "md5").hexdigest(),
"size": f.stat().st_size
})
tmpfile = metadata_dir / refdata_id / _MD5_FILE_TMP
tmpfile.parent.mkdir(parents=True, exist_ok=True)
with open(tmpfile, "w") as f:
f.write(json.dumps({"file_md5s": file_md5s}, indent=4))
# Ensure no reading of partially written files
finalfile = metadata_dir / refdata_id / _MD5_FILE
tmpfile.rename(finalfile)
logr.info(f"Wrote md5s for refdata {refdata_id} to {finalfile}")
31 changes: 18 additions & 13 deletions cdmtaskservice/s3/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,19 +321,24 @@ async def _process_download(
if cache_dir:
_add_to_cache(outputpath, cache_path)
if unpack: # never used with a cache
try:
op = str(outputpath).lower()
if op.endswith(_EXT_TGZ) or op.endswith(_EXT_TARGZ):
return _extract_tar(outputpath)
elif op.endswith(_EXT_GZ):
return _extract_gz(outputpath)
else:
raise ValueError(
f"Unsupported unpack file type for file {outputpath}. " +
f"Only {', '.join(UNPACK_FILE_EXTENSIONS)} are supported."
)
finally:
outputpath.unlink(missing_ok=True)
unpack_archive(outputpath)


def unpack_archive(archive: Path):
""" Unpack a *.tgz, *.tar.gz, or *.gz file. """
try:
op = str(archive).lower()
if op.endswith(_EXT_TGZ) or op.endswith(_EXT_TARGZ):
return _extract_tar(archive)
elif op.endswith(_EXT_GZ):
return _extract_gz(archive)
else:
raise ValueError(
f"Unsupported unpack file type for file {archive}. " +
f"Only {', '.join(UNPACK_FILE_EXTENSIONS)} are supported."
)
finally:
archive.unlink(missing_ok=True)


async def _process_downloads(
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ services:
- KBCTS_REFDATA_SERVICE_TOKEN=token_with_refservice_role
- KBCTS_SERVICE_GROUP=localdev
volumes:
- <path_to_refdata_mount>:/refdata
- ./refdata_mount:/refdata

kafka:
# Note the image should sync with the test.cfg.example and GHA test files.
Expand Down
Loading