Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-salesforce-native: new connector #2519

Merged
merged 5 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ on:
- "source-iterate/**"
- "source-google-analytics-data-api-native/**"
- "source-monday/**"
- "source-salesforce-native/**"

pull_request:
branches: [main]
Expand Down Expand Up @@ -76,6 +77,7 @@ on:
- "source-iterate/**"
- "source-google-analytics-data-api-native/**"
- "source-monday/**"
- "source-salesforce-native/**"

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
Expand Down Expand Up @@ -219,6 +221,10 @@ jobs:
type: capture
version: v1
usage_rate: "1.0"
- name: source-salesforce-native
type: capture
version: v1
usage_rate: "1.0"

steps:
- uses: actions/checkout@v4
Expand Down
1 change: 1 addition & 0 deletions source-salesforce-native/VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
v1
24 changes: 24 additions & 0 deletions source-salesforce-native/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
credentials:
credentials_title: OAuth Credentials
client_id_sops: ENC[AES256_GCM,data:fMXpKq4TrDG9vbkRFa7EYhOsdk6FtTLLT+jU5hokrHEB2FF8h39CquHiOp0+El0Mg/4zQMTdJ5wDp90WKSPa9yxzaMAi97ExkqfNWPM2FNAK2KmAYg==,iv:8lNVaJITypPnTuUZM4kMXUMdJglGFdkDYGNGhyPsxVU=,tag:yT1ENGiQzGZKM8DCT8gS0g==,type:str]
client_secret_sops: ENC[AES256_GCM,data:jJgwo0E280bY9JApzBDCXuFaEbnnMMhtT0zYrwybJcOtZc+6RXNkxtQSbXvw9DpAAC6jbGEXMMc6yONx4y21ww==,iv:KqFuPgJzjVgDdOdKGRrnvEda/ZUUBvvlBjUcEdiALeg=,tag:M2RoEtnWixP86GOv/uwMIw==,type:str]
refresh_token_sops: ENC[AES256_GCM,data:vqvATUauWsPBTFv1wTO7DljL4zOADONNj7+euRIQkXlB6nQgFIX11uH6tFrhhThAT99x5ipqRSxzzRgbGb/bd++AybniGSIy/Cw6AvPUDvDYAJ8yRX94,iv:4gQ10k3Z3oZ2/pKrP530yWpaha16KznbwXJilLmJGO0=,tag:VvwHjDwFNRkPeBGauLtajw==,type:str]
instance_url: https://estuary2-dev-ed.develop.my.salesforce.com
is_sandbox: false
start_date: "2024-07-26T00:00:00Z"
advanced:
window_size: 1000
sops:
kms: []
gcp_kms:
- resource_id: projects/estuary-theatre/locations/us-central1/keyRings/connector-keyring/cryptoKeys/connector-repository
created_at: "2024-07-26T15:53:37Z"
enc: CiQAdmEdwpmP0/Kcv6L1SnM3nPbsYLywIaUeOL1mz7cZjFUwWt0SSQD5lVw43Of1YI96anxUVo7d7ALi0DYnCNvxTHCgHX5ZeDBPlfXwLNpmYb6y3hJSsHG7NupwDQT/qIpUvlqQnq5IVkgJEFHPU1s=
azure_kv: []
hc_vault: []
age: []
lastmodified: "2025-03-14T18:11:44Z"
mac: ENC[AES256_GCM,data:2q6JA+SFc4w+MsfeuapyfuVZq9n/ATOdIJxEurebzR2TDToOsfqhSR+c2rEM9aIErD9UDL2xc6hYtfu8hCg5YCdsyqAQXYxFAfASa5xWiHodAZ5H4k2jaY9Yf8Yi0UM1IVW2tt04o8qc2yrLut7mnVoQtjPfzT/782LjgfZZt6s=,iv:qoMbdN3i4rO1S/EoqEZ4gTggf89FPKkOh3qNZgjmbfs=,tag:HUKjKKjBojrETyyk5oNiLg==,type:str]
pgp: []
encrypted_suffix: _sops
version: 3.7.3
1,583 changes: 1,583 additions & 0 deletions source-salesforce-native/poetry.lock

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions source-salesforce-native/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[tool.poetry]
name = "source_salesforce_native"
version = "0.1.0"
description = ""
authors = ["Alex Bair <[email protected]>"]

[tool.poetry.dependencies]
estuary-cdk = {path="../estuary-cdk", develop = true}
pydantic = "^2"
python = "^3.11"
aiocsv = "^1.3.2"

