Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ This is a forked version of [tap-salesforce (v1.4.24)](https://github.com/singer
Main differences from the original version:

- Support for `username/password/security_token` authentication
- Support for concurrent execution (8 threads by default) when accessing different API endpoints to speed up the extraction process
- Support for concurrent execution (4 threads by default) when accessing different API endpoints to speed up the extraction process
- Support for much faster discovery
- **API consumption optimizations**: Reduced API calls through caching, optimized polling, and rate limiting

# Quickstart

Expand Down Expand Up @@ -71,12 +72,29 @@ The `api_type` is used to switch the behavior of the tap between using Salesforc

The `state_message_threshold` is used to throttle how often STATE messages are generated when the tap is using the "REST" API. This is a balance between not slowing down execution due to too many STATE messages produced and how many records must be fetched again if a tap fails unexpectedly. Defaults to 1000 (generate a STATE message every 1000 records).

The `max_workers` value is used to set the maximum number of threads used in order to concurrently extract data for streams. Defaults to 8 (extract data for 8 streams in paralel).
The `max_workers` value is used to set the maximum number of threads used in order to concurrently extract data for streams. Defaults to 4 (extract data for 4 streams in parallel) to reduce API consumption. You can increase this if you have higher API limits.

The `streams_to_discover` value may contain a list of Salesforce streams (each ending up in a target table) for which the discovery is handled.
By default, discovery is handled for all existing streams, which can take several minutes. With just several entities which users typically need it is running few seconds.
The disadvantage is that you have to keep this list in sync with the `select` section, where you specify all properties(each ending up in a table column).

## API Consumption Optimizations

This tap includes several optimizations to reduce Salesforce API consumption:

- **Caching**: Describe calls and quota checks are cached to avoid redundant API requests
- Describe results are cached for 1 hour
- Bulk quota checks are cached for 5 minutes
- **Optimized Polling**: Batch status polling uses exponential backoff with increased initial intervals
- Regular batch polling: 30s initial (was 20s)
- PK chunked batch polling: 90s initial (was 60s)
- Polling intervals increase exponentially up to a maximum
- **Rate Limiting**: Built-in rate limiting ensures minimum 100ms between requests (max 10 req/sec)
- **Reduced Concurrency**: Default concurrent workers reduced from 8 to 4 to lower API pressure
- **Reduced Logging**: Request logging moved to debug level to reduce overhead

These optimizations can significantly reduce API consumption, especially during discovery and when syncing multiple streams.

## Run Discovery

To run discovery mode, execute the tap with the config file.
Expand Down
6,258 changes: 6,258 additions & 0 deletions log-tap-api-pull.txt

Large diffs are not rendered by default.

19 changes: 12 additions & 7 deletions tap_salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,10 @@ async def sync_catalog_entry(sf, catalog_entry, state):
def do_sync(sf, catalog, state):
LOGGER.info("Starting sync")

max_workers = CONFIG.get("max_workers", 8)
# Reduce default concurrent workers from 8 to 4 to reduce API pressure
# Users can still override this via config if needed
max_workers = CONFIG.get("max_workers", 4)
LOGGER.info("Using %d concurrent workers for sync", max_workers)
executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
loop = asyncio.get_event_loop()
loop.set_default_executor(executor)
Expand Down Expand Up @@ -523,6 +526,9 @@ def main_impl():
)
sf.login()

# Note: We track quota usage from response headers instead of calling /limits endpoint
# to avoid wasting API calls on an endpoint that may require special permissions

if args.discover:
do_discover(sf, CONFIG.get("streams_to_discover", []))
elif args.properties or args.catalog:
Expand All @@ -531,13 +537,12 @@ def main_impl():
do_sync(sf, catalog, state)
finally:
if sf:
if sf.rest_requests_attempted > 0:
LOGGER.debug(
"This job used %s REST requests towards the Salesforce quota.",
sf.rest_requests_attempted,
)
# Log how much quota this specific job consumed
# (tracked from response headers, no additional API calls needed)
sf.log_job_quota_usage()

