|
46 | 46 | import numpy as np |
47 | 47 | import pandas as pd |
48 | 48 | import astropy.table |
49 | | -from astro_metadata_translator.headers import merge_headers |
50 | 49 |
|
51 | 50 | import lsst.geom |
52 | 51 | import lsst.pex.config as pexConfig |
|
57 | 56 | import lsst.afw.table as afwTable |
58 | 57 | from lsst.afw.image import ExposureSummaryStats, ExposureF |
59 | 58 | from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig |
60 | | -from lsst.obs.base.utils import strip_provenance_from_fits_header |
| 59 | +from lsst.obs.base.utils import strip_provenance_from_fits_header, TableVStack |
61 | 60 |
|
62 | 61 | from .coaddBase import reorderRefs |
63 | 62 | from .functors import CompositeFunctor, Column |
@@ -87,113 +86,6 @@ def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inp |
87 | 86 | return newDf |
88 | 87 |
|
89 | 88 |
|
90 | | -class TableVStack: |
91 | | - """A helper class for stacking astropy tables without having them all in |
92 | | - memory at once. |
93 | | -
|
94 | | - Parameters |
95 | | - ---------- |
96 | | - capacity : `int` |
97 | | - Full size of the final table. |
98 | | -
|
99 | | - Notes |
100 | | - ----- |
101 | | - Unlike `astropy.table.vstack`, this class requires all tables to have the |
102 | | - exact same columns (it's slightly more strict than even the |
103 | | - ``join_type="exact"`` argument to `astropy.table.vstack`). |
104 | | - """ |
105 | | - |
106 | | - def __init__(self, capacity): |
107 | | - self.index = 0 |
108 | | - self.capacity = capacity |
109 | | - self.result = None |
110 | | - |
111 | | - @classmethod |
112 | | - def from_handles(cls, handles): |
113 | | - """Construct from an iterable of |
114 | | - `lsst.daf.butler.DeferredDatasetHandle`. |
115 | | -
|
116 | | - Parameters |
117 | | - ---------- |
118 | | - handles : `~collections.abc.Iterable` [ \ |
119 | | - `lsst.daf.butler.DeferredDatasetHandle` ] |
120 | | - Iterable of handles. Must have a storage class that supports the |
121 | | - "rowcount" component, which is all that will be fetched. |
122 | | -
|
123 | | - Returns |
124 | | - ------- |
125 | | - vstack : `TableVStack` |
126 | | - An instance of this class, initialized with capacity equal to the |
127 | | - sum of the rowcounts of all the given table handles. |
128 | | - """ |
129 | | - capacity = sum(handle.get(component="rowcount") for handle in handles) |
130 | | - return cls(capacity=capacity) |
131 | | - |
132 | | - def extend(self, table): |
133 | | - """Add a single table to the stack. |
134 | | -
|
135 | | - Parameters |
136 | | - ---------- |
137 | | - table : `astropy.table.Table` |
138 | | - An astropy table instance. |
139 | | - """ |
140 | | - if self.result is None: |
141 | | - self.result = astropy.table.Table() |
142 | | - for name in table.colnames: |
143 | | - column = table[name] |
144 | | - column_cls = type(column) |
145 | | - self.result[name] = column_cls.info.new_like([column], self.capacity, name=name) |
146 | | - self.result[name][:len(table)] = column |
147 | | - self.index = len(table) |
148 | | - self.result.meta = table.meta.copy() |
149 | | - else: |
150 | | - next_index = self.index + len(table) |
151 | | - if set(self.result.colnames) != set(table.colnames): |
152 | | - raise TypeError( |
153 | | - "Inconsistent columns in concatentation: " |
154 | | - f"{set(self.result.colnames).symmetric_difference(table.colnames)}" |
155 | | - ) |
156 | | - for name in table.colnames: |
157 | | - out_col = self.result[name] |
158 | | - in_col = table[name] |
159 | | - if out_col.dtype != in_col.dtype: |
160 | | - raise TypeError(f"Type mismatch on column {name!r}: {out_col.dtype} != {in_col.dtype}.") |
161 | | - self.result[name][self.index:next_index] = table[name] |
162 | | - self.index = next_index |
163 | | - # Butler provenance should be stripped on merge. It will be |
164 | | - # added by butler on write. No attempt is made here to combine |
165 | | - # provenance from multiple input tables. |
166 | | - self.result.meta = merge_headers([self.result.meta, table.meta], mode="drop") |
167 | | - strip_provenance_from_fits_header(self.result.meta) |
168 | | - |
169 | | - @classmethod |
170 | | - def vstack_handles(cls, handles): |
171 | | - """Vertically stack tables represented by deferred dataset handles. |
172 | | -
|
173 | | - Parameters |
174 | | - ---------- |
175 | | - handles : `~collections.abc.Iterable` [ \ |
176 | | - `lsst.daf.butler.DeferredDatasetHandle` ] |
177 | | - Iterable of handles. Must have the "ArrowAstropy" storage class |
178 | | - and identical columns. |
179 | | -
|
180 | | - Returns |
181 | | - ------- |
182 | | - table : `astropy.table.Table` |
183 | | - Concatenated table with the same columns as each input table and |
184 | | - the rows of all of them. |
185 | | - """ |
186 | | - handles = tuple(handles) # guard against single-pass iterators |
187 | | - # Ensure that zero length catalogs are not included |
188 | | - rowcount = tuple(handle.get(component="rowcount") for handle in handles) |
189 | | - handles = tuple(handle for handle, count in zip(handles, rowcount) if count > 0) |
190 | | - |
191 | | - vstack = cls.from_handles(handles) |
192 | | - for handle in handles: |
193 | | - vstack.extend(handle.get()) |
194 | | - return vstack.result |
195 | | - |
196 | | - |
197 | 89 | class WriteObjectTableConnections(pipeBase.PipelineTaskConnections, |
198 | 90 | defaultTemplates={"coaddName": "deep"}, |
199 | 91 | dimensions=("tract", "patch", "skymap")): |
|
0 commit comments