Skip to content

Commit dcb7a3c

Browse files
committed
Initial work to get async functionality working (tested manually)
1 parent 56cdcc1 commit dcb7a3c

File tree

7 files changed

+183
-90
lines changed

7 files changed

+183
-90
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ repos:
55
- id: mypy
66
args: [--strict]
77
additional_dependencies:
8-
[pydantic, pytest, pytest_mock, types-requests, flagsmith-flag-engine, responses, sseclient-py]
8+
[flagsmith-flag-engine, httpx, pydantic, pytest, pytest_mock, responses, sseclient-py, types-requests]
99
- repo: https://github.com/PyCQA/isort
1010
rev: 5.13.2
1111
hooks:

flagsmith/analytics.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,28 @@
1-
import json
1+
import threading
2+
import time
23
import typing
3-
from datetime import datetime
44

5-
from requests_futures.sessions import FuturesSession # type: ignore
5+
import httpx
66

77
ANALYTICS_ENDPOINT: typing.Final[str] = "analytics/flags/"
88

99
# Used to control how often we send data(in seconds)
1010
ANALYTICS_TIMER: typing.Final[int] = 10
1111

12-
session = FuturesSession(max_workers=4)
1312

14-
15-
class AnalyticsProcessor:
13+
class AnalyticsProcessor(threading.Thread):
1614
"""
1715
AnalyticsProcessor is used to track how often individual Flags are evaluated within
1816
the Flagsmith SDK. Docs: https://docs.flagsmith.com/advanced-use/flag-analytics.
1917
"""
2018

2119
def __init__(
22-
self, environment_key: str, base_api_url: str, timeout: typing.Optional[int] = 3
20+
self,
21+
*args: typing.Any,
22+
environment_key: str,
23+
base_api_url: str,
24+
client: httpx.Client | None = None,
25+
**kwargs: typing.Any,
2326
):
2427
"""
2528
Initialise the AnalyticsProcessor to handle sending analytics on flag usage to
@@ -30,11 +33,16 @@ def __init__(
3033
:param timeout: used to tell requests to stop waiting for a response after a
3134
given number of seconds
3235
"""
36+
super().__init__(*args, **kwargs)
37+
3338
self.analytics_endpoint = base_api_url + ANALYTICS_ENDPOINT
3439
self.environment_key = environment_key
35-
self._last_flushed = datetime.now()
3640
self.analytics_data: typing.MutableMapping[str, typing.Any] = {}
37-
self.timeout = timeout or 3
41+
42+
self._client = client or httpx.Client()
43+
self._client.headers.update({"X-Environment-Key": self.environment_key})
44+
45+
self._stop_event = threading.Event()
3846

3947
def flush(self) -> None:
4048
"""
@@ -43,20 +51,28 @@ def flush(self) -> None:
4351

4452
if not self.analytics_data:
4553
return
46-
session.post(
54+
55+
self._client.post(
4756
self.analytics_endpoint,
48-
data=json.dumps(self.analytics_data),
49-
timeout=self.timeout,
57+
data=self.analytics_data,
5058
headers={
5159
"X-Environment-Key": self.environment_key,
5260
"Content-Type": "application/json",
5361
},
5462
)
5563

5664
self.analytics_data.clear()
57-
self._last_flushed = datetime.now()
5865

5966
def track_feature(self, feature_name: str) -> None:
6067
self.analytics_data[feature_name] = self.analytics_data.get(feature_name, 0) + 1
61-
if (datetime.now() - self._last_flushed).seconds > ANALYTICS_TIMER:
68+
69+
def run(self) -> None:
70+
while not self._stop_event.is_set():
71+
time.sleep(ANALYTICS_TIMER)
6272
self.flush()
73+
74+
def stop(self) -> None:
75+
self._stop_event.set()
76+
77+
def __del__(self) -> None:
78+
self._stop_event.set()

flagsmith/flagsmith.py

Lines changed: 50 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,14 @@
22
import typing
33
from datetime import timezone
44

5+
import httpx
56
import pydantic
6-
import requests
77
from flag_engine import engine
88
from flag_engine.environments.models import EnvironmentModel
99
from flag_engine.identities.models import IdentityModel
1010
from flag_engine.identities.traits.models import TraitModel
1111
from flag_engine.identities.traits.types import TraitValue
1212
from flag_engine.segments.evaluator import get_identity_segments
13-
from requests.adapters import HTTPAdapter
14-
from urllib3 import Retry
1513

1614
from flagsmith.analytics import AnalyticsProcessor
1715
from flagsmith.exceptions import FlagsmithAPIError, FlagsmithClientError
@@ -37,27 +35,25 @@ class Flagsmith:
3735
3836
>>> from flagsmith import Flagsmith
3937
>>> flagsmith = Flagsmith(environment_key="<your API key>")
40-
>>> environment_flags = flagsmith.get_environment_flags()
38+
>>> environment_flags = await flagsmith.get_environment_flags()
4139
>>> feature_enabled = environment_flags.is_feature_enabled("foo")
4240
>>> identity_flags = flagsmith.get_identity_flags("identifier", {"foo": "bar"})
4341
>>> feature_enabled_for_identity = identity_flags.is_feature_enabled("foo")
4442
"""
4543

