|
1 | 1 | import bz2
|
| 2 | +import concurrent.futures |
2 | 3 | import io
|
3 | 4 | import json
|
4 | 5 | import os
|
5 | 6 | import shutil
|
6 | 7 | import subprocess
|
7 | 8 | import sys
|
8 | 9 | import tarfile
|
| 10 | +import time |
| 11 | + |
| 12 | +import requests |
9 | 13 |
|
10 | 14 | from metaflow.metaflow_config import DATASTORE_LOCAL_DIR
|
11 | 15 | from metaflow.plugins import DATASTORES
|
|
15 | 19 |
|
16 | 20 | # Bootstraps a valid conda virtual environment composed of conda and pypi packages
|
17 | 21 |
|
| 22 | + |
| 23 | +def print_timer(operation, start_time): |
| 24 | + duration = time.time() - start_time |
| 25 | + print(f"Time taken for {operation}: {duration:.2f} seconds") |
| 26 | + |
| 27 | + |
18 | 28 | if __name__ == "__main__":
|
| 29 | + total_start_time = time.time() |
19 | 30 | if len(sys.argv) != 5:
|
20 | 31 | print("Usage: bootstrap.py <flow_name> <id> <datastore_type> <architecture>")
|
21 | 32 | sys.exit(1)
|
|
47 | 58 |
|
48 | 59 | prefix = os.path.join(os.getcwd(), architecture, id_)
|
49 | 60 | pkgs_dir = os.path.join(os.getcwd(), ".pkgs")
|
| 61 | + conda_pkgs_dir = os.path.join(pkgs_dir, "conda") |
| 62 | + pypi_pkgs_dir = os.path.join(pkgs_dir, "pypi") |
50 | 63 | manifest_dir = os.path.join(os.getcwd(), DATASTORE_LOCAL_DIR, flow_name)
|
51 | 64 |
|
52 | 65 | datastores = [d for d in DATASTORES if d.TYPE == datastore_type]
|
|
64 | 77 | os.path.join(os.getcwd(), MAGIC_FILE),
|
65 | 78 | os.path.join(manifest_dir, MAGIC_FILE),
|
66 | 79 | )
|
67 |
| - |
68 | 80 | with open(os.path.join(manifest_dir, MAGIC_FILE)) as f:
|
69 | 81 | env = json.load(f)[id_][architecture]
|
70 | 82 |
|
71 |
| - # Download Conda packages. |
72 |
| - conda_pkgs_dir = os.path.join(pkgs_dir, "conda") |
73 |
| - with storage.load_bytes([package["path"] for package in env["conda"]]) as results: |
74 |
| - for key, tmpfile, _ in results: |
75 |
| - # Ensure that conda packages go into architecture specific folders. |
76 |
| - # The path looks like REPO/CHANNEL/CONDA_SUBDIR/PACKAGE. We trick |
77 |
| - # Micromamba into believing that all packages are coming from a local |
78 |
| - # channel - the only hurdle is ensuring that packages are organised |
79 |
| - # properly. |
80 |
| - |
81 |
| - # TODO: consider RAM disk |
82 |
| - dest = os.path.join(conda_pkgs_dir, "/".join(key.split("/")[-2:])) |
83 |
| - os.makedirs(os.path.dirname(dest), exist_ok=True) |
84 |
| - shutil.move(tmpfile, dest) |
85 |
| - |
86 |
| - # Create Conda environment. |
87 |
| - cmds = [ |
| 83 | + def run_cmd(cmd): |
| 84 | + cmd_start_time = time.time() |
| 85 | + result = subprocess.run( |
| 86 | + cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True |
| 87 | + ) |
| 88 | + print_timer(f"Command: {cmd}", cmd_start_time) |
| 89 | + if result.returncode != 0: |
| 90 | + print(f"Bootstrap failed while executing: {cmd}") |
| 91 | + print("Stdout:", result.stdout) |
| 92 | + print("Stderr:", result.stderr) |
| 93 | + sys.exit(1) |
| 94 | + |
| 95 | + def install_micromamba(architecture): |
88 | 96 | # TODO: check if mamba or conda are already available on the image
|
89 |
| - # TODO: micromamba installation can be pawned off to micromamba.py |
90 |
| - f"""set -e; |
91 |
| - if ! command -v micromamba >/dev/null 2>&1; then |
92 |
| - mkdir -p micromamba; |
93 |
| - python -c "import requests, bz2, sys; data = requests.get('https://micro.mamba.pm/api/micromamba/{architecture}/1.5.7').content; sys.stdout.buffer.write(bz2.decompress(data))" | tar -xv -C $(pwd)/micromamba bin/micromamba --strip-components 1; |
94 |
| - export PATH=$PATH:$(pwd)/micromamba; |
95 |
| - if ! command -v micromamba >/dev/null 2>&1; then |
96 |
| - echo "Failed to install Micromamba!"; |
97 |
| - exit 1; |
98 |
| - fi; |
99 |
| - fi""", |
100 |
| - # Create a conda environment through Micromamba. |
101 |
| - f'''set -e; |
102 |
| - tmpfile=$(mktemp); |
103 |
| - echo "@EXPLICIT" > "$tmpfile"; |
104 |
| - ls -d {conda_pkgs_dir}/*/* >> "$tmpfile"; |
105 |
| - export PATH=$PATH:$(pwd)/micromamba; |
106 |
| - export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; |
107 |
| - micromamba create --yes --offline --no-deps --safety-checks=disabled --no-extra-safety-checks --prefix {prefix} --file "$tmpfile"; |
108 |
| - rm "$tmpfile"''', |
109 |
| - ] |
110 |
| - |
111 |
| - # Download PyPI packages. |
112 |
| - if "pypi" in env: |
113 |
| - pypi_pkgs_dir = os.path.join(pkgs_dir, "pypi") |
114 |
| - with storage.load_bytes( |
115 |
| - [package["path"] for package in env["pypi"]] |
116 |
| - ) as results: |
| 97 | + micromamba_timer = time.time() |
| 98 | + micromamba_dir = os.path.join(os.getcwd(), "micromamba") |
| 99 | + micromamba_path = os.path.join(micromamba_dir, "bin", "micromamba") |
| 100 | + |
| 101 | + if which("micromamba") or os.path.exists(micromamba_path): |
| 102 | + return micromamba_path |
| 103 | + |
| 104 | + os.makedirs(micromamba_dir, exist_ok=True) |
| 105 | + # TODO: download micromamba from datastore |
| 106 | + url = f"https://micro.mamba.pm/api/micromamba/{architecture}/1.5.7" |
| 107 | + response = requests.get(url, stream=True) |
| 108 | + if response.status_code != 200: |
| 109 | + raise Exception( |
| 110 | + f"Failed to download micromamba: HTTP {response.status_code}" |
| 111 | + ) |
| 112 | + tar_content = bz2.BZ2Decompressor().decompress(response.raw.read()) |
| 113 | + with tarfile.open(fileobj=io.BytesIO(tar_content), mode="r:") as tar: |
| 114 | + tar.extract("bin/micromamba", path=micromamba_dir, set_attrs=False) |
| 115 | + |
| 116 | + os.chmod(micromamba_path, 0o755) |
| 117 | + if not os.path.exists(micromamba_path): |
| 118 | + raise Exception("Failed to install Micromamba!") |
| 119 | + |
| 120 | + os.environ["PATH"] += os.pathsep + os.path.dirname(micromamba_path) |
| 121 | + print_timer("Downloading micromamba", micromamba_timer) |
| 122 | + return micromamba_path |
| 123 | + |
| 124 | + def download_conda_packages(storage, packages, dest_dir): |
| 125 | + download_start_time = time.time() |
| 126 | + os.makedirs(dest_dir, exist_ok=True) |
| 127 | + with storage.load_bytes([package["path"] for package in packages]) as results: |
117 | 128 | for key, tmpfile, _ in results:
|
118 |
| - dest = os.path.join(pypi_pkgs_dir, os.path.basename(key)) |
| 129 | + # Ensure that conda packages go into architecture specific folders. |
| 130 | + # The path looks like REPO/CHANNEL/CONDA_SUBDIR/PACKAGE. We trick |
| 131 | + # Micromamba into believing that all packages are coming from a local |
| 132 | + # channel - the only hurdle is ensuring that packages are organised |
| 133 | + # properly. |
| 134 | + |
| 135 | + # TODO: consider RAM disk |
| 136 | + dest = os.path.join(dest_dir, "/".join(key.split("/")[-2:])) |
119 | 137 | os.makedirs(os.path.dirname(dest), exist_ok=True)
|
120 | 138 | shutil.move(tmpfile, dest)
|
| 139 | + print_timer("Downloading conda packages", download_start_time) |
| 140 | + return dest_dir |
121 | 141 |
|
122 |
| - # Install PyPI packages. |
123 |
| - cmds.extend( |
124 |
| - [ |
125 |
| - f"""set -e; |
126 |
| - export PATH=$PATH:$(pwd)/micromamba; |
127 |
| - export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; |
128 |
| - micromamba run --prefix {prefix} python -m pip --disable-pip-version-check install --root-user-action=ignore --no-compile {pypi_pkgs_dir}/*.whl --no-user""" |
129 |
| - ] |
130 |
| - ) |
| 142 | + def download_pypi_packages(storage, packages, dest_dir): |
| 143 | + download_start_time = time.time() |
| 144 | + os.makedirs(dest_dir, exist_ok=True) |
| 145 | + with storage.load_bytes([package["path"] for package in packages]) as results: |
| 146 | + for key, tmpfile, _ in results: |
| 147 | + dest = os.path.join(dest_dir, os.path.basename(key)) |
| 148 | + shutil.move(tmpfile, dest) |
| 149 | + print_timer("Downloading pypi packages", download_start_time) |
| 150 | + return dest_dir |
| 151 | + |
| 152 | + def create_conda_environment(prefix, conda_pkgs_dir): |
| 153 | + cmd = f'''set -e; |
| 154 | + tmpfile=$(mktemp); |
| 155 | + echo "@EXPLICIT" > "$tmpfile"; |
| 156 | + ls -d {conda_pkgs_dir}/*/* >> "$tmpfile"; |
| 157 | + export PATH=$PATH:$(pwd)/micromamba; |
| 158 | + export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; |
| 159 | + micromamba create --yes --offline --no-deps --safety-checks=disabled --no-extra-safety-checks --prefix {prefix} --file "$tmpfile"; |
| 160 | + rm "$tmpfile"''' |
| 161 | + run_cmd(cmd) |
131 | 162 |
|
132 |
| - for cmd in cmds: |
133 |
| - result = subprocess.run( |
134 |
| - cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE |
| 163 | + def install_pypi_packages(prefix, pypi_pkgs_dir): |
| 164 | + cmd = f"""set -e; |
| 165 | + export PATH=$PATH:$(pwd)/micromamba; |
| 166 | + export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; |
| 167 | + micromamba run --prefix {prefix} python -m pip --disable-pip-version-check install --root-user-action=ignore --no-compile --no-index --no-cache-dir --no-deps --prefer-binary --find-links={pypi_pkgs_dir} {pypi_pkgs_dir}/*.whl --no-user""" |
| 168 | + run_cmd(cmd) |
| 169 | + |
| 170 | + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: |
| 171 | + # install micromamba, download conda and pypi packages in parallel |
| 172 | + future_install_micromamba = executor.submit(install_micromamba, architecture) |
| 173 | + future_download_conda_packages = executor.submit( |
| 174 | + download_conda_packages, storage, env["conda"], conda_pkgs_dir |
135 | 175 | )
|
136 |
| - if result.returncode != 0: |
137 |
| - print(f"Bootstrap failed while executing: {cmd}") |
138 |
| - print("Stdout:", result.stdout.decode()) |
139 |
| - print("Stderr:", result.stderr.decode()) |
140 |
| - sys.exit(1) |
| 176 | + future_download_pypi_packages = None |
| 177 | + if "pypi" in env: |
| 178 | + future_download_pypi_packages = executor.submit( |
| 179 | + download_pypi_packages, storage, env["pypi"], pypi_pkgs_dir |
| 180 | + ) |
| 181 | + # create conda environment after micromamba is installed and conda packages are downloaded |
| 182 | + concurrent.futures.wait( |
| 183 | + [future_install_micromamba, future_download_conda_packages] |
| 184 | + ) |
| 185 | + future_create_conda_environment = executor.submit( |
| 186 | + create_conda_environment, prefix, conda_pkgs_dir |
| 187 | + ) |
| 188 | + if "pypi" in env: |
| 189 | + # install pypi packages after conda environment is created and pypi packages are downloaded |
| 190 | + concurrent.futures.wait( |
| 191 | + [future_create_conda_environment, future_download_pypi_packages] |
| 192 | + ) |
| 193 | + future_install_pypi_packages = executor.submit( |
| 194 | + install_pypi_packages, prefix, pypi_pkgs_dir |
| 195 | + ) |
| 196 | + # wait for pypi packages to be installed |
| 197 | + future_install_pypi_packages.result() |
| 198 | + else: |
| 199 | + # wait for conda environment to be created |
| 200 | + future_create_conda_environment.result() |
| 201 | + |
| 202 | + total_time = time.time() - total_start_time |
| 203 | + print(f"{total_time:.2f}") |
0 commit comments