diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..90846a1 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,32 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "tap-hubspot", + "type": "debugpy", + "request": "launch", + "program": "${workspaceRoot}/tap_hubspot/tap.py", + "console": "integratedTerminal", + "args": [ + "--config", + ".secrets/config.json", + "--state", + ".secrets/state.json" + ], + "env": { "PYTHONPATH": "${workspaceRoot}" }, + // Change this to false if you wish to debug and add breakpoints outside of your code e.g. the singer-sdk package + "justMyCode": false + }, + { + "name": "tap-hubspot full-refresh", + "type": "debugpy", + "request": "launch", + "program": "${workspaceRoot}/tap_hubspot/tap.py", + "console": "integratedTerminal", + "args": ["--config", ".secrets/config.json"], + "env": { "PYTHONPATH": "${workspaceRoot}" }, + // Change this to false if you wish to debug and add breakpoints outside of your code e.g. the singer-sdk package + "justMyCode": false + } + ] +} diff --git a/poetry.toml b/poetry.toml new file mode 100644 index 0000000..efa46ec --- /dev/null +++ b/poetry.toml @@ -0,0 +1,2 @@ +[virtualenvs] +in-project = true \ No newline at end of file diff --git a/tap_hubspot/client.py b/tap_hubspot/client.py index 4d07454..4882ff4 100644 --- a/tap_hubspot/client.py +++ b/tap_hubspot/client.py @@ -2,16 +2,15 @@ from __future__ import annotations -import sys -import requests import datetime +import sys +from typing import Any, Callable, Iterable -from typing import Any, Callable - +import requests from singer_sdk import typing as th +from singer_sdk._singerlib.utils import strptime_to_utc from singer_sdk.pagination import BaseAPIPaginator from singer_sdk.streams import RESTStream -from singer_sdk._singerlib.utils import strptime_to_utc if sys.version_info >= (3, 8): from functools import cached_property @@ -133,7 +132,6 @@ def get_url_params( return params - class DynamicHubspotStream(HubspotStream): """DynamicHubspotStream""" @@ -150,13 +148,12 @@ def schema(self) -> dict: hs_props = [] self.hs_properties = self._get_available_properties() for name, type in self.hs_properties.items(): - hs_props.append( - th.Property(name, self._get_datatype(type)) - ) + hs_props.append(th.Property(name, self._get_datatype(type))) schema = th.PropertiesList( th.Property("id", th.StringType), th.Property( - "properties", th.ObjectType(*hs_props), + "properties", + th.ObjectType(*hs_props), ), th.Property("createdAt", th.DateTimeType), th.Property("updatedAt", th.DateTimeType), @@ -198,11 +195,20 @@ def get_url_params( class DynamicIncrementalHubspotStream(DynamicHubspotStream): """DynamicIncrementalHubspotStream""" + date_filter = None + record_id_filter = None + last_record_id = None + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _is_incremental_search(self, context): - return self.replication_method == REPLICATION_INCREMENTAL and self.get_starting_replication_key_value(context) and hasattr(self, "incremental_path") and self.incremental_path + return ( + self.replication_method == REPLICATION_INCREMENTAL + and self.get_starting_replication_key_value(context) + and hasattr(self, "incremental_path") + and self.incremental_path + ) @cached_property def schema(self) -> dict: @@ -210,13 +216,12 @@ def schema(self) -> dict: hs_props = [] self.hs_properties = self._get_available_properties() for name, type in self.hs_properties.items(): - hs_props.append( - th.Property(name, self._get_datatype(type)) - ) + hs_props.append(th.Property(name, self._get_datatype(type))) schema = th.PropertiesList( th.Property("id", th.StringType), th.Property( - "properties", th.ObjectType(*hs_props), + "properties", + th.ObjectType(*hs_props), ), th.Property("createdAt", th.DateTimeType), th.Property("updatedAt", th.DateTimeType), @@ -267,11 +272,13 @@ def post_process( Returns: The resulting record dict, or `None` if the record should be excluded. """ + if self.replication_key: val = None if props := row.get("properties"): val = props[self.replication_key] row[self.replication_key] = val + self.last_record_id = row.get("id") return row def prepare_request( @@ -285,7 +292,6 @@ def prepare_request( self.rest_method = "POST" return super().prepare_request(context, next_page_token) - def prepare_request_payload( self, context: dict | None, @@ -305,47 +311,52 @@ def prepare_request_payload( next page of data. """ body = {} + if self._is_incremental_search(context): # Only filter in case we have a value to filter on # https://developers.hubspot.com/docs/api/crm/search - ts = datetime.datetime.fromisoformat(self.get_starting_replication_key_value(context)) + if self.date_filter is None: + self.date_filter = datetime.datetime.fromisoformat(self.get_starting_replication_key_value(context)) + if next_page_token: # Hubspot wont return more than 10k records so when we hit 10k we # need to reset our epoch to most recent and not send the next_page_token if int(next_page_token) + 100 >= 10000: - ts = strptime_to_utc( - self.get_context_state(context).get("progress_markers").get("replication_key_value") + self.logger.warning( + f"More than 10k objects in the search result. Updating record_id filter to {self.last_record_id} and date filter to {self.date_filter.isoformat()}." ) + self.record_id_filter = { + "propertyName": "hs_object_id", + "operator": "GTE", + "value": self.last_record_id, + } else: body["after"] = next_page_token - epoch_ts = str(int(ts.timestamp() * 1000)) + + epoch_ts = str(int(self.date_filter.timestamp() * 1000)) + + filters = [ + { + "propertyName": self.replication_key, + "operator": "GTE", + "value": epoch_ts, + } + ] + if self.record_id_filter: + filters.append(self.record_id_filter) body.update( { - "filterGroups": [ - { - "filters": [ - { - "propertyName": self.replication_key, - "operator": "GTE", - # Timestamps need to be in milliseconds - # https://legacydocs.hubspot.com/docs/faq/how-should-timestamps-be-formatted-for-hubspots-apis - "value": epoch_ts, - } - ] - } - ], + "filterGroups": [{"filters": filters}], "sorts": [ { - # This is inside the properties object - "propertyName": self.replication_key, + "propertyName": "hs_object_id", "direction": "ASCENDING", } ], - # Hubspot sets a limit of most 100 per request. Default is 10 - "limit": 100, - "properties": list(self.hs_properties) + "limit": 200, # Hubspot sets a limit of most 200 per request. Default is 10 + "properties": list(self.hs_properties), } ) - return body \ No newline at end of file + return body diff --git a/tap_hubspot/tap.py b/tap_hubspot/tap.py index 42aa59a..9e6b26e 100644 --- a/tap_hubspot/tap.py +++ b/tap_hubspot/tap.py @@ -4,7 +4,6 @@ from singer_sdk import Tap from singer_sdk import typing as th # JSON schema typing helpers - from tap_hubspot import streams @@ -62,16 +61,16 @@ def discover_streams(self) -> list[streams.HubspotStream]: streams.OwnersStream(self), streams.TicketPipelineStream(self), streams.DealPipelineStream(self), - streams.EmailSubscriptionStream(self), + # streams.EmailSubscriptionStream(self), streams.PropertyNotesStream(self), streams.CompanyStream(self), streams.DealStream(self), - streams.FeedbackSubmissionsStream(self), + # streams.FeedbackSubmissionsStream(self), streams.LineItemStream(self), streams.ProductStream(self), streams.TicketStream(self), streams.QuoteStream(self), - streams.GoalStream(self), + # streams.GoalStream(self), streams.CallStream(self), streams.CommunicationStream(self), streams.EmailStream(self),