Skip to content

Commit 5fe5c06

Browse files
committed
update cells iterator to handle repeated records
Change-Id: I368b9ec7737e6cad018728b329d636de0a678816
1 parent f5b02ca commit 5fe5c06

File tree

4 files changed

+285
-37
lines changed

4 files changed

+285
-37
lines changed

core/bigquery.py

Lines changed: 67 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
from google.cloud.bigquery_storage import ReadSession
2929
from google.cloud.exceptions import NotFound
3030

31-
from .auth import Credentials
31+
from core.auth import Credentials
32+
from core.helpers import parse_column_path
3233

3334
BQ_SCOPES = ['https://www.googleapis.com/auth/bigquery']
3435

@@ -189,7 +190,6 @@ def get_readrows_iterator(
189190
read_session=requested_session,
190191
max_stream_count=1,
191192
)
192-
193193
# Use 0th stream because because max_stream_count=1
194194
stream_name = session.streams[0].name
195195

@@ -201,38 +201,69 @@ def get_readrows_iterator(
201201
return cast(Iterable[Mapping], rows)
202202

203203

204-
def get_cells_iterator(bq_read_client: BigQueryReadClient,
205-
table_metadata: TableMetadata,
206-
column: str) -> Generator[Any, None, None]:
207-
"""
208-
Get an Iterator of cell values with the requested columns of the table,
209-
using an authenticated BigQuery Storage API client.
210-
211-
Note: Does support nested columns.
212-
213-
Args:
214-
* bq_read_client: BigQuery Storage API Read client
215-
* table_metadata: TableMetadata object
216-
* column : Column name to select
217-
218-
Returns:
219-
* Iterator of cell values
220-
"""
221-
nested_columns = column.split('.')
222-
parent_column = nested_columns[0]
223-
224-
rows = get_readrows_iterator(bq_read_client,
225-
table_metadata, [parent_column],
226-
data_format=DataFormat.AVRO)
227-
228-
for row in rows:
229-
value = row
230-
for nested_column in nested_columns:
231-
try:
232-
value = value[nested_column]
233-
except KeyError:
234-
raise KeyError(f'{nested_column} was not found.')
235-
yield value
204+
def get_cells_iterator(
205+
bq_read_client: BigQueryReadClient,
206+
table_metadata: TableMetadata,
207+
column: str,
208+
) -> Generator[Any, None, None]:
209+
"""Retrieves an iterator of cell values for a specified column, optimized
210+
for both simple and nested column
211+
212+
access, including handling special value structures with dynamic value types
213+
for nested columns.
214+
215+
Args:
216+
bq_read_client (BigQueryReadClient): The BigQuery Storage API Read client.
217+
table_metadata (TableMetadata): The table's metadata.
218+
column (str): The column name, supporting nested fields and array indices
219+
for complex cases.
220+
221+
Returns:
222+
Generator[Any, None, None]: An iterator over cell values.
223+
"""
224+
# Check if the column path indicates a simple column access
225+
if "." not in column and "[" not in column:
226+
# Optimize data retrieval for simple column access
227+
rows = get_readrows_iterator(bq_read_client,
228+
table_metadata, [column],
229+
data_format=DataFormat.AVRO)
230+
for row in rows:
231+
yield row.get(column)
232+
else:
233+
# Handle nested and special value structures as before
234+
nested_columns = parse_column_path(column)
235+
parent_column = nested_columns[0][0]
236+
237+
rows = get_readrows_iterator(
238+
bq_read_client,
239+
table_metadata,
240+
[parent_column],
241+
data_format=DataFormat.AVRO,
242+
)
243+
244+
for current_value in rows:
245+
for column_name, key in nested_columns:
246+
if isinstance(current_value, dict):
247+
current_value = current_value.get(column_name)
248+
elif isinstance(current_value, list) and key:
249+
current_value = next(
250+
(item for item in current_value
251+
if item.get(column_name) == key),
252+
None,
253+
)
254+
255+
if isinstance(current_value, dict) and "value" in current_value:
256+
extracted_value = next(
257+
(value for key, value in current_value["value"].items()
258+
if value is not None),
259+
None,
260+
)
261+
current_value = (extracted_value if extracted_value
262+
is not None else current_value)
263+
264+
if current_value is None:
265+
break
266+
yield current_value
236267

237268

238269
def get_table(bq_legacy_client: BigQueryLegacyClient,
@@ -293,8 +324,8 @@ def upload_rows(bq_legacy_client: BigQueryLegacyClient,
293324
* List of errors, if any
294325
"""
295326
if len(rows) > 0:
296-
result = bq_legacy_client.insert_rows_json(
297-
table_metadata.full_table_id, rows)
327+
result = bq_legacy_client.insert_rows_json(table_metadata.full_table_id,
328+
rows)
298329
# result is empty if no errors occurred
299330
for row in result:
300331
# Note:

core/helpers.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
See the License for the specific language governing permissions and
1414
limitations under the License.
1515
"""
16+
import re
1617
from typing import Any, Callable, Generic, List, NoReturn, TypeVar, Union
1718

1819

@@ -30,6 +31,33 @@ def get_function_name(function: Any) -> str:
3031
return str(function.__qualname__.split('.')[0])
3132

3233

34+
PATTERN = re.compile(r"(^.*?)\[([^\[\]]*?)\](.*?)$")
35+
36+
37+
def parse_column_path(column: str) -> list:
38+
"""Parse the column string to extract nested fields and array indices.
39+
40+
Args:
41+
column (str): The column string with potential nested fields and array
42+
indices.
43+
44+
Returns:
45+
list: A list of tuples, where each tuple contains the column name and the
46+
key/index.
47+
"""
48+
nested_columns = []
49+
for col in column.split("."):
50+
parts = PATTERN.match(col)
51+
if parts:
52+
column_name, key = parts.groups()[0], parts.groups()[1]
53+
else:
54+
column_name, key = col, None
55+
if not column_name:
56+
raise ValueError(f"Invalid column path: {column}")
57+
nested_columns.append((column_name, key))
58+
return nested_columns
59+
60+
3361
T = TypeVar('T')
3462
FlushFunction = Callable[[List[T]], Any]
3563

docs/config.md

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ An example is provided in `deployment/config_template.json`, and below:
8787
* `table_name`: BigQuery table name
8888

8989
* BigQuery table `columns`: (mapping)
90-
* key: BigQuery table column name
90+
* key: BigQuery table column name. Supports nested (Repeated and Nullable records BQ data type) columns. More details [here]()
9191
* value:
9292
* `parser`: [Parser name](#parsers)
9393
* `rules`: (list)
@@ -112,6 +112,40 @@ Advanced:
112112
* Grant Service Account Token Creator (`roles/iam.serviceAccountTokenCreator`) to the DQM Service Account.
113113
* Specify `service_account_email` in your config.
114114

115+
### Nested column logic for column name
116+
117+
The DQM configuration file allows you to target specific columns within complex data structures in your BigQuery tables. Here's how to handle nested columns and extract values from arrays of structs:
118+
119+
**1. Nested Columns**
120+
121+
* Use dot notation to reference columns within nested structures (STRUCTs in BigQuery).
122+
123+
**Example:**
124+
125+
```
126+
column2.nested_column1
127+
```
128+
129+
* This targets `nested_column1` within the `column2` struct.
130+
131+
**2. Key-Value Logic within Arrays**
132+
133+
* Employ the `key[some_value]` syntax to extract values from arrays of structs where the elements have a `key` field.
134+
135+
**Example:**
136+
137+
```
138+
event_params.key[ga_session_id]
139+
```
140+
141+
* This targets the `value` where the corresponding `key` is "ga_session_number" within the `event_params` array.
142+
143+
**Key Points:**
144+
145+
* **Matching Logic:** The `key[some_value]` syntax will extract the **first** matching value within the array.
146+
* **Parsers:** Choose parsers that are compatible with the data type of the values you are extracting (e.g., `parse_int` for integers, `parse_str` for strings).
147+
* **Custom Rules:** For more complex logic or to process multiple values with the same key, you can write custom Python rules.
148+
115149
## Parsers
116150

117151
DQM always treats values from BigQuery as non-typed, i.e. the required Type

test/core/test_bigquery.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
from collections.abc import Iterable
1818
import os
1919
import unittest
20+
from unittest.mock import MagicMock
21+
from unittest.mock import patch
2022

2123
from google.cloud.bigquery import Client as BigQueryLegacyClient
2224
from google.cloud.bigquery_storage import BigQueryReadClient
@@ -27,6 +29,7 @@
2729
from core.auth import OAuthCredentials
2830
from core.bigquery import get_bq_legacy_client
2931
from core.bigquery import get_bq_read_client
32+
from core.bigquery import get_cells_iterator
3033
from core.bigquery import get_readrows_iterator
3134
from core.bigquery import TableMetadata
3235

@@ -92,3 +95,155 @@ def test_get_readrows_iterator(self):
9295
table_name=os.environ['TEST_TABLE_NAME'])
9396
rows = get_readrows_iterator(self.bqs_client, table_metadata)
9497
self.assertTrue(isinstance(rows, Iterable))
98+
99+
100+
class TestGetCellsIterator(unittest.TestCase):
101+
102+
@patch('core.bigquery.get_readrows_iterator')
103+
def test_get_cells_iterator_single_column(self, mock_get_readrows_iterator):
104+
mock_get_readrows_iterator.return_value = iter([
105+
{
106+
"column_name": 10
107+
},
108+
{
109+
"column_name": 20
110+
},
111+
])
112+
mock_table_metadata = TableMetadata(project_id="test-project",
113+
dataset_id="test-dataset",
114+
table_name="test-table")
115+
result = list(
116+
get_cells_iterator(MagicMock(spec=BigQueryReadClient),
117+
mock_table_metadata, "column_name"))
118+
self.assertEqual(result, [10, 20])
119+
120+
@patch('core.bigquery.get_readrows_iterator')
121+
def test_get_cells_iterator_nested_column(self, mock_get_readrows_iterator):
122+
mock_get_readrows_iterator.return_value = iter([
123+
{
124+
"c": {
125+
"nested": {
126+
"column_name": "value1"
127+
}
128+
}
129+
},
130+
{
131+
"c": {
132+
"nested": {
133+
"column_name": "value2"
134+
}
135+
}
136+
},
137+
])
138+
mock_table_metadata = TableMetadata(project_id="test-project",
139+
dataset_id="test-dataset",
140+
table_name="test-table")
141+
result = list(
142+
get_cells_iterator(MagicMock(spec=BigQueryReadClient),
143+
mock_table_metadata, "c.nested.column_name"))
144+
self.assertEqual(result, ["value1", "value2"])
145+
146+
@patch('core.bigquery.get_readrows_iterator')
147+
def test_get_cells_iterator_extract_special_value(
148+
self, mock_get_readrows_iterator):
149+
mock_data = [{
150+
"event_params": [{
151+
"key": "ga_session_number",
152+
"value": {
153+
'string_value': None,
154+
'int_value': 1,
155+
'float_value': None,
156+
'double_value': None
157+
}
158+
},]
159+
}]
160+
mock_get_readrows_iterator.return_value = iter(mock_data)
161+
mock_table_metadata = TableMetadata(project_id="test-project",
162+
dataset_id="test-dataset",
163+
table_name="test-table")
164+
result = list(
165+
get_cells_iterator(MagicMock(spec=BigQueryReadClient),
166+
mock_table_metadata,
167+
"event_params.key[ga_session_number]"))
168+
self.assertEqual(result, [1])
169+
mock_get_readrows_iterator.assert_called_once()
170+
171+
@patch('core.bigquery.get_readrows_iterator')
172+
def test_get_cells_iterator_extract_double_nested_value(
173+
self, mock_get_readrows_iterator):
174+
mock_data = [{
175+
"event_params": [{
176+
"key": "ga_session_number",
177+
"value": {
178+
'string_value': {
179+
"deep_column": 1
180+
}
181+
}
182+
},]
183+
}]
184+
mock_get_readrows_iterator.return_value = iter(mock_data)
185+
mock_table_metadata = TableMetadata(project_id="test-project",
186+
dataset_id="test-dataset",
187+
table_name="test-table")
188+
result = list(
189+
get_cells_iterator(
190+
MagicMock(spec=BigQueryReadClient), mock_table_metadata,
191+
"event_params.key[ga_session_number].deep_column"))
192+
self.assertEqual(result, [1])
193+
mock_get_readrows_iterator.assert_called_once()
194+
195+
@patch('core.bigquery.get_readrows_iterator')
196+
def test_get_cells_iterator_key_not_found(self, mock_get_readrows_iterator):
197+
mock_data = [
198+
{
199+
"event_params": [{
200+
"key": "some_other_key",
201+
"value": {
202+
'int_value': 2
203+
}
204+
}]
205+
},
206+
]
207+
mock_get_readrows_iterator.return_value = iter(mock_data)
208+
mock_table_metadata = TableMetadata(project_id="test-project",
209+
dataset_id="test-dataset",
210+
table_name="test-table")
211+
result = list(
212+
get_cells_iterator(MagicMock(spec=BigQueryReadClient),
213+
mock_table_metadata,
214+
"event_params.key[non_existent_key]"))
215+
self.assertEqual(result, [None])
216+
mock_get_readrows_iterator.assert_called_once()
217+
218+
@patch('core.bigquery.get_readrows_iterator')
219+
def test_get_cells_iterator_return_full_record(self,
220+
mock_get_readrows_iterator):
221+
# Simulate data where the specified key is not present
222+
mock_data = [
223+
{
224+
"event_params": [{
225+
"key": "some_other_key",
226+
"value": {
227+
'int_value': 2
228+
}
229+
}]
230+
},
231+
]
232+
mock_get_readrows_iterator.return_value = iter(mock_data)
233+
mock_table_metadata = TableMetadata(project_id="test-project",
234+
dataset_id="test-dataset",
235+
table_name="test-table")
236+
result = list(
237+
get_cells_iterator(MagicMock(spec=BigQueryReadClient),
238+
mock_table_metadata, "event_params"))
239+
self.assertEqual(result, [[{
240+
"key": "some_other_key",
241+
"value": {
242+
'int_value': 2
243+
}
244+
}]])
245+
mock_get_readrows_iterator.assert_called_once()
246+
247+
248+
# TODO(psnel) add support for highly nested usecases alternating between
249+
# repeated and nullable structures.

0 commit comments

Comments
 (0)