Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-salesforce-native: new connector #2519

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

Alex-Bair
Copy link
Member

@Alex-Bair Alex-Bair commented Mar 12, 2025

Description:

This is an initial version of a native Salesforce capture connector. There are a lot of details I'd like to mention, so this will be a fairly long PR description.

Overall Strategy

Salesforce's REST API and Bulk API 2.0 are used for capturing data. Interacting with each API is abstracted into separate classes, either RestQueryManager or BulkJobManager.

The queryAll query option is used to capture soft deletes (i.e. when a record is in Salesforce's Recycle Bin for ~15 days before being hard deleted). Salesforce sometimes nulls out fields when a record is soft deleted, and those are reflected in the records we get from a queryAll.

Since the actual pagination of results is abstracted into RestQueryManager/BulkJobManager, it would be messy to keep checkpointing to only between pages. Instead, I used the "checkpoint after N results have been yielded" strategy, while keeping N close to the max page size for each API.

Discovery

The complete list of possible bindings the connector could support can be determined by hitting the /sobjects endpoint and checking the list of objects & their abbreviated metadata. However, to determine if the object has a valid field we can use as a cursor & incrementally replicate it, we would have to hit the /sobjects/:sobjectName/describe endpoint for each possible object & check the list of available fields. There can be hundreds to thousands of possible objects, and sending a request for each one in order to determine if it has a valid cursor field quickly churns through the API limit. For reference, the ATF connector's discovery is like this, and it uses 800+ requests during discovery for my hardly customized dev Salesforce account. With auto-discovers happening multiple times per day, we need to use some other way to determine if we can incrementally replicate an object.

To avoid those excessive requests during discovery, I compiled a list of supported standard Salesforce objects in supported_standard_objects.py that list a cursor field if there should be one present for that object. This means we'll need to maintain this list manually & add new objects as needed, but that cost seems worth it to avoid burning API requests during each discovery. Also, Salesforce says that custom objects always have the SystemModstamp field, so we should always be able to incrementally replicate those.

We will still need to hit the /sobjects/:sobjectName/describe endpoint to fetch the complete list of fields to use when querying the APIs. The connector does this during validate and open, and only for enabled bindings (aligning with the strategy implemented for source-oracle-flashback in #2013).

Similar to the ATF source-salesforce connector, only a small subset of the available bindings are enabled by default (approx. 17). This keeps the number of enabled bindings to a reasonable number in the garden path where users don't do much configuration.

API Limitations

Bulk API 2.0

Some objects and fields (ex: non-scalar fields) are not able to be queried via the Bulk API 2.0. When that is the case, we attempt to query via the REST API instead.

Results fetched via the Bulk API 2.0 are returned in a CSV file, and field values are always read as strings. The connector attempts to use Salesforce's reported soapType for the field to coerce it into the appropriate type before yielding the document. However, we have observed that custom fields don't always adhere to the soapType Salesforce says it should be, so if coercion fails, the field is left as a string & we'll rely on schema inference to do it's job. Along the same lines, if the CSV-read value is an empty string, the connector assumes it's actually null in Salesforce & converts it to None.

The aiocsv package is leveraged to asynchronously read CSV results files. While this capability may be useful for future connectors, handling all the variations of CSVs (different delimiters, line endings, quotes, etc.) beyond what Salesforce uses would be additional scope I didn't want to tackle right now. If this capability is needed later for different connectors, we could investigate improving the flexibility of the _process_csv_lines and move it into the CDK.

REST API

When querying via the REST API, the fields are explicitly specified in the query params. The alternative is to use the FIELDS(ALL) syntax to avoid needing to explicitly specify fields, but that requires using a page size of 200 with LIMIT 200. By explicitly specifying fields, the maximum page size is 2,000.

As a result of explicitly specifying fields in the URL, we have to ensure the combined length of the URI and headers stays below Salesforce's limit of 16,384 bytes. To handle this, we chunk fields across multiple API requests & merge them together using a strategy very similar to the one in source-hubspot-native. When there are not many custom fields, we shouldn't need to chunk fields. Doing some rough math considering the maximum number of custom fields allowed per object, the field chunking strategy should still be more API request-efficient than using the FIELDS(ALL) syntax and a 10x smaller page size.

Records returned by the REST API are slightly different from those returned by the Bulk API 2.0. Datetimes from the REST API are returned in the format 2024-09-24T03:26:07.000+0000 where the Bulk API 2.0 returns them as 2024-09-24T03:26:07.000Z. The connector attempts to convert datetimes to the format returned by the Bulk API (with the escape hatch if the conversion fails since custom fields aren't always the type Salesforce says they are). The REST API also automatically includes additional metadata for each record:

      "attributes": {
        "type": "SetupAuditTrail",
        "url": "/services/data/v62.0/sobjects/SetupAuditTrail/{record_id}"
      }

This metadata can be derived from the binding name & record Id, so I opted to remove it to stay consistent with what's returned by the Bulk API 2.0.

Formula Fields

Salesforce has a concept called formula fields, which are fields calculated based on some other values. They are calculated at query time, and changes to formula fields do not cause any of the possible cursor fields to update per Salesforce's docs. This makes it possible for us to capture a record, and then it's formula fields are updated without any change in the cursor field, causing the previously captured record to have outdated formula field values.

To address this issue, the connector initiates a backfill per some schedule & yield the most recent formula field values for previously captured records. A top level merge reduction strategy is used to combined these more recent formula field values and the associated previously yielded record.

Assumptions

  • All full refresh bindings are able to be queried via the Bulk API 2.0. Since these objects do not have any valid cursor fields, fetching them via the REST API is tricky - we can't be sure that a record wasn't updated between queries for separate field chunks. We could decide that's ok (we'll eventually snapshot all records again on the next sweep), but I figured we could wait and see if this is an actual issue before adding this capability. There are relatively few Salesforce objects that don't have a valid cursor field, so I feel comfortable deferring this decision.

Workflow steps:

(How does one use this feature, and how has it changed)

Documentation links affected:

Documentation should be created for the connector. In particular, we've done a lot of troubleshooting around the following with the ATF connector & it would be beneficial to explain these behaviors in the docs:

Notes for reviewers:

This PR is still in development. Most of the core connector is complete, but there's still scope that needs completed before the PR is ready for review & could be merged. Remaining scope is all around supporting periodically refreshing formula fields per some schedule (i.e. running a connector-initiated backfill, yielding records that only contain formula fields and using a top-level merge reduction strategy). The remaining scope I anticipate to support this is:

Tested on a local stack with a couple Salesforce accounts. Confirmed:

  • The connector works with both sandbox and production Salesforce accounts.
  • OAuth works for authentication with an OAuth app I created in my dev Salesforce account. Using our prod OAuth app credentials, the connector's OAuth process succeeds up to the expected "invalid redirect uri" error (i.e. http:localhost:3000 isn't a valid redirect URI for our prod app).
  • Documents yielded between the REST and Bulk APIs are identical in terms of field types & datetime precisions.
  • Results from the REST and Bulk APIs are received in ascending order of the cursor field, and the connector checkpoints after approximately every page of results.
  • The connector does not try to query the Bulk API if it knows the querying the object isn't supported by the Bulk API.
  • Custom objects and custom fields are captured fine, like standard objects and fields.
  • Reading from a collection & materializing into a dockerized PostgreSQL instance works.
  • If an object has formula fields and there's a schedule set, the formula fields are backfilled per the schedule & documents containing just the primary key, cursor field, and formula fields are yielded. Materializing into PostgreSQL results in a single row per primary key with the most recently captured formula field values represented in the row.
  • If an object does not have formula fields and there's schedule set, the fetch_page function returns early before sending any API requests.
  • Cached access tokens are reused until it's almost expired. Effectively, we get a new access token every 45 minutes instead of for every request.
  • Discoveries do not send API requests to fetch objects' fields. Validating and opening bindings use a scatter-gather technique to fetch objects' fields & only do so for enabled bindings.

This change is Reviewable

@williamhbaker
Copy link
Member

Only just started looking at this and will give it a high-level go through later, but one initial thing I'd suggest is pairing down the recovery snapshot somehow. Either omit it completely or post-process it so that it is quite a bit less than 45k lines. Maybe prune out all but the bindings that are enabled by default (if that's still a thing)?

