Skip to content

Commit 94bd1a7

Browse files
authored
added ray serializer (#72)
1 parent eddc77d commit 94bd1a7

File tree

4 files changed

+119
-1
lines changed

4 files changed

+119
-1
lines changed

btrdb/conn.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from btrdb.stream import Stream, StreamSet
2727
from btrdb.utils.general import unpack_stream_descriptor
2828
from btrdb.utils.conversion import to_uuid
29-
from btrdb.exceptions import NotFound
29+
from btrdb.exceptions import NotFound, InvalidOperation
3030

3131
##########################################################################
3232
## Module Variables
@@ -346,3 +346,6 @@ def collection_metadata(self, prefix):
346346
pyTags = {tag.key: tag.count for tag in tags}
347347
pyAnn = {ann.key: ann.count for ann in annotations}
348348
return pyTags, pyAnn
349+
350+
def __reduce__(self):
351+
raise InvalidOperation("BTrDB object cannot be reduced.")

btrdb/utils/ray.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from functools import partial
2+
3+
import ray
4+
5+
import btrdb
6+
from btrdb.conn import BTrDB
7+
8+
def register_serializer(conn_str=None, apikey=None, profile=None):
9+
"""
10+
Register serializer for BTrDB Object
11+
Parameters
12+
----------
13+
conn_str: str, default=None
14+
The address and port of the cluster to connect to, e.g. `192.168.1.1:4411`.
15+
If set to None, will look in the environment variable `$BTRDB_ENDPOINTS`
16+
(recommended).
17+
apikey: str, default=None
18+
The API key used to authenticate requests (optional). If None, the key
19+
is looked up from the environment variable `$BTRDB_API_KEY`.
20+
profile: str, default=None
21+
The name of a profile containing the required connection information as
22+
found in the user's predictive grid credentials file
23+
`~/.predictivegrid/credentials.yaml`.
24+
"""
25+
ray.register_custom_serializer(
26+
BTrDB, serializer=btrdb_serializer, deserializer=partial(btrdb_deserializer, conn_str=conn_str, apikey=apikey, profile=profile))
27+
28+
def btrdb_serializer(_):
29+
"""
30+
sererialize function
31+
"""
32+
return None
33+
34+
def btrdb_deserializer(_, conn_str=None, apikey=None, profile=None):
35+
"""
36+
deserialize function
37+
38+
Parameters
39+
----------
40+
conn_str: str, default=None
41+
The address and port of the cluster to connect to, e.g. `192.168.1.1:4411`.
42+
If set to None, will look in the environment variable `$BTRDB_ENDPOINTS`
43+
(recommended).
44+
apikey: str, default=None
45+
The API key used to authenticate requests (optional). If None, the key
46+
is looked up from the environment variable `$BTRDB_API_KEY`.
47+
profile: str, default=None
48+
The name of a profile containing the required connection information as
49+
found in the user's predictive grid credentials file
50+
`~/.predictivegrid/credentials.yaml`.
51+
Returns
52+
-------
53+
db : BTrDB
54+
An instance of the BTrDB context to directly interact with the database.
55+
"""
56+
return btrdb.connect(conn_str=conn_str, apikey=apikey, profile=profile)

docs/source/working.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ to interact with the BTrDB database.
1515
working/stream-view-data
1616
working/streamsets
1717
working/multiprocessing
18+
working/ray

docs/source/working/ray.rst

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
Working with Ray
2+
================================
3+
4+
To use BTrDB connection, stream and streamsets objects in the parallelization library ray,
5+
a special serializer is required. BTrDB provides a utility function that register the serializer with ray.
6+
An example is shown below.
7+
8+
Setting up the ray serializer
9+
-----------------------------
10+
.. code-block:: python
11+
12+
import btrdb
13+
import ray
14+
from btrdb.utils.ray import register_serializer
15+
16+
uuids = ["b19592fc-fb71-4f61-9d49-8646d4b1c2a1",
17+
"07b2cff3-e957-4fa9-b1b3-e14d5afb1e63"]
18+
ray.init()
19+
20+
conn_params = {"profile": "profile_name"}
21+
22+
# register serializer with the connection parameters
23+
register_serializer(**conn_params)
24+
25+
conn = btrdb.connect(**conn_params)
26+
27+
# BTrDB connection object can be passed as an argument
28+
# to a ray remote function
29+
@ray.remote
30+
def test_btrdb(conn):
31+
print(conn.info())
32+
33+
# Stream object can be passed as an argument
34+
# to a ray remote function
35+
@ray.remote
36+
def test_stream(stream):
37+
print(stream.earliest())
38+
39+
# StreamSet object can be passed as an argument
40+
# to a ray remote function
41+
@ray.remote
42+
def test_streamset(streamset):
43+
print(streamset.earliest())
44+
print(streamset)
45+
46+
47+
ids = [test_btrdb.remote(conn),
48+
test_stream.remote(conn.stream_from_uuid(uuids[0])),
49+
test_streamset.remote(conn.streams(*uuids))]
50+
51+
ray.get(ids)
52+
# output of test_btrdb
53+
>>(pid=28479) {'majorVersion': 5, 'build': '5.10.5', 'proxy': {'proxyEndpoints': []}}
54+
# output of test_stream
55+
>>(pid=28482) (RawPoint(1533210100000000000, 0.0), 0)
56+
# output of test_streamset
57+
>>(pid=28481) (RawPoint(1533210100000000000, 0.0), RawPoint(1533210100000000000, 0.0))
58+
>>(pid=28481) StreamSet with 2 streams

0 commit comments

Comments
 (0)