Skip to content

Commit 247c440

Browse files
committed
fixup: don't attempt to use the bulk api if we know it won't work
1 parent 75d1e49 commit 247c440

File tree

4 files changed

+59
-19
lines changed

4 files changed

+59
-19
lines changed

source-salesforce-native/source_salesforce_native/api.py

+17-18
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ async def _execution_wrapper(
8383

8484
async def backfill_incremental_resources(
8585
http: HTTPSession,
86+
is_supported_by_bulk_api: bool,
8687
bulk_job_manager: BulkJobManager,
8788
rest_query_manager: RestQueryManager,
8889
instance_url: str,
@@ -105,44 +106,42 @@ async def backfill_incremental_resources(
105106

106107
cursor_field = _determine_cursor_field(fields)
107108

108-
try:
109-
gen = bulk_job_manager.execute(
109+
async def _execute(
110+
manager: BulkJobManager | RestQueryManager,
111+
checkpoint_interval: int
112+
) -> AsyncGenerator[SalesforceResource | str]:
113+
gen = manager.execute(
110114
name,
111115
fields,
112116
cursor_field,
113117
start,
114118
end,
115119
)
116120

117-
async for record_or_dt in _execution_wrapper(gen, cursor_field, start, end, BULK_CHECKPOINT_INTERVAL):
121+
async for record_or_dt in _execution_wrapper(gen, cursor_field, start, end, checkpoint_interval):
118122
if isinstance(record_or_dt, datetime):
119123
yield dt_to_str(record_or_dt)
120124
else:
121125
yield SalesforceResource.model_validate(record_or_dt)
122126

127+
try:
128+
gen = _execute(bulk_job_manager, BULK_CHECKPOINT_INTERVAL) if is_supported_by_bulk_api else _execute(rest_query_manager, REST_CHECKPOINT_INTERVAL)
129+
async for doc_or_str in gen:
130+
yield doc_or_str
123131
except BulkJobError as err:
124-
# If this object can't be queried via the Bulk API, we use the REST API.
132+
# If this object can't be queried via the Bulk API, fallback to using the REST API.
125133
if err.errors and (CANNOT_FETCH_COMPOUND_DATA in err.errors or NOT_SUPPORTED_BY_BULK_API in err.errors):
126-
rest_gen = rest_query_manager.execute(
127-
name,
128-
fields,
129-
cursor_field,
130-
start,
131-
end,
132-
)
133-
134-
async for record_or_dt in _execution_wrapper(rest_gen, cursor_field, start, end, REST_CHECKPOINT_INTERVAL):
135-
if isinstance(record_or_dt, datetime):
136-
yield dt_to_str(record_or_dt)
137-
else:
138-
yield SalesforceResource.model_validate(record_or_dt)
139-
134+
log.info(f"{name} cannot be queried via the Bulk API. Attempting to use the REST API instead.", {"errors": err.errors})
135+
async for doc_or_str in _execute(rest_query_manager, REST_CHECKPOINT_INTERVAL):
136+
yield doc_or_str
137+
140138
else:
141139
raise
142140

143141

144142
async def fetch_incremental_resources(
145143
http: HTTPSession,
144+
is_supported_by_bulk_api: bool,
146145
bulk_job_manager: BulkJobManager,
147146
rest_query_manager: RestQueryManager,
148147
instance_url: str,

source-salesforce-native/source_salesforce_native/models.py

+13
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,19 @@ class SoapTypes(StrEnum):
134134
SEARCH_LAYOUT_BUTTON = "urn:SearchLayoutButton"
135135
SEARCH_LAYOUT_FIELD = "urn:SearchLayoutField"
136136

137+
138+
SOAP_TYPES_NOT_SUPPORTED_BY_BULK_API = [
139+
SoapTypes.BASE64,
140+
SoapTypes.JSON,
141+
SoapTypes.ADDRESS,
142+
SoapTypes.LOCATION,
143+
SoapTypes.SEARCH_LAYOUT_BUTTON,
144+
SoapTypes.SEARCH_LAYOUT_BUTTONS_DISPLAYED,
145+
SoapTypes.SEARCH_LAYOUT_FIELD,
146+
SoapTypes.SEARCH_LAYOUT_FIELDS_DISPLAYED,
147+
]
148+
149+
137150
# FieldDetails is used by the connector to convert field types in Bulk API responses.
138151
class FieldDetails(BaseModel, extra="allow"):
139152
soapType: SoapTypes # Type of field

source-salesforce-native/source_salesforce_native/resources.py

+14
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
ResourceState,
2020
AccessTokenResponse,
2121
GlobalDescribeObjectsResponse,
22+
SOAP_TYPES_NOT_SUPPORTED_BY_BULK_API,
2223
FieldDetailsDict,
2324
SObject,
2425
OAUTH2_SPEC,
@@ -139,6 +140,7 @@ def incremental_resource(
139140
log: Logger,
140141
http: HTTPMixin,
141142
config: EndpointConfig,
143+
is_supported_by_bulk_api: bool,
142144
bulk_job_manager: BulkJobManager,
143145
rest_query_manager: RestQueryManager,
144146
instance_url: str,
@@ -167,6 +169,7 @@ def open(
167169
fetch_page=functools.partial(
168170
backfill_incremental_resources,
169171
http,
172+
is_supported_by_bulk_api,
170173
bulk_job_manager,
171174
rest_query_manager,
172175
instance_url,
@@ -177,6 +180,7 @@ def open(
177180
fetch_changes=functools.partial(
178181
fetch_incremental_resources,
179182
http,
183+
is_supported_by_bulk_api,
180184
bulk_job_manager,
181185
rest_query_manager,
182186
instance_url,
@@ -225,14 +229,24 @@ async def _object_to_resource(
225229

226230
cursor_field = details.get('cursor_field', None)
227231
enable = details.get('enabled_by_default', False)
232+
is_supported_by_bulk_api = details.get('is_supported_by_bulk_api', True)
228233
assert isinstance(enable, bool)
234+
assert isinstance(is_supported_by_bulk_api, bool)
229235
fields = await _fetch_object_fields(log, http, instance_url, name) if should_fetch_fields else None
230236

237+
if fields:
238+
field_soap_types = {fields[field].soapType for field in fields}
239+
for soap_type in field_soap_types:
240+
if soap_type in SOAP_TYPES_NOT_SUPPORTED_BY_BULK_API:
241+
is_supported_by_bulk_api = False
242+
243+
231244
if is_custom_object or cursor_field is not None:
232245
return incremental_resource(
233246
log,
234247
http,
235248
config,
249+
is_supported_by_bulk_api,
236250
bulk_job_manager,
237251
rest_query_manager,
238252
instance_url,

source-salesforce-native/source_salesforce_native/supported_standard_objects.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515

1616
class ObjectDetails(TypedDict, total=False):
1717
cursor_field: Optional[CursorFields] # If absent, the object is full refresh. If present, the object is incremental.
18-
enabled_by_default: Optional[bool]
18+
enabled_by_default: Optional[bool] # If absent, the object is not enabled by default. If present and True, the object is enabled by default.
19+
is_supported_by_bulk_api: Optional[bool] # If absent, the object is supported by the Bulk API. If present and False, the object is not supported by the Bulk API.
1920

2021

2122
COMMON_CUSTOM_OBJECT_DETAILS: ObjectDetails = {
@@ -47,6 +48,7 @@ class ObjectDetails(TypedDict, total=False):
4748
},
4849
"AcceptedEventRelation": {
4950
"cursor_field": CursorFields.SYSTEM_MODSTAMP,
51+
"is_supported_by_bulk_api": False,
5052
},
5153
"Account": {
5254
"cursor_field": CursorFields.SYSTEM_MODSTAMP,
@@ -419,6 +421,7 @@ class ObjectDetails(TypedDict, total=False):
419421
},
420422
"CaseStatus": {
421423
"cursor_field": CursorFields.SYSTEM_MODSTAMP,
424+
"is_supported_by_bulk_api": False,
422425
},
423426
"CaseTeamMember": {
424427
"cursor_field": CursorFields.SYSTEM_MODSTAMP
@@ -683,6 +686,7 @@ class ObjectDetails(TypedDict, total=False):
683686
},
684687
"ContractStatus": {
685688
"cursor_field": CursorFields.SYSTEM_MODSTAMP,
689+
"is_supported_by_bulk_api": False,
686690
},
687691
"Conversation": {
688692
"cursor_field": CursorFields.LAST_MODIFIED_DATE
@@ -834,6 +838,7 @@ class ObjectDetails(TypedDict, total=False):
834838
},
835839
"DeclinedEventRelation": {
836840
"cursor_field": CursorFields.SYSTEM_MODSTAMP,
841+
"is_supported_by_bulk_api": False,
837842
},
838843
"DeleteEvent": {
839844
"cursor_field": CursorFields.SYSTEM_MODSTAMP
@@ -995,6 +1000,7 @@ class ObjectDetails(TypedDict, total=False):
9951000
},
9961001
"FieldSecurityClassification": {
9971002
"cursor_field": CursorFields.SYSTEM_MODSTAMP,
1003+
"is_supported_by_bulk_api": False,
9981004
},
9991005
"FileSearchActivity": {
10001006
"cursor_field": CursorFields.SYSTEM_MODSTAMP
@@ -1447,6 +1453,7 @@ class ObjectDetails(TypedDict, total=False):
14471453
},
14481454
"OrderStatus": {
14491455
"cursor_field": CursorFields.SYSTEM_MODSTAMP,
1456+
"is_supported_by_bulk_api": False,
14501457
},
14511458
"OrgDeleteRequest": {
14521459
"cursor_field": CursorFields.SYSTEM_MODSTAMP
@@ -1477,6 +1484,7 @@ class ObjectDetails(TypedDict, total=False):
14771484
},
14781485
"PartnerRole": {
14791486
"cursor_field": CursorFields.SYSTEM_MODSTAMP,
1487+
"is_supported_by_bulk_api": False,
14801488
},
14811489
"PartyConsent": {
14821490
"cursor_field": CursorFields.SYSTEM_MODSTAMP
@@ -1855,6 +1863,7 @@ class ObjectDetails(TypedDict, total=False):
18551863
},
18561864
"ServiceAppointmentStatus": {
18571865
"cursor_field": CursorFields.SYSTEM_MODSTAMP,
1866+
"is_supported_by_bulk_api": False,
18581867
},
18591868
"ServiceContract": {
18601869
"cursor_field": CursorFields.SYSTEM_MODSTAMP,
@@ -1955,6 +1964,7 @@ class ObjectDetails(TypedDict, total=False):
19551964
},
19561965
"ShiftStatus": {
19571966
"cursor_field": CursorFields.SYSTEM_MODSTAMP,
1967+
"is_supported_by_bulk_api": False,
19581968
},
19591969
"Shipment": {
19601970
"cursor_field": CursorFields.SYSTEM_MODSTAMP
@@ -2018,6 +2028,7 @@ class ObjectDetails(TypedDict, total=False):
20182028
},
20192029
"SolutionStatus": {
20202030
"cursor_field": CursorFields.SYSTEM_MODSTAMP,
2031+
"is_supported_by_bulk_api": False,
20212032
},
20222033
"Stamp": {
20232034
"cursor_field": CursorFields.SYSTEM_MODSTAMP
@@ -2044,9 +2055,11 @@ class ObjectDetails(TypedDict, total=False):
20442055
},
20452056
"TaskPriority": {
20462057
"cursor_field": CursorFields.SYSTEM_MODSTAMP,
2058+
"is_supported_by_bulk_api": False,
20472059
},
20482060
"TaskStatus": {
20492061
"cursor_field": CursorFields.SYSTEM_MODSTAMP,
2062+
"is_supported_by_bulk_api": False,
20502063
},
20512064
"TenantUsageEntitlement": {
20522065
"cursor_field": CursorFields.SYSTEM_MODSTAMP
@@ -2096,6 +2109,7 @@ class ObjectDetails(TypedDict, total=False):
20962109
},
20972110
"UndecidedEventRelation": {
20982111
"cursor_field": CursorFields.SYSTEM_MODSTAMP,
2112+
"is_supported_by_bulk_api": False,
20992113
},
21002114
"User": {
21012115
"cursor_field": CursorFields.SYSTEM_MODSTAMP,

0 commit comments

Comments
 (0)