@Alex-Bair
Copy link
Member Author

Alex-Bair commented Mar 13, 2025

Yep, we still only enable a handful (~17) of the possible bindings by default. I'll update the PR description to mention that we disable most of the bindings by default, and I'll cut down how many bindings are included in the discover snapshot so it's not as large.

Copy link
Member

@williamhbaker williamhbaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments for consideration.


except BulkJobError as err:
# If this object can't be queried via the Bulk API, we use the REST API.
if err.errors and (CANNOT_FETCH_COMPOUND_DATA in err.errors or NOT_SUPPORTED_BY_BULK_API in err.errors):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we cache this somehow? That way every time this is called it doesn't have to run a failed bulk job request.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. For the objects that plain can't be queried via the Bulk API, I can add another property to the associated dict in SUPPORTED_STANDARD_OBJECTS to indicate that. For everything else, I should be able to figure it out from their fields' soapTypes (ex: if there's a urn:address type field, we can't use the Bulk API). I'll try that out.


async def _process_csv_lines(
self,
line_generator: AsyncGenerator[bytes, None],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Less of a line_generator now and more of a bytes_generator?

return
break
case BulkJobStates.UPLOAD_COMPLETE | BulkJobStates.IN_PROGRESS:
await asyncio.sleep(delay)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some kind of logging here might be nice, perhaps after X attempts or something like that.

@Alex-Bair
Copy link
Member Author

Alex-Bair commented Mar 14, 2025

I've added commits that start with "fixup" that address the comments so far. After approval & before merging, I'll rebase all the "fixup" commits into the main "new connector" commit, but they're left alone now to make reviewing easier.

@Alex-Bair Alex-Bair force-pushed the bair/source-salesforce-native branch from 247c440 to aa29e08 Compare March 14, 2025 13:37
@Alex-Bair
Copy link
Member Author

After testing out those "fixup" changes a bit more & testing with multiple bindings enabled, I found room for improvement. They're mostly around speeding up the resource building process & re-using the cached access token we receive after exchanging the refresh token.

@Alex-Bair Alex-Bair marked this pull request as ready for review March 14, 2025 18:31
@Alex-Bair
Copy link
Member Author

I've added support for scheduled formula field backfills. I'll update the PR description to reflect this feature soon, but otherwise it's ready for review.

@Alex-Bair Alex-Bair force-pushed the bair/source-salesforce-native branch from 79860a9 to 3ab120f Compare March 14, 2025 19:27
Copy link
Member

@williamhbaker williamhbaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, with some non-blocking comments

"properties": {
"start_date": {
"default": "1700-01-01T00:00:00Z",
"description": "UTC data and time in the format YYYY-MM-DDTHH:MM:SSZ. Any data generated before this date will not be replicated. If left blank, all data will be replicated.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in UTC data

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 Thanks for catching that, I'll get it fixed before merging.

"type": "string"
},
"schedule": {
"default": "",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default being an empty string seems kind of strange. Not sure if this is how it is elsewhere though. Remind me...does an empty string make it never refresh?

Copy link
Member Author

@Alex-Bair Alex-Bair Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, an empty string makes it never refresh. I tried using None when I introduced the feature originally, but it was rendered in the UI as something like <object Object>, so I went with an empty string instead since the UI showed it as an empty input field.

chunk: list[str] = [*mandatory_fields]
for field in field_names:
if (
len(chunk) > 0 and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The len(chunk) > 0 seems unnecessary here, since chunk always has at least the two mandatory fields.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, another good catch. That condition was a relic before I had the mandatory fields included in each chunk, I'll remove it.

headers, body = await self.http.request_stream(self.log, url, params=params, headers=request_headers)
count = headers.get('Sforce-NumberOfRecords')

if count is None or count == 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we've got a count of records here, it would be nice to sanity check that the number of records we read matches up. A cute way to do it would be to decrement count for every yielded record, and then make sure it is 0 at the end.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, will do.

For my own selfish future reference, the Sforce-NumberOfRecords header is the number of records in the current set/page, not the total number of records for the bulk job. There's also numberRecordsProcessed included in the response that returns the current job status, and I suspect that represents the total number of results we should expect for the overall bulk job; I'll look into that & consider doing a similar count check.

# To avoid this, we make a noop request to set the token_source's access token before using the scatter-gather
# technique to make multiple requests concurrently. This prevents the first BUILD_RESOURCE_SEMAPHORE_LIMIT
# requests from all exchanging the same access token and encountering that intermittent error.
await _fetch_queryable_objects(log, http, config.credentials.instance_url)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember looking at this a little bit in the CDK HTTP module. We should probably add some synchronization there. Doesn't have to be right now though, since what you have here should work.

@Alex-Bair
Copy link
Member Author

Support for surfacing response headers to connectors was reverted by #2538. source-salesforce-native needs to be able to read response header values when paging through Bulk API results, so that functionality will need re-implemented before merging this PR.

…ther & improve access token handling

Previously, all resources in enabled_bindings were built in series. Since the fields for each resource must be fetched from Salesforce before they're built, building resources was a very slow process. Using the scatter-gather technique to build multiple resources concurrently significantly speeds up the process.

I also figured out how to store instance_url in the config's credentials. We no longer need to make a duplicate request outside of the standard OAuth method to fetch the instance url.

Also around OAuth, I decided to use SalesforceTokenSource to subclass TokenSource and override the default access token expiration duration of 0 seconds. This lets us reuse our cached access token instead of always requesting a new one when we make an HTTP request. This also gets around intermittent failures when exchanging the same refresh token for an access token mutiple times within a small time window.
Salesforce formula fields are calculated at query time - they are not
saved in Salesforce's database, so they don't have any prior state. This
means that any updates to formula fields don't always cause an object's
cursor field to update, and formula fields can be outdated. This is known
to happen when formula fields are calculated based on data external to
its object (ex: other objects, a global variable, specific times, etc.).

To fix this, we leverage the recently added `RecurringFetchPageFn` type
of `fetch_page` & the `ResourceConfigWithSchedule` to backfill formula
fields at some cadence. A top level merge reduction strategy is used for
these collections in order to merge in the updated formula fields. The
default schedule is currently once a day at 23:55 UTC, but that can be
changed as we learn more about how frequently users want these scheduled
formula field backfills to occur & what the typical API limit impact
looks like.
@Alex-Bair Alex-Bair force-pushed the bair/source-salesforce-native branch from 3ab120f to dd64b95 Compare March 18, 2025 20:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants