Skip to content

Commit 79860a9

Browse files
committed
source-salesforce-native: backfill formula fields on a schedule
Salesforce formula fields are calculated at query time - they are not saved in Salesforce's database, so they don't have any prior state. This means that any updates to formula fields don't always cause an object's cursor field to update, and formula fields can be outdated. This is known to happen when formula fields are calculated based on data external to its object (ex: other objects, a global variable, specific times, etc.). To fix this, we leverage the recently added `RecurringFetchPageFn` type of `fetch_page` & the `ResourceConfigWithSchedule` to backfill formula fields at some cadence. A top level merge reduction strategy is used for these collections in order to merge in the updated formula fields. The default schedule is currently once a day at 23:55 UTC, but that can be changed as we learn more about how frequently users want these scheduled formula field backfills to occur & what the typical API limit impact looks like.
1 parent 2a57adc commit 79860a9

File tree

8 files changed

+175
-54
lines changed

8 files changed

+175
-54
lines changed

source-salesforce-native/config.yaml

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ credentials:
33
client_id_sops: ENC[AES256_GCM,data:fMXpKq4TrDG9vbkRFa7EYhOsdk6FtTLLT+jU5hokrHEB2FF8h39CquHiOp0+El0Mg/4zQMTdJ5wDp90WKSPa9yxzaMAi97ExkqfNWPM2FNAK2KmAYg==,iv:8lNVaJITypPnTuUZM4kMXUMdJglGFdkDYGNGhyPsxVU=,tag:yT1ENGiQzGZKM8DCT8gS0g==,type:str]
44
client_secret_sops: ENC[AES256_GCM,data:jJgwo0E280bY9JApzBDCXuFaEbnnMMhtT0zYrwybJcOtZc+6RXNkxtQSbXvw9DpAAC6jbGEXMMc6yONx4y21ww==,iv:KqFuPgJzjVgDdOdKGRrnvEda/ZUUBvvlBjUcEdiALeg=,tag:M2RoEtnWixP86GOv/uwMIw==,type:str]
55
refresh_token_sops: ENC[AES256_GCM,data:vqvATUauWsPBTFv1wTO7DljL4zOADONNj7+euRIQkXlB6nQgFIX11uH6tFrhhThAT99x5ipqRSxzzRgbGb/bd++AybniGSIy/Cw6AvPUDvDYAJ8yRX94,iv:4gQ10k3Z3oZ2/pKrP530yWpaha16KznbwXJilLmJGO0=,tag:VvwHjDwFNRkPeBGauLtajw==,type:str]
6+
instance_url: https://estuary2-dev-ed.develop.my.salesforce.com
67
is_sandbox: false
78
start_date: "2024-07-26T00:00:00Z"
89
advanced:
@@ -16,8 +17,8 @@ sops:
1617
azure_kv: []
1718
hc_vault: []
1819
age: []
19-
lastmodified: "2025-03-12T02:46:56Z"
20-
mac: ENC[AES256_GCM,data:0NacPbDKrJcFItn8Pa0sh3pFUEcjubauI3dsLofT4csBqO5+VHg23mCMDcYB5oYGb3SK58RMn1r8Cx1WunChczQdoeWiE41ga2p92u8Vag5enZQomk7cqV/d6/eLb38KbV7YR1MkXg5XW3OorTkcVnqXfDUrIWOTMddM9WDQh8I=,iv:F7lYY8p5HoWUv6qSeKA14kwfA4LVLEksB0RqniMmwEw=,tag:n6/2qysw4FEQHhz60059uw==,type:str]
20+
lastmodified: "2025-03-14T18:11:44Z"
21+
mac: ENC[AES256_GCM,data:2q6JA+SFc4w+MsfeuapyfuVZq9n/ATOdIJxEurebzR2TDToOsfqhSR+c2rEM9aIErD9UDL2xc6hYtfu8hCg5YCdsyqAQXYxFAfASa5xWiHodAZ5H4k2jaY9Yf8Yi0UM1IVW2tt04o8qc2yrLut7mnVoQtjPfzT/782LjgfZZt6s=,iv:qoMbdN3i4rO1S/EoqEZ4gTggf89FPKkOh3qNZgjmbfs=,tag:HUKjKKjBojrETyyk5oNiLg==,type:str]
2122
pgp: []
2223
encrypted_suffix: _sops
2324
version: 3.7.3

source-salesforce-native/source_salesforce_native/__init__.py

+8-8
Original file line numberDiff line numberDiff line change
@@ -19,36 +19,36 @@
1919
ConnectorState,
2020
EndpointConfig,
2121
OAUTH2_SPEC,
22-
ResourceConfig,
22+
SalesforceResourceConfigWithSchedule,
2323
)
2424

2525

2626
class Connector(
27-
BaseCaptureConnector[EndpointConfig, ResourceConfig, ConnectorState],
27+
BaseCaptureConnector[EndpointConfig, SalesforceResourceConfigWithSchedule, ConnectorState],
2828
HTTPMixin,
2929
):
3030
def request_class(self):
31-
return Request[EndpointConfig, ResourceConfig, ConnectorState]
31+
return Request[EndpointConfig, SalesforceResourceConfigWithSchedule, ConnectorState]
3232

