|
21 | 21 | # along with this program. If not, see <https://www.gnu.org/licenses/>. |
22 | 22 |
|
23 | 23 |
|
24 | | -"""Curate template_coadd datasets for LSSTCam with a new TAGGED collection. |
| 24 | +"""Curate template_coadd datasets for LSSTCam. |
25 | 25 |
|
26 | | -This script copies a selected set of template_coadd datasets to a new location |
27 | | -at a butler repository, and tag them to a new collection. |
28 | | -
|
29 | | -The input collections are ideally collections with only vetted templates. |
30 | | -
|
31 | | -The scripts leaves a ecsv file used by butler ingest-files. |
32 | | -
|
33 | | -This script provides a short-term workaround to store duplicated templates |
34 | | -with a special prefix, before DM-50699 provides a better mechanism. |
| 26 | +This script performs a curation cut on a set of template_coadd datasets and |
| 27 | +prepares them for manual vetting. |
35 | 28 | """ |
36 | 29 |
|
37 | 30 | import argparse |
38 | 31 | import logging |
| 32 | +import numpy as np |
| 33 | +import os |
39 | 34 | import sys |
40 | 35 |
|
41 | 36 | from astropy.table import Table |
42 | 37 |
|
43 | 38 | from lsst.daf.butler import Butler, CollectionType |
44 | | -from lsst.daf.butler.script import ingest_files |
45 | 39 |
|
46 | 40 |
|
47 | 41 | def _make_parser(): |
48 | 42 | parser = argparse.ArgumentParser() |
49 | 43 | parser.add_argument( |
50 | 44 | "repo", |
51 | | - help="An existing data repository.", |
| 45 | + help="An existing data repository containing the input collections.", |
| 46 | + ) |
| 47 | + parser.add_argument( |
| 48 | + "tag", |
| 49 | + help="A Jira ticket number for the new template collection name.", |
52 | 50 | ) |
53 | 51 | parser.add_argument( |
54 | | - "collections", |
55 | | - help="The input collections to search for template_coadd datasets.", |
| 52 | + "release_num", |
| 53 | + help="The release number (##) for the given tag.", |
| 54 | + ) |
| 55 | + parser.add_argument( |
| 56 | + "--collections", |
| 57 | + action="extend", |
| 58 | + nargs="+", |
| 59 | + help="The input collections to search for template_coadd and coadd_depth_table datasets.", |
56 | 60 | ) |
57 | 61 | parser.add_argument( |
58 | 62 | "--where", |
59 | 63 | default="", |
60 | 64 | help="A string expression to select datasets in the input collections.", |
61 | 65 | ) |
62 | 66 | parser.add_argument( |
63 | | - "tag", |
64 | | - help="A Jira ticket number for the new template collection name.", |
| 67 | + "--records_path", |
| 68 | + required=False, |
| 69 | + default="", |
| 70 | + help="An absolute filepath to save records to.", |
65 | 71 | ) |
66 | 72 | parser.add_argument( |
67 | | - "--records", |
| 73 | + "--stat_records", |
68 | 74 | required=False, |
69 | | - default="records.ecsv", |
70 | | - help="An output table file with records of selected template files." |
71 | | - " The file can be used by butler ingest-files.", |
| 75 | + help="An output table file with accepted/rejected stats on templates that pass" |
| 76 | + " curation. Default is release_{release_num}_stat_records.csv.", |
| 77 | + ) |
| 78 | + parser.add_argument( |
| 79 | + "--filter_by", |
| 80 | + required=False, |
| 81 | + default="depth_above_threshold_3", |
| 82 | + help="The coadd_depth_table column to use for filtering." |
| 83 | + " Default is depth_above_threshold_3.", |
| 84 | + ) |
| 85 | + parser.add_argument( |
| 86 | + "--cutoff", |
| 87 | + required=False, |
| 88 | + default=95, |
| 89 | + help="The curation process will filter out anything below this cutoff." |
| 90 | + " Default is 95.", |
72 | 91 | ) |
73 | 92 | return parser |
74 | 93 |
|
75 | 94 |
|
| 95 | +def get_tracts(butler, where): |
| 96 | + tracts = [] |
| 97 | + coadd_depth_tables = butler.registry.queryDatasets(datasetType='coadd_depth_table', where=where) |
| 98 | + for item in coadd_depth_tables: |
| 99 | + tract = item.dataId['tract'] |
| 100 | + tracts.append(tract) |
| 101 | + tracts = set(tracts) |
| 102 | + return tracts |
| 103 | + |
| 104 | + |
| 105 | +def make_threshold_cuts(butler, template_coadds, n_images, tracts, filter_by, cutoff): |
| 106 | + accepted_drefs = [] |
| 107 | + accepted_n_image_refs = [] |
| 108 | + rejected_drefs = [] |
| 109 | + |
| 110 | + for tract in tracts: |
| 111 | + coadd_depth_table = butler.get('coadd_depth_table', tract=tract) |
| 112 | + mask = (coadd_depth_table[filter_by] > cutoff) |
| 113 | + |
| 114 | + accepted_coadds = coadd_depth_table[mask] |
| 115 | + for patch_band in accepted_coadds['patch', 'band']: |
| 116 | + patch = patch_band[0] |
| 117 | + band = patch_band[1] |
| 118 | + dref = [d for d in template_coadds |
| 119 | + if d.dataId['tract'] == tract |
| 120 | + and d.dataId['patch'] == patch |
| 121 | + and d.dataId['band'] == band |
| 122 | + ] |
| 123 | + n_image_dref = [d for d in n_images |
| 124 | + if d.dataId['tract'] == tract |
| 125 | + and d.dataId['patch'] == patch |
| 126 | + and d.dataId['band'] == band |
| 127 | + ] |
| 128 | + |
| 129 | + if len(dref) > 1: |
| 130 | + sorted_dupe_entry = sorted(dref, key=lambda ref: ref.run) |
| 131 | + ref = sorted_dupe_entry[-1] |
| 132 | + else: |
| 133 | + ref = dref[0] |
| 134 | + accepted_drefs.append(ref) |
| 135 | + |
| 136 | + if len(n_image_dref) > 1: |
| 137 | + sorted_dupe_entry = sorted(n_image_dref, key=lambda ref: ref.run) |
| 138 | + n_image_ref = sorted_dupe_entry[-1] |
| 139 | + else: |
| 140 | + n_image_ref = n_image_dref[0] |
| 141 | + accepted_n_image_refs.append(n_image_ref) |
| 142 | + |
| 143 | + rejected_coadds = coadd_depth_table[~mask] |
| 144 | + for patch_band in rejected_coadds['patch', 'band']: |
| 145 | + patch = patch_band[0] |
| 146 | + band = patch_band[1] |
| 147 | + dref = [d for d in template_coadds |
| 148 | + if d.dataId['tract'] == tract |
| 149 | + and d.dataId['patch'] == patch |
| 150 | + and d.dataId['band'] == band |
| 151 | + ] |
| 152 | + |
| 153 | + if len(dref) > 1: |
| 154 | + sorted_dupe_entry = sorted(dref, key=lambda ref: ref.run) |
| 155 | + ref = sorted_dupe_entry[-1] |
| 156 | + else: |
| 157 | + ref = dref[0] |
| 158 | + rejected_drefs.append(ref) |
| 159 | + return accepted_drefs, rejected_drefs, accepted_n_image_refs |
| 160 | + |
| 161 | + |
| 162 | +def run_stats(accepted_drefs, rejected_drefs, tracts, stats_records_file): |
| 163 | + # Create table of accepted drefs |
| 164 | + accepted = Table() |
| 165 | + accepted_tracts = [] |
| 166 | + accepted_patches = [] |
| 167 | + accepted_bands = [] |
| 168 | + bands = ['u', 'g', 'r', 'i', 'z', 'y'] |
| 169 | + |
| 170 | + for ref in accepted_drefs: |
| 171 | + accepted_tracts.append(ref.dataId['tract']) |
| 172 | + accepted_patches.append(ref.dataId['patch']) |
| 173 | + accepted_bands.append(ref.dataId['band']) |
| 174 | + |
| 175 | + accepted_table_data = [accepted_tracts, accepted_patches, accepted_bands] |
| 176 | + accepted = Table(data=accepted_table_data, names=['tract', 'patch', 'band']) |
| 177 | + |
| 178 | + # Create table of rejected drefs |
| 179 | + rejected = Table() |
| 180 | + rejected_tracts = [] |
| 181 | + rejected_patches = [] |
| 182 | + rejected_bands = [] |
| 183 | + |
| 184 | + for ref in rejected_drefs: |
| 185 | + rejected_tracts.append(ref.dataId['tract']) |
| 186 | + rejected_patches.append(ref.dataId['patch']) |
| 187 | + rejected_bands.append(ref.dataId['band']) |
| 188 | + |
| 189 | + rejected_table_data = [rejected_tracts, rejected_patches, rejected_bands] |
| 190 | + rejected = Table(data=rejected_table_data, names=['tract', 'patch', 'band']) |
| 191 | + |
| 192 | + # Run stats |
| 193 | + by_band_stats = [] |
| 194 | + for tract in tracts: |
| 195 | + tract_band_stats = [] |
| 196 | + for band in bands: |
| 197 | + accepted_bands = ((accepted['tract'] == tract) & (accepted['band'] == band)).sum() |
| 198 | + rejected_bands = ((rejected['tract'] == tract) & (rejected['band'] == band)).sum() |
| 199 | + total_bands = accepted_bands + rejected_bands |
| 200 | + if total_bands == 0: |
| 201 | + tract_band_stats.append(["0 / 0", np.nan]) |
| 202 | + else: |
| 203 | + tract_band_stats.append([f"{accepted_bands} / {total_bands}", |
| 204 | + accepted_bands / total_bands * 100]) |
| 205 | + by_band_stats.append(tract_band_stats) |
| 206 | + |
| 207 | + # Compile stats into a table and save |
| 208 | + accepted_col_names = [f"{band}_{suffix}" for band in bands for suffix |
| 209 | + in ("num_accepted", "percent_accepted")] |
| 210 | + by_tract_names = ['tract'] + accepted_col_names |
| 211 | + |
| 212 | + stat_table_data = {col: [] for col in by_tract_names} |
| 213 | + |
| 214 | + for tract_index, tract in enumerate(tracts): |
| 215 | + band_stats = by_band_stats[tract_index] |
| 216 | + |
| 217 | + stat_table_data['tract'].append(tract) |
| 218 | + |
| 219 | + for band_idx, band in enumerate(bands): |
| 220 | + accepted_str, percent = band_stats[band_idx] |
| 221 | + stat_table_data[f"{band}_num_accepted"].append(accepted_str) |
| 222 | + stat_table_data[f"{band}_percent_accepted"].append(percent) |
| 223 | + by_tract_stats = Table(stat_table_data) |
| 224 | + by_tract_stats.write(stats_records_file, format='ascii.csv', overwrite=True) |
| 225 | + |
| 226 | + |
76 | 227 | def main(): |
77 | 228 | logging.basicConfig(level=logging.INFO, stream=sys.stdout) |
78 | 229 |
|
| 230 | + # Hide spurious messages from numexpr by setting the numexpr env var. |
| 231 | + os.environ["NUMEXPR_MAX_THREADS"] = "8" |
| 232 | + |
79 | 233 | args = _make_parser().parse_args() |
80 | | - butler = Butler(args.repo, writeable=True) |
| 234 | + butler = Butler(args.repo, collections=args.collections) |
| 235 | + butler_write = Butler(args.repo, writeable=True) |
| 236 | + |
| 237 | + # Create directory for records |
| 238 | + directory = args.records_path |
| 239 | + if directory: |
| 240 | + if not os.path.exists(directory): |
| 241 | + os.makedirs(directory) |
| 242 | + |
| 243 | + # Set (stat_)records defaults based on release_num if not provided |
| 244 | + if args.stat_records is None: |
| 245 | + args.stat_records = f"release_{args.release_num}_stat_records.csv" |
81 | 246 |
|
82 | | - tagged_collection = "LSSTCam/templates/" + args.tag |
83 | | - registered = butler.collections.register( |
| 247 | + # Create tagged collection, abort if it already exists. |
| 248 | + tagged_collection = f"LSSTCam/templates/candidates/{args.tag}/release_{args.release_num}" |
| 249 | + logging.info(f"Creating tagged collection {tagged_collection}.") |
| 250 | + registered = butler_write.collections.register( |
84 | 251 | tagged_collection, type=CollectionType.TAGGED |
85 | 252 | ) |
86 | 253 | if not registered: |
87 | 254 | logging.error(f"Collection {tagged_collection} already exists. Aborting.") |
88 | 255 | sys.exit(1) |
89 | 256 |
|
90 | | - refs = butler.query_datasets( |
91 | | - "template_coadd", collections=args.collections, where=args.where, limit=None |
92 | | - ) |
| 257 | + logging.info("Collecting template_coadd and template_coadd_n_image refs.") |
| 258 | + refs = butler.query_datasets("template_coadd", where=args.where, limit=None) |
| 259 | + n_image_refs = butler.query_datasets("template_coadd_n_image", where=args.where, limit=None) |
93 | 260 | logging.info(f"Found {len(refs)} template_coadd datasets in {args.collections}.") |
94 | 261 |
|
95 | | - columns = ("filename", "band", "skymap", "tract", "patch") |
96 | | - data = Table(names=columns, dtype=("str", "str", "str", "int", "int")) |
97 | | - for ref in refs: |
98 | | - uri_path = butler.getURI(ref).geturl() |
99 | | - data_id_values = tuple(ref.dataId[col] for col in columns[1:]) |
100 | | - data.add_row((uri_path, *data_id_values)) |
101 | | - data.write(args.records, overwrite=True) |
102 | | - logging.info(f"Data records written to {args.records}.") |
103 | | - |
104 | | - run_collection = tagged_collection + "/run" |
105 | | - ingest_files( |
106 | | - args.repo, |
107 | | - "template_coadd", |
108 | | - run_collection, |
109 | | - table_file=args.records, |
110 | | - data_id=("instrument=LSSTCam",), |
111 | | - transfer="copy", |
112 | | - ) |
| 262 | + # Get a list of the tracts inside the template collection. |
| 263 | + tracts = get_tracts(butler, args.where) |
113 | 264 |
|
114 | | - refs = butler.query_datasets( |
115 | | - "template_coadd", collections=run_collection, limit=None |
116 | | - ) |
117 | | - logging.info(f"Associating {len(refs)} datasets to {tagged_collection}.") |
118 | | - butler.registry.associate(tagged_collection, refs) |
| 265 | + # Filter out template_coads that don't meet the cutoff and save them to record. |
| 266 | + logging.info("Starting curation.") |
| 267 | + accepted_drefs, rejected_drefs, accepted_n_image_refs = make_threshold_cuts(butler, refs, |
| 268 | + n_image_refs, tracts, |
| 269 | + args.filter_by, args.cutoff |
| 270 | + ) |
| 271 | + logging.info(f"Curation complete. Accepted {len(accepted_drefs)} out of {len(refs)}" |
| 272 | + f" template_coadd datasets in {args.collections}.") |
| 273 | + |
| 274 | + # Run accepted/rejected statistics and save them to record. |
| 275 | + logging.info("Starting stat generation.") |
| 276 | + stats_records_file = os.path.join(directory, args.stat_records) |
| 277 | + run_stats(accepted_drefs, rejected_drefs, tracts, stats_records_file) |
| 278 | + logging.info("Stat generation complete. Accepted/rejected stat records written to" |
| 279 | + f" {stats_records_file}.") |
| 280 | + |
| 281 | + # Associate accepted template_coadds to tagged collection. |
| 282 | + logging.info(f"Associating {len(accepted_drefs)} template_coadds to {tagged_collection}.") |
| 283 | + butler_write.registry.associate(tagged_collection, accepted_drefs) |
| 284 | + logging.info("Association complete.") |
| 285 | + |
| 286 | + # Associate accepted template_coadd_n_images to tagged collection. |
| 287 | + logging.info(f"Associating {len(accepted_n_image_refs)} template_coadd_n_images to {tagged_collection}.") |
| 288 | + butler_write.registry.associate(tagged_collection, accepted_n_image_refs) |
| 289 | + logging.info("Association complete.") |
119 | 290 |
|
120 | 291 |
|
121 | 292 | if __name__ == "__main__": |
|
0 commit comments