@@ -117,7 +117,7 @@ def run(argv = None):
117
117
pipeline_options .view_as (SetupOptions ).save_main_session = True # see https://beam.apache.org/releases/pydoc/2.7.0/_modules/apache_beam/io/gcp/pubsub_it_pipeline.html
118
118
p = beam .Pipeline (options = pipeline_options )
119
119
120
- # Retrieve list of files to process.
120
+ # Retrieve list of Avro files to process.
121
121
# If another analysis has already been run, only the new files are analyzed and the results are then merged
122
122
# with those from the last analysis in an incremental way.
123
123
files_to_process = ['gs://{}' .format (fpath ) for fpath in fs .walk (known_args .input_bucket )]
@@ -141,22 +141,22 @@ def run(argv = None):
141
141
(p
142
142
| 'get_files_list' >> beam .Create (files_to_process ) # create PCollection containing the names of all files to process
143
143
| 'read_files' >> beam .io .avroio .ReadAllFromAvro () # returns all rows in all files as dictionaries
144
- # {< column name> --> < column value at the row> }
145
- | 'parse_and_classify_rows' >> beam .ParDo (ParseRowDoFn ()).with_outputs () # applies method ParseRowDoFn to each row
144
+ # {column name : column value at the row}
145
+ | 'parse_and_classify_rows' >> beam .ParDo (ParseRowDoFn ()).with_outputs () # applies method process in ParseRowDoFn to each row
146
146
)
147
147
148
148
valid_inputs = processed_input [None ] # main output
149
149
invalid_times = processed_input [ParseRowDoFn .OUTPUT_TAG_INVALID ] # secondary output: list of invalid insertion times
150
150
151
- # PIPELINE BRANCH 1: count distinct values with min/max insertion time; filter 10 most frequent values for each column
151
+ # PIPELINE BRANCH 1: count distinct values with min/max insertion time; filter 10 most frequent values for each column.
152
152
# This performs the equivalent of:
153
153
# select
154
154
# col, vtype, value
155
155
# count(*),
156
- # min(record_insertion_time), # first time when value has appeared in this column
156
+ # min(record_insertion_time), # first time when value has appeared in column col
157
157
# max(record_insertion_time)
158
158
# from
159
- # {source data}
159
+ # valid_inputs
160
160
# group by
161
161
# col, vtype, value
162
162
# Inspired by: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/combiners.py
@@ -180,9 +180,9 @@ def merge_accumulators(self, accumulators):
180
180
def extract_output (self , accumulator ):
181
181
return accumulator
182
182
183
- # Input: ((col, vtype, value), record_insertion_time)
183
+ # Input: elements in valid_inputs, which have the form ((col, vtype, value), record_insertion_time)
184
184
# CountMinMaxCombineFn is called for each (col, vtype, value) and operates on element = record_insertion_time.
185
- # Output: ((col, vtype, value), (count , min_record_insertion_time, max_record_insertion_time))
185
+ # Output: ((col, vtype, value), (counts , min_record_insertion_time, max_record_insertion_time))
186
186
# (the output is grouped by key = (col, vtype, value))
187
187
distinct_values = (valid_inputs
188
188
| 'value_counts' >> beam .CombinePerKey (CountMinMaxCombineFn ())
@@ -239,8 +239,8 @@ def merge_valuecounts(x):
239
239
def remap_for_filter (x ): # input: ((col, vtype, value), (counts, mintime, maxtime))
240
240
ctv , values = x
241
241
return (ctv [0 ], ctv [1 ]), (values [0 ], (ctv [2 ], values [1 ], values [2 ])) # <--- beam.combiners.Top.LargestPerKey requires arguments
242
- # to be provided in this form (in order to filter largest
243
- # by counts):
242
+ # to be provided in this form
243
+ # (in order to filter largest by counts):
244
244
# ((col, vtype), (counts, (value, mindate, maxdate)))
245
245
246
246
top_values = (distinct_values
@@ -260,7 +260,7 @@ def format_top_values(x):
260
260
result += header + value + '\t ' + unicode (counts )+ '\t ' + unicode (mindate )+ '\t ' + unicode (maxdate )+ '\r \n '
261
261
result = result [:- 2 ] # remove last \r\n
262
262
return result
263
- # result looks like this (for one single input)
263
+ # result looks like this (for one single input x): top 10 values
264
264
# col vtype value counts1 mindate1 maxdate1
265
265
# col vtype value counts2 mindate2 maxdate2
266
266
# ...
@@ -271,15 +271,15 @@ def format_top_values(x):
271
271
| 'save_top_values' >> beam .io .WriteToText (known_args .output_bucket + 'topvalues' )
272
272
)
273
273
274
- # PIPELINE BRANCH 2: count total and null values by period (= year/month in this case)
274
+ # PIPELINE BRANCH 2: count total and null values by period (= year/month in this case).
275
275
# Performs the equivalent of:
276
276
# select
277
277
# col,
278
278
# year-month,
279
279
# count(*),
280
280
# sum(if(value is null or value = '', 1, 0))
281
281
# from
282
- # {input data}
282
+ # valid_input
283
283
# group by
284
284
# col,
285
285
# year-month
0 commit comments