@@ -61,115 +61,152 @@ def metadata(ctx):
61
61
@click .argument ("name" )
62
62
@project_id_option ()
63
63
@sql_dir_option
64
- def update (name : str , sql_dir : Optional [str ], project_id : Optional [str ]) -> None :
64
+ @parallelism_option ()
65
+ def update (
66
+ name : str , sql_dir : Optional [str ], project_id : Optional [str ], parallelism : int
67
+ ) -> None :
65
68
"""Update metadata yaml file."""
66
69
table_metadata_files = paths_matching_name_pattern (
67
70
name , sql_dir , project_id = project_id , files = ["metadata.yaml" ]
68
71
)
72
+
69
73
retained_dataset_roles = ConfigLoader .get (
70
74
"deprecation" , "retain_dataset_roles" , fallback = []
71
75
)
72
76
73
- # create and populate the dataset metadata yaml file if it does not exist
77
+ # group table metadata files by dataset
78
+ datasets : dict [Path , list [Path ]] = {}
74
79
for table_metadata_file in table_metadata_files :
75
- dataset_metadata_path = (
76
- Path (table_metadata_file ).parent .parent / "dataset_metadata.yaml"
77
- )
78
- if not dataset_metadata_path .exists ():
79
- continue
80
- dataset_metadata = DatasetMetadata .from_file (dataset_metadata_path )
81
- table_metadata = Metadata .from_file (table_metadata_file )
82
-
83
- dataset_metadata_updated = False
84
- table_metadata_updated = False
85
-
86
- # set dataset metadata default_table_workgroup_access to table_workgroup_access if not set
87
- if not dataset_metadata .default_table_workgroup_access :
88
- dataset_metadata .default_table_workgroup_access = (
89
- dataset_metadata .workgroup_access
80
+ dataset_path = Path (table_metadata_file ).parent .parent
81
+ if dataset_path not in datasets :
82
+ datasets [dataset_path ] = []
83
+ datasets [dataset_path ].append (table_metadata_file )
84
+
85
+ # process each dataset in parallel
86
+ if parallelism > 1 :
87
+ with Pool (parallelism ) as pool :
88
+ pool .map (
89
+ partial (_update_dataset_metadata , retained_dataset_roles ),
90
+ datasets .items (),
91
+ )
92
+ else :
93
+ for dataset_path , dataset_table_files in datasets .items ():
94
+ _update_dataset_metadata (
95
+ retained_dataset_roles , (dataset_path , dataset_table_files )
90
96
)
91
- dataset_metadata_updated = True
92
97
93
- if table_metadata .deprecated :
94
- # filter table workgroup_access to only retained roles
95
- if table_metadata .workgroup_access is not None :
96
- table_metadata .workgroup_access = [
97
- workgroup
98
- for workgroup in table_metadata .workgroup_access
99
- if workgroup .role in retained_dataset_roles
100
- ]
101
- else :
102
- table_metadata .workgroup_access = []
103
- table_metadata_updated = True
104
-
105
- # filter dataset workgroup_access to only retained roles
106
- dataset_metadata .workgroup_access = [
107
- workgroup
108
- for workgroup in dataset_metadata .workgroup_access
109
- if workgroup .get ("role" ) in retained_dataset_roles
110
- ]
111
- dataset_metadata_updated = True
112
-
113
- # if dataViewer role exists in default_table_workgroup_access, ensure metadataViewer is present in workgroup_access
114
- # https://mozilla-hub.atlassian.net/browse/DENG-8843
115
- data_viewer = next (
116
- (
117
- workgroup
118
- for workgroup in dataset_metadata .default_table_workgroup_access
119
- if workgroup .get ("role" ) == "roles/bigquery.dataViewer"
120
- ),
121
- None ,
98
+ return None
99
+
100
+
101
+ def _update_dataset_metadata (retained_dataset_roles , dataset_info ):
102
+ """Update all metadata files for a single dataset."""
103
+ dataset_path , table_metadata_files = dataset_info
104
+
105
+ # process all table metadata files in this dataset
106
+ for table_metadata_file in table_metadata_files :
107
+ try :
108
+ dataset_metadata_path = (
109
+ Path (table_metadata_file ).parent .parent / "dataset_metadata.yaml"
122
110
)
123
- if data_viewer :
124
- has_metadata_viewer = any (
125
- workgroup .get ("role" ) == "roles/bigquery.metadataViewer"
111
+ if not dataset_metadata_path .exists ():
112
+ continue
113
+
114
+ dataset_metadata = DatasetMetadata .from_file (dataset_metadata_path )
115
+ table_metadata = Metadata .from_file (table_metadata_file )
116
+
117
+ dataset_metadata_updated = False
118
+ table_metadata_updated = False
119
+
120
+ # set dataset metadata default_table_workgroup_access to table_workgroup_access if not set
121
+ if not dataset_metadata .default_table_workgroup_access :
122
+ dataset_metadata .default_table_workgroup_access = (
123
+ dataset_metadata .workgroup_access
124
+ )
125
+ dataset_metadata_updated = True
126
+
127
+ if table_metadata .deprecated :
128
+ # filter table workgroup_access to only retained roles
129
+ if table_metadata .workgroup_access is not None :
130
+ table_metadata .workgroup_access = [
131
+ workgroup
132
+ for workgroup in table_metadata .workgroup_access
133
+ if workgroup .role in retained_dataset_roles
134
+ ]
135
+ else :
136
+ table_metadata .workgroup_access = []
137
+ table_metadata_updated = True
138
+
139
+ # filter dataset workgroup_access to only retained roles
140
+ dataset_metadata .workgroup_access = [
141
+ workgroup
126
142
for workgroup in dataset_metadata .workgroup_access
143
+ if workgroup .get ("role" ) in retained_dataset_roles
144
+ ]
145
+ dataset_metadata_updated = True
146
+
147
+ # if dataViewer role exists in default_table_workgroup_access, ensure metadataViewer is present in workgroup_access
148
+ # https://mozilla-hub.atlassian.net/browse/DENG-8843
149
+ data_viewer = next (
150
+ (
151
+ workgroup
152
+ for workgroup in dataset_metadata .default_table_workgroup_access
153
+ if workgroup .get ("role" ) == "roles/bigquery.dataViewer"
154
+ ),
155
+ None ,
127
156
)
128
- if not has_metadata_viewer :
129
- dataset_metadata .workgroup_access .append (
130
- {
131
- "role" : "roles/bigquery.metadataViewer" ,
132
- "members" : data_viewer .get ("members" , []),
133
- }
157
+ if data_viewer :
158
+ has_metadata_viewer = any (
159
+ workgroup .get ("role" ) == "roles/bigquery.metadataViewer"
160
+ for workgroup in dataset_metadata .workgroup_access
134
161
)
135
- else :
136
- if table_metadata .workgroup_access is None :
137
- table_metadata .workgroup_access = []
138
-
139
- for (
140
- default_workgroup_access
141
- ) in dataset_metadata .default_table_workgroup_access :
142
- role_exists = False
143
- for i , table_workgroup_access in enumerate (
144
- table_metadata .workgroup_access
145
- ):
146
- if table_workgroup_access .role == default_workgroup_access .get (
147
- "role"
162
+ if not has_metadata_viewer :
163
+ dataset_metadata .workgroup_access .append (
164
+ {
165
+ "role" : "roles/bigquery.metadataViewer" ,
166
+ "members" : sorted (data_viewer .get ("members" , [])),
167
+ }
168
+ )
169
+ else :
170
+ if table_metadata .workgroup_access is None :
171
+ table_metadata .workgroup_access = []
172
+
173
+ for (
174
+ default_workgroup_access
175
+ ) in dataset_metadata .default_table_workgroup_access :
176
+ role_exists = False
177
+ for i , table_workgroup_access in enumerate (
178
+ table_metadata .workgroup_access
148
179
):
149
- role_exists = True
150
- table_metadata .workgroup_access [i ].members = sorted (
151
- set (table_workgroup_access .members )
152
- | set (default_workgroup_access .get ("members" , []))
180
+ if table_workgroup_access .role == default_workgroup_access .get (
181
+ "role"
182
+ ):
183
+ role_exists = True
184
+ table_metadata .workgroup_access [i ].members = sorted (
185
+ set (table_workgroup_access .members )
186
+ | set (default_workgroup_access .get ("members" , []))
187
+ )
188
+ table_metadata_updated = True
189
+
190
+ if not role_exists :
191
+ table_metadata .workgroup_access .append (
192
+ WorkgroupAccessMetadata (
193
+ role = default_workgroup_access ["role" ],
194
+ members = sorted (
195
+ default_workgroup_access .get ("members" , [])
196
+ ),
197
+ )
153
198
)
154
199
table_metadata_updated = True
155
200
156
- if not role_exists :
157
- table_metadata .workgroup_access .append (
158
- WorkgroupAccessMetadata (
159
- role = default_workgroup_access ["role" ],
160
- members = default_workgroup_access .get ("members" , []),
161
- )
162
- )
163
- table_metadata_updated = True
164
-
165
- if dataset_metadata_updated :
166
- dataset_metadata .write (dataset_metadata_path )
167
- click .echo (f"Updated { dataset_metadata_path } " )
168
- if table_metadata_updated :
169
- table_metadata .write (table_metadata_file )
170
- click .echo (f"Updated { table_metadata_file } " )
171
-
172
- return None
201
+ if dataset_metadata_updated :
202
+ dataset_metadata .write (dataset_metadata_path )
203
+ click .echo (f"Updated { dataset_metadata_path } " )
204
+ if table_metadata_updated :
205
+ table_metadata .write (table_metadata_file )
206
+ click .echo (f"Updated { table_metadata_file } " )
207
+ except Exception as e :
208
+ click .echo (f"Error processing { table_metadata_file } : { e } " , err = True )
209
+ raise e
173
210
174
211
175
212
@metadata .command (
@@ -254,8 +291,12 @@ def deprecate(
254
291
deletion_date : datetime ,
255
292
) -> None :
256
293
"""Deprecate Bigquery table by updating metadata yaml file(s)."""
257
- table_metadata_files = paths_matching_name_pattern (
258
- name , sql_dir , project_id = project_id , files = ["metadata.yaml" ]
294
+ table_metadata_files = list (
295
+ set (
296
+ paths_matching_name_pattern (
297
+ name , sql_dir , project_id = project_id , files = ["metadata.yaml" ]
298
+ )
299
+ )
259
300
)
260
301
261
302
for metadata_file in table_metadata_files :
@@ -292,8 +333,12 @@ def validate_workgroups(
292
333
"""Validate workgroup_access and default_table_workgroup_access configuration."""
293
334
failed_files = set ()
294
335
295
- table_metadata_files = paths_matching_name_pattern (
296
- name , sql_dir , project_id = project_id , files = ["metadata.yaml" ]
336
+ table_metadata_files = list (
337
+ set (
338
+ paths_matching_name_pattern (
339
+ name , sql_dir , project_id = project_id , files = ["metadata.yaml" ]
340
+ )
341
+ )
297
342
)
298
343
skip_validation = ConfigLoader .get ("metadata" , "validation" , "skip" , fallback = [])
299
344
0 commit comments