3333
async def spec(self, _: request.Spec, logger: Logger) -> ConnectorSpec:
3434
return ConnectorSpec(
3535
configSchema=EndpointConfig.model_json_schema(),
3636
oauth2=OAUTH2_SPEC,
3737
documentationUrl="https://go.estuary.dev/source-salesforce-native",
38-
resourceConfigSchema=ResourceConfig.model_json_schema(),
39-
resourcePathPointers=ResourceConfig.PATH_POINTERS,
38+
resourceConfigSchema=SalesforceResourceConfigWithSchedule.model_json_schema(),
39+
resourcePathPointers=SalesforceResourceConfigWithSchedule.PATH_POINTERS,
4040
)
4141

4242
async def discover(
4343
self, log: Logger, discover: request.Discover[EndpointConfig]
44-
) -> response.Discovered[ResourceConfig]:
44+
) -> response.Discovered[SalesforceResourceConfigWithSchedule]:
4545
resources = await all_resources(log, self, discover.config)
4646
return common.discovered(resources)
4747

4848
async def validate(
4949
self,
5050
log: Logger,
51-
validate: request.Validate[EndpointConfig, ResourceConfig],
51+
validate: request.Validate[EndpointConfig, SalesforceResourceConfigWithSchedule],
5252
) -> response.Validated:
5353
resources = await enabled_resources(log, self, validate.config, validate.bindings)
5454
resolved = common.resolve_bindings(validate.bindings, resources)
@@ -57,7 +57,7 @@ async def validate(
5757
async def open(
5858
self,
5959
log: Logger,
60-
open: request.Open[EndpointConfig, ResourceConfig, ConnectorState],
60+
open: request.Open[EndpointConfig, SalesforceResourceConfigWithSchedule, ConnectorState],
6161
) -> tuple[response.Opened, Callable[[Task], Awaitable[None]]]:
6262
resources = await enabled_resources(log, self, open.capture.config, open.capture.bindings)
6363
resolved = common.resolve_bindings(open.capture.bindings, resources)

source-salesforce-native/source_salesforce_native/api.py

+29-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@
77

88
from .bulk_job_manager import BulkJobError, BulkJobManager, NOT_SUPPORTED_BY_BULK_API, CANNOT_FETCH_COMPOUND_DATA
99
from .rest_query_manager import RestQueryManager
10-
from .shared import dt_to_str, str_to_dt
10+
from .shared import dt_to_str, str_to_dt, now
1111
from .models import (
12+
FieldDetails,
1213
FieldDetailsDict,
1314
FullRefreshResource,
1415
SalesforceResource,
@@ -34,6 +35,21 @@ def _determine_cursor_field(
3435
raise RuntimeError("Attempted to find cursor field but no valid cursor field exists.")
3536

3637

38+
def _filter_to_only_formula_fields(all_fields: FieldDetailsDict, cursor_field: str) -> tuple[bool, FieldDetailsDict]:
39+
mandatory_fields: list[str] = ['Id', cursor_field]
40+
41+
mandatory_and_formula_fields: dict[str, FieldDetails] = {}
42+
has_formula_fields = False
43+
for field, details in all_fields.items():
44+
if field in mandatory_fields or details.calculated:
45+
mandatory_and_formula_fields[field] = details
46+
47+
if details.calculated:
48+
has_formula_fields = True
49+
50+
return (has_formula_fields, FieldDetailsDict.model_validate(mandatory_and_formula_fields))
51+
52+
3753
async def snapshot_resources(
3854
http: HTTPSession,
3955
bulk_job_manager: BulkJobManager,
@@ -93,6 +109,7 @@ async def backfill_incremental_resources(
93109
log: Logger,
94110
page: PageCursor | None,
95111
cutoff: LogCursor,
112+
is_connector_initiated: bool,
96113
) -> AsyncGenerator[SalesforceResource | PageCursor, None]:
97114
assert isinstance(page, str)
98115
assert isinstance(cutoff, datetime)
@@ -106,6 +123,16 @@ async def backfill_incremental_resources(
106123

107124
cursor_field = _determine_cursor_field(fields)
108125

126+
# On connector-initiated backfills, only fetch formula fields and rely on the top level
127+
# merge reduction strategy to merge in partial documents containing updated formula fields.
128+
if is_connector_initiated:
129+
log.info('ALEX - saw connector_initiated argument! only fetching formula fields')
130+
has_formula_fields, fields = _filter_to_only_formula_fields(fields, cursor_field)
131+
# If there are no formula fields in this object, return early.
132+
if not has_formula_fields:
133+
log.info("HEYA - there aren't any formula fields, finishing early")
134+
return
135+
109136
async def _execute(
110137
manager: BulkJobManager | RestQueryManager,
111138
checkpoint_interval: int
@@ -153,7 +180,7 @@ async def fetch_incremental_resources(
153180
) -> AsyncGenerator[SalesforceResource | LogCursor, None]:
154181
assert isinstance(log_cursor, datetime)
155182

156-
end = min(datetime.now(tz=UTC), log_cursor + timedelta(days=window_size))
183+
end = min(now(), log_cursor + timedelta(days=window_size))
157184

158185
cursor_field = _determine_cursor_field(fields)
159186

source-salesforce-native/source_salesforce_native/models.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
BaseDocument,
1010
BaseOAuth2Credentials,
1111
OAuth2Spec,
12-
ResourceConfig,
12+
ResourceConfigWithSchedule,
13+
CRON_REGEX,
1314
ResourceState,
1415
)
1516
from estuary_cdk.capture.common import (
@@ -98,6 +99,15 @@ def _you_must_build_oauth2_credentials_for_a_provider(self): ...
9899
OAuth2Credentials = SalesforceOAuth2Credentials.for_provider(OAUTH2_SPEC.provider)
99100

100101

102+
class SalesforceResourceConfigWithSchedule(ResourceConfigWithSchedule):
103+
schedule: str = Field(
104+
default="",
105+
title="Formula Field Refresh Schedule",
106+
description="Schedule to automatically refresh formula fields. Accepts a cron expression.",
107+
pattern=CRON_REGEX
108+
)
109+
110+
101111
class EndpointConfig(BaseModel):
102112
start_date: AwareDatetime = Field(
103113
description="UTC data and time in the format YYYY-MM-DDTHH:MM:SSZ. Any data generated before this date will not be replicated. If left blank, all data will be replicated.",

source-salesforce-native/source_salesforce_native/resources.py

+13-8
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,18 @@
66

77
from estuary_cdk.flow import CaptureBinding
88
from estuary_cdk.capture import common, Task
9+
from estuary_cdk.capture.common import ReductionStrategy
910
from estuary_cdk.http import HTTPMixin
1011

1112
from .supported_standard_objects import SUPPORTED_STANDARD_OBJECTS, COMMON_CUSTOM_OBJECT_DETAILS
1213

1314
from .bulk_job_manager import BulkJobManager
1415
from .rest_query_manager import RestQueryManager
15-
from .shared import dt_to_str, VERSION
16+
from .shared import dt_to_str, now, VERSION
1617

1718
from .models import (
1819
EndpointConfig,
19-
ResourceConfig,
20+
SalesforceResourceConfigWithSchedule,
2021
ResourceState,
2122
SalesforceTokenSource,
2223
GlobalDescribeObjectsResponse,
@@ -79,7 +80,7 @@ def full_refresh_resource(
7980
) -> common.Resource:
8081

8182
def open(
82-
binding: CaptureBinding[ResourceConfig],
83+
binding: CaptureBinding[SalesforceResourceConfigWithSchedule],
8384
binding_index: int,
8485
state: ResourceState,
8586
task: Task,
@@ -112,7 +113,7 @@ def open(
112113
model=FullRefreshResource,
113114
open=open,
114115
initial_state=ResourceState(),
115-
initial_config=ResourceConfig(
116+
initial_config=SalesforceResourceConfigWithSchedule(
116117
name=name, interval=timedelta(minutes=5)
117118
),
118119
schema_inference=True,
@@ -136,7 +137,7 @@ def incremental_resource(
136137
) -> common.Resource:
137138

138139
def open(
139-
binding: CaptureBinding[ResourceConfig],
140+
binding: CaptureBinding[SalesforceResourceConfigWithSchedule],
140141
binding_index: int,
141142
state: ResourceState,
142143
task: Task,
@@ -176,7 +177,7 @@ def open(
176177
),
177178
)
178179

179-
cutoff = datetime.now(tz=UTC).replace(microsecond=0)
180+
cutoff = now()
180181

181182
resource = common.Resource(
182183
name=name,
@@ -187,11 +188,15 @@ def open(
187188
inc=ResourceState.Incremental(cursor=cutoff),
188189
backfill=ResourceState.Backfill(next_page=dt_to_str(config.start_date), cutoff=cutoff)
189190
),
190-
initial_config=ResourceConfig(
191-
name=name, interval=timedelta(minutes=5)
191+
initial_config=SalesforceResourceConfigWithSchedule(
192+
name=name,
193+
interval=timedelta(minutes=5),
194+
# Default to performing a formula field refresh at 23:55 UTC every day for every enabled binding.
195+
schedule="55 23 * * *"
192196
),
193197
schema_inference=True,
194198
disable=not enable,
199+
reduction_strategy=ReductionStrategy.MERGE,
195200
)
196201

197202
return resource

source-salesforce-native/source_salesforce_native/shared.py

+6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ def str_to_dt(string: str) -> datetime:
1414
return datetime.fromisoformat(string)
1515

1616

17+
# Salesforce's datetimes have millisecond precision. now helps ensure
18+
# we are always working with millisecond precision datetimes.
19+
def now() -> datetime:
20+
return str_to_dt(dt_to_str(datetime.now(tz=UTC)))
21+
22+
1723
def build_query(
1824
object_name: str,
1925
fields: list[str],

0 commit comments

Comments
 (0)