Skip to content

Commit ab1d2e2

Browse files
committed
Merge pull request #51 from Kinto/39-retry-for-batch
Retry batch requests (fixes #39)
2 parents 36c2b99 + a50cde4 commit ab1d2e2

File tree

8 files changed

+279
-97
lines changed

8 files changed

+279
-97
lines changed

README.rst

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,12 +211,39 @@ It is possible to do batch requests using a Python context manager (``with``):
211211
.. code-block:: python
212212
213213
with client.batch() as batch:
214-
for idx in range(0,100):
214+
for idx in range(0,100):
215215
batch.update_record(data={'id': idx})
216216
217217
A batch object shares the same methods as another client.
218218

219219

220+
Retry on error
221+
--------------
222+
223+
When the server is throttled (under heavy load or maintenance) it can
224+
return error responses.
225+
226+
The client can hence retry to send the same request until it succeeds.
227+
To enable this, specify the number of retries on the client:
228+
229+
.. code-block:: python
230+
231+
client = Client(server_url='http://localhost:8888/v1',
232+
auth=credentials,
233+
retry=10)
234+
235+
The Kinto protocol lets the server `define the duration in seconds between retries
236+
<http://kinto.readthedocs.org/en/latest/api/1.x/cliquet/backoff.html#retry-after-indicators>`_.
237+
It is possible (but not recommended) to force this value in the clients:
238+
239+
.. code-block:: python
240+
241+
client = Client(server_url='http://localhost:8888/v1',
242+
auth=credentials,
243+
retry=10,
244+
retry_after=5)
245+
246+
220247
Run tests
221248
=========
222249

kinto_client/__init__.py

Lines changed: 12 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
import collections
2-
import requests
32
import uuid
43
from six import iteritems
5-
from six.moves.urllib.parse import urlparse
64

75
from contextlib import contextmanager
86

97

108
from kinto_client import utils
9+
from kinto_client.session import create_session, Session
1110
from kinto_client.batch import Batch
1211
from kinto_client.exceptions import BucketNotFound, KintoException
1312

@@ -27,31 +26,6 @@
2726
DO_NOT_OVERWRITE = {'If-None-Match': '*'}
2827

2928

30-
def create_session(server_url=None, auth=None, session=None):
31-
"""Returns a session from the passed arguments.
32-
33-
:param server_url:
34-
The URL of the server to use, with the prefix.
35-
:param auth:
36-
A requests authentication policy object.
37-
:param session:
38-
An optional session object to use, rather than creating a new one.
39-
"""
40-
# XXX Refactor the create_session to take place in the caller objects.
41-
# E.g. test if the session exists before calling create_session.
42-
if session is not None and (
43-
server_url is not None or auth is not None):
44-
msg = ("You cannot specify session and server_url or auth. "
45-
"Chose either session or (auth + server_url).")
46-
raise AttributeError(msg)
47-
if session is None and server_url is None and auth is None:
48-
msg = ("You need to either set session or auth + server_url")
49-
raise AttributeError(msg)
50-
if session is None:
51-
session = Session(server_url=server_url, auth=auth)
52-
return session
53-
54-
5529
class Endpoints(object):
5630
endpoints = {
5731
'root': '{root}/',
@@ -81,66 +55,26 @@ def get(self, endpoint, **kwargs):
8155
field=','.join(e.args)))
8256

8357

84-
class Session(object):
85-
"""Handles all the interactions with the network.
86-
"""
87-
def __init__(self, server_url, auth=None):
88-
self.server_url = server_url
89-
self.auth = auth
90-
91-
def request(self, method, endpoint, data=None, permissions=None,
92-
payload=None, **kwargs):
93-
parsed = urlparse(endpoint)
94-
if not parsed.scheme:
95-
actual_url = utils.urljoin(self.server_url, endpoint)
96-
else:
97-
actual_url = endpoint
98-
99-
if self.auth is not None:
100-
kwargs.setdefault('auth', self.auth)
101-
102-
payload = payload or {}
103-
# if data is not None:
104-
payload['data'] = data or {}
105-
if permissions is not None:
106-
if hasattr(permissions, 'as_dict'):
107-
permissions = permissions.as_dict()
108-
payload['permissions'] = permissions
109-
if payload:
110-
payload_kwarg = 'data' if 'files' in kwargs else 'json'
111-
kwargs.setdefault(payload_kwarg, payload)
112-
resp = requests.request(method, actual_url, **kwargs)
113-
if not (200 <= resp.status_code < 400):
114-
message = '{0} - {1}'.format(resp.status_code, resp.json())
115-
exception = KintoException(message)
116-
exception.request = resp.request
117-
exception.response = resp
118-
raise exception
119-
120-
if resp.status_code == 304:
121-
body = None
122-
else:
123-
body = resp.json()
124-
# XXX Add the status code.
125-
return body, resp.headers
126-
127-
12858
class Client(object):
12959

