Skip to content

Commit

Permalink
add support for seg map and meshes
Browse files Browse the repository at this point in the history
  • Loading branch information
xgui3783 committed Aug 1, 2022
1 parent 4fe55b9 commit 01da099
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 10 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,6 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

bla.py
bla.py

tmp/*
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ user = User(auth_token=auth_token)
bucket = DataProxyBucket(user, "MY_BUCKET_NAME")

proxy = EbrainsDataproxyHttpReplicatorAccessor(dataproxybucket=bucket, prefix="bigbrain_2015")
io = get_IO_for_existing_dataset(bigbrain_accessor)
import json
proxy.store_file("info", json.dumps(io.info).encode("utf-8"))
bigbrain_accessor.mirror_to(proxy)
```

Expand Down
61 changes: 55 additions & 6 deletions chunk_replicator/accessor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Set
from typing import List, Set
from neuroglancer_scripts.accessor import Accessor, _CHUNK_PATTERN_FLAT
from neuroglancer_scripts.http_accessor import HttpAccessor
from neuroglancer_scripts.precomputed_io import get_IO_for_existing_dataset
Expand All @@ -10,6 +10,7 @@

from .dataproxy import DataProxyBucket
from .util import retry
from .logger import logger

try:
WORKER_THREADS = int(os.getenv("WORKER_THREADS", "16"))
Expand All @@ -30,14 +31,62 @@ class HttpMirrorSrcAccessor(HttpAccessor, MirrorSrcAccessor):
def mirror_chunk(self, dst: Accessor, key: str, chunk_coords):
chunk = self.fetch_chunk(key, chunk_coords)
dst.store_chunk(chunk, key, chunk_coords)

def mirror_file(self, dst: Accessor, relative_path: str, mime_type="application/octet-stream"):
file_content = self.fetch_file(relative_path)
dst.store_file(relative_path, file_content, mime_type)
if relative_path.endswith(":0"):
dst.store_file(relative_path[:-2], file_content, mime_type)


def mirror_to(self, dst: Accessor):
def mirror_to(self, dst: Accessor, *, mesh_indicies: List[int]=None):
assert dst.can_write
io = get_IO_for_existing_dataset(self)

print("Begin mirroring. Got info:", io.info)
logger.debug("Begin mirroring. Got info:", io.info)

if mesh_indicies is not None:
logger.debug("mesh_indicies provided, mirroring meshes...")
mesh_dir = io.info.get("mesh")
assert mesh_dir, f"expecting mesh key defined in info, but is not."

with ThreadPoolExecutor(max_workers=WORKER_THREADS) as executor:
for progress in tqdm(
executor.map(
self.mirror_file,
repeat(dst),
(f"{mesh_dir}/{str(idx)}:0" for idx in mesh_indicies),
repeat("application/json")
),
total=len(mesh_indicies),
desc="Fetching and writing mesh metadata...",
unit="files",
leave=True,
):
...

fragments = [f"{mesh_dir}/{frag}"
for idx in mesh_indicies
for frag in json.loads(
dst.fetch_file(f"{mesh_dir}/{str(idx)}:0").decode("utf-8")
).get("fragments")]

with ThreadPoolExecutor(max_workers=WORKER_THREADS) as executor:
for progress in tqdm(
executor.map(
self.mirror_file,
repeat(dst),
fragments,
),
total=len(fragments),
desc="Fetching and writing meshes...",
unit="meshes",
leave=True
):
...


print("Mirroring info ...")
logger.debug("Mirroring info ...")
dst.store_file("info", json.dumps(io.info).encode("utf-8"))

for scale in io.info.get('scales'):
Expand Down Expand Up @@ -74,7 +123,7 @@ def mirror_to(self, dst: Accessor):
if not should_check_chunk_exists or not dst.chunk_exists(key, chunk_coord)
]

print(f"Mirroring data for key {key}")
logger.debug(f"Mirroring data for key {key}")

with ThreadPoolExecutor(max_workers=WORKER_THREADS) as executor:
for progress in tqdm(
Expand Down Expand Up @@ -153,7 +202,7 @@ def store_chunk(self, buf, key, chunk_coords, mime_type="application/octet-strea
def chunk_exists(self, key, chunk_coords):
if not self._existing_obj_name_set:

print(f"Checking existing objects. Listing existing objects for {self.prefix}...")
logger.debug(f"Checking existing objects. Listing existing objects for {self.prefix}...")

for obj in tqdm(
self.dataproxybucket.iterate_objects(prefix=self.prefix),
Expand Down
4 changes: 4 additions & 0 deletions chunk_replicator/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

0 comments on commit 01da099

Please sign in to comment.