[tool.poetry.group.dev.dependencies]
debugpy = "^1.8.0"
mypy = "^1.8.0"
pytest = "^7.4.3"
pytest-insta = "^0.3.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
64 changes: 64 additions & 0 deletions source-salesforce-native/source_salesforce_native/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from logging import Logger
from typing import Callable, Awaitable

from estuary_cdk.flow import (
ConnectorSpec,
)
from estuary_cdk.capture import (
BaseCaptureConnector,
Request,
Task,
common,
request,
response,
)
from estuary_cdk.http import HTTPMixin

from .resources import all_resources, enabled_resources
from .models import (
ConnectorState,
EndpointConfig,
OAUTH2_SPEC,
SalesforceResourceConfigWithSchedule,
)


class Connector(
BaseCaptureConnector[EndpointConfig, SalesforceResourceConfigWithSchedule, ConnectorState],
HTTPMixin,
):
def request_class(self):
return Request[EndpointConfig, SalesforceResourceConfigWithSchedule, ConnectorState]

async def spec(self, _: request.Spec, logger: Logger) -> ConnectorSpec:
return ConnectorSpec(
configSchema=EndpointConfig.model_json_schema(),
oauth2=OAUTH2_SPEC,
documentationUrl="https://go.estuary.dev/source-salesforce-native",
resourceConfigSchema=SalesforceResourceConfigWithSchedule.model_json_schema(),
resourcePathPointers=SalesforceResourceConfigWithSchedule.PATH_POINTERS,
)

async def discover(
self, log: Logger, discover: request.Discover[EndpointConfig]
) -> response.Discovered[SalesforceResourceConfigWithSchedule]:
resources = await all_resources(log, self, discover.config)
return common.discovered(resources)

async def validate(
self,
log: Logger,
validate: request.Validate[EndpointConfig, SalesforceResourceConfigWithSchedule],
) -> response.Validated:
resources = await enabled_resources(log, self, validate.config, validate.bindings)
resolved = common.resolve_bindings(validate.bindings, resources)
return common.validated(resolved)

async def open(
self,
log: Logger,
open: request.Open[EndpointConfig, SalesforceResourceConfigWithSchedule, ConnectorState],
) -> tuple[response.Opened, Callable[[Task], Awaitable[None]]]:
resources = await enabled_resources(log, self, open.capture.config, open.capture.bindings)
resolved = common.resolve_bindings(open.capture.bindings, resources)
return common.open(open, resolved)
4 changes: 4 additions & 0 deletions source-salesforce-native/source_salesforce_native/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import asyncio
import source_salesforce_native

asyncio.run(source_salesforce_native.Connector().serve())
197 changes: 197 additions & 0 deletions source-salesforce-native/source_salesforce_native/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
from datetime import datetime, timedelta, UTC
from logging import Logger
from typing import Any, AsyncGenerator

from estuary_cdk.capture.common import LogCursor, PageCursor
from estuary_cdk.http import HTTPSession

from .bulk_job_manager import BulkJobError, BulkJobManager, NOT_SUPPORTED_BY_BULK_API, CANNOT_FETCH_COMPOUND_DATA
from .rest_query_manager import RestQueryManager
from .shared import dt_to_str, str_to_dt, now
from .models import (
FieldDetails,
FieldDetailsDict,
FullRefreshResource,
SalesforceResource,
CursorFields,
)

REST_CHECKPOINT_INTERVAL = 2_000
BULK_CHECKPOINT_INTERVAL = 200_000

def _determine_cursor_field(
fields: FieldDetailsDict,
) -> CursorFields:
field_names = list(fields.keys())
if CursorFields.SYSTEM_MODSTAMP in field_names:
return CursorFields.SYSTEM_MODSTAMP
elif CursorFields.LAST_MODIFIED_DATE in field_names:
return CursorFields.LAST_MODIFIED_DATE
elif CursorFields.CREATED_DATE in field_names:
return CursorFields.CREATED_DATE
elif CursorFields.LOGIN_TIME in field_names:
return CursorFields.LOGIN_TIME
else:
raise RuntimeError("Attempted to find cursor field but no valid cursor field exists.")


def _filter_to_only_formula_fields(all_fields: FieldDetailsDict, cursor_field: str) -> tuple[bool, FieldDetailsDict]:
mandatory_fields: list[str] = ['Id', cursor_field]

mandatory_and_formula_fields: dict[str, FieldDetails] = {}
has_formula_fields = False
for field, details in all_fields.items():
if field in mandatory_fields or details.calculated:
mandatory_and_formula_fields[field] = details

