Skip to content

Commit aa29e08

Browse files
committed
source-salesforce-native: speed up building resources with scatter-gather & improve access token handling
Previously, all resources in enabled_bindings were built in series. Since the fields for each resource must be fetched from Salesforce before they're built, building resources was a very slow process. Using the scatter-gather technique to build multiple resources concurrently significantly speeds up the process. I also figured out how to store instance_url in the config's credentials. We no longer need to make a duplicate request outside of the standard OAuth method to fetch the instance url. Also around OAuth, I decided to use SalesforceTokenSource to subclass TokenSource and override the default access token expiration duration of 0 seconds. This lets us reuse our cached access token instead of always requesting a new one when we make an HTTP request. This also gets around intermittent failures when exchanging the same refresh token for an access token mutiple times within a small time window.
1 parent bf25faf commit aa29e08

File tree

2 files changed

+75
-44
lines changed

2 files changed

+75
-44
lines changed

source-salesforce-native/source_salesforce_native/models.py

+41-7
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
from estuary_cdk.capture.common import (
1616
ConnectorState as GenericConnectorState,
1717
)
18-
18+
from estuary_cdk.http import (
19+
TokenSource,
20+
)
1921

2022
EARLIEST_VALID_DATE_IN_SALESFORCE = datetime(1700, 1, 1, tzinfo=UTC)
2123

@@ -46,6 +48,7 @@
4648
),
4749
accessTokenResponseMap={
4850
"refresh_token": "/refresh_token",
51+
"instance_url": "/instance_url",
4952
},
5053
)
5154

@@ -54,10 +57,45 @@ def update_oauth_spec(is_sandbox: bool):
5457
OAUTH2_SPEC.accessTokenUrlTemplate = f"https://{'test' if is_sandbox else 'login'}.salesforce.com/services/oauth2/token"
5558

5659

60+
# The access token response does not contain any field indicating if/when the access token we receive expires.
61+
# The default TokenSource.AccessTokenResponse.expires_in is 0, causing every request to fetch a new access token.
62+
# The actual expires_in value depends on the session settings in the user's Salesforce account. The default seems
63+
# to be 2 hours. More importantly, each time the access token is used, it's validity is extended. Meaning that as long
64+
# as the access token is actively used, it shouldn't expire. Setting an expires_in of 1 hour should be a
65+
# safe fallback in case a user changes all enabled bindings' intervals to an hour or greater.
66+
class SalesforceTokenSource(TokenSource):
67+
class AccessTokenResponse(TokenSource.AccessTokenResponse):
68+
expires_in: int = 1 * 60 * 60
69+
70+
71+
class SalesforceOAuth2Credentials(BaseOAuth2Credentials):
72+
instance_url: str = Field(
73+
title="Instance URL",
74+
)
75+
76+
@staticmethod
77+
def for_provider(provider: str) -> type["SalesforceOAuth2Credentials"]:
78+
"""
79+
Builds an OAuth2Credentials model for the given OAuth2 `provider`.
80+
This routine is only available in Pydantic V2 environments.
81+
"""
82+
from pydantic import ConfigDict
83+
84+
class _OAuth2Credentials(SalesforceOAuth2Credentials):
85+
model_config = ConfigDict(
86+
json_schema_extra={"x-oauth2-provider": provider},
87+
title="OAuth",
88+
)
89+
90+
def _you_must_build_oauth2_credentials_for_a_provider(self): ...
91+
92+
return _OAuth2Credentials
93+
94+
5795
if TYPE_CHECKING:
58-
OAuth2Credentials = BaseOAuth2Credentials
96+
OAuth2Credentials = SalesforceOAuth2Credentials
5997
else:
60-
OAuth2Credentials = BaseOAuth2Credentials.for_provider(OAUTH2_SPEC.provider)
98+
OAuth2Credentials = SalesforceOAuth2Credentials.for_provider(OAUTH2_SPEC.provider)
6199

62100

63101
def default_start_date():
@@ -97,10 +135,6 @@ class Advanced(BaseModel):
97135
ConnectorState = GenericConnectorState[ResourceState]
98136

99137

100-
class AccessTokenResponse(BaseModel, extra="allow"):
101-
instance_url: str
102-
103-
104138
class PartialSObject(BaseModel, extra="allow"):
105139
name: str
106140
queryable: bool

source-salesforce-native/source_salesforce_native/resources.py

+34-37
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1+
import asyncio
12
from datetime import datetime, timedelta, UTC
23
import functools
34
from logging import Logger
45
from typing import Any
56

67
from estuary_cdk.flow import CaptureBinding
78
from estuary_cdk.capture import common, Task
8-
from estuary_cdk.http import HTTPMixin, TokenSource
9+
from estuary_cdk.http import HTTPMixin
910

1011
from .supported_standard_objects import SUPPORTED_STANDARD_OBJECTS, COMMON_CUSTOM_OBJECT_DETAILS
1112

@@ -17,7 +18,7 @@
1718
EndpointConfig,
1819
ResourceConfig,
1920
ResourceState,
20-
AccessTokenResponse,
21+
SalesforceTokenSource,
2122
GlobalDescribeObjectsResponse,
2223
SOAP_TYPES_NOT_SUPPORTED_BY_BULK_API,
2324
FieldDetailsDict,
@@ -35,22 +36,7 @@
3536

3637

