diff --git a/.gitignore b/.gitignore index ed6220d..237039a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,9 @@ .prod.env .test.env +# Certificates +certs/ + # IDE .idea/ diff --git a/Dockerfile b/Dockerfile index 22a65e2..a7b8d96 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,16 @@ FROM python:3.13.7-trixie +# Install CA infrastructure +RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* + +# Add AIoD cert and build custom bundle +RUN mkdir -p /certs +COPY certs/aiod-insight-centre.crt /certs/aiod-insight-centre.crt +RUN cat /etc/ssl/certs/ca-certificates.crt /certs/aiod-insight-centre.crt > /certs/custom-ca-bundle.crt + +# Make Python requests use the custom CA bundle +ENV REQUESTS_CA_BUNDLE=/certs/custom-ca-bundle.crt + RUN useradd -m appuser USER appuser WORKDIR /home/appuser diff --git a/README.md b/README.md index 8a03401..4512f87 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ -# AI-on-Demand Hugging Face connector -Collects dataset metadata from [Hugging Face](https://huggingface.co) and uploads it to AI-on-Demand. +# AI-on-Demand OpenML connector +Collects dataset metadata from [OpenML](https://www.openml.org) and uploads it to AI-on-Demand. This package is not intended to be used directly by others, but may serve as an example of how to build a connector for the AI-on-Demand platform. For more information on how to test this connector locally as a springboard for developing your own connector, reference the [Development](#Development) section below. @@ -27,7 +27,7 @@ the default configuration can be found in the [`/script/config.prod.toml`](/scri You will also need to have the 'Client Secret' for the client, which can be obtained from the keycloak administrator. The client secret must be provided to the Docker container as an environment variable or in a dotenv file *similar to* [`script/.local.env`](script/.local.env) but named `script/.prod.env`. -Please contact the Keycloak service maintainer to obtain said credentials you need if you are in charge of deploying this Hugging Face connector. +Please contact the Keycloak service maintainer to obtain said credentials you need if you are in charge of deploying this OpenML connector. ## Running the Connector You will need to mount the aiondemand configuration to `/home/appuser/.aiod/config.toml` and provide environment variables directly with `-e` or through mounting the dotfile in `/home/appuser/.aiod/openml/.env`. The [`script/run.sh`](script/run.sh) script provides a convenience that automatically does this. @@ -36,8 +36,8 @@ Any following arguments are interpreted as arguments to the main script. For the latest commandline arguments, use `docker run aiondemand/openml-connector --help`. Some example invocations that use the `script/run.sh` script: - - `script/run.sh local --mode id --value 61 --app-log-level debug` syncs one specific openml dataset, and produces debug logs for the connector only. - - `script/run.sh test --mode since --value 100 --root-log-level debug` syncs all datasets with identifier 100 or greater (in ascending order). + - `script/run.sh local --mode id --value 61 --app-log-level debug` syncs one specific OpenML dataset, and produces debug logs for the connector only. + - `script/run.sh test --mode since --value 100 --root-log-level debug` syncs all datasets with identifier `>= 100` (in ascending identifier order). - `script/run.sh prod --mode all --root-log-level info` indexes all datasets on OpenML, producing info logs for the connector and all its dependencies (this is the default). ## Development @@ -45,8 +45,8 @@ You can test the connector when running the [metadata catalogue](https://github. The default configurations for this setup can be found in the [`.local.env`](script/.local.env) and [`config.local.toml`](script/config.local.toml) files. When connecting to the AI-on-Demand test or production server, you will need to a dedicated client registered in the keycloak instance which is connected to the REST API you want to upload data to. -See [this form]() to apply for a client. The client will need to have a `platform_X` role attached, where `X` is the name of the platform from which you register assets. +See [this form]() to apply for a client. The client will need to have a `platform_X` role attached, where `X` is the name of the platform from which you register assets. For this connector that will typically be `platform_openml`. When a client is created, you will need its 'Client ID' and 'Client Secret' and update the relevant configuration and environment files accordingly. ## Disclaimer -This project is not affiliated with OpenML in any way. +This project is not affiliated with OpenML in any way. \ No newline at end of file diff --git a/script/config.test.toml b/script/config.test.toml index 2a09c04..c2fe2f0 100644 --- a/script/config.test.toml +++ b/script/config.test.toml @@ -1,3 +1,3 @@ -api_server = 'https://test.openml.org/aiod/' +api_server = 'https://aiod.insight-centre.org/' auth_server = 'https://ai4europe.test.fedcloud.eu/aiod-auth/' -client_id = 'platform_openml' +client_id = 'platform_openml' \ No newline at end of file diff --git a/src/connector.py b/src/connector.py index 0cb9c73..8d731a9 100644 --- a/src/connector.py +++ b/src/connector.py @@ -14,6 +14,7 @@ from aiod.authentication import set_token, Token from dotenv import load_dotenv import requests +from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError class Modes(StrEnum): @@ -42,30 +43,52 @@ class ServerError(Exception): def list_datasets(from_: int | None = None): def paginate_all_datasets(items_per_page: int = 50): - url_data = f"https://www.openml.org/api/v1/json/data/list/limit/{items_per_page}/offset/{{offset}}" + url_template = ( + "https://www.openml.org/api/v1/json/data/list/limit/" + f"{items_per_page}/offset/{{offset}}" + ) for offset in range(0, 1_000_000, items_per_page): - response = requests.get(url_data.format(offset=offset), timeout=REQUEST_TIMEOUT) + url = url_template.format(offset=offset) + response = requests.get(url, timeout=REQUEST_TIMEOUT) + if not response.ok: status_code = response.status_code + if status_code == 412: + logger.warning( + "OpenML returned 412 (database connection error) for %s; " + "skipping this page and continuing with the next offset.", + url, + ) + continue + try: msg = response.json()["error"]["message"] except Exception as e: - logger.error("Error while paginating, cannot continue.") + logger.error("Error while paginating datasets, cannot continue.") logger.exception(e) msg = response.content - err_msg = f"Error while fetching {url_data} from OpenML: ({status_code}) {msg}" + err_msg = ( + f"Error while fetching {url} from OpenML: " + f"({status_code}) {msg}" + ) raise ServerError(err_msg) + logger.debug(f"Paging through datasets (offset {offset})") try: dataset_summaries = response.json()["data"]["dataset"] if dataset_summaries: yield from dataset_summaries - logger.debug(f"Paged through datasets (total {len(dataset_summaries)})") + logger.debug( + f"Paged through datasets (total {len(dataset_summaries)})" + ) else: break - except Exception: - raise ParsingError(f"Could not parse response ({response.status_code}): {response.content}", exc_info=True) + except Exception as e: + raise ParsingError( + f"Could not parse response ({response.status_code}): " + f"{response.content}" + ) from e from_ = from_ or 0 for dataset in paginate_all_datasets(): @@ -78,48 +101,75 @@ def paginate_all_datasets(items_per_page: int = 50): continue try: - yield fetch_openml_dataset(identifier, dataset["quality"]) + # dataset["quality"] is the pre-fetched qualities summary, if present. + yield fetch_openml_dataset(identifier, dataset.get("quality")) except Exception as e: logger.error(f"Exception when processing dataset {identifier}") logger.exception(e) def fetch_openml_dataset(identifier_: int, qualities: dict | None = None): + # Only hit /data/qualities if we don’t already have them from the list call. if not qualities: - qualities_url = f"https://www.openml.org/api/v1/json/data/qualities/{identifier_}" + qualities_url = ( + f"https://www.openml.org/api/v1/json/data/qualities/{identifier_}" + ) qualities_response = requests.get( qualities_url, timeout=REQUEST_TIMEOUT, ) + if not qualities_response.ok: status_code = qualities_response.status_code + if status_code == 412: + logger.warning( + "OpenML returned 412 for qualities of dataset %s; " + "continuing without qualities.", + identifier_, + ) + qualities = [] + else: + try: + msg = qualities_response.json()["error"]["message"] + except Exception as e: + logger.exception(e) + msg = qualities_response.content + err_msg = ( + f"Error while fetching {qualities_url} from OpenML: " + f"({status_code}) {msg}" + ) + raise ServerError(err_msg) + else: try: - msg = qualities_response.json()["error"]["message"] + qualities = qualities_response.json()["data_qualities"]["quality"] except Exception as e: - logger.exception(e) - msg = qualities_response.content - err_msg = f"Error while fetching {qualities_url} from OpenML: ({status_code}) {msg}" - raise ServerError(err_msg) - try: - qualities = qualities_response.json()["data_qualities"]["quality"] - except: - raise ParsingError(f"Error parsing JSON of qualities of dataset {qualities_response.content}") + raise ParsingError( + f"Error parsing JSON of qualities of dataset " + f"{qualities_response.content}" + ) from e url_data = f"https://www.openml.org/api/v1/json/data/{identifier_}" response = requests.get(url_data, timeout=REQUEST_TIMEOUT) if not response.ok: status_code = response.status_code - msg = response.json()["error"]["message"] + try: + msg = response.json()["error"]["message"] + except Exception: + msg = response.content err_msg = f"Error while fetching {url_data} from OpenML: ({status_code}) {msg}" raise ServerError(err_msg) try: dataset_json = response.json()["data_set_description"] - qualities_json = {quality["name"]: quality["value"] for quality in qualities} + qualities_json = { + quality["name"]: quality["value"] for quality in (qualities or []) + } return dataset_json | {"qualities": qualities_json} - except Exception: - raise ParsingError(f"Error parsing JSON of dataset {response.content}") + except Exception as e: + raise ParsingError( + f"Error parsing JSON of dataset {response.content}" + ) from e def _convert_dataset_to_aiod(dataset: dict) -> dict: @@ -129,11 +179,15 @@ def _convert_dataset_to_aiod(dataset: dict) -> dict: if isinstance(description, list) and len(description) == 0: description = "" if not isinstance(description, str): - logger.warning(f"Ignoring description {description} of dataset {identifier}.") + logger.warning( + f"Ignoring description {description} of dataset {identifier}." + ) description = "" if len(description) > MAX_DESCRIPTION_LENGTH: text_break = " [...]" - description = description[: MAX_DESCRIPTION_LENGTH - len(text_break)] + text_break + description = ( + description[: MAX_DESCRIPTION_LENGTH - len(text_break)] + text_break + ) size = None if n_rows := dataset["qualities"].get("NumberOfInstances"): @@ -152,9 +206,13 @@ def _convert_dataset_to_aiod(dataset: dict) -> dict: version=dataset["version"], same_as=f"https://www.openml.org/api/v1/json/data/{identifier}", description=dict(plain=description), - date_published=dateutil.parser.parse(dataset["upload_date"]).isoformat(), + date_published=dateutil.parser.parse( + dataset["upload_date"] + ).isoformat(), license=dataset.get("licence"), - distribution=[dict(content_url=dataset["url"], encoding_format=dataset["format"])], + distribution=[ + dict(content_url=dataset["url"], encoding_format=dataset["format"]) + ], is_accessible_for_free=True, keyword=keyword, size=size, @@ -172,29 +230,51 @@ def upsert_dataset(dataset: dict) -> int: platform_identifier=identifier, data_format="json", ) - except KeyError: + except (KeyError, RequestsJSONDecodeError) as e: + logger.debug( + "No existing AI-on-Demand dataset for OpenML id %s " + "or response was non-JSON (%s). Registering new asset.", + identifier, + e, + ) response = aiod.datasets.register(metadata=local_dataset) if isinstance(response, str): logger.debug(f"Indexed dataset {identifier}: {response}") return HTTPStatus.OK elif isinstance(response, requests.Response): - logger.warning(f"Error uploading dataset ({response.status_code}: {response.content}") - breakpoint() + logger.warning( + "Error uploading dataset %s (%s): %s", + identifier, + response.status_code, + response.content, + ) return response.status_code raise if "identifier" not in aiod_dataset: raise RuntimeError( - f"Unexpected server response retrieving OpenML dataset {identifier} " - f"from AI-on-Demand: {aiod_dataset}" + "Unexpected server response retrieving OpenML dataset " + f"{identifier} from AI-on-Demand: {aiod_dataset}" ) - response = aiod.datasets.replace(identifier=aiod_dataset['identifier'], metadata=local_dataset) + response = aiod.datasets.replace( + identifier=aiod_dataset["identifier"], + metadata=local_dataset, + ) if response.status_code == HTTPStatus.OK: - logger.debug(f"Updated dataset {identifier}: {aiod_dataset['identifier']}") + logger.debug( + "Updated dataset %s: %s", + identifier, + aiod_dataset["identifier"], + ) else: - logger.warning(f"Could not update {aiod_dataset['identifier']} for openml dataset {identifier} " - f"({response.status_code}): {response.content}") + logger.warning( + "Could not update %s for OpenML dataset %s (%s): %s", + aiod_dataset["identifier"], + identifier, + response.status_code, + response.content, + ) except Exception as e: logger.exception( msg=f"Exception encountered when upserting dataset {identifier}.", @@ -225,20 +305,23 @@ def parse_args(): "set to 'auto', in which case the last inserted dataset on AI-on-Demand " "will be determined and only datasets uploaded after that one are indexed. " "Cannot be set with mode 'ALL'." - ) + ), ) log_levels = [level.lower() for level in logging.getLevelNamesMapping()] parser.add_argument( "--app-log-level", choices=log_levels, - default='info', - help="Emit all log messages generated of at least this level by the app." + default="info", + help="Emit all log messages generated of at least this level by the app.", ) parser.add_argument( - '--root-log-level', + "--root-log-level", choices=log_levels, - default='error', - help="Emit all log messages generated of at least this level by the app's dependencies." + default="error", + help=( + "Emit all log messages generated of at least this level " + "by the app's dependencies." + ), ) args = parser.parse_args() if args.mode == Modes.ALL and args.value: @@ -255,18 +338,20 @@ def configure_connector(): if dot_file.exists() and load_dotenv(dot_file): logger.info(f"Loaded variables from {dot_file}") else: - reason = "unknown reason" if dot_file else "file does not exist" + reason = "file does not exist" if not dot_file else "unknown reason" logger.info(f"No environment variables loaded from {dot_file}: {reason}.") BATCH_SIZE = os.getenv("AIOD_BATCH_SIZE", 25) PLATFORM_NAME = os.getenv("PLATFORM_NAME", PLATFORM_NAME) PER_DATASET_DELAY = float(delay) if (delay := os.getenv("PER_DATASET_DELAY")) else None - STOP_ON_UNEXPECTED_ERROR = os.getenv("STOP_ON_UNEXPECTED_ERROR", STOP_ON_UNEXPECTED_ERROR) + STOP_ON_UNEXPECTED_ERROR = os.getenv( + "STOP_ON_UNEXPECTED_ERROR", STOP_ON_UNEXPECTED_ERROR + ) token = os.getenv("CLIENT_SECRET") assert token, "CLIENT_SECRET environment variable not set" - masked_token = '*' * (len(token) + 4) + token[-4:] + masked_token = "*" * (len(token) + 4) + token[-4:] logger.info(f"{'aiondemand version:':25} {aiod.version}") logger.info(f"{'STOP_ON_UNEXPECTED_ERROR:':25} {STOP_ON_UNEXPECTED_ERROR}") logger.info(f"{'PER_DATASET_DELAY:':25} {PER_DATASET_DELAY}") @@ -277,17 +362,29 @@ def configure_connector(): logger.info(f"{'Using secret:':25} {masked_token}") set_token(Token(client_secret=token)) - user = aiod.get_current_user() - - required_role = f"platform_{PLATFORM_NAME}" - wrong_platform_msg = ( - f"Client roles {user.roles} do not include required {required_role!r} role." - "Please make sure the `PLATFORM_NAME` environment variable is configured correctly, " - "or contact your Keycloak administrator." - ) - assert required_role in user.roles, wrong_platform_msg - logger.info("Successfully authenticated and connected to AI-on-Demand.") + try: + user = aiod.get_current_user() + except Exception as e: + logger.warning( + "Could not verify AI-on-Demand connectivity via authorization_test (%s). " + "Continuing with configured client credentials; if subsequent calls fail " + "with 401/403, please check AIoD/Keycloak configuration.", + e, + ) + else: + roles = getattr(user, "roles", []) or [] + required_role = f"platform_{PLATFORM_NAME}" + if required_role not in roles: + logger.warning( + "Client roles %s do not include required %r role. " + "Please make sure the `PLATFORM_NAME` environment variable is " + "configured correctly, or contact your Keycloak administrator.", + roles, + required_role, + ) + else: + logger.info("Successfully authenticated and connected to AI-on-Demand.") def get_newest_indexed_dataset() -> str: @@ -295,12 +392,25 @@ def get_newest_indexed_dataset() -> str: last_dataset = 0 batch_size = 100 for offset in range(0, 1_000_000, batch_size): - openml_datasets = aiod.datasets.get_list(platform=PLATFORM_NAME, data_format="json", offset=offset, limit=batch_size) + openml_datasets = aiod.datasets.get_list( + platform=PLATFORM_NAME, + data_format="json", + offset=offset, + limit=batch_size, + ) if not openml_datasets: break - last_dataset = max(int(d["platform_resource_identifier"]) for d in openml_datasets) - logger.info(f"Found dataset {last_dataset} was already indexed on AI-on-Demand.") - logger.info(f"Dataset {last_dataset} is the last dataset indexed on AI-on-Demand.") + last_dataset = max( + int(d["platform_resource_identifier"]) for d in openml_datasets + ) + logger.info( + "Found dataset %s was already indexed on AI-on-Demand.", + last_dataset, + ) + logger.info( + "Dataset %s is the last dataset indexed on AI-on-Demand.", + last_dataset, + ) return str(last_dataset) @@ -315,28 +425,36 @@ def main(): match (args.mode, args.value): case Modes.ID, id_: if not id_.isdigit(): - logger.error(f"Identifier specified should be an integer, is {id_!r}") + logger.error( + f"Identifier specified should be an integer, is {id_!r}" + ) quit(1) dataset = fetch_openml_dataset(int(id_)) upsert_dataset(dataset) case Modes.SINCE, id_: - if id_ == 'auto': + if id_ == "auto": id_ = get_newest_indexed_dataset() if not id_.isdigit(): - logger.error(f"Identifier specified should be an integer, is {id_!r}") + logger.error( + f"Identifier specified should be an integer, is {id_!r}" + ) quit(1) for dataset in list_datasets(from_=int(id_)): try: upsert_dataset(dataset) errors.append(None) except Exception as e: - logger.error(f"Unrecoverable error upserting dataset {dataset}") + logger.error( + f"Unrecoverable error upserting dataset {dataset}" + ) logger.exception(e) errors.append(e) if len(errors) > 10: - errors.pop() + errors.pop(0) if sum(e is not None for e in errors) > 5: - logger.error("Quiting because we are encountering too many errors") + logger.error( + "Quitting because we are encountering too many errors" + ) quit(1) if PER_DATASET_DELAY: time.sleep(PER_DATASET_DELAY) @@ -346,8 +464,10 @@ def main(): if PER_DATASET_DELAY: time.sleep(PER_DATASET_DELAY) case _: - raise NotImplemented(f"Unexpected arguments: {args}") + raise NotImplemented( + f"Unexpected arguments: {args}" + ) -if __name__ == '__main__': - main() +if __name__ == "__main__": + main() \ No newline at end of file