diff --git a/.gitignore b/.gitignore index 2546081..f7a22cd 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,4 @@ __pycache__ /cdm_task_service_temp /tmp_creds/ /.venv/ +/refdata_mount/ diff --git a/cdmtaskservice/app_state.py b/cdmtaskservice/app_state.py index 0641036..e0a1acf 100644 --- a/cdmtaskservice/app_state.py +++ b/cdmtaskservice/app_state.py @@ -149,7 +149,7 @@ async def build_refdata_app(app: FastAPI, cfg: CDMRefdataServiceConfig, service_ logr.info("Done") logr.info("Initializing CTS refdata client... ") refcli = await CTSRefdataClient.create(cfg.cts_root_url, cfg.cts_refdata_token) - dest.register("cts refdata client", refcli.close()) + dest.register("CTS refdata client", refcli.close()) refman = RefdataManager( refcli, s3cfg.get_internal_client(), @@ -157,6 +157,7 @@ async def build_refdata_app(app: FastAPI, cfg: CDMRefdataServiceConfig, service_ Path(cfg.refdata_local_path).absolute(), Path(cfg.refdata_meta_path).absolute(), ) + dest.register("refdata manager", refman.close) app.state._cdmstate = RefdataState( service_name=service_name, auth=auth, diff --git a/cdmtaskservice/refserv/refdata_manager.py b/cdmtaskservice/refserv/refdata_manager.py index 05d2ecd..ab0a4e0 100644 --- a/cdmtaskservice/refserv/refdata_manager.py +++ b/cdmtaskservice/refserv/refdata_manager.py @@ -1,5 +1,9 @@ """ Manager for staging reference data based on CTS records. """ +import asyncio +from concurrent.futures.process import ProcessPoolExecutor +import json +import hashlib import logging from pathlib import Path import traceback @@ -12,6 +16,11 @@ from cdmtaskservice import sites from cdmtaskservice.s3.client import S3Client from cdmtaskservice.s3.paths import S3Paths +from cdmtaskservice.s3.remote import unpack_archive + + +_MD5_FILE = "md5.json" +_MD5_FILE_TMP = _MD5_FILE + ".tmp" class RefdataManager: @@ -26,13 +35,31 @@ def __init__( refdata_meta_path: Path, ): - """ Create the manager. """ + """ + Create the manager. + + ctsrefcli - the CTS reference data client. + s3cli - the S3 client. Must be able to read all locations where CTS refdata is stored. + coman - a coroutine manager. + refdata_path - where refdata will be stored locally. + refdata_meta_path - where refdata metadata will be stored locally. + """ self._cli = _not_falsy(ctsrefcli, "ctsrefcli") self._s3cli = _not_falsy(s3cli, "s3cli") self._coman = _not_falsy(coman, "coman") self._refpath = _not_falsy(refdata_path, "refdata_path") self._metapath = _not_falsy(refdata_meta_path, "refdata_meta_path") self._logr = logging.getLogger(__name__) + # TODO CODE update to an InterpreterPoolExecutor when upgrading to Python 3.14 + # is possible. Currently some dependencies aren't compatible. + # 10 workers seems like plenty, refdata staging should be rare. + self._exe = ProcessPoolExecutor(max_workers=10) + + def close(self): + """ + Close any resources associated with the manager. + """ + self._exe.shutdown(cancel_futures=True) async def stage_refdata(self, refdata_id: str, cluster: sites.Cluster): """ @@ -64,14 +91,18 @@ async def stage_refdata(self, refdata_id: str, cluster: sites.Cluster): async def _stage_refdata(self, refdata: models.ReferenceData, cluster: sites.Cluster): try: - refdir = self._refpath / refdata.id - refdir.mkdir(parents=True, exist_ok=True) - arcpath = refdir / Path(refdata.file).name + arcpath = self._refpath / refdata.id / Path(refdata.file).name + arcpath.parent.mkdir(parents=True, exist_ok=True) self._logr.info( f"Downloading refdata {refdata.id} from S3 {refdata.file} to {arcpath}" ) await self._s3cli.download_objects_to_file(S3Paths([refdata.file]), [arcpath]) - # TODO NEXT check crc64nve, unzip, delete archive, calc md5s in interpreter thread + await asyncio.get_running_loop().run_in_executor( + self._exe, _unpack_refdata, refdata.id, arcpath, refdata.unpack, self._metapath + ) + await self._cli.update_refdata_state( + refdata.id, cluster, models.ReferenceDataState.COMPLETE + ) except Exception as e: self._logr.exception( f"Failed to stage refdata {refdata.id} for cluster {cluster.value}" @@ -84,3 +115,34 @@ async def _stage_refdata(self, refdata: models.ReferenceData, cluster: sites.Clu admin_error=str(e), traceback=traceback.format_exc() ) + + +# Expected to be run in a new process or interpreter +# Don't block the event loop with all this sync stuff +def _unpack_refdata(refdata_id: str, arcpath: Path, unpack: bool, metadata_dir: Path): + logging.basicConfig(level=logging.INFO) + logr = logging.getLogger(__name__) + if unpack: + logr.info( + f"Unpacking refdata {refdata_id} archive {arcpath}" + ) + unpack_archive(arcpath) + refparent = arcpath.parent + logr.info(f"Generating md5s for refdata {refdata_id}") + files = [file for file in refparent.rglob('*') if file.is_file()] + file_md5s = [] + for f in files: + with open(f, "rb") as fo: + file_md5s.append({ + "file": str(f.relative_to(refparent)), + "md5": hashlib.file_digest(fo, "md5").hexdigest(), + "size": f.stat().st_size + }) + tmpfile = metadata_dir / refdata_id / _MD5_FILE_TMP + tmpfile.parent.mkdir(parents=True, exist_ok=True) + with open(tmpfile, "w") as f: + f.write(json.dumps({"file_md5s": file_md5s}, indent=4)) + # Ensure no reading of partially written files + finalfile = metadata_dir / refdata_id / _MD5_FILE + tmpfile.rename(finalfile) + logr.info(f"Wrote md5s for refdata {refdata_id} to {finalfile}") diff --git a/cdmtaskservice/s3/remote.py b/cdmtaskservice/s3/remote.py index f301d19..abd27b1 100644 --- a/cdmtaskservice/s3/remote.py +++ b/cdmtaskservice/s3/remote.py @@ -321,19 +321,24 @@ async def _process_download( if cache_dir: _add_to_cache(outputpath, cache_path) if unpack: # never used with a cache - try: - op = str(outputpath).lower() - if op.endswith(_EXT_TGZ) or op.endswith(_EXT_TARGZ): - return _extract_tar(outputpath) - elif op.endswith(_EXT_GZ): - return _extract_gz(outputpath) - else: - raise ValueError( - f"Unsupported unpack file type for file {outputpath}. " + - f"Only {', '.join(UNPACK_FILE_EXTENSIONS)} are supported." - ) - finally: - outputpath.unlink(missing_ok=True) + unpack_archive(outputpath) + + +def unpack_archive(archive: Path): + """ Unpack a *.tgz, *.tar.gz, or *.gz file. """ + try: + op = str(archive).lower() + if op.endswith(_EXT_TGZ) or op.endswith(_EXT_TARGZ): + return _extract_tar(archive) + elif op.endswith(_EXT_GZ): + return _extract_gz(archive) + else: + raise ValueError( + f"Unsupported unpack file type for file {archive}. " + + f"Only {', '.join(UNPACK_FILE_EXTENSIONS)} are supported." + ) + finally: + archive.unlink(missing_ok=True) async def _process_downloads( diff --git a/docker-compose.yaml b/docker-compose.yaml index bf59355..83c7743 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -113,7 +113,7 @@ services: - KBCTS_REFDATA_SERVICE_TOKEN=token_with_refservice_role - KBCTS_SERVICE_GROUP=localdev volumes: - - :/refdata + - ./refdata_mount:/refdata kafka: # Note the image should sync with the test.cfg.example and GHA test files.