3738
CUSTOM_OBJECT_SUFFIX = '__c'
38-
39-
40-
async def _fetch_instance_url(log: Logger, http: HTTPMixin, config: EndpointConfig) -> str:
41-
url = OAUTH2_SPEC.accessTokenUrlTemplate
42-
body = {
43-
"grant_type": "refresh_token",
44-
"client_id": config.credentials.client_id,
45-
"client_secret": config.credentials.client_secret,
46-
"refresh_token": config.credentials.refresh_token,
47-
}
48-
49-
response = AccessTokenResponse.model_validate_json(
50-
await http.request(log, url, method="POST", form=body, _with_token=False)
51-
)
52-
53-
return response.instance_url
39+
BUILD_RESOURCE_SEMAPHORE_LIMIT = 15
5440

5541

5642
async def _fetch_queryable_objects(log: Logger, http: HTTPMixin, instance_url: str) -> list[str]:
@@ -272,45 +258,56 @@ async def enabled_resources(
272258
log: Logger, http: HTTPMixin, config: EndpointConfig, bindings: list[common._ResolvableBinding]
273259
) -> list[common.Resource]:
274260
update_oauth_spec(config.is_sandbox)
275-
http.token_source = TokenSource(oauth_spec=OAUTH2_SPEC, credentials=config.credentials)
276-
277-
instance_url = await _fetch_instance_url(log, http, config)
261+
http.token_source = SalesforceTokenSource(oauth_spec=OAUTH2_SPEC, credentials=config.credentials)
278262

279263
enabled_binding_names: list[str] = []
280264

281265
for binding in bindings:
282266
path: list[str] = binding.resourceConfig.path()
283267
enabled_binding_names.append(path[0])
284268

285-
bulk_job_manager = BulkJobManager(http, log, instance_url)
286-
rest_query_manager = RestQueryManager(http, log, instance_url)
287-
resources: list[common.Resource] = []
288-
289-
for name in enabled_binding_names:
290-
r = await _object_to_resource(log, http, config, bulk_job_manager, rest_query_manager, instance_url, name, True)
291-
if r:
292-
resources.append(r)
269+
bulk_job_manager = BulkJobManager(http, log, config.credentials.instance_url)
270+
rest_query_manager = RestQueryManager(http, log, config.credentials.instance_url)
271+
272+
# If we concurrently send multiple requests that exchange the same refresh token for an access token,
273+
# some of those requests intermittently fail.
274+
# https://help.salesforce.com/s/articleView?id=xcloud.remoteaccess_oauth_refresh_token_flow.htm&type=5#:~:text=Avoid%20sending%20simultaneous%20requests%20that%20contain%20the%20same%20refresh%20token.%20If%20your%20client%20sends%20identical%20requests%20at%20the%20same%20time%2C%20some%20of%20the%20requests%20fail%20intermittently%20and%20the%20Status%20column%20in%20the%20Login%20History%20displays%20Failed%3A%20Token%20request%20is%20already%20being%20processed.
275+
# To avoid this, we make a noop request to set the token_source's access token before using the scatter-gather
276+
# technique to make multiple requests concurrently. This prevents the first BUILD_RESOURCE_SEMAPHORE_LIMIT
277+
# requests from all exchanging the same access token and encountering that intermittent error.
278+
await _fetch_queryable_objects(log, http, config.credentials.instance_url)
279+
280+
semaphore = asyncio.Semaphore(BUILD_RESOURCE_SEMAPHORE_LIMIT)
281+
async def build_resource(name: str) -> common.Resource | None:
282+
async with semaphore:
283+
return await _object_to_resource(
284+
log, http, config, bulk_job_manager, rest_query_manager, config.credentials.instance_url, name, True
285+
)
286+
287+
task_results = await asyncio.gather(
288+
*(
289+
build_resource(name) for name in enabled_binding_names
290+
)
291+
)
293292

294-
return resources
293+
return [resource for resource in task_results if resource is not None]
295294

296295

297296
# all_resources returns resources for all possible supported bindings.
298297
async def all_resources(
299298
log: Logger, http: HTTPMixin, config: EndpointConfig
300299
) -> list[common.Resource]:
301300
update_oauth_spec(config.is_sandbox)
302-
http.token_source = TokenSource(oauth_spec=OAUTH2_SPEC, credentials=config.credentials)
303-
304-
instance_url = await _fetch_instance_url(log, http, config)
301+
http.token_source = SalesforceTokenSource(oauth_spec=OAUTH2_SPEC, credentials=config.credentials)
305302

306-
queryable_object_names = await _fetch_queryable_objects(log, http, instance_url)
303+
queryable_object_names = await _fetch_queryable_objects(log, http, config.credentials.instance_url)
307304

308-
bulk_job_manager = BulkJobManager(http, log, instance_url)
309-
rest_query_manager = RestQueryManager(http, log, instance_url)
305+
bulk_job_manager = BulkJobManager(http, log, config.credentials.instance_url)
306+
rest_query_manager = RestQueryManager(http, log, config.credentials.instance_url)
310307
resources: list[common.Resource] = []
311308

312309
for name in queryable_object_names:
313-
r = await _object_to_resource(log, http, config, bulk_job_manager, rest_query_manager, instance_url, name)
310+
r = await _object_to_resource(log, http, config, bulk_job_manager, rest_query_manager, config.credentials.instance_url, name)
314311
if r:
315312
resources.append(r)
316313

0 commit comments

Comments
 (0)