46-
def __init__(
44+
def __init__( # noqa: C901
4745
self,
4846
environment_key: typing.Optional[str] = None,
4947
api_url: typing.Optional[str] = None,
5048
realtime_api_url: typing.Optional[str] = None,
51-
custom_headers: typing.Optional[typing.Dict[str, typing.Any]] = None,
52-
request_timeout_seconds: typing.Optional[int] = None,
49+
client: httpx.Client | None = None,
50+
aclient: httpx.AsyncClient | None = None,
5351
enable_local_evaluation: bool = False,
5452
environment_refresh_interval_seconds: typing.Union[int, float] = 60,
55-
retries: typing.Optional[Retry] = None,
5653
enable_analytics: bool = False,
5754
default_flag_handler: typing.Optional[
5855
typing.Callable[[str], DefaultFlag]
5956
] = None,
60-
proxies: typing.Optional[typing.Dict[str, str]] = None,
6157
offline_mode: bool = False,
6258
offline_handler: typing.Optional[BaseOfflineHandler] = None,
6359
enable_realtime_updates: bool = False,
@@ -67,21 +63,15 @@ def __init__(
6763
Required unless offline_mode is True.
6864
:param api_url: Override the URL of the Flagsmith API to communicate with
6965
:param realtime_api_url: Override the URL of the Flagsmith real-time API
70-
:param custom_headers: Additional headers to add to requests made to the
71-
Flagsmith API
72-
:param request_timeout_seconds: Number of seconds to wait for a request to
73-
complete before terminating the request
66+
:param client: a httpx.AsyncClient instance to use for HTTP requests
7467
:param enable_local_evaluation: Enables local evaluation of flags
7568
:param environment_refresh_interval_seconds: If using local evaluation,
7669
specify the interval period between refreshes of local environment data
77-
:param retries: a urllib3.Retry object to use on all http requests to the
78-
Flagsmith API
7970
:param enable_analytics: if enabled, sends additional requests to the Flagsmith
8071
API to power flag analytics charts
8172
:param default_flag_handler: callable which will be used in the case where
8273
flags cannot be retrieved from the API or a non-existent feature is
8374
requested
84-
:param proxies: as per https://requests.readthedocs.io/en/latest/api/#requests.Session.proxies
8575
:param offline_mode: sets the client into offline mode. Relies on offline_handler for
8676
evaluating flags.
8777
:param offline_handler: provide a handler for offline logic. Used to get environment
@@ -120,12 +110,10 @@ def __init__(
120110
if not environment_key:
121111
raise ValueError("environment_key is required.")
122112

123-
self.session = requests.Session()
124-
self.session.headers.update(
125-
**{"X-Environment-Key": environment_key}, **(custom_headers or {})
126-
)
127-
self.session.proxies.update(proxies or {})
128-
retries = retries or Retry(total=3, backoff_factor=0.1)
113+
self._client = client or httpx.Client()
114+
self._aclient = aclient or httpx.AsyncClient()
115+
for c in (self._client, self._aclient):
116+
c.headers.update({"X-Environment-Key": environment_key})
129117

130118
api_url = api_url or DEFAULT_API_URL
131119
self.api_url = api_url if api_url.endswith("/") else f"{api_url}/"
@@ -137,9 +125,6 @@ def __init__(
137125
else f"{realtime_api_url}/"
138126
)
139127

140-
self.request_timeout_seconds = request_timeout_seconds
141-
self.session.mount(self.api_url, HTTPAdapter(max_retries=retries))
142-
143128
self.environment_flags_url = f"{self.api_url}flags/"
144129
self.identities_url = f"{self.api_url}identities/"
145130
self.environment_url = f"{self.api_url}environment-document/"
@@ -155,8 +140,12 @@ def __init__(
155140

156141
if enable_analytics:
157142
self._analytics_processor = AnalyticsProcessor(
158-
environment_key, self.api_url, timeout=self.request_timeout_seconds
143+
daemon=True,
144+
environment_key=environment_key,
145+
base_api_url=self.api_url,
146+
client=self._client,
159147
)
148+
self._analytics_processor.start()
160149

161150
def _initialise_local_evaluation(self) -> None:
162151
# To ensure that the environment is set before allowing subsequent
@@ -199,17 +188,17 @@ def handle_stream_event(self, event: StreamEvent) -> None:
199188
if event.updated_at > environment_updated_at:
200189
self.update_environment()
201190

202-
def get_environment_flags(self) -> Flags:
191+
async def get_environment_flags(self) -> Flags:
203192
"""
204193
Get all the default for flags for the current environment.
205194
206195
:return: Flags object holding all the flags for the current environment.
207196
"""
208197
if (self.offline_mode or self.enable_local_evaluation) and self._environment:
209198
return self._get_environment_flags_from_document()
210-
return self._get_environment_flags_from_api()
199+
return await self._get_environment_flags_from_api()
211200

212-
def get_identity_flags(
201+
async def get_identity_flags(
213202
self,
214203
identifier: str,
215204
traits: typing.Optional[TraitMapping] = None,
@@ -233,7 +222,7 @@ def get_identity_flags(
233222
traits = traits or {}
234223
if (self.offline_mode or self.enable_local_evaluation) and self._environment:
235224
return self._get_identity_flags_from_document(identifier, traits)
236-
return self._get_identity_flags_from_api(
225+
return await self._get_identity_flags_from_api(
237226
identifier,
238227
traits,
239228
transient=transient,
@@ -276,7 +265,13 @@ def update_environment(self) -> None:
276265
}
277266

278267
def _get_environment_from_api(self) -> EnvironmentModel:
279-
environment_data = self._get_json_response(self.environment_url, method="GET")
268+
try:
269+
environment_data = self._client.get(self.environment_url).json()
270+
except httpx.HTTPError as e:
271+
raise FlagsmithAPIError(
272+
"Unable to get valid response from Flagsmith API."
273+
) from e
274+
280275
return EnvironmentModel.model_validate(environment_data)
281276

282277
def _get_environment_flags_from_document(self) -> Flags:
@@ -304,10 +299,10 @@ def _get_identity_flags_from_document(
304299
default_flag_handler=self.default_flag_handler,
305300
)
306301

307-
def _get_environment_flags_from_api(self) -> Flags:
302+
async def _get_environment_flags_from_api(self) -> Flags:
308303
try:
309-
json_response: typing.List[typing.Mapping[str, JsonType]] = (
310-
self._get_json_response(url=self.environment_flags_url, method="GET")
304+
json_response = await self._get_json_response(
305+
url=self.environment_flags_url, method="GET"
311306
)
312307
return Flags.from_api_flags(
313308
api_flags=json_response,
@@ -321,7 +316,7 @@ def _get_environment_flags_from_api(self) -> Flags:
321316
return Flags(default_flag_handler=self.default_flag_handler)
322317
raise
323318

324-
def _get_identity_flags_from_api(
319+
async def _get_identity_flags_from_api(
325320
self,
326321
identifier: str,
327322
traits: TraitMapping,
@@ -334,12 +329,10 @@ def _get_identity_flags_from_api(
334329
transient=transient,
335330
)
336331
try:
337-
json_response: typing.Dict[str, typing.List[typing.Dict[str, JsonType]]] = (
338-
self._get_json_response(
339-
url=self.identities_url,
340-
method="POST",
341-
body=request_body,
342-
)
332+
json_response = await self._get_json_response(
333+
url=self.identities_url,
334+
method="POST",
335+
body=request_body,
343336
)
344337
return Flags.from_api_flags(
345338
api_flags=json_response["flags"],
@@ -353,20 +346,27 @@ def _get_identity_flags_from_api(
353346
return Flags(default_flag_handler=self.default_flag_handler)
354347
raise
355348

356-
def _get_json_response(
349+
async def _get_json_response(
357350
self,
358351
url: str,
359352
method: str,
360353
body: typing.Optional[JsonType] = None,
361354
) -> typing.Any:
362355
try:
363-
request_method = getattr(self.session, method.lower())
364-
response = request_method(
365-
url, json=body, timeout=self.request_timeout_seconds
366-
)
356+
client = self._aclient
357+
request_method = getattr(client, method.lower())
358+
359+
# TODO: can we simplify this? I had to split it out because
360+
# client.get (understandably) does not support the json
361+
# argument
362+
kwargs: dict[str, typing.Any] = {"url": url}
363+
if body is not None:
364+
kwargs["json"] = body
365+
366+
response = await request_method(**kwargs)
367367
response.raise_for_status()
368368
return response.json()
369-
except requests.RequestException as e:
369+
except httpx.HTTPError as e:
370370
raise FlagsmithAPIError(
371371
"Unable to get valid response from Flagsmith API."
372372
) from e
@@ -405,3 +405,6 @@ def __del__(self) -> None:
405405

406406
if hasattr(self, "event_stream_thread"):
407407
self.event_stream_thread.stop()
408+
409+
if self._analytics_processor:
410+
self._analytics_processor.stop()

flagsmith/streaming_manager.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
import typing
44
from typing import Callable, Optional
55

6+
import httpx
67
import pydantic
7-
import requests
88
import sseclient
99

1010
logger = logging.getLogger(__name__)
@@ -32,17 +32,19 @@ def __init__(
3232
def run(self) -> None:
3333
while not self._stop_event.is_set():
3434
try:
35-
with requests.get(
35+
with httpx.stream(
36+
"GET",
3637
self.stream_url,
37-
stream=True,
3838
headers={"Accept": "application/json, text/event-stream"},
3939
timeout=self.request_timeout_seconds,
4040
) as response:
41-
sse_client = sseclient.SSEClient(chunk for chunk in response)
41+
sse_client = sseclient.SSEClient(
42+
chunk for chunk in response.iter_bytes()
43+
)
4244
for event in sse_client.events():
4345
self.on_event(StreamEvent.model_validate_json(event.data))
4446

45-
except (requests.RequestException, pydantic.ValidationError):
47+
except (httpx.HTTPError, pydantic.ValidationError):
4648
logger.exception("Error opening or reading from the event stream")
4749

4850
def stop(self) -> None:

flagsmith/types.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
]
1313
JsonType: TypeAlias = typing.Union[
1414
_JsonScalarType,
15-
typing.Dict[str, "JsonType"],
16-
typing.List["JsonType"],
15+
typing.Mapping[str, "JsonType"],
16+
typing.Sequence["JsonType"],
1717
]
1818

1919

0 commit comments

Comments
 (0)