Skip to content

Commit

Permalink
Improve HTTPS downloads (#277)
Browse files Browse the repository at this point in the history
* Cleaned up the keyword arguments, docstring

- Only one keyword argument is used, so having a large (and
  undocumented) flexibility with **kwargs is unneeded
- Docstring style mixed NumPy and Google

* Add a test requirements file, simplify test YAML

Sorry, a little house maintenance while I'm at it

* Fix how parallel downloads are implemented

Previous version was using an executor in a way that would
produce an unlimited number of threads, which can cause problems
for large datasets

* Make a progress bar, error checking

* Removed deprecated code

It will live in git and our hearts forever

* Flake8 fixes
  • Loading branch information
WardLT authored Oct 24, 2022
1 parent 48d5baf commit 83fcee2
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 180 deletions.
8 changes: 2 additions & 6 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,11 @@ jobs:
env:
GLOBUS_CONFIG: "${{ secrets.GLOBUS_CONFIG }}"



- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 pytest
pip install pytest-cov~=2.12.1
pip install jsonschema
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
pip install -r requirements.txt
pip install -r test-requirements.txt
- name: Lint with flake8
run: |
Expand Down
71 changes: 42 additions & 29 deletions foundry/foundry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
import logging
import warnings
import os
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed

from mdf_connect_client import MDFConnectClient
from mdf_forge import Forge
from dlhub_sdk import DLHubClient
from tqdm.auto import tqdm

from .utils import is_pandas_pytable, is_doi
from .utils import _read_csv, _read_json, _read_excel

Expand All @@ -23,7 +25,6 @@
)
from foundry.https_download import download_file, recursive_ls


logger = logging.getLogger(__name__)


Expand All @@ -44,7 +45,7 @@ class Foundry(FoundryMetadata):
xtract_tokens: Any

def __init__(
self, no_browser=False, no_local_server=False, index="mdf", authorizers=None, **data
self, no_browser=False, no_local_server=False, index="mdf", authorizers=None, **data
):
"""Initialize a Foundry client
Args:
Expand All @@ -67,16 +68,16 @@ def __init__(
auths = authorizers
else:
services = [
"data_mdf",
"mdf_connect",
"search",
"petrel",
"transfer",
"dlhub",
"funcx",
"openid",
"https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all",
]
"data_mdf",
"mdf_connect",
"search",
"petrel",
"transfer",
"dlhub",
"funcx",
"openid",
"https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all",
]
auths = mdf_toolbox.login(
services=services,
app_name="Foundry",
Expand Down Expand Up @@ -321,7 +322,7 @@ def get_citation(self) -> str:
return bibtex

def publish(self, foundry_metadata, data_source, title, authors, update=False,
publication_year=None, **kwargs,):
publication_year=None, **kwargs, ):
"""Submit a dataset for publication
Args:
foundry_metadata (dict): Dict of metadata describing data package
Expand Down Expand Up @@ -444,19 +445,20 @@ def configure(self, **kwargs):
self.config = FoundryConfig(**kwargs)
return self

def download(self, globus=True, verbose=False, **kwargs):
def download(self, globus: bool = True, interval: int = 20, parallel_https: int = 4, verbose: bool = False) -> 'Foundry':
"""Download a Foundry dataset
Args:
globus (bool): if True, use Globus to download the data else try HTTPS
verbose (bool): if True print out debug information during the download
globus: if True, use Globus to download the data else try HTTPS
interval: How often to wait before checking Globus transfer status
parallel_https: Number of files to download in parallel if using HTTPS
verbose: Produce more debug messages to screen
Returns
-------
(Foundry): self: for chaining
Returns:
self, for chaining
"""
# Check if the dir already exists
path = os.path.join(self.config.local_cache_dir, self.mdf["source_id"])