13060
def __init__(self, server_url=None, session=None, auth=None,
131-
bucket="default", collection=None):
61+
bucket="default", collection=None, retry=0, retry_after=None):
13262
self.endpoints = Endpoints()
133-
self.session = create_session(server_url, auth, session)
63+
self.session_kwargs = dict(server_url=server_url,
64+
auth=auth,
65+
session=session,
66+
retry=retry,
67+
retry_after=retry_after)
68+
self.session = create_session(**self.session_kwargs)
13469
self._bucket_name = bucket
13570
self._collection_name = collection
13671
self._server_settings = None
13772

13873
def clone(self, **kwargs):
139-
return Client(**{
140-
'session': kwargs.get('session', self.session),
141-
'bucket': kwargs.get('bucket', self._bucket_name),
142-
'collection': kwargs.get('collection', self._collection_name),
143-
})
74+
kwargs.setdefault('session', self.session)
75+
kwargs.setdefault('bucket', self._bucket_name)
76+
kwargs.setdefault('collection', self._collection_name)
77+
return Client(**kwargs)
14478

14579
@contextmanager
14680
def batch(self, **kwargs):

kinto_client/batch.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,10 @@ def send(self):
4646
result = []
4747
requests = self._build_requests()
4848
for chunk in utils.chunks(requests, self.batch_max_requests):
49-
resp, headers = self.session.request(
50-
'POST',
51-
self.endpoints.get('batch'),
52-
payload={'requests': chunk}
53-
)
49+
kwargs = dict(method='POST',
50+
endpoint=self.endpoints.get('batch'),
51+
payload={'requests': chunk})
52+
resp, headers = self.session.request(**kwargs)
5453
for i, response in enumerate(resp['responses']):
5554
status_code = response['status']
5655
if not (200 <= status_code < 400):

kinto_client/session.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import time
2+
3+
import requests
4+
from six.moves.urllib.parse import urlparse
5+
6+
from kinto_client import utils
7+
from kinto_client.exceptions import KintoException
8+
9+
10+
def create_session(server_url=None, auth=None, session=None, retry=0,
11+
retry_after=None):
12+
"""Returns a session from the passed arguments.
13+
14+
:param server_url:
15+
The URL of the server to use, with the prefix.
16+
:param auth:
17+
A requests authentication policy object.
18+
:param session:
19+
An optional session object to use, rather than creating a new one.
20+
"""
21+
# XXX Refactor the create_session to take place in the caller objects.
22+
# E.g. test if the session exists before calling create_session.
23+
if session is not None and (
24+
server_url is not None or auth is not None):
25+
msg = ("You cannot specify session and server_url or auth. "
26+
"Chose either session or (auth + server_url).")
27+
raise AttributeError(msg)
28+
if session is None and server_url is None and auth is None:
29+
msg = ("You need to either set session or auth + server_url")
30+
raise AttributeError(msg)
31+
if session is None:
32+
session = Session(server_url=server_url, auth=auth, retry=retry,
33+
retry_after=retry_after)
34+
return session
35+
36+
37+
class Session(object):
38+
"""Handles all the interactions with the network.
39+
"""
40+
def __init__(self, server_url, auth=None, retry=0, retry_after=None):
41+
self.server_url = server_url
42+
self.auth = auth
43+
self.nb_retry = retry
44+
self.retry_after = retry_after
45+
46+
def request(self, method, endpoint, data=None, permissions=None,
47+
payload=None, **kwargs):
48+
parsed = urlparse(endpoint)
49+
if not parsed.scheme:
50+
actual_url = utils.urljoin(self.server_url, endpoint)
51+
else:
52+
actual_url = endpoint
53+
54+
if self.auth is not None:
55+
kwargs.setdefault('auth', self.auth)
56+
57+
payload = payload or {}
58+
# if data is not None:
59+
payload['data'] = data or {}
60+
if permissions is not None:
61+
if hasattr(permissions, 'as_dict'):
62+
permissions = permissions.as_dict()
63+
payload['permissions'] = permissions
64+
if payload:
65+
payload_kwarg = 'data' if 'files' in kwargs else 'json'
66+
kwargs.setdefault(payload_kwarg, payload)
67+
68+
retry = self.nb_retry
69+
while retry >= 0:
70+
resp = requests.request(method, actual_url, **kwargs)
71+
retry = retry - 1
72+
if not (200 <= resp.status_code < 400):
73+
if resp.status_code >= 500 and retry >= 0:
74+
# Wait and try again.
75+
# If not forced, use retry-after header and wait.
76+
if self.retry_after is None:
77+
retry_after = resp.headers.get("Retry-After", 0)
78+
else:
79+
retry_after = self.retry_after
80+
time.sleep(retry_after)
81+
continue
82+
83+
# Retries exhausted, raise expection.
84+
message = '{0} - {1}'.format(resp.status_code, resp.json())
85+
exception = KintoException(message)
86+
exception.request = resp.request
87+
exception.response = resp
88+
raise exception
89+
90+
if resp.status_code == 304:
91+
body = None
92+
else:
93+
body = resp.json()
94+
# XXX Add the status code.
95+
return body, resp.headers

