Skip to content

Commit

Permalink
Move data generation code to notebook
Browse files Browse the repository at this point in the history
  • Loading branch information
troyraen committed Jan 25, 2025
1 parent 394b203 commit 347357f
Show file tree
Hide file tree
Showing 11 changed files with 260 additions and 207 deletions.
Binary file not shown.
Binary file not shown.
Binary file modified tests/data/bad_schemas/dataset/_common_metadata
Binary file not shown.
Binary file modified tests/data/bad_schemas/dataset/_metadata
Binary file not shown.
250 changes: 248 additions & 2 deletions tests/data/generate_data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
"import hats_import.pipeline as runner\n",
"from hats_import.catalog.arguments import ImportArguments\n",
"import tempfile\n",
"import shutil\n",
"from pathlib import Path\n",
"import pyarrow as pa\n",
"import pyarrow.dataset as pds\n",
"import pyarrow.parquet as pq\n",
"from dask.distributed import Client\n",
"\n",
"tmp_path = tempfile.TemporaryDirectory()\n",
Expand Down Expand Up @@ -119,11 +123,253 @@
"client.close()\n",
"tmp_path.cleanup()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Malformed Catalogs: bad_schemas and wrong_files_and_rows\n",
"\n",
"These datasets are designed to fail verification tests.\n",
"They are generated by mangling `small_sky_object_catalog`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Load the input data that will be used to generate the malformed catalogs.\n",
"input_dataset_path = Path(hats_import_dir) / \"small_sky_object_catalog\" / \"dataset\"\n",
"input_ds = pds.parquet_dataset(input_dataset_path / \"_metadata\")\n",
"\n",
"# Unit tests expect the Npix=11 data file\n",
"input_frag = next(frag for frag in input_ds.get_fragments() if frag.path.endswith(\"Npix=11.parquet\"))\n",
"frag_key = Path(input_frag.path).relative_to(input_dataset_path)\n",
"input_tbl = input_frag.to_table()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def collect_and_write_metadata(output_dataset_path: Path, schema: pa.Schema | None = None) -> None:\n",
" schema = schema or input_tbl.schema\n",
" dataset = pds.dataset(output_dataset_path)\n",
" metadata_collector = []\n",
" for frag in dataset.get_fragments():\n",
" frag.ensure_complete_metadata()\n",
" frag.metadata.set_file_path(str(Path(frag.path).relative_to(output_dataset_path)))\n",
" metadata_collector.append(frag.metadata)\n",
" pq.write_metadata(\n",
" schema=schema, where=output_dataset_path / \"_metadata\", metadata_collector=metadata_collector\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### bad_schemas\n",
"\n",
"This dataset is designed to fail all schema verification tests.\n",
"\n",
"```\n",
"bad_schemas/\n",
"|- dataset/\n",
" |- _common_metadata.import_truth # mimics schema provided by user upon import\n",
" |- _common_metadata # wrong types\n",
" |- _metadata # wrong file-level metadata\n",
" |- Norder=0/Dir=0/\n",
" |- Npix=11.parquet # direct copy of input\n",
" |- Npix=11.extra_column.parquet # extra column\n",
" |- Npix=11.missing_column.parquet # missing column\n",
" |- Npix=11.wrong_dtypes.parquet # wrong types\n",
" |- Npix=11.wrong_metadata.parquet # wrong metadata\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"output_dataset_path = Path(\".\") / \"bad_schemas\" / \"dataset\"\n",
"\n",
"# Existing files may result in unexpected metadata output.\n",
"if output_dataset_path.parent.exists() and any(output_dataset_path.parent.iterdir()):\n",
" raise FileExistsError(\"bad_schemas directory exists and is not empty. Remove it and try again.\")\n",
"\n",
"# We will create the following files using input_frag\n",
"ffrag_out = output_dataset_path / frag_key\n",
"fextra_col = ffrag_out.with_suffix(\".extra_column.parquet\")\n",
"fmissing_col = ffrag_out.with_suffix(\".missing_column.parquet\")\n",
"fwrong_types = ffrag_out.with_suffix(\".wrong_dtypes.parquet\")\n",
"fwrong_metadata = ffrag_out.with_suffix(\".wrong_metadata.parquet\")\n",
"\n",
"ffrag_out.parent.mkdir(parents=True, exist_ok=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Make a direct copy of input_frag for all files that will be recorded in the _metadata file\n",
"for file_out in [ffrag_out, fmissing_col, fextra_col, fwrong_types]:\n",
" shutil.copy(input_frag.path, file_out)\n",
"\n",
"# Write a _metadata that has the correct schema except for file-level metadata\n",
"metadata = input_tbl.schema.metadata or {}\n",
"metadata.update({b\"extra key\": b\"extra value\"})\n",
"collect_and_write_metadata(output_dataset_path, schema=input_tbl.schema.with_metadata(metadata))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Write new data files using incorrect schemas.\n",
"\n",
"# Drop a column\n",
"pq.write_table(input_tbl.drop_columns(\"dec_error\"), fmissing_col)\n",
"\n",
"# Add an extra column\n",
"extra_col = pa.array(range(len(input_tbl)))\n",
"extra_col_tbl = input_tbl.add_column(5, pa.field(\"extra\", pa.int64()), extra_col)\n",
"pq.write_table(extra_col_tbl, fextra_col)\n",
"\n",
"# Mangle file-level metadata\n",
"wrong_metadata = {\"bad key\": \"bad value\"}\n",
"pq.write_table(input_tbl.replace_schema_metadata(wrong_metadata), fwrong_metadata)\n",
"\n",
"# Change some types\n",
"wrong_dtypes_fields = [\n",
" fld if not fld.name.startswith(\"ra\") else fld.with_type(pa.float16()) for fld in input_tbl.schema\n",
"]\n",
"wrong_dtypes_schema = pa.schema(wrong_dtypes_fields, metadata=input_tbl.schema.metadata)\n",
"pq.write_table(input_tbl.cast(wrong_dtypes_schema), fwrong_types)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Write a _common_metadata with the wrong dtypes.\n",
"pq.write_metadata(schema=wrong_dtypes_schema, where=output_dataset_path / \"_common_metadata\")\n",
"\n",
"# Write a _common_metadata with the correct schema but no hats columns.\n",
"# This mimics a schema that could have been passed as 'use_schema_file' upon import.\n",
"fimport_schema = (output_dataset_path / \"_common_metadata\").with_suffix(\".import_truth\")\n",
"hats_cols = [\"_healpix_29\", \"Norder\", \"Dir\", \"Npix\"]\n",
"import_schema = pa.schema([fld for fld in input_tbl.schema if fld.name not in hats_cols])\n",
"pq.write_metadata(schema=import_schema, where=fimport_schema)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### wrong_files_and_rows\n",
"\n",
"This dataset is designed to fail the following verification tests:\n",
"\n",
"- Files listed in metadata match files on disk.\n",
"- Row counts in metadata match row counts on disk and (if provided) user-supplied truth.\n",
"- `hats.io.validation.is_valid_catalog`\n",
"\n",
"```\n",
"wrong_files_and_rows/\n",
"|- properties # direct copy of input\n",
"|- dataset/\n",
" |- _common_metadata # direct copy of input\n",
" |- _metadata # missing file 'Npix=11.extra_file.parquet'\n",
" |- Norder=0/Dir=0/\n",
" |- Npix=11.parquet # direct copy of input\n",
" |- Npix=11.extra_file.parquet # added after _metadata generated\n",
" |- Npix=11.extra_rows.parquet # rows appended after _metadata generated\n",
" |- (Npix=11.missing_file.parquet) # dropped after _metadata generated\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"output_dataset_path = Path(\".\") / \"wrong_files_and_rows\" / \"dataset\"\n",
"\n",
"# Existing files may result in unexpected metadata output.\n",
"if output_dataset_path.parent.exists() and any(output_dataset_path.parent.iterdir()):\n",
" raise FileExistsError(\"wrong_files_and_rows directory exists and is not empty. Remove it and try again.\")\n",
"\n",
"# We will create the following files using input_frag\n",
"ffrag_out = output_dataset_path / frag_key\n",
"fmissing_file = ffrag_out.with_suffix(\".missing_file.parquet\")\n",
"fextra_file = ffrag_out.with_suffix(\".extra_file.parquet\")\n",
"fextra_rows = ffrag_out.with_suffix(\".extra_rows.parquet\")\n",
"\n",
"ffrag_out.parent.mkdir(parents=True, exist_ok=True)\n",
"\n",
"# Copy metadata files that we will not alter\n",
"shutil.copy(input_dataset_path.parent / \"properties\", output_dataset_path.parent / \"properties\")\n",
"shutil.copy(input_dataset_path / \"_common_metadata\", output_dataset_path / \"_common_metadata\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Make a direct copy of input_frag for all files that will be recorded in the _metadata file\n",
"for file_out in [ffrag_out, fmissing_file, fextra_rows]:\n",
" shutil.copy(input_frag.path, file_out)\n",
"\n",
"# Write _metadata\n",
"collect_and_write_metadata(output_dataset_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Mangle the dataset.\n",
"\n",
"# Add a file\n",
"shutil.copy(input_frag.path, fextra_file)\n",
"\n",
"# Remove a file\n",
"fmissing_file.unlink()\n",
"\n",
"# Add rows to an existing file\n",
"new_tbl = pa.concat_tables([input_tbl, input_tbl.take([1, 2, 3, 4])])\n",
"pq.write_table(new_tbl, fextra_rows)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "env",
"display_name": "hats312",
"language": "python",
"name": "python3"
},
Expand All @@ -137,7 +383,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
"version": "3.12.7"
}
},
"nbformat": 4,
Expand Down
Binary file not shown.
Binary file not shown.
Binary file modified tests/data/wrong_files_and_rows/dataset/_metadata
Binary file not shown.
2 changes: 0 additions & 2 deletions tests/data/wrong_files_and_rows/partition_info.csv

This file was deleted.

18 changes: 12 additions & 6 deletions tests/data/wrong_files_and_rows/properties
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
#HATS catalog
obs_collection=wrong_files_and_rows
obs_collection=small_sky_object_catalog
dataproduct_type=object
hats_nrows=600
hats_col_ra=source_ra
hats_col_dec=source_dec
hats_order=2

hats_nrows=131
hats_col_ra=ra
hats_col_dec=dec
hats_max_rows=1000000
hats_order=0
moc_sky_fraction=0.08333
hats_builder=hats-import v0.4.2.dev1+g57aaa9d
hats_creation_date=2024-11-07T15\:20UTC
hats_estsize=113
hats_release_date=2024-09-18
hats_version=v0.1
Loading

0 comments on commit 347357f

Please sign in to comment.