if os.path.isdir(path):
# if directory is present, but doesn't have the correct number of files inside,
# dataset will attempt to redownload
Expand All @@ -472,10 +474,12 @@ def download(self, globus=True, verbose=False, **kwargs):
if len(missing_files) > 0:
logger.info(f"Dataset will be redownloaded, following files are missing: {missing_files}")
else:
logger.info("Dataset has already been downloaded and contains all the desired files")
return self
else:
# in the case of no splits, ensure the directory contains at least one file
if (len(os.listdir(path)) >= 1):
if len(os.listdir(path)) >= 1:
logger.info("Dataset has already been downloaded and contains all the desired files")
return self
else:
logger.info("Dataset will be redownloaded, expected file is missing")
Expand All @@ -488,7 +492,7 @@ def download(self, globus=True, verbose=False, **kwargs):
res,
dest=self.config.local_cache_dir,
dest_ep=self.config.destination_endpoint,
interval=kwargs.get("interval", 20),
interval=interval,
download_datasets=True,
)
else:
Expand All @@ -499,12 +503,21 @@ def download(self, globus=True, verbose=False, **kwargs):
"source_id": self.mdf["source_id"]
}

task_list = list(recursive_ls(self.transfer_client,
# Begin finding files to download
task_generator = recursive_ls(self.transfer_client,
https_config['source_ep_id'],
https_config['folder_to_crawl']))
for task in task_list:
with ThreadPoolExecutor() as executor:
executor.submit(download_file, task, https_config)
https_config['folder_to_crawl'])
with ThreadPoolExecutor(parallel_https) as executor:
# First submit all files
futures = [executor.submit(lambda x: download_file(x, https_config), f)
for f in tqdm(task_generator, disable=not verbose, desc="Finding files")]

# Check that they completed successfully
for result in tqdm(as_completed(futures), disable=not verbose, desc="Downloading files"):
if result.exception() is not None:
for f in futures:
f.cancel()
raise result.exception()

# after download check making sure directory exists, contains all indicated files
if os.path.isdir(path):
Expand All @@ -521,7 +534,7 @@ def download(self, globus=True, verbose=False, **kwargs):
raise FileNotFoundError(f"Downloaded directory does not contain the following files: {missing_files}")

else:
if (len(os.listdir(path)) < 1):
if len(os.listdir(path)) < 1:
raise FileNotFoundError("Downloaded directory does not contain the expected file")
else:
raise NotADirectoryError("Unable to create directory to download data")
Expand Down
169 changes: 25 additions & 144 deletions foundry/https_download.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import urllib3
import json
import time
import os
import multiprocessing
import requests
from collections import deque
from joblib import Parallel, delayed

import requests
from globus_sdk import TransferClient


def recursive_ls(tc: TransferClient, ep: str, path: str, max_depth: int = 3):
"""Find all files in a Globus directory recursively
def recursive_ls(tc, ep, path, max_depth=3):
Args:
tc: TransferClient authorized to access the directory
ep: Endpoint on which the files reside
path: Path to the files being downloaded
max_depth: Maximum recurse depth
Yields:
Dictionaries describing the location of the files. Each includes at least
"name": Name of the file
"path": Absolute path to the file's location
"""
queue = deque()
queue.append((path, "", 0))
yield from _get_files(tc, ep, queue, max_depth)
Expand Down Expand Up @@ -38,7 +48,14 @@ def _get_files(tc, ep, queue, max_depth):
yield item


# TODO (wardlt): Avoid passing dictionaries, as documenting their content is tedious
def download_file(item, https_config):
"""Download a file to disk
Args:
item: Dictionary defining the path to the file
https_config: Configuration defining the URL of the server and the name of the dataset
"""
url = f"{https_config['base_url']}{item['path']}{item['name']}"

# build destination path for data file
Expand All @@ -56,141 +73,5 @@ def download_file(item, https_config):
with open(destination, "wb") as f:
f.write(response.content)

# TODO (wardlt): Should we just return the key?
return {destination + " status": True}


# NOTE: deprecated with deprecation of Xtract Aug 2022; saved for posterity
# TODO: reassess if there's a better way than passing self in
def xtract_https_download(foundryObj, verbose=False, **kwargs):
source_id = foundryObj.mdf["source_id"]
xtract_base_url = kwargs.get("xtract_base_url")
# MDF Materials Data at NCSA
source_ep_id = kwargs.get("source_ep_id")
folder_to_crawl = kwargs.get("folder_to_crawl")

