|
21 | 21 | # along with this program. If not, see <https://www.gnu.org/licenses/>. |
22 | 22 |
|
23 | 23 |
|
24 | | -"""Curate template_coadd datasets for LSSTCam with a new TAGGED collection. |
| 24 | +"""Curate template_coadd datasets for LSSTCam. |
25 | 25 |
|
26 | | -This script copies a selected set of template_coadd datasets to a new location |
27 | | -at a butler repository, and tag them to a new collection. |
28 | | -
|
29 | | -The input collections are ideally collections with only vetted templates. |
30 | | -
|
31 | | -The scripts leaves a ecsv file used by butler ingest-files. |
32 | | -
|
33 | | -This script provides a short-term workaround to store duplicated templates |
34 | | -with a special prefix, before DM-50699 provides a better mechanism. |
| 26 | +This script performs a curation cut on a set of template_coadd datasets and |
| 27 | +prepares them for manual vetting. |
35 | 28 | """ |
36 | 29 |
|
37 | 30 | import argparse |
38 | 31 | import logging |
| 32 | +import os |
39 | 33 | import sys |
40 | 34 |
|
41 | | -from astropy.table import Table |
| 35 | +from astropy.table import Table, vstack |
42 | 36 |
|
43 | 37 | from lsst.daf.butler import Butler, CollectionType |
44 | | -from lsst.daf.butler.script import ingest_files |
45 | 38 |
|
46 | 39 |
|
47 | 40 | def _make_parser(): |
48 | 41 | parser = argparse.ArgumentParser() |
49 | 42 | parser.add_argument( |
50 | 43 | "repo", |
51 | | - help="An existing data repository.", |
| 44 | + help="An existing data repository containing the input collections.", |
| 45 | + ) |
| 46 | + parser.add_argument( |
| 47 | + "tag", |
| 48 | + help="A Jira ticket number for the new template collection name.", |
52 | 49 | ) |
53 | 50 | parser.add_argument( |
54 | | - "collections", |
55 | | - help="The input collections to search for template_coadd datasets.", |
| 51 | + "release_num", |
| 52 | + help="The release number (##) for the given tag.", |
| 53 | + ) |
| 54 | + parser.add_argument( |
| 55 | + "--collections", |
| 56 | + action="extend", |
| 57 | + nargs="+", |
| 58 | + required=True, |
| 59 | + help="The input collections to search for template_coadd and coadd_depth_table datasets.", |
56 | 60 | ) |
57 | 61 | parser.add_argument( |
58 | 62 | "--where", |
59 | 63 | default="", |
60 | 64 | help="A string expression to select datasets in the input collections.", |
61 | 65 | ) |
62 | 66 | parser.add_argument( |
63 | | - "tag", |
64 | | - help="A Jira ticket number for the new template collection name.", |
| 67 | + "--records_path", |
| 68 | + required=False, |
| 69 | + default="", |
| 70 | + help="An absolute filepath to save records to.", |
65 | 71 | ) |
66 | 72 | parser.add_argument( |
67 | | - "--records", |
| 73 | + "--stat_records", |
68 | 74 | required=False, |
69 | | - default="records.ecsv", |
70 | | - help="An output table file with records of selected template files." |
71 | | - " The file can be used by butler ingest-files.", |
| 75 | + help="An output table file with accepted/rejected stats on templates that pass" |
| 76 | + " curation. Default is release_{release_num}_stat_records.csv.", |
| 77 | + ) |
| 78 | + parser.add_argument( |
| 79 | + "--filter_by", |
| 80 | + required=False, |
| 81 | + default="depth_above_threshold_3", |
| 82 | + help="The coadd_depth_table column to use for filtering." |
| 83 | + " Default is depth_above_threshold_3.", |
| 84 | + ) |
| 85 | + parser.add_argument( |
| 86 | + "--cutoff", |
| 87 | + required=False, |
| 88 | + default=95, |
| 89 | + type=int, |
| 90 | + help="The curation process will filter out anything below this cutoff." |
| 91 | + " Default is 95.", |
72 | 92 | ) |
73 | 93 | return parser |
74 | 94 |
|
75 | 95 |
|
| 96 | +def select_ref(drefs, tract, patch, band, dtype="template_coadd"): |
| 97 | + if not drefs: |
| 98 | + logging.warning(f"No {dtype} found for tract {tract}, patch {patch}, band {band}. Skipping.") |
| 99 | + return None |
| 100 | + if len(drefs) > 1: |
| 101 | + return sorted(drefs, key=lambda ref: ref.run)[-1] |
| 102 | + return drefs[0] |
| 103 | + |
| 104 | + |
| 105 | +def make_threshold_cuts(butler, template_coadds, n_images, tracts, filter_by, cutoff): |
| 106 | + accepted_drefs = [] |
| 107 | + accepted_n_image_refs = [] |
| 108 | + rejected_drefs = [] |
| 109 | + |
| 110 | + for tract in tracts: |
| 111 | + coadd_depth_table = butler.get('coadd_depth_table', tract=tract) |
| 112 | + mask = (coadd_depth_table[filter_by] > cutoff) |
| 113 | + |
| 114 | + accepted_coadds = coadd_depth_table[mask] |
| 115 | + for patch_band in accepted_coadds['patch', 'band']: |
| 116 | + patch = patch_band[0] |
| 117 | + band = patch_band[1] |
| 118 | + dref = [d for d in template_coadds |
| 119 | + if d.dataId['tract'] == tract |
| 120 | + and d.dataId['patch'] == patch |
| 121 | + and d.dataId['band'] == band |
| 122 | + ] |
| 123 | + n_image_dref = [d for d in n_images |
| 124 | + if d.dataId['tract'] == tract |
| 125 | + and d.dataId['patch'] == patch |
| 126 | + and d.dataId['band'] == band |
| 127 | + ] |
| 128 | + |
| 129 | + if not dref: |
| 130 | + logging.warning(f"No template_coadd found for tract {tract}, patch {patch}, band {band}. " |
| 131 | + f"Skipping.") |
| 132 | + continue |
| 133 | + if len(dref) > 1: |
| 134 | + sorted_dupe_entry = sorted(dref, key=lambda ref: ref.run) |
| 135 | + ref = sorted_dupe_entry[-1] |
| 136 | + else: |
| 137 | + ref = dref[0] |
| 138 | + accepted_drefs.append(ref) |
| 139 | + |
| 140 | + if not n_image_dref: |
| 141 | + logging.warning(f"No template_coadd_n_image found for tract {tract}, patch {patch}, " |
| 142 | + f"band {band}. Skipping.") |
| 143 | + continue |
| 144 | + if len(n_image_dref) > 1: |
| 145 | + sorted_dupe_entry = sorted(n_image_dref, key=lambda ref: ref.run) |
| 146 | + n_image_ref = sorted_dupe_entry[-1] |
| 147 | + else: |
| 148 | + n_image_ref = n_image_dref[0] |
| 149 | + accepted_n_image_refs.append(n_image_ref) |
| 150 | + |
| 151 | + rejected_coadds = coadd_depth_table[~mask] |
| 152 | + for patch_band in rejected_coadds['patch', 'band']: |
| 153 | + patch = patch_band[0] |
| 154 | + band = patch_band[1] |
| 155 | + dref = [d for d in template_coadds |
| 156 | + if d.dataId['tract'] == tract |
| 157 | + and d.dataId['patch'] == patch |
| 158 | + and d.dataId['band'] == band |
| 159 | + ] |
| 160 | + |
| 161 | + if not dref: |
| 162 | + logging.warning(f"No template_coadd found for tract {tract}, patch {patch}, band {band}. " |
| 163 | + f"Skipping.") |
| 164 | + continue |
| 165 | + if len(dref) > 1: |
| 166 | + sorted_dupe_entry = sorted(dref, key=lambda ref: ref.run) |
| 167 | + ref = sorted_dupe_entry[-1] |
| 168 | + else: |
| 169 | + ref = dref[0] |
| 170 | + rejected_drefs.append(ref) |
| 171 | + return accepted_drefs, rejected_drefs, accepted_n_image_refs |
| 172 | + |
| 173 | + |
| 174 | +def run_stats(accepted_drefs, rejected_drefs, tracts, stats_records_file): |
| 175 | + """ |
| 176 | + Compute per-tract and per-band accepted/rejected statistics and save to CSV. |
| 177 | +
|
| 178 | + Parameters |
| 179 | + ---------- |
| 180 | + accepted_drefs : list of DatasetRef |
| 181 | + Template coadd references that passed curation. |
| 182 | + rejected_drefs : list of DatasetRef |
| 183 | + Template coadd references that failed curation. |
| 184 | + tracts : iterable of int |
| 185 | + List of tract IDs to include in the stats. |
| 186 | + stats_records_file : str |
| 187 | + Path to save the resulting CSV file. |
| 188 | + """ |
| 189 | + |
| 190 | + bands = ['u', 'g', 'r', 'i', 'z', 'y'] |
| 191 | + |
| 192 | + # Build accepted table |
| 193 | + if accepted_drefs: |
| 194 | + accepted = Table( |
| 195 | + { |
| 196 | + 'tract': [int(r.dataId['tract']) for r in accepted_drefs], |
| 197 | + 'patch': [int(r.dataId['patch']) for r in accepted_drefs], |
| 198 | + 'band': [str(r.dataId['band']) for r in accepted_drefs], |
| 199 | + 'status': ['accepted'] * len(accepted_drefs) |
| 200 | + } |
| 201 | + ) |
| 202 | + else: |
| 203 | + accepted = Table(names=('tract', 'patch', 'band', 'status')) |
| 204 | + |
| 205 | + # Build rejected table |
| 206 | + if rejected_drefs: |
| 207 | + rejected = Table( |
| 208 | + { |
| 209 | + 'tract': [int(r.dataId['tract']) for r in rejected_drefs], |
| 210 | + 'patch': [int(r.dataId['patch']) for r in rejected_drefs], |
| 211 | + 'band': [str(r.dataId['band']) for r in rejected_drefs], |
| 212 | + 'status': ['rejected'] * len(rejected_drefs) |
| 213 | + } |
| 214 | + ) |
| 215 | + else: |
| 216 | + rejected = Table(names=('tract', 'patch', 'band', 'status')) |
| 217 | + |
| 218 | + # Combine tables |
| 219 | + all_refs = vstack([accepted, rejected]) |
| 220 | + |
| 221 | + # Group by tract and band |
| 222 | + grouped = all_refs.group_by(['tract', 'band']) |
| 223 | + |
| 224 | + # Prepare output table |
| 225 | + stat_table_data = {'tract': [], } |
| 226 | + for band in bands: |
| 227 | + stat_table_data[f'{band}_num_accepted'] = [] |
| 228 | + stat_table_data[f'{band}_percent_accepted'] = [] |
| 229 | + |
| 230 | + for tract in tracts: |
| 231 | + stat_table_data['tract'].append(tract) |
| 232 | + for band in bands: |
| 233 | + mask = (grouped['tract'] == tract) & (grouped['band'] == band) |
| 234 | + subset = grouped[mask] |
| 235 | + n_total = len(subset) |
| 236 | + n_accepted = (subset['status'] == 'accepted').sum() if n_total > 0 else 0 |
| 237 | + percent = (n_accepted / n_total * 100) if n_total > 0 else float('nan') |
| 238 | + stat_table_data[f'{band}_num_accepted'].append(f"{n_accepted} / {n_total}") |
| 239 | + stat_table_data[f'{band}_percent_accepted'].append(percent) |
| 240 | + |
| 241 | + # Create final table |
| 242 | + stat_table = Table(stat_table_data) |
| 243 | + stat_table.write(stats_records_file, format='ascii.csv', overwrite=True) |
| 244 | + |
| 245 | + |
76 | 246 | def main(): |
77 | 247 | logging.basicConfig(level=logging.INFO, stream=sys.stdout) |
78 | 248 |
|
| 249 | + # Hide spurious messages from numexpr by setting the numexpr env var. |
| 250 | + os.environ["NUMEXPR_MAX_THREADS"] = "8" |
| 251 | + |
79 | 252 | args = _make_parser().parse_args() |
80 | | - butler = Butler(args.repo, writeable=True) |
| 253 | + butler = Butler(args.repo, collections=args.collections) |
| 254 | + butler_write = Butler(args.repo, writeable=True) |
81 | 255 |
|
82 | | - tagged_collection = "LSSTCam/templates/" + args.tag |
83 | | - registered = butler.collections.register( |
| 256 | + # Create directory for records |
| 257 | + directory = args.records_path |
| 258 | + if directory: |
| 259 | + if not os.path.exists(directory): |
| 260 | + os.makedirs(directory) |
| 261 | + |
| 262 | + # Set (stat_)records defaults based on release_num if not provided |
| 263 | + if args.stat_records is None: |
| 264 | + args.stat_records = f"release_{args.release_num}_stat_records.csv" |
| 265 | + |
| 266 | + # Create tagged collection, abort if it already exists. |
| 267 | + tagged_collection = f"LSSTCam/templates/candidates/{args.tag}/release_{args.release_num}" |
| 268 | + logging.info(f"Creating tagged collection {tagged_collection}.") |
| 269 | + registered = butler_write.collections.register( |
84 | 270 | tagged_collection, type=CollectionType.TAGGED |
85 | 271 | ) |
86 | 272 | if not registered: |
87 | 273 | logging.error(f"Collection {tagged_collection} already exists. Aborting.") |
88 | 274 | sys.exit(1) |
89 | 275 |
|
90 | | - refs = butler.query_datasets( |
91 | | - "template_coadd", collections=args.collections, where=args.where, limit=None |
92 | | - ) |
93 | | - logging.info(f"Found {len(refs)} template_coadd datasets in {args.collections}.") |
94 | | - |
95 | | - columns = ("filename", "band", "skymap", "tract", "patch") |
96 | | - data = Table(names=columns, dtype=("str", "str", "str", "int", "int")) |
97 | | - for ref in refs: |
98 | | - uri_path = butler.getURI(ref).geturl() |
99 | | - data_id_values = tuple(ref.dataId[col] for col in columns[1:]) |
100 | | - data.add_row((uri_path, *data_id_values)) |
101 | | - data.write(args.records, overwrite=True) |
102 | | - logging.info(f"Data records written to {args.records}.") |
103 | | - |
104 | | - run_collection = tagged_collection + "/run" |
105 | | - ingest_files( |
106 | | - args.repo, |
107 | | - "template_coadd", |
108 | | - run_collection, |
109 | | - table_file=args.records, |
110 | | - data_id=("instrument=LSSTCam",), |
111 | | - transfer="copy", |
112 | | - ) |
| 276 | + logging.info("Collecting coadd_depth_table, template_coadd, and template_coadd_n_image refs.") |
| 277 | + coadd_depth_table_refs = butler.query_datasets("coadd_depth_table", where=args.where, limit=None) |
| 278 | + if not coadd_depth_table_refs: |
| 279 | + logging.error("No coadd_depth_table datasets found in the given collections.") |
| 280 | + sys.exit(1) |
113 | 281 |
|
114 | | - refs = butler.query_datasets( |
115 | | - "template_coadd", collections=run_collection, limit=None |
116 | | - ) |
117 | | - logging.info(f"Associating {len(refs)} datasets to {tagged_collection}.") |
118 | | - butler.registry.associate(tagged_collection, refs) |
| 282 | + # Get a list of relavent tracts. |
| 283 | + tracts = {item.dataId['tract'] for item in coadd_depth_table_refs} |
| 284 | + |
| 285 | + # Ammend the where argument to restrict refs to relavent tracts. |
| 286 | + tracts_str = ",".join(str(t) for t in tracts) |
| 287 | + tract_restriction = f"tract IN ({tracts_str})" |
| 288 | + args.where = f"({args.where}) AND ({tract_restriction})" if args.where else tract_restriction |
| 289 | + |
| 290 | + # Get relavent template_coadd and template_coadd_n_image refs. |
| 291 | + coadd_refs = butler.query_datasets("template_coadd", where=args.where, limit=None) |
| 292 | + if not coadd_refs: |
| 293 | + logging.error("No template_coadd datasets found in the given collections.") |
| 294 | + sys.exit(1) |
| 295 | + n_image_refs = butler.query_datasets("template_coadd_n_image", where=args.where, limit=None) |
| 296 | + if not n_image_refs: |
| 297 | + logging.error("No template_coadd_n_image datasets found in the given collections.") |
| 298 | + sys.exit(1) |
| 299 | + logging.info(f"Found {len(coadd_refs)} template_coadd datasets with coadd_depth_tables " |
| 300 | + f"in {args.collections}.") |
| 301 | + |
| 302 | + # Filter out template_coads that don't meet the cutoff and save them to record. |
| 303 | + logging.info("Starting curation.") |
| 304 | + accepted_drefs, rejected_drefs, accepted_n_image_refs = make_threshold_cuts(butler, coadd_refs, |
| 305 | + n_image_refs, tracts, |
| 306 | + args.filter_by, args.cutoff |
| 307 | + ) |
| 308 | + logging.info(f"Curation complete. Accepted {len(accepted_drefs)} out of {len(coadd_refs)}" |
| 309 | + f" template_coadd datasets in {args.collections}.") |
| 310 | + |
| 311 | + # Run accepted/rejected statistics and save them to record. |
| 312 | + logging.info("Starting stat generation.") |
| 313 | + stats_records_file = os.path.join(directory, args.stat_records) |
| 314 | + run_stats(accepted_drefs, rejected_drefs, tracts, stats_records_file) |
| 315 | + logging.info("Stat generation complete. Accepted/rejected stat records written to" |
| 316 | + f" {stats_records_file}.") |
| 317 | + |
| 318 | + # Associate accepted template_coadds and template_coadd_n_images to tagged collection. |
| 319 | + logging.info(f"Associating {len(accepted_drefs)} template_coadds and " |
| 320 | + f"{len(accepted_n_image_refs)} template_coadd_n_images to {tagged_collection}.") |
| 321 | + butler_write.registry.associate(tagged_collection, accepted_drefs) |
| 322 | + butler_write.registry.associate(tagged_collection, accepted_n_image_refs) |
| 323 | + logging.info("Association complete.") |
119 | 324 |
|
120 | 325 |
|
121 | 326 | if __name__ == "__main__": |
|
0 commit comments