Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "tap-hubspot",
"type": "debugpy",
"request": "launch",
"program": "${workspaceRoot}/tap_hubspot/tap.py",
"console": "integratedTerminal",
"args": [
"--config",
".secrets/config.json",
"--state",
".secrets/state.json"
],
"env": { "PYTHONPATH": "${workspaceRoot}" },
// Change this to false if you wish to debug and add breakpoints outside of your code e.g. the singer-sdk package
"justMyCode": false
},
{
"name": "tap-hubspot full-refresh",
"type": "debugpy",
"request": "launch",
"program": "${workspaceRoot}/tap_hubspot/tap.py",
"console": "integratedTerminal",
"args": ["--config", ".secrets/config.json"],
"env": { "PYTHONPATH": "${workspaceRoot}" },
// Change this to false if you wish to debug and add breakpoints outside of your code e.g. the singer-sdk package
"justMyCode": false
}
]
}
2 changes: 2 additions & 0 deletions poetry.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[virtualenvs]
in-project = true
89 changes: 50 additions & 39 deletions tap_hubspot/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@

from __future__ import annotations

import sys
import requests
import datetime
import sys
from typing import Any, Callable, Iterable

from typing import Any, Callable

import requests
from singer_sdk import typing as th
from singer_sdk._singerlib.utils import strptime_to_utc
from singer_sdk.pagination import BaseAPIPaginator
from singer_sdk.streams import RESTStream
from singer_sdk._singerlib.utils import strptime_to_utc

if sys.version_info >= (3, 8):
from functools import cached_property
Expand Down Expand Up @@ -133,7 +132,6 @@ def get_url_params(
return params



class DynamicHubspotStream(HubspotStream):
"""DynamicHubspotStream"""

Expand All @@ -150,13 +148,12 @@ def schema(self) -> dict:
hs_props = []
self.hs_properties = self._get_available_properties()
for name, type in self.hs_properties.items():
hs_props.append(
th.Property(name, self._get_datatype(type))
)
hs_props.append(th.Property(name, self._get_datatype(type)))
schema = th.PropertiesList(
th.Property("id", th.StringType),
th.Property(
"properties", th.ObjectType(*hs_props),
"properties",
th.ObjectType(*hs_props),
),
th.Property("createdAt", th.DateTimeType),
th.Property("updatedAt", th.DateTimeType),
Expand Down Expand Up @@ -198,25 +195,33 @@ def get_url_params(
class DynamicIncrementalHubspotStream(DynamicHubspotStream):
"""DynamicIncrementalHubspotStream"""

date_filter = None
record_id_filter = None
last_record_id = None

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def _is_incremental_search(self, context):
return self.replication_method == REPLICATION_INCREMENTAL and self.get_starting_replication_key_value(context) and hasattr(self, "incremental_path") and self.incremental_path
return (
self.replication_method == REPLICATION_INCREMENTAL
and self.get_starting_replication_key_value(context)
and hasattr(self, "incremental_path")
and self.incremental_path
)

@cached_property
def schema(self) -> dict:
"""Return a draft JSON schema for this stream."""
hs_props = []
self.hs_properties = self._get_available_properties()
for name, type in self.hs_properties.items():
hs_props.append(
th.Property(name, self._get_datatype(type))
)
hs_props.append(th.Property(name, self._get_datatype(type)))
schema = th.PropertiesList(
th.Property("id", th.StringType),
th.Property(
"properties", th.ObjectType(*hs_props),
"properties",
th.ObjectType(*hs_props),
),
th.Property("createdAt", th.DateTimeType),
th.Property("updatedAt", th.DateTimeType),
Expand Down Expand Up @@ -267,11 +272,13 @@ def post_process(
Returns:
The resulting record dict, or `None` if the record should be excluded.
"""

if self.replication_key:
val = None
if props := row.get("properties"):
val = props[self.replication_key]
row[self.replication_key] = val
self.last_record_id = row.get("id")
return row

def prepare_request(
Expand All @@ -285,7 +292,6 @@ def prepare_request(
self.rest_method = "POST"
return super().prepare_request(context, next_page_token)


def prepare_request_payload(
self,
context: dict | None,
Expand All @@ -305,47 +311,52 @@ def prepare_request_payload(
next page of data.
"""
body = {}

if self._is_incremental_search(context):
# Only filter in case we have a value to filter on
# https://developers.hubspot.com/docs/api/crm/search
ts = datetime.datetime.fromisoformat(self.get_starting_replication_key_value(context))
if self.date_filter is None:
self.date_filter = datetime.datetime.fromisoformat(self.get_starting_replication_key_value(context))

if next_page_token:
# Hubspot wont return more than 10k records so when we hit 10k we
# need to reset our epoch to most recent and not send the next_page_token
if int(next_page_token) + 100 >= 10000:
ts = strptime_to_utc(
self.get_context_state(context).get("progress_markers").get("replication_key_value")
self.logger.warning(
f"More than 10k objects in the search result. Updating record_id filter to {self.last_record_id} and date filter to {self.date_filter.isoformat()}."
)
self.record_id_filter = {
"propertyName": "hs_object_id",
"operator": "GTE",
"value": self.last_record_id,
}
else:
body["after"] = next_page_token
epoch_ts = str(int(ts.timestamp() * 1000))

epoch_ts = str(int(self.date_filter.timestamp() * 1000))

filters = [
{
"propertyName": self.replication_key,
"operator": "GTE",
"value": epoch_ts,
}
]
if self.record_id_filter:
filters.append(self.record_id_filter)

body.update(
{
"filterGroups": [
{
"filters": [
{
"propertyName": self.replication_key,
"operator": "GTE",
# Timestamps need to be in milliseconds
# https://legacydocs.hubspot.com/docs/faq/how-should-timestamps-be-formatted-for-hubspots-apis
"value": epoch_ts,
}
]
}
],
"filterGroups": [{"filters": filters}],
"sorts": [
{
# This is inside the properties object
"propertyName": self.replication_key,
"propertyName": "hs_object_id",
"direction": "ASCENDING",
}
],
# Hubspot sets a limit of most 100 per request. Default is 10
"limit": 100,
"properties": list(self.hs_properties)
"limit": 200, # Hubspot sets a limit of most 200 per request. Default is 10
"properties": list(self.hs_properties),
}
)

return body
return body
7 changes: 3 additions & 4 deletions tap_hubspot/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from singer_sdk import Tap
from singer_sdk import typing as th # JSON schema typing helpers

from tap_hubspot import streams


Expand Down Expand Up @@ -62,16 +61,16 @@ def discover_streams(self) -> list[streams.HubspotStream]:
streams.OwnersStream(self),
streams.TicketPipelineStream(self),
streams.DealPipelineStream(self),
streams.EmailSubscriptionStream(self),
# streams.EmailSubscriptionStream(self),
streams.PropertyNotesStream(self),
streams.CompanyStream(self),
streams.DealStream(self),
streams.FeedbackSubmissionsStream(self),
# streams.FeedbackSubmissionsStream(self),
streams.LineItemStream(self),
streams.ProductStream(self),
streams.TicketStream(self),
streams.QuoteStream(self),
streams.GoalStream(self),
# streams.GoalStream(self),
streams.CallStream(self),
streams.CommunicationStream(self),
streams.EmailStream(self),
Expand Down