Skip to content

Commit

Permalink
[AIRFLOW-1579] Adds support for jagged rows in Bigquery hook for BQ l…
Browse files Browse the repository at this point in the history
…oad jobs

Closes apache#2582 from DannyLee12/master
  • Loading branch information
DannyLee12 authored and criccomini committed Sep 8, 2017
1 parent c2c5151 commit 5b978b2
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
9 changes: 9 additions & 0 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ def run_load(self,
max_bad_records=0,
quote_character=None,
allow_quoted_newlines=False,
allow_jagged_rows=False,
schema_update_options=(),
src_fmt_configs={}):
"""
Expand Down Expand Up @@ -429,6 +430,11 @@ def run_load(self,
:type quote_character: string
:param allow_quoted_newlines: Whether to allow quoted newlines (true) or not (false).
:type allow_quoted_newlines: boolean
:param allow_jagged_rows: Accept rows that are missing trailing optional columns.
The missing values are treated as nulls. If false, records with missing trailing columns
are treated as bad records, and if there are too many bad records, an invalid error is
returned in the job result. Only applicable when soure_format is CSV.
:type allow_jagged_rows: bool
:param schema_update_options: Allows the schema of the desitination
table to be updated as a side effect of the load job.
:type schema_update_options: list
Expand Down Expand Up @@ -527,6 +533,9 @@ def run_load(self,
if k in valid_configs}
configuration['load'].update(src_fmt_configs)

if allow_jagged_rows:
configuration['load']['allowJaggedRows'] = allow_jagged_rows

return self.run_with_configuration(configuration)

def run_with_configuration(self, configuration):
Expand Down
10 changes: 9 additions & 1 deletion airflow/contrib/operators/gcs_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(
max_bad_records=0,
quote_character=None,
allow_quoted_newlines=False,
allow_jagged_rows=False,
max_id_key=None,
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='google_cloud_storage_default',
Expand Down Expand Up @@ -93,6 +94,11 @@ def __init__(
:type quote_character: string
:param allow_quoted_newlines: Whether to allow quoted newlines (true) or not (false).
:type allow_quoted_newlines: boolean
:param allow_jagged_rows: Accept rows that are missing trailing optional columns.
The missing values are treated as nulls. If false, records with missing trailing columns
are treated as bad records, and if there are too many bad records, an invalid error is
returned in the job result. Only applicable to CSV, ignored for other formats.
:type allow_jagged_rows: bool
:param max_id_key: If set, the name of a column in the BigQuery table
that's to be loaded. Thsi will be used to select the MAX value from
BigQuery after the load occurs. The results will be returned by the
Expand All @@ -109,7 +115,7 @@ def __init__(
work, the service account making the request must have domain-wide
delegation enabled.
:type delegate_to: string
:param schema_update_options: Allows the schema of the desitination
:param schema_update_options: Allows the schema of the desitination
table to be updated as a side effect of the load job.
:type schema_update_options: list
:param src_fmt_configs: configure optional fields specific to the source format
Expand All @@ -133,6 +139,7 @@ def __init__(
self.max_bad_records = max_bad_records
self.quote_character = quote_character
self.allow_quoted_newlines = allow_quoted_newlines
self.allow_jagged_rows = allow_jagged_rows

self.max_id_key = max_id_key
self.bigquery_conn_id = bigquery_conn_id
Expand Down Expand Up @@ -173,6 +180,7 @@ def execute(self, context):
max_bad_records=self.max_bad_records,
quote_character=self.quote_character,
allow_quoted_newlines=self.allow_quoted_newlines,
allow_jagged_rows=self.allow_jagged_rows,
schema_update_options=self.schema_update_options,
src_fmt_configs=self.src_fmt_configs)

Expand Down

0 comments on commit 5b978b2

Please sign in to comment.