-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TDL-19835: Adding new streams and discovery mode #38
base: master
Are you sure you want to change the base?
Changes from 7 commits
1522ee4
8112ab6
45a53aa
6be4e77
41f35f9
959eb09
bf0b898
251ba65
58cf292
332f3cf
a24dc67
c39f276
3974762
6a294ae
d2e9edf
1bc7d2a
426f555
485b2cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,56 +1,19 @@ | ||||||
from asyncio.log import logger | ||||||
from singer.catalog import Catalog, CatalogEntry, Schema | ||||||
import os | ||||||
import json | ||||||
import singer | ||||||
from singer import metadata | ||||||
from singer.catalog import Catalog, CatalogEntry, Schema | ||||||
from tap_braintree.streams import STREAMS | ||||||
from tap_braintree.schema import get_schemas | ||||||
|
||||||
LOGGER = singer.get_logger() | ||||||
|
||||||
def get_abs_path(path): | ||||||
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) | ||||||
|
||||||
def get_schemas(): | ||||||
schemas = {} | ||||||
field_metadata = {} | ||||||
|
||||||
for stream_name, stream_metadata in STREAMS.items(): | ||||||
|
||||||
schema_path = get_abs_path("schemas/{}.json".format(stream_name)) | ||||||
with open(schema_path,"r") as file: | ||||||
schema = json.load(file) | ||||||
schemas[stream_name] = schema | ||||||
|
||||||
mdata = metadata.new() | ||||||
|
||||||
mdata = metadata.get_standard_metadata( | ||||||
schema=schema, | ||||||
key_properties=stream_metadata.key_properties, | ||||||
valid_replication_keys=stream_metadata.replication_keys, | ||||||
replication_method=stream_metadata.replication_method, | ||||||
) | ||||||
|
||||||
mdata = metadata.to_map(mdata) | ||||||
# Loop through all keys and make replication keys of automatic inclusion | ||||||
for field_name in schema["properties"].keys(): | ||||||
|
||||||
if stream_metadata.replication_keys and field_name in stream_metadata.replication_keys: | ||||||
mdata = metadata.write( | ||||||
mdata, ("properties", field_name), "inclusion", "automatic", | ||||||
) | ||||||
|
||||||
mdata = metadata.to_list(mdata) | ||||||
field_metadata[stream_name] = mdata | ||||||
|
||||||
return schemas, field_metadata | ||||||
|
||||||
def discover(): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add function doc string for all the functions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added doc strings for all the fuctions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Divide this file into two files There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seperated discover.py's code into schema.py and discover.py |
||||||
""" | ||||||
Generate catalog for call the streams | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated doc strings |
||||||
""" | ||||||
|
||||||
schemas, field_metadata = get_schemas() | ||||||
catalog = Catalog([]) | ||||||
|
||||||
for stream_name, schema_dict in schemas.items(): | ||||||
|
||||||
schema = Schema.from_dict(schema_dict) | ||||||
mdata = field_metadata[stream_name] | ||||||
|
||||||
|
@@ -64,4 +27,4 @@ def discover(): | |||||
) | ||||||
) | ||||||
|
||||||
return catalog | ||||||
return catalog |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
import os | ||
import json | ||
from singer import metadata | ||
from tap_braintree.streams import STREAMS | ||
|
||
def get_abs_path(path): | ||
""" | ||
Return full path for given argument relative path | ||
""" | ||
|
||
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) | ||
|
||
def get_schemas(): | ||
""" | ||
Return metadata and schema for all the streams | ||
""" | ||
|
||
schemas = {} | ||
field_metadata = {} | ||
|
||
for stream_name, stream_metadata in STREAMS.items(): | ||
|
||
schema_path = get_abs_path("schemas/{}.json".format(stream_name)) | ||
with open(schema_path,"r") as file: | ||
schema = json.load(file) | ||
schemas[stream_name] = schema | ||
|
||
mdata = metadata.new() | ||
|
||
mdata = metadata.get_standard_metadata( | ||
schema=schema, | ||
key_properties=stream_metadata.key_properties, | ||
valid_replication_keys=stream_metadata.replication_keys, | ||
replication_method=stream_metadata.replication_method, | ||
) | ||
|
||
mdata = metadata.to_map(mdata) | ||
# Loop through all keys and make replication keys of automatic inclusion | ||
for field_name in schema["properties"].keys(): | ||
|
||
if stream_metadata.replication_keys and field_name in stream_metadata.replication_keys: | ||
mdata = metadata.write( | ||
mdata, ("properties", field_name), "inclusion", "automatic", | ||
) | ||
|
||
mdata = metadata.to_list(mdata) | ||
field_metadata[stream_name] = mdata | ||
|
||
return schemas, field_metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated logger messages