kinto_client/tests/support.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,35 @@ def get_http_error(status):
4040
exception.response.status_code = status
4141
exception.request = mock.sentinel.request
4242
return exception
43+
44+
45+
def get_200():
46+
response_200 = mock.MagicMock()
47+
response_200.status_code = 200
48+
response_200.json().return_value = mock.sentinel.resp,
49+
response_200.headers = mock.sentinel.headers
50+
return response_200
51+
52+
53+
def get_503():
54+
body_503 = {
55+
"message": "Service temporary unavailable due to overloading",
56+
"code": 503,
57+
"error": "Service Unavailable",
58+
"errno": 201
59+
}
60+
headers_503 = {
61+
"Content-Type": "application/json; charset=UTF-8",
62+
"Content-Length": 151
63+
}
64+
response_503 = mock.MagicMock()
65+
response_503.status_code = 503
66+
response_503.json.return_value = body_503
67+
response_503.headers = headers_503
68+
return response_503
69+
70+
71+
def get_403():
72+
response_403 = mock.MagicMock()
73+
response_403.status_code = 403
74+
return response_403

kinto_client/tests/test_batch.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ def test_send_adds_data_attribute(self):
2525
batch.send()
2626

2727
self.client.session.request.assert_called_with(
28-
'POST',
29-
self.client.endpoints.get('batch'),
28+
method='POST',
29+
endpoint=self.client.endpoints.get('batch'),
3030
payload={'requests': [{
3131
'method': 'GET',
3232
'path': '/foobar/baz',
@@ -41,8 +41,8 @@ def test_send_adds_permissions_attribute(self):
4141
batch.send()
4242

4343
self.client.session.request.assert_called_with(
44-
'POST',
45-
self.client.endpoints.get('batch'),
44+
method='POST',
45+
endpoint=self.client.endpoints.get('batch'),
4646
payload={'requests': [{
4747
'method': 'GET',
4848
'path': '/foobar/baz',
@@ -56,8 +56,8 @@ def test_send_adds_headers_if_specified(self):
5656
batch.send()
5757

5858
self.client.session.request.assert_called_with(
59-
'POST',
60-
self.client.endpoints.get('batch'),
59+
method='POST',
60+
endpoint=self.client.endpoints.get('batch'),
6161
payload={'requests': [{
6262
'method': 'GET',
6363
'path': '/foobar/baz',

kinto_client/tests/test_client.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ def test_context_manager_works_as_expected(self):
2222
batch.create_record(id=5678, data={'bar': 'baz'})
2323

2424
self.session.request.assert_called_with(
25-
'POST',
26-
'/batch',
25+
method='POST',
26+
endpoint='/batch',
2727
payload={'requests': [
2828
{'body': {'data': {'foo': 'bar'}},
2929
'path': '/buckets/mozilla/collections/test/records/1234',
@@ -65,6 +65,16 @@ def test_batch_raises_exception_if_subrequest_failed(self):
6565
batch.create_record(id=1234, data={'foo': 'bar'})
6666
batch.create_record(id=5678, data={'tutu': 'toto'})
6767

68+
def test_batch_options_are_transmitted(self):
69+
settings = {"batch_max_requests": 25}
70+
self.session.request.side_effect = [({"settings": settings}, [])]
71+
with mock.patch('kinto_client.create_session') as create_session:
72+
with self.client.batch(bucket='moz', collection='test', retry=12,
73+
retry_after=20):
74+
_, last_call_kwargs = create_session.call_args_list[-1]
75+
self.assertEqual(last_call_kwargs['retry'], 12)
76+
self.assertEqual(last_call_kwargs['retry_after'], 20)
77+
6878
def test_client_is_represented_properly(self):
6979
client = Client(
7080
server_url="https://kinto.notmyidea.org/v1",

0 commit comments

Comments
 (0)