# This only matters if you want files grouped together.
grouper = kwargs.get("grouper")

auth_token = foundryObj.xtract_tokens["auth_token"]
transfer_token = foundryObj.xtract_tokens["transfer_token"]
funcx_token = foundryObj.xtract_tokens["funcx_token"]

headers = {
"Authorization": auth_token,
"Transfer": transfer_token,
"FuncX": funcx_token,
"Petrel": auth_token,
}
if verbose:
print(f"Headers: {headers}")

# Initialize the crawl. This kicks off the Globus EP crawling service on the backend.
crawl_url = f"{xtract_base_url}/crawl"
if verbose:
print(f"Crawl URL is : {crawl_url}")

first_ep_dict = {
"repo_type": "GLOBUS",
"eid": source_ep_id,
"dir_paths": [folder_to_crawl],
"grouper": grouper,
}
tokens = {"Transfer": transfer_token, "FuncX": funcx_token, "Authorization": auth_token}
crawl_req = requests.post(
f"{xtract_base_url}/crawl",
json={"endpoints": [first_ep_dict], "tokens": tokens},
)

if verbose:
print("Crawl response:", crawl_req)
crawl_id = json.loads(crawl_req.content)["crawl_id"]
if verbose:
print(f"Crawl ID: {crawl_id}")

# Wait for the crawl to finish before we can start fetching our metadata.
while True:
crawl_status = requests.get(
f"{xtract_base_url}/get_crawl_status", json={"crawl_id": crawl_id}
)
if verbose:
print(crawl_status)
crawl_content = json.loads(crawl_status.content)
if verbose:
print(f"Crawl Status: {crawl_content}")

if crawl_content["crawl_status"] == "complete":
files_crawled = crawl_content["files_crawled"]
if verbose:
print("Our crawl has succeeded!")
break
else:
if verbose:
print("Sleeping before re-polling...")
time.sleep(2)

# Now we fetch our metadata. Here you can configure n to be maximum number of
# messages you want at once.

file_ls = []
fetched_files = 0
while fetched_files < files_crawled:
fetch_mdata = requests.get(
f"{xtract_base_url}/fetch_crawl_mdata",
json={"crawl_id": crawl_id, "n": 2},
)
fetch_content = json.loads(fetch_mdata.content)

for file_path in fetch_content["file_ls"]:
file_ls.append(file_path)
fetched_files += 1

if fetch_content["queue_empty"]:
if verbose:
print("Queue is empty! Continuing...")
time.sleep(2)

source_path = os.path.join(
foundryObj.config.local_cache_dir, foundryObj.mdf["source_id"]
)

if not os.path.exists(foundryObj.config.local_cache_dir):
os.mkdir(foundryObj.config.local_cache_dir)
os.mkdir(source_path)

elif not os.path.exists(source_path):
os.mkdir(source_path)

num_cores = multiprocessing.cpu_count()

@delayed
def download_file(file):
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

url = "https://data.materialsdatafacility.org" + file["path"]

# removes data source (eg MDF) parent directories, leaving the split path only
datasplit_subpath = file["path"].split(source_id + "/")[-1]

# build destination path for data file
destination = os.path.join("data/", source_id, datasplit_subpath)

parent_path = os.path.split(destination)[0]

# if parent directories don't exist, create them
if not os.path.exists(parent_path):
os.makedirs(parent_path)

response = requests.get(url, verify=False)

# write file to local destination
with open(destination, "wb") as f:
f.write(response.content)

return {file["path"] + " status": True}

results = Parallel(n_jobs=num_cores)(
download_file(file) for file in file_ls
)

print("Done curling.")
print(results)
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ mdf_forge>=0.8.0
mdf-connect-client>=0.4.0
json2table>=1.1.5
torch>=1.8.0
tensorflow>=2
tensorflow>=2
tqdm>=4.64
4 changes: 4 additions & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pytest>=7
pytest-cov>=2.12
flake8
jsonschema

0 comments on commit 83fcee2

Please sign in to comment.