Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-19841 request timeout and backoff implementation #40

Open
wants to merge 6 commits into
base: TDL-19835-Adding-New-Streams-And-Discovery-Mode
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion tap_braintree/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
import sys
import json
import braintree
from requests import request
import singer
from singer import utils
from tap_braintree.discover import discover
from tap_braintree.sync import sync as _sync
import datetime

REQUEST_TIME_OUT = 300

REQUIRED_CONFIG_KEYS = [
"merchant_id",
Expand Down Expand Up @@ -40,12 +44,23 @@ def main():

environment = getattr(braintree.Environment, config.pop("environment", "Production"))

try:
# Take value of request_timeout if provided in config else take default value
request_timeout = float(config.get("request_timeout", REQUEST_TIME_OUT))

if request_timeout == 0:
# Raise error when request_timeout is given as 0 in config
raise ValueError()
except ValueError:
raise ValueError('Please provide a value greater than 0 for the request_timeout parameter in config')

gateway = braintree.BraintreeGateway(
braintree.Configuration(
environment,
merchant_id = config['merchant_id'],
public_key= config["public_key"],
private_key=config["private_key"]
private_key=config["private_key"],
timeout = request_timeout
)
)
try:
Expand Down
37 changes: 36 additions & 1 deletion tap_braintree/streams.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import pytz
import singer
import braintree
import backoff
from singer import utils
from datetime import datetime, timedelta
from .transform import transform_row

from braintree.exceptions.too_many_requests_error import TooManyRequestsError
from braintree.exceptions.server_error import ServerError
from braintree.exceptions.service_unavailable_error import ServiceUnavailableError
from braintree.exceptions.gateway_timeout_error import GatewayTimeoutError


TRAILING_DAYS = timedelta(days=30)
DEFAULT_TIMESTAMP = "1970-01-01T00:00:00Z"
LOGGER = singer.get_logger()
Expand Down Expand Up @@ -86,6 +93,14 @@ class SyncWithoutWindow(Stream):
replication_keys = "updated_at"
replication_method = "INCREMENTAL"

# Backoff the request for 5 times when ConnectionError, TooManyRequestsError (status code = 429),
# ServerError(status code = 500) , ServiceUnavailableError (status code = 503) ,
# or GatewayTimeoutError (status code = 504) occurs
@backoff.on_exception(
backoff.expo,
(ConnectionError, TooManyRequestsError, ServerError, ServiceUnavailableError, GatewayTimeoutError),
max_tries=5,
factor=2)
def sync(self, gateway, config, schema, state, selected_streams):
"""
Sync function for incremental stream without window logic
Expand Down Expand Up @@ -133,6 +148,18 @@ class SyncWithWindow(Stream):
replication_keys = "created_at"
replication_method = "INCREMENTAL"

# Backoff the request for 5 times when ConnectionError, TooManyRequestsError (status code = 429),
# ServerError(status code = 500) , ServiceUnavailableError (status code = 503) ,
# or GatewayTimeoutError (status code = 504) occurs
@backoff.on_exception(
backoff.expo,
(ConnectionError, TooManyRequestsError, ServerError, ServiceUnavailableError, GatewayTimeoutError),
max_tries=5,
factor=2)
Comment on lines +154 to +158

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the API call to some function and apply backoff over it? Currently, tap is doing date_window, so if any API failed, it will retry from the start.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def GetRecords(self, gateway, start, end):
"""Return records between given date window"""
return self.sdk_call(gateway, start, end)

def sync(self, gateway, config, schema, state, selected_streams):
"""
Sync function for incremental stream with window logic
Expand Down Expand Up @@ -163,7 +190,7 @@ def sync(self, gateway, config, schema, state, selected_streams):
for start, end in self.daterange(period_start, period_end):
end = min(end, period_end)

data = self.sdk_call(gateway, start, end)
data = self.GetRecords(gateway, start, end)
time_extracted = utils.now()

row_written_count = 0
Expand Down Expand Up @@ -247,6 +274,14 @@ class FullTableSync(Stream):
sdk_call = None
key_properties = ["id"]

# Backoff the request for 5 times when ConnectionError, TooManyRequestsError (status code = 429),
# ServerError(status code = 500) , ServiceUnavailableError (status code = 503) ,
# or GatewayTimeoutError (status code = 504) occurs
@backoff.on_exception(
backoff.expo,
(ConnectionError, TooManyRequestsError, ServerError, ServiceUnavailableError, GatewayTimeoutError),
max_tries=5,
factor=2)
def sync(self, gateway, config, schema, state, selected_streams):
"""
Sync function for full_table stream
Expand Down
57 changes: 57 additions & 0 deletions tests/unittests/test_backoff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import unittest
from unittest import mock
from parameterized import parameterized
from braintree.exceptions.too_many_requests_error import TooManyRequestsError
from braintree.exceptions.server_error import ServerError
from braintree.exceptions.service_unavailable_error import ServiceUnavailableError
from braintree.exceptions.gateway_timeout_error import GatewayTimeoutError
from tap_braintree.streams import AddOn, Transaction


class TestBackoff(unittest.TestCase):
'''
Test that backoff logic works properly
'''

gateway = "test"
config = {"start_date":"2022-08-01T00:00:00Z"}
schema = {}
state = {}

@parameterized.expand([
['connection_error', ConnectionError, 5],
['too_many_requests_error', TooManyRequestsError, 5],
['server_error', ServerError, 5],
['service_unavailable_error', ServiceUnavailableError, 5],
['gateway_timeout_error', GatewayTimeoutError, 5]
])
@mock.patch("tap_braintree.streams.AddOn.sdk_call")
@mock.patch("time.sleep")
def test_backoff_for_sync_without_window(self, name, test_exception, expected_count, mocked_time, mocked_sdk_call):
'''Test function to verify working of backoff for sync of incremental streams without window'''

stream_obj = AddOn()
mocked_sdk_call.side_effect = test_exception('exception')

with self.assertRaises(test_exception):
stream_obj.sync(self.gateway, self.config, self.schema, self.state, ["add_ons"])
self.assertEqual(mocked_sdk_call.call_count, expected_count)

@parameterized.expand([
['connection_error', ConnectionError, 5],
['too_many_requests_error', TooManyRequestsError, 5],
['server_error', ServerError, 5],
['service_unavailable_error', ServiceUnavailableError, 5],
['gateway_timeout_error', GatewayTimeoutError, 5]
])
@mock.patch("tap_braintree.streams.Transaction.sdk_call")
@mock.patch("time.sleep")
def test_backoff_for_sync_with_window(self, name, test_exception, expected_count, mocked_time, mocked_sdk_call):
'''Test function to verify working of backoff for sync of incremental streams with window'''

stream_obj = Transaction()
mocked_sdk_call.side_effect = test_exception('exception')

with self.assertRaises(test_exception):
stream_obj.GetRecords(self.gateway, self.config["start_date"], self.config["start_date"])
self.assertEqual(mocked_sdk_call.call_count, expected_count)
13 changes: 0 additions & 13 deletions tests/unittests/test_discover.py

This file was deleted.

36 changes: 0 additions & 36 deletions tests/unittests/test_exception_handling.py

This file was deleted.

43 changes: 0 additions & 43 deletions tests/unittests/test_main.py

This file was deleted.

24 changes: 0 additions & 24 deletions tests/unittests/test_schema.py

This file was deleted.

Loading