Skip to content

Commit 612da17

Browse files
committed
Merge pull request #59 from pirsquare/add-query-paging
Support reading query results with multiple pages and other dataset helper methods
2 parents b787883 + ca408f8 commit 612da17

File tree

3 files changed

+139
-34
lines changed

3 files changed

+139
-34
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,9 @@ client.update_dataset('mydataset', friendly_name="mon Dataset") # description is
225225

226226
# Patch dataset
227227
client.patch_dataset('mydataset', friendly_name="mon Dataset") # friendly_name changed; description is preserved
228+
229+
# Check if dataset exists.
230+
exists = client.check_dataset('mydataset')
228231
```
229232

230233
# Creating a schema from a sample record

bigquery/client.py

Lines changed: 56 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ def __init__(self, bq_service, project_id, swallow_results=True):
121121
self.cache = {}
122122

123123
def _submit_query_job(self, query_data):
124-
125124
""" Submit a query job to BigQuery.
126125
127126
This is similar to BigQueryClient.query, but gives the user
@@ -172,7 +171,6 @@ def _submit_query_job(self, query_data):
172171
return job_id, [self._transform_row(row, schema) for row in rows]
173172

174173
def _insert_job(self, body_object):
175-
176174
""" Submit a job to BigQuery
177175
178176
Direct proxy to the insert() method of the offical BigQuery
@@ -243,9 +241,7 @@ def get_query_schema(self, job_id):
243241
A list of dictionaries that represent the schema.
244242
"""
245243

246-
job_collection = self.bigquery.jobs()
247-
query_reply = self._get_query_results(
248-
job_collection, self.project_id, job_id, offset=0, limit=0)
244+
query_reply = self.get_query_results(job_id, offset=0, limit=0)
249245

250246
if not query_reply['jobComplete']:
251247
logging.warning('BigQuery job %s not complete' % job_id)
@@ -289,38 +285,72 @@ def check_job(self, job_id):
289285
included in the query table if it has completed.
290286
"""
291287

292-
job_collection = self.bigquery.jobs()
293-
query_reply = self._get_query_results(
294-
job_collection, self.project_id, job_id, offset=0, limit=0)
288+
query_reply = self.get_query_results(job_id, offset=0, limit=0)
295289

296290
return (query_reply.get('jobComplete', False),
297291
int(query_reply.get('totalRows', 0)))
298292

299-
def get_query_rows(self, job_id, offset=None, limit=None):
293+
def get_query_rows(self, job_id, offset=None, limit=None, timeout=0):
300294
"""Retrieve a list of rows from a query table by job id.
295+
This method will append results from multiple pages together. If you want
296+
to manually page through results, you can use `get_query_results`
297+
method directly.
301298
302299
Args:
303300
job_id: The job id that references a BigQuery query.
304301
offset: The offset of the rows to pull from BigQuery.
305302
limit: The number of rows to retrieve from a query table.
306-
303+
timeout: Timeout in seconds.
307304
Returns:
308305
A list of dictionaries that represent table rows.
309306
"""
310307

311-
job_collection = self.bigquery.jobs()
312-
query_reply = self._get_query_results(
313-
job_collection, self.project_id, job_id, offset=offset,
314-
limit=limit)
315-
308+
# Get query results
309+
query_reply = self.get_query_results(job_id, offset=offset, limit=limit, timeout=timeout)
316310
if not query_reply['jobComplete']:
317311
logging.warning('BigQuery job %s not complete' % job_id)
318312
raise UnfinishedQueryException()
319313

320-
schema = query_reply['schema']['fields']
314+
schema = query_reply["schema"]["fields"]
321315
rows = query_reply.get('rows', [])
316+
page_token = query_reply.get("pageToken")
317+
records = [self._transform_row(row, schema) for row in rows]
318+
319+
# Append to records if there are multiple pages for query results
320+
while page_token:
321+
query_reply = self.get_query_results(job_id, offset=offset, limit=limit,
322+
page_token=page_token, timeout=timeout)
323+
page_token = query_reply.get("pageToken")
324+
rows = query_reply.get('rows', [])
325+
records += [self._transform_row(row, schema) for row in rows]
326+
return records
327+
328+
def check_dataset(self, dataset_id):
329+
"""Check to see if a dataset exists.
330+
Args:
331+
dataset: dataset unique id
332+
Returns:
333+
bool indicating if the table exists.
334+
"""
335+
dataset = self.get_dataset(dataset_id)
336+
return bool(dataset)
322337

323-
return [self._transform_row(row, schema) for row in rows]
338+
def get_dataset(self, dataset_id):
339+
"""
340+
Retrieve a dataset if it exists, otherwise return an empty dict.
341+
Args:
342+
dataset: dataset unique id
343+
Returns:
344+
dictionary containing the dataset object if it exists, otherwise
345+
an empty dictionary
346+
"""
347+
try:
348+
dataset = self.bigquery.datasets().get(
349+
projectId=self.project_id, datasetId=dataset_id).execute()
350+
except HttpError:
351+
dataset = {}
352+
353+
return dataset
324354

325355
def check_table(self, dataset, table):
326356
"""Check to see if a table exists.
@@ -1039,27 +1069,28 @@ def _in_range(self, start_time, end_time, time):
10391069
time <= start_time <= time + ONE_MONTH or \
10401070
time <= end_time <= time + ONE_MONTH
10411071

1042-
def _get_query_results(self, job_collection, project_id, job_id,
1043-
offset=None, limit=None):
1044-
"""Execute the query job indicated by the given job id.
1072+
def get_query_results(self, job_id, offset=None, limit=None, page_token=None, timeout=0):
1073+
"""Execute the query job indicated by the given job id. This is direct mapping to
1074+
bigquery api https://cloud.google.com/bigquery/docs/reference/v2/jobs/getQueryResults
10451075
10461076
Args:
1047-
job_collection: The collection the job belongs to.
1048-
project_id: The project id of the table.
10491077
job_id: The job id of the query to check.
10501078
offset: The index the result set should start at.
10511079
limit: The maximum number of results to retrieve.
1052-
1080+
page_token: Page token, returned by a previous call, to request the next page of results.
1081+
timeout: Timeout in seconds.
10531082
Returns:
10541083
The query reply.
10551084
"""
10561085

1086+
job_collection = self.bigquery.jobs()
10571087
return job_collection.getQueryResults(
1058-
projectId=project_id,
1088+
projectId=self.project_id,
10591089
jobId=job_id,
10601090
startIndex=offset,
10611091
maxResults=limit,
1062-
timeoutMs=0).execute()
1092+
pageToken=page_token,
1093+
timeoutMs=timeout * 1000).execute()
10631094

10641095
def _transform_row(self, row, schema):
10651096
"""Apply the given schema to the given BigQuery data row.

bigquery/tests/test_client.py

Lines changed: 80 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,6 @@ def test_get_response(self):
358358
"""Ensure that the query is executed and the query reply is returned.
359359
"""
360360

361-
project_id = 'foo'
362361
job_id = 'bar'
363362

364363
mock_query_job = mock.Mock()
@@ -368,14 +367,15 @@ def test_get_response(self):
368367

369368
offset = 5
370369
limit = 10
370+
page_token = "token"
371+
timeout = 1
371372

372-
actual = self.client._get_query_results(self.mock_job_collection,
373-
project_id, job_id,
374-
offset, limit)
373+
actual = self.client.get_query_results(job_id, offset, limit, page_token, timeout)
375374

376375
self.mock_job_collection.getQueryResults.assert_called_once_with(
377-
timeoutMs=0, projectId=project_id, jobId=job_id,
378-
startIndex=offset, maxResults=limit)
376+
projectId=self.project_id, jobId=job_id, startIndex=offset,
377+
maxResults=limit, pageToken=page_token, timeoutMs=1000)
378+
379379
mock_query_job.execute.assert_called_once()
380380
self.assertEquals(actual, mock_query_reply)
381381

@@ -458,7 +458,7 @@ def test_transform_row_with_nested_repeated(self):
458458
self.assertEquals(actual, expected)
459459

460460

461-
@mock.patch('bigquery.client.BigQueryClient._get_query_results')
461+
@mock.patch('bigquery.client.BigQueryClient.get_query_results')
462462
class TestCheckJob(unittest.TestCase):
463463

464464
def setUp(self):
@@ -1175,7 +1175,7 @@ def test_not_inside_range(self):
11751175
}
11761176

11771177

1178-
@mock.patch('bigquery.client.BigQueryClient._get_query_results')
1178+
@mock.patch('bigquery.client.BigQueryClient.get_query_results')
11791179
class TestGetQuerySchema(unittest.TestCase):
11801180

11811181
def test_query_complete(self, get_query_mock):
@@ -1251,7 +1251,7 @@ def test_table_does_not_exist(self):
12511251
self.mock_tables.get.return_value.execute.assert_called_once_with()
12521252

12531253

1254-
@mock.patch('bigquery.client.BigQueryClient._get_query_results')
1254+
@mock.patch('bigquery.client.BigQueryClient.get_query_results')
12551255
class TestGetQueryRows(unittest.TestCase):
12561256

12571257
def test_query_complete(self, get_query_mock):
@@ -1281,6 +1281,77 @@ def test_query_complete(self, get_query_mock):
12811281
{'foo': 'abc', 'spider': 'xyz'}]
12821282
self.assertEquals(result_rows, expected_rows)
12831283

1284+
def test_query_complete_with_page_token(self, get_query_mock):
1285+
"""Ensure that get_query_rows works with page token."""
1286+
from bigquery.client import BigQueryClient
1287+
1288+
page_one_resp = {
1289+
"jobComplete": True,
1290+
"kind": "bigquery#getQueryResultsResponse",
1291+
"pageToken": "TOKEN_TO_PAGE_2",
1292+
"schema": {
1293+
"fields": [{
1294+
"name": "first_name",
1295+
"type": "STRING",
1296+
}, {
1297+
"name": "last_name",
1298+
"type": "STRING",
1299+
}]
1300+
},
1301+
"rows": [{
1302+
"f": [{
1303+
"v": "foo",
1304+
}, {
1305+
"v": "bar"
1306+
}]
1307+
}, {
1308+
"f": [{
1309+
"v": "abc",
1310+
}, {
1311+
"v": "xyz"
1312+
}]
1313+
}],
1314+
"totalRows": "4"
1315+
}
1316+
1317+
page_two_resp = {
1318+
"jobComplete": True,
1319+
"kind": "bigquery#getQueryResultsResponse",
1320+
"schema": {
1321+
"fields": [{
1322+
"name": "first_name",
1323+
"type": "STRING",
1324+
}, {
1325+
"name": "last_name",
1326+
"type": "STRING",
1327+
}]
1328+
},
1329+
"rows": [{
1330+
"f": [{
1331+
"v": "the",
1332+
}, {
1333+
"v": "beatles"
1334+
}]
1335+
}, {
1336+
"f": [{
1337+
"v": "monty",
1338+
}, {
1339+
"v": "python"
1340+
}]
1341+
}],
1342+
"totalRows": "4"
1343+
}
1344+
1345+
bq = BigQueryClient(mock.Mock(), 'project')
1346+
get_query_mock.side_effect = [page_one_resp, page_two_resp]
1347+
result_rows = bq.get_query_rows(job_id=123, offset=0, limit=0)
1348+
1349+
expected_rows = [{'first_name': 'foo', 'last_name': 'bar'},
1350+
{'first_name': 'abc', 'last_name': 'xyz'},
1351+
{'first_name': 'the', 'last_name': 'beatles'},
1352+
{'first_name': 'monty', 'last_name': 'python'}]
1353+
self.assertEquals(result_rows, expected_rows)
1354+
12841355
def test_query_incomplete(self, get_query_mock):
12851356
"""Ensure that get_query_rows handles scenarios where the query is not
12861357
finished.

0 commit comments

Comments
 (0)