if sf.jobs_completed > 0:
LOGGER.debug(
LOGGER.info(
"Replication used %s Bulk API jobs towards the Salesforce quota.",
sf.jobs_completed,
)
Expand Down
208 changes: 198 additions & 10 deletions tap_salesforce/salesforce/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import re
import time
from datetime import timedelta

import backoff
Expand All @@ -20,6 +21,9 @@

LOGGER = singer.get_logger()

# Cache describe results for 1 hour to reduce API calls during discovery
DESCRIBE_CACHE_TTL = 3600 # 1 hour in seconds

BULK_API_TYPE = "BULK"
BULK2_API_TYPE = "BULK2"
REST_API_TYPE = "REST"
Expand Down Expand Up @@ -249,6 +253,18 @@ def __init__(
self.pk_chunking = False
self.limit_tasks_month = limit_tasks_month
self.pull_config_objects = pull_config_objects
# Cache for describe calls to reduce API consumption
self._describe_cache = {}
self._describe_cache_timestamps = {}
# Rate limiting: track last request time to throttle requests
self._last_request_time = 0
self._min_request_interval = 0.1 # Minimum 100ms between requests (10 req/sec max)
# Track REST quota usage for this job (start and end values)
self._initial_quota_used = None
self._initial_quota_allotted = None
self._final_quota_used = None
self._final_quota_allotted = None
self._max_quota_used_seen = 0

self.auth = SalesforceAuth.from_credentials(credentials, is_sandbox=self.is_sandbox)

Expand Down Expand Up @@ -282,18 +298,145 @@ def _parse_objects_config(self, objects_config: str | list[dict], stream: str) -
LOGGER.warning(f"Failed to parse OBJECTS configuration: {e}")
return []

def get_api_quota_info(self):
"""Fetch and return current API quota information from Salesforce.

Note: This endpoint requires special permissions and may return 403 Forbidden.
If it fails, we fall back to header-based quota tracking.
"""
endpoint = "limits"
url = self.data_url.format(self.instance_url, endpoint)

try:
resp = self._make_request("GET", url, headers=self.auth.rest_headers)
quota_data = resp.json()

quota_info = {}

# REST API quota
if "DailyApiRequests" in quota_data:
rest_used = quota_data["DailyApiRequests"]["Max"] - quota_data["DailyApiRequests"]["Remaining"]
rest_max = quota_data["DailyApiRequests"]["Max"]
rest_remaining = quota_data["DailyApiRequests"]["Remaining"]
rest_percent = (rest_used / rest_max * 100) if rest_max > 0 else 0
quota_info["rest"] = {
"used": rest_used,
"max": rest_max,
"remaining": rest_remaining,
"percent_used": rest_percent,
}

# Capture initial quota if not already set (from limits endpoint)
if self._initial_quota_used is None:
self._initial_quota_used = rest_used
self._initial_quota_allotted = rest_max
self._max_quota_used_seen = rest_used

# Bulk API quota
if "DailyBulkApiBatches" in quota_data:
bulk_used = quota_data["DailyBulkApiBatches"]["Max"] - quota_data["DailyBulkApiBatches"]["Remaining"]
bulk_max = quota_data["DailyBulkApiBatches"]["Max"]
bulk_remaining = quota_data["DailyBulkApiBatches"]["Remaining"]
bulk_percent = (bulk_used / bulk_max * 100) if bulk_max > 0 else 0
quota_info["bulk"] = {
"used": bulk_used,
"max": bulk_max,
"remaining": bulk_remaining,
"percent_used": bulk_percent,
}

return quota_info
except requests.exceptions.HTTPError as e:
# 403 Forbidden is common - user doesn't have permission to access /limits endpoint
# This is fine, we'll use header-based tracking instead
if e.response and e.response.status_code == 403:
LOGGER.debug(
"Cannot access /limits endpoint (403 Forbidden) - using header-based quota tracking instead"
)
else:
LOGGER.debug("Failed to fetch API quota information from /limits endpoint: %s", e)
return None
except Exception as e:
LOGGER.debug("Failed to fetch API quota information: %s", e)
return None

def log_api_quota(self, context="Current"):
"""Log current API quota status."""
quota_info = self.get_api_quota_info()

if quota_info is None:
return

if "rest" in quota_info:
rest = quota_info["rest"]
LOGGER.info(
"%s REST API Quota: %d/%d used (%.1f%%), %d remaining",
context,
rest["used"],
rest["max"],
rest["percent_used"],
rest["remaining"],
)

if "bulk" in quota_info:
bulk = quota_info["bulk"]
LOGGER.info(
"%s Bulk API Quota: %d/%d used (%.1f%%), %d remaining",
context,
bulk["used"],
bulk["max"],
bulk["percent_used"],
bulk["remaining"],
)

def log_job_quota_usage(self):
"""Log how much API quota this job consumed from start to end."""
# Always show internal counter (most accurate for this job)
internal_count = self.rest_requests_attempted

# Also show quota header tracking if available
if self._initial_quota_used is not None and self._final_quota_used is not None:
quota_used_by_job = self._final_quota_used - self._initial_quota_used
LOGGER.info(
"This job used %d REST API requests (internal count: %d, system-wide quota change: %d from %d to %d out of %d total)", # noqa: E501
internal_count,
internal_count,
quota_used_by_job,
self._initial_quota_used,
self._final_quota_used,
self._final_quota_allotted,
)
else:
# No quota header tracking available (might be using Bulk API only or header not present)
LOGGER.info(
"This job used %d REST API requests (internal count)",
internal_count,
)

# pylint: disable=anomalous-backslash-in-string,line-too-long
def check_rest_quota_usage(self, headers):
match = re.search(r"^api-usage=(\d+)/(\d+)$", headers.get("Sforce-Limit-Info"))

if match is None:
return

remaining, allotted = map(int, match.groups())
used, allotted = map(int, match.groups())

# Track maximum quota used to get the most accurate end value
# (since header reflects system-wide usage, not just this job)
if used > self._max_quota_used_seen:
self._max_quota_used_seen = used

LOGGER.info("Used %s of %s daily REST API quota", remaining, allotted)
# Store initial quota on first check (if not already set)
if self._initial_quota_used is None:
self._initial_quota_used = used
self._initial_quota_allotted = allotted

percent_used_from_total = (remaining / allotted) * 100
# Always update final quota to track the latest value
self._final_quota_used = self._max_quota_used_seen
self._final_quota_allotted = allotted

percent_used_from_total = (used / allotted) * 100
max_requests_for_run = int((self.quota_percent_per_run * allotted) / 100)

if percent_used_from_total > self.quota_percent_total:
Expand All @@ -302,7 +445,7 @@ def check_rest_quota_usage(self, headers):
+ "used across all Salesforce Applications. Terminating "
+ "replication to not continue past configured percentage "
+ "of {}% total quota."
).format(remaining, allotted, percent_used_from_total, self.quota_percent_total)
).format(used, allotted, percent_used_from_total, self.quota_percent_total)
raise TapSalesforceQuotaExceededError(total_message)
elif self.rest_requests_attempted > max_requests_for_run:
partial_message = (
Expand Down Expand Up @@ -332,25 +475,63 @@ def instance_url(self):
on_backoff=log_backoff_attempt,
)
def _make_request(self, http_method, url, headers=None, body=None, stream=False, params=None):
# Rate limiting: ensure minimum time between requests
current_time = time.time()
time_since_last_request = current_time - self._last_request_time
if time_since_last_request < self._min_request_interval:
sleep_time = self._min_request_interval - time_since_last_request
time.sleep(sleep_time)