if details.calculated:
has_formula_fields = True

return (has_formula_fields, FieldDetailsDict.model_validate(mandatory_and_formula_fields))


async def snapshot_resources(
http: HTTPSession,
bulk_job_manager: BulkJobManager,
instance_url: str,
name: str,
fields: FieldDetailsDict,
log: Logger,
) -> AsyncGenerator[FullRefreshResource, None]:

async for record in bulk_job_manager.execute(
name,
fields,
):
yield FullRefreshResource.model_validate(record)


# _execution_wrapper centralizes the cursor management logic for incremental resources.
async def _execution_wrapper(
record_generator: AsyncGenerator[dict[str, Any], None],
cursor_field: CursorFields,
start: datetime,
end: datetime,
checkpoint_interval: int,
) -> AsyncGenerator[dict[str, Any] | datetime, None]:
last_seen_dt = start
count = 0

async for record in record_generator:
record_dt = str_to_dt(record[cursor_field])

if (
count >= checkpoint_interval and
record_dt > last_seen_dt
):
yield last_seen_dt
count = 0

yield record
count += 1
last_seen_dt = record_dt

if last_seen_dt != start and count > 0:
yield last_seen_dt
else:
yield end


async def backfill_incremental_resources(
http: HTTPSession,
is_supported_by_bulk_api: bool,
bulk_job_manager: BulkJobManager,
rest_query_manager: RestQueryManager,
instance_url: str,
name: str,
fields: FieldDetailsDict,
window_size: int,
log: Logger,
page: PageCursor | None,
cutoff: LogCursor,
is_connector_initiated: bool,
) -> AsyncGenerator[SalesforceResource | PageCursor, None]:
assert isinstance(page, str)
assert isinstance(cutoff, datetime)

start = str_to_dt(page)

if start >= cutoff:
return

end = min(cutoff, start + timedelta(days=window_size))

cursor_field = _determine_cursor_field(fields)

# On connector-initiated backfills, only fetch formula fields and rely on the top level
# merge reduction strategy to merge in partial documents containing updated formula fields.
if is_connector_initiated:
has_formula_fields, fields = _filter_to_only_formula_fields(fields, cursor_field)
# If there are no formula fields in this object, return early.
if not has_formula_fields:
return

async def _execute(
manager: BulkJobManager | RestQueryManager,
checkpoint_interval: int
) -> AsyncGenerator[SalesforceResource | str, None]:
gen = manager.execute(
name,
fields,
cursor_field,
start,
end,
)

async for record_or_dt in _execution_wrapper(gen, cursor_field, start, end, checkpoint_interval):
if isinstance(record_or_dt, datetime):
yield dt_to_str(record_or_dt)
else:
yield SalesforceResource.model_validate(record_or_dt)

try:
gen = _execute(bulk_job_manager, BULK_CHECKPOINT_INTERVAL) if is_supported_by_bulk_api else _execute(rest_query_manager, REST_CHECKPOINT_INTERVAL)
async for doc_or_str in gen:
yield doc_or_str
except BulkJobError as err:
# If this object can't be queried via the Bulk API, fallback to using the REST API.
if err.errors and (CANNOT_FETCH_COMPOUND_DATA in err.errors or NOT_SUPPORTED_BY_BULK_API in err.errors):
log.info(f"{name} cannot be queried via the Bulk API. Attempting to use the REST API instead.", {"errors": err.errors})
async for doc_or_str in _execute(rest_query_manager, REST_CHECKPOINT_INTERVAL):
yield doc_or_str

else:
raise


async def fetch_incremental_resources(
http: HTTPSession,
is_supported_by_bulk_api: bool,
bulk_job_manager: BulkJobManager,
rest_query_manager: RestQueryManager,
instance_url: str,
name: str,
fields: FieldDetailsDict,
window_size: int,
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[SalesforceResource | LogCursor, None]:
assert isinstance(log_cursor, datetime)

end = min(now(), log_cursor + timedelta(days=window_size))

cursor_field = _determine_cursor_field(fields)

gen = rest_query_manager.execute(
name,
fields,
cursor_field,
log_cursor,
end,
)

async for record_or_dt in _execution_wrapper(gen, cursor_field, log_cursor, end, REST_CHECKPOINT_INTERVAL):
if isinstance(record_or_dt, datetime):
yield record_or_dt
else:
yield SalesforceResource.model_validate(record_or_dt)
Loading
Loading