forked from airbytehq/airbyte-python-cdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdpath_extractor.py
109 lines (93 loc) · 3.84 KB
/
dpath_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, MutableMapping, Union
import dpath
import requests
from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
from airbyte_cdk.sources.declarative.extractors.record_extractor import (
SERVICE_KEY_PREFIX,
RecordExtractor,
)
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config
# The name of the service field to bind the response (root) in each record
RECORD_ROOT_KEY = SERVICE_KEY_PREFIX + "root"
def update_record(record: Any, root: Any) -> Any:
if isinstance(record, dict):
copy = {k: v for k, v in record.items()}
copy.update({RECORD_ROOT_KEY: root})
else:
copy = record
return copy
@dataclass
class DpathExtractor(RecordExtractor):
"""
Record extractor that searches a decoded response over a path defined as an array of fields.
If the field path points to an array, that array is returned.
If the field path points to an object, that object is returned wrapped as an array.
If the field path points to an empty object, an empty array is returned.
If the field path points to a non-existing path, an empty array is returned.
Examples of instantiating this transform:
```
extractor:
type: DpathExtractor
field_path:
- "root"
- "data"
```
```
extractor:
type: DpathExtractor
field_path:
- "root"
- "{{ parameters['field'] }}"
```
```
extractor:
type: DpathExtractor
field_path: []
```
Attributes:
field_path (Union[InterpolatedString, str]): Path to the field that should be extracted
config (Config): The user-provided configuration as specified by the source's spec
decoder (Decoder): The decoder responsible to transfom the response in a Mapping
"""
field_path: List[Union[InterpolatedString, str]]
config: Config
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path = [
InterpolatedString.create(path, parameters=parameters) for path in self.field_path
]
for path_index in range(len(self.field_path)):
if isinstance(self.field_path[path_index], str):
self._field_path[path_index] = InterpolatedString.create(
self.field_path[path_index], parameters=parameters
)
def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
for body in self.decoder.decode(response):
if body == {}:
# An empty/invalid JSON parsed, keep the contract
yield {}
else:
root_response = body
if len(self._field_path) == 0:
extracted = body
else:
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
extracted = dpath.values(body, path)
else:
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
if isinstance(extracted, list):
for record in extracted:
yield update_record(record, root_response)
elif isinstance(extracted, dict):
yield update_record(extracted, root_response)
elif extracted:
yield extracted
else:
yield from []