# Use debug level for request logging to reduce log verbosity
if http_method == "GET":
LOGGER.info("Making %s request to %s with params: %s", http_method, url, params)
LOGGER.debug("Making %s request to %s with params: %s", http_method, url, params)
resp = self.session.get(url, headers=headers, stream=stream, params=params)
elif http_method == "POST":
LOGGER.info("Making %s request to %s with body %s", http_method, url, body)
LOGGER.debug("Making %s request to %s", http_method, url)
resp = self.session.post(url, headers=headers, data=body)
else:
raise TapSalesforceExceptionError("Unsupported HTTP method")

self._last_request_time = time.time()

raise_for_status(resp)

# Increment counter for ALL successful API requests
# (not just those with Sforce-Limit-Info header, as many Bulk API calls don't include it)
self.rest_requests_attempted += 1

# Log API call details for debugging (only at debug level to avoid spam)
LOGGER.debug(
"API call #%d: %s %s (has Sforce-Limit-Info: %s)",
self.rest_requests_attempted,
http_method,
url.split("?")[0] if "?" in url else url, # Strip query params for readability
resp.headers.get("Sforce-Limit-Info") is not None,
)

# Check quota usage if header is present (for quota tracking)
if resp.headers.get("Sforce-Limit-Info") is not None:
self.rest_requests_attempted += 1
self.check_rest_quota_usage(resp.headers)

return resp

def describe(self, sobject=None):
"""Describes all objects or a specific object"""
"""Describes all objects or a specific object with caching to reduce API calls"""
current_time = time.time()
cache_key = str(sobject) if sobject else "all_objects"

# Check cache first
if cache_key in self._describe_cache:
cache_age = current_time - self._describe_cache_timestamps.get(cache_key, 0)
if cache_age < DESCRIBE_CACHE_TTL:
LOGGER.debug("Using cached describe result for %s (age: %.1fs)", cache_key, cache_age)
return self._describe_cache[cache_key]
else:
# Cache expired, remove it
LOGGER.debug("Cache expired for %s (age: %.1fs), fetching fresh data", cache_key, cache_age)
del self._describe_cache[cache_key]
del self._describe_cache_timestamps[cache_key]

headers = self.auth.rest_headers
instance_url = self.auth.instance_url
body = None
Expand Down Expand Up @@ -385,9 +566,16 @@ def describe(self, sobject=None):
resp = self._make_request(method, url, headers=headers, body=body)

if isinstance(sobject, list):
return resp.json()["results"]
result = resp.json()["results"]
else:
return resp.json()
result = resp.json()

# Cache the result
self._describe_cache[cache_key] = result
self._describe_cache_timestamps[cache_key] = current_time
LOGGER.debug("Cached describe result for %s", cache_key)

return result

# pylint: disable=no-self-use
def _get_selected_properties(self, catalog_entry):
Expand Down
Loading