Skip to content

Commit 57ddd41

Browse files
committed
Merge pull request #17 from aneilbaboo/master
Enable data import from Google Cloud Storage files
2 parents 10abd8e + 2fc643a commit 57ddd41

File tree

6 files changed

+503
-60
lines changed

6 files changed

+503
-60
lines changed

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,19 @@ rows = [
139139
inserted = client.push_rows('dataset', 'table', rows, 'id')
140140
```
141141

142+
# Import data from Google cloud storage
143+
```python
144+
schema = [ {"name": "username", "type": "string", "mode": "nullable"} ]
145+
job = client.import_data_from_uris( ['gs://mybucket/mydata.json'],
146+
'dataset',
147+
'table',
148+
schema,
149+
source_format=JOB_SOURCE_FORMAT_JSON)
150+
151+
job = client.wait_for_job(job, timeout=60)
152+
print(job)
153+
```
154+
142155
# Managing Datasets
143156

144157
The client provides an API for listing, creating, deleting, updating and patching datasets.

bigquery/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,3 @@
33

44
logger = logging.getLogger('bigquery')
55
logger.setLevel(logging.DEBUG)
6-

bigquery/client.py

Lines changed: 204 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import calendar
22
from collections import defaultdict
33
from datetime import datetime
4+
from time import sleep, time
5+
from hashlib import sha256
46
import json
57

68
from apiclient.discovery import build
@@ -11,9 +13,21 @@
1113
from bigquery.errors import UnfinishedQueryException
1214
from bigquery.schema_builder import schema_from_record
1315

16+
1417
BIGQUERY_SCOPE = 'https://www.googleapis.com/auth/bigquery'
1518
BIGQUERY_SCOPE_READ_ONLY = 'https://www.googleapis.com/auth/bigquery.readonly'
1619

20+
JOB_CREATE_IF_NEEDED = 'CREATE_IF_NEEDED'
21+
JOB_CREATE_NEVER = 'CREATE_NEVER'
22+
JOB_SOURCE_FORMAT_NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON'
23+
JOB_SOURCE_FORMAT_DATASTORE_BACKUP = 'DATASTORE_BACKUP'
24+
JOB_SOURCE_FORMAT_CSV = 'CSV'
25+
JOB_WRITE_TRUNCATE = 'WRITE_TRUNCATE'
26+
JOB_WRITE_APPEND = 'WRITE_APPEND'
27+
JOB_WRITE_EMPTY = 'WRITE_EMPTY'
28+
JOB_ENCODING_UTF_8 = 'UTF-8'
29+
JOB_ENCODING_ISO_8859_1 = 'ISO-8859-1'
30+
1731

1832
def get_client(project_id, credentials=None, service_account=None,
1933
private_key=None, readonly=True):
@@ -67,11 +81,11 @@ def _get_bq_service(credentials=None, service_account=None, private_key=None,
6781
def _credentials():
6882
"""Import and return SignedJwtAssertionCredentials class"""
6983
from oauth2client.client import SignedJwtAssertionCredentials
84+
7085
return SignedJwtAssertionCredentials
7186

7287

7388
class BigQueryClient(object):
74-
7589
def __init__(self, bq_service, project_id):
7690
self.bigquery = bq_service
7791
self.project_id = project_id
@@ -309,6 +323,191 @@ def get_tables(self, dataset_id, app_id, start_time, end_time):
309323

310324
return self._filter_tables_by_time(app_tables, start_time, end_time)
311325

326+
def import_data_from_uris(
327+
self,
328+
source_uris,
329+
dataset,
330+
table,
331+
schema=None,
332+
job=None,
333+
source_format=None,
334+
create_disposition=None,
335+
write_disposition=None,
336+
encoding=None,
337+
ignore_unknown_values=None,
338+
max_bad_records=None,
339+
allow_jagged_rows=None,
340+
allow_quoted_newlines=None,
341+
field_delimiter=None,
342+
quote=None,
343+
skip_leading_rows=None,
344+
):
345+
"""
346+
Imports data into a BigQuery table from cloud storage.
347+
Args:
348+
source_uris: required string or list of strings representing
349+
the uris on cloud storage of the form:
350+
gs://bucket/filename
351+
dataset: required string id of the dataset
352+
table: required string id of the table
353+
job: optional string identifying the job (a unique jobid
354+
is automatically generated if not provided)
355+
schema: optional list representing the bigquery schema
356+
source_format: optional string
357+
(one of the JOB_SOURCE_FORMAT_* constants)
358+
create_disposition: optional string
359+
(one of the JOB_CREATE_* constants)
360+
write_disposition: optional string
361+
(one of the JOB_WRITE_* constants)
362+
encoding: optional string default
363+
(one of the JOB_ENCODING_* constants)
364+
ignore_unknown_values: optional boolean
365+
max_bad_records: optional boolean
366+
allow_jagged_rows: optional boolean for csv only
367+
allow_quoted_newlines: optional boolean for csv only
368+
field_delimiter: optional string for csv only
369+
quote: optional string the quote character for csv only
370+
skip_leading_rows: optional int for csv only
371+
372+
Optional arguments with value None are determined by
373+
BigQuery as described:
374+
https://developers.google.com/bigquery/docs/reference/v2/jobs
375+
376+
Returns:
377+
dict, a BigQuery job resource or None on failure
378+
"""
379+
source_uris = source_uris if isinstance(source_uris, list) \
380+
else [source_uris]
381+
382+
configuration = {
383+
"destinationTable": {
384+
"projectId": self.project_id,
385+
"tableId": table,
386+
"datasetId": dataset
387+
},
388+
"sourceUris": source_uris,
389+
}
390+
391+
if max_bad_records:
392+
configuration['maxBadRecords'] = max_bad_records
393+
394+
if ignore_unknown_values:
395+
configuration['ignoreUnknownValues'] = ignore_unknown_values
396+
397+
if create_disposition:
398+
configuration['createDisposition'] = create_disposition
399+
400+
if write_disposition:
401+
configuration['writeDisposition'] = write_disposition
402+
403+
if encoding:
404+
configuration['encoding'] = encoding
405+
406+
if schema:
407+
configuration['schema'] = schema
408+
409+
if source_format:
410+
configuration['sourceFormat'] = source_format
411+
412+
if not job:
413+
hex = sha256(":".join(source_uris) + str(time())).hexdigest()
414+
job = "{dataset}-{table}-{digest}".format(
415+
dataset=dataset,
416+
table=table,
417+
digest=hex
418+
)
419+
420+
if source_format == JOB_SOURCE_FORMAT_CSV:
421+
if field_delimiter:
422+
configuration['fieldDelimiter'] = field_delimiter
423+
424+
if allow_jagged_rows:
425+
configuration['allowJaggedRows'] = allow_jagged_rows
426+
427+
if allow_quoted_newlines:
428+
configuration['allowQuotedNewlines'] = allow_quoted_newlines
429+
430+
if quote:
431+
configuration['quote'] = quote
432+
433+
if skip_leading_rows:
434+
configuration['skipLeadingRows'] = skip_leading_rows
435+
436+
elif field_delimiter or allow_jagged_rows \
437+
or allow_quoted_newlines or quote or skip_leading_rows:
438+
all_values = dict(field_delimiter=field_delimiter,
439+
allow_jagged_rows=allow_jagged_rows,
440+
allow_quoted_newlines=allow_quoted_newlines,
441+
skip_leading_rows=skip_leading_rows,
442+
quote=quote)
443+
non_null_values = dict((k, v) for k, v
444+
in all_values.items()
445+
if v)
446+
raise Exception("Parameters field_delimiter, allow_jagged_rows, "
447+
"allow_quoted_newlines, quote and "
448+
"skip_leading_rows are only allowed when "
449+
"source_format=JOB_SOURCE_FORMAT_CSV: %s"
450+
% non_null_values)
451+
452+
body = {
453+
"configuration": {
454+
'load': configuration
455+
},
456+
"jobReference": {
457+
"projectId": self.project_id,
458+
"jobId": job
459+
}
460+
}
461+
462+
try:
463+
logger.info("Creating load job %s" % body)
464+
job_resource = self.bigquery.jobs() \
465+
.insert(projectId=self.project_id, body=body) \
466+
.execute()
467+
return job_resource
468+
except Exception, e:
469+
logger.error("Failed while starting uri import job: {0}"
470+
.format(e))
471+
return None
472+
473+
def wait_for_job(self, job, interval=5, timeout=None):
474+
"""
475+
Waits until the job indicated by job_resource is done or has failed
476+
Args:
477+
job: dict, representing a BigQuery job resource or jobId
478+
interval: optional float polling interval in seconds, default = 5
479+
timeout: optional float timeout in seconds, default = None
480+
Returns:
481+
dict, final state of the job_resource, as described here:
482+
https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/python/latest/bigquery_v2.jobs.html#get
483+
Raises:
484+
standard exceptions on http / auth failures (you must retry)
485+
"""
486+
if isinstance(job, dict): # job is a job resource
487+
complete = job.get('jobComplete')
488+
job_id = job['jobReference']['jobId']
489+
else: # job is the jobId
490+
complete = False
491+
job_id = job
492+
job_resource = None
493+
494+
start_time = time()
495+
elapsed_time = 0
496+
while not (complete
497+
or (timeout is not None and elapsed_time > timeout)):
498+
sleep(interval)
499+
request = self.bigquery.jobs().get(projectId=self.project_id,
500+
jobId=job_id)
501+
job_resource = request.execute()
502+
error = job_resource.get('error')
503+
if error:
504+
raise Exception("{message} ({code}). Errors: {errors}",
505+
**error)
506+
complete = job_resource.get('status').get('state') == u'DONE'
507+
elapsed_time = time() - start_time
508+
509+
return job_resource
510+
312511
def push_rows(self, dataset, table, rows, insert_id_key=None):
313512
"""Upload rows to BigQuery table.
314513
@@ -471,8 +670,8 @@ def _in_range(self, start_time, end_time, time):
471670
ONE_MONTH = 2764800 # 32 days
472671

473672
return start_time <= time <= end_time or \
474-
time <= start_time <= time + ONE_MONTH or \
475-
time <= end_time <= time + ONE_MONTH
673+
time <= start_time <= time + ONE_MONTH or \
674+
time <= end_time <= time + ONE_MONTH
476675

477676
def _get_query_results(self, job_collection, project_id, job_id,
478677
offset=None, limit=None):
@@ -594,8 +793,8 @@ def create_dataset(self, dataset_id, friendly_name=None, description=None,
594793
datasets.insert(projectId=self.project_id,
595794
body=dataset_data).execute()
596795
return True
597-
except:
598-
logger.error('Cannot create dataset %s' % dataset_id)
796+
except Exception, e:
797+
logger.error('Cannot create dataset %s, %s' % (dataset_id, e))
599798
return False
600799

601800
def get_datasets(self):

bigquery/schema_builder.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,3 @@ def bigquery_type(o, timestamp_parser=default_timestamp_parser):
102102
else:
103103
raise Exception(
104104
"Invalid object for BigQuery schema: {0} ({1}".format(o, t))
105-

0 commit comments

Comments
 (0)