From 01da099f787ab42c0af396c5430e0499ff98427a Mon Sep 17 00:00:00 2001 From: Xiao Gui Date: Mon, 1 Aug 2022 18:37:17 +0200 Subject: [PATCH] add support for seg map and meshes --- .gitignore | 4 ++- README.md | 3 -- chunk_replicator/accessor.py | 61 ++++++++++++++++++++++++++++++++---- chunk_replicator/logger.py | 4 +++ 4 files changed, 62 insertions(+), 10 deletions(-) create mode 100644 chunk_replicator/logger.py diff --git a/.gitignore b/.gitignore index 746fb95..6165502 100644 --- a/.gitignore +++ b/.gitignore @@ -151,4 +151,6 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ -bla.py \ No newline at end of file +bla.py + +tmp/* \ No newline at end of file diff --git a/README.md b/README.md index 1c80092..abedf2e 100644 --- a/README.md +++ b/README.md @@ -20,9 +20,6 @@ user = User(auth_token=auth_token) bucket = DataProxyBucket(user, "MY_BUCKET_NAME") proxy = EbrainsDataproxyHttpReplicatorAccessor(dataproxybucket=bucket, prefix="bigbrain_2015") -io = get_IO_for_existing_dataset(bigbrain_accessor) -import json -proxy.store_file("info", json.dumps(io.info).encode("utf-8")) bigbrain_accessor.mirror_to(proxy) ``` diff --git a/chunk_replicator/accessor.py b/chunk_replicator/accessor.py index d1f6735..97cdcc1 100644 --- a/chunk_replicator/accessor.py +++ b/chunk_replicator/accessor.py @@ -1,4 +1,4 @@ -from typing import Set +from typing import List, Set from neuroglancer_scripts.accessor import Accessor, _CHUNK_PATTERN_FLAT from neuroglancer_scripts.http_accessor import HttpAccessor from neuroglancer_scripts.precomputed_io import get_IO_for_existing_dataset @@ -10,6 +10,7 @@ from .dataproxy import DataProxyBucket from .util import retry +from .logger import logger try: WORKER_THREADS = int(os.getenv("WORKER_THREADS", "16")) @@ -30,14 +31,62 @@ class HttpMirrorSrcAccessor(HttpAccessor, MirrorSrcAccessor): def mirror_chunk(self, dst: Accessor, key: str, chunk_coords): chunk = self.fetch_chunk(key, chunk_coords) dst.store_chunk(chunk, key, chunk_coords) + + def mirror_file(self, dst: Accessor, relative_path: str, mime_type="application/octet-stream"): + file_content = self.fetch_file(relative_path) + dst.store_file(relative_path, file_content, mime_type) + if relative_path.endswith(":0"): + dst.store_file(relative_path[:-2], file_content, mime_type) + - def mirror_to(self, dst: Accessor): + def mirror_to(self, dst: Accessor, *, mesh_indicies: List[int]=None): assert dst.can_write io = get_IO_for_existing_dataset(self) - print("Begin mirroring. Got info:", io.info) + logger.debug("Begin mirroring. Got info:", io.info) + + if mesh_indicies is not None: + logger.debug("mesh_indicies provided, mirroring meshes...") + mesh_dir = io.info.get("mesh") + assert mesh_dir, f"expecting mesh key defined in info, but is not." + + with ThreadPoolExecutor(max_workers=WORKER_THREADS) as executor: + for progress in tqdm( + executor.map( + self.mirror_file, + repeat(dst), + (f"{mesh_dir}/{str(idx)}:0" for idx in mesh_indicies), + repeat("application/json") + ), + total=len(mesh_indicies), + desc="Fetching and writing mesh metadata...", + unit="files", + leave=True, + ): + ... + + fragments = [f"{mesh_dir}/{frag}" + for idx in mesh_indicies + for frag in json.loads( + dst.fetch_file(f"{mesh_dir}/{str(idx)}:0").decode("utf-8") + ).get("fragments")] + + with ThreadPoolExecutor(max_workers=WORKER_THREADS) as executor: + for progress in tqdm( + executor.map( + self.mirror_file, + repeat(dst), + fragments, + ), + total=len(fragments), + desc="Fetching and writing meshes...", + unit="meshes", + leave=True + ): + ... + - print("Mirroring info ...") + logger.debug("Mirroring info ...") dst.store_file("info", json.dumps(io.info).encode("utf-8")) for scale in io.info.get('scales'): @@ -74,7 +123,7 @@ def mirror_to(self, dst: Accessor): if not should_check_chunk_exists or not dst.chunk_exists(key, chunk_coord) ] - print(f"Mirroring data for key {key}") + logger.debug(f"Mirroring data for key {key}") with ThreadPoolExecutor(max_workers=WORKER_THREADS) as executor: for progress in tqdm( @@ -153,7 +202,7 @@ def store_chunk(self, buf, key, chunk_coords, mime_type="application/octet-strea def chunk_exists(self, key, chunk_coords): if not self._existing_obj_name_set: - print(f"Checking existing objects. Listing existing objects for {self.prefix}...") + logger.debug(f"Checking existing objects. Listing existing objects for {self.prefix}...") for obj in tqdm( self.dataproxybucket.iterate_objects(prefix=self.prefix), diff --git a/chunk_replicator/logger.py b/chunk_replicator/logger.py new file mode 100644 index 0000000..9facd1d --- /dev/null +++ b/chunk_replicator/logger.py @@ -0,0 +1,4 @@ +import logging + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO)