-
Notifications
You must be signed in to change notification settings - Fork 1
/
hubspot_pipeline.py
146 lines (112 loc) · 4.43 KB
/
hubspot_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from typing import List
import dlt
from dlt_plus.dbt_generator.utils import table_reference_adapter
from hubspot import hubspot, hubspot_events_for_objects, THubspotObjectType
def load_crm_data() -> None:
"""
This function loads all resources from HubSpot CRM
Returns:
None
"""
# Create a DLT pipeline object with the pipeline name, dataset name, and destination database type
# Add dev_mode=(True or False) if you need your pipeline to create the dataset in your destination
p = dlt.pipeline(
pipeline_name="hubspot",
dataset_name="hubspot_dataset",
destination='bigquery',
)
# Run the pipeline with the HubSpot source connector
info = p.run(hubspot())
# Print information about the pipeline run
print(info)
table_reference_adapter(
p,
"deals",
references=[
{
"referenced_table": "contacts__deals",
"columns": ["id"],
"referenced_columns": ["deals_id"],
}
],)
table_reference_adapter(
p,
"tickets",
references=[
{
"referenced_table": "contacts__tickets",
"columns": ["id"],
"referenced_columns": ["tickets_id"],
}
],)
def load_crm_data_with_history() -> None:
"""
Loads all HubSpot CRM resources and property change history for each entity.
The history entries are loaded to a tables per resource `{resource_name}_property_history`, e.g. `contacts_property_history`
Returns:
None
"""
# Create a DLT pipeline object with the pipeline name, dataset name, and destination database type
# Add dev_mode=(True or False) if you need your pipeline to create the dataset in your destination
p = dlt.pipeline(
pipeline_name="hubspot",
dataset_name="hubspot_dataset",
destination='bigquery',
)
# Configure the source with `include_history` to enable property history load, history is disabled by default
data = hubspot(include_history=True)
# Run the pipeline with the HubSpot source connector
info = p.run(data)
# Print information about the pipeline run
print(info)
def load_crm_objects_with_custom_properties() -> None:
"""
Loads CRM objects, reading only properties defined by the user.
"""
# Create a DLT pipeline object with the pipeline name,
# dataset name, properties to read and destination database
# type Add dev_mode=(True or False) if you need your
# pipeline to create the dataset in your destination
p = dlt.pipeline(
pipeline_name="hubspot",
dataset_name="hubspot_dataset",
destination='bigquery',
)
source = hubspot()
# By default, all the custom properties of a CRM object are extracted,
# ignoring those driven by Hubspot (prefixed with `hs_`).
# To read fields in addition to the custom ones:
# source.contacts.bind(props=["date_of_birth", "degree"])
# To read only two particular fields:
source.contacts.bind(props=["date_of_birth", "degree"], include_custom_props=False)
# Run the pipeline with the HubSpot source connector
info = p.run(source)
# Print information about the pipeline run
print(info)
def load_web_analytics_events(
object_type: THubspotObjectType, object_ids: List[str]
) -> None:
"""
This function loads web analytics events for a list objects in `object_ids` of type `object_type`
Returns:
None
"""
# Create a DLT pipeline object with the pipeline name, dataset name, and destination database type
p = dlt.pipeline(
pipeline_name="hubspot",
dataset_name="hubspot_dataset",
destination='bigquery',
dev_mode=False,
)
# you can get many resources by calling this function for various object types
resource = hubspot_events_for_objects(object_type, object_ids)
# and load them together passing resources in the list
info = p.run([resource])
# Print information about the pipeline run
print(info)
if __name__ == "__main__":
# Call the functions to load HubSpot data into the database with and without company events enabled
load_crm_data()
#load_crm_data_with_history()
#load_web_analytics_events("company", ["7086461639", "7086464459"])
#load_crm_objects_with_custom_properties()