Skip to content

Commit 7edee2b

Browse files
authored
adds error handling for grpc errors (#86)
* adds error handling for grpc errors * create custom btrdb exceptions, add decorator for error handling, update tests * fixes trailing whitespace * catch and reformat permissions denied error * simplified error handling decorator * minor error handling decorator fixes, standardized and more descriptive exception docstrings * fix grpc imports
1 parent fc174e6 commit 7edee2b

File tree

6 files changed

+416
-140
lines changed

6 files changed

+416
-140
lines changed

btrdb/conn.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from btrdb.stream import Stream, StreamSet
2727
from btrdb.utils.general import unpack_stream_descriptor
2828
from btrdb.utils.conversion import to_uuid
29-
from btrdb.exceptions import NotFound, InvalidOperation
29+
from btrdb.exceptions import StreamNotFoundError, InvalidOperation
3030

3131
##########################################################################
3232
## Module Variables
@@ -187,7 +187,7 @@ def streams(self, *identifiers, versions=None, is_collection_prefix=False):
187187
if len(found) == 1:
188188
streams.append(found[0])
189189
continue
190-
raise NotFound(f"Could not identify stream `{ident}`")
190+
raise StreamNotFoundError(f"Could not identify stream `{ident}`")
191191

192192
raise ValueError(f"Could not identify stream based on `{ident}`. Identifier must be UUID or collection/name.")
193193

btrdb/endpoint.py

Lines changed: 128 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -28,44 +28,68 @@
2828
from btrdb.grpcinterface import btrdb_pb2
2929
from btrdb.grpcinterface import btrdb_pb2_grpc
3030
from btrdb.point import RawPoint
31-
from btrdb.exceptions import BTrDBError
31+
from btrdb.exceptions import BTrDBError, error_handler, check_proto_stat
3232
from btrdb.utils.general import unpack_stream_descriptor
3333

34+
3435
class Endpoint(object):
3536
def __init__(self, channel):
3637
self.stub = btrdb_pb2_grpc.BTrDBStub(channel)
3738

38-
def rawValues(self, uu, start, end, version = 0):
39-
params = btrdb_pb2.RawValuesParams(uuid = uu.bytes, start = start, end = end, versionMajor = version)
39+
@error_handler
40+
def rawValues(self, uu, start, end, version=0):
41+
params = btrdb_pb2.RawValuesParams(
42+
uuid=uu.bytes, start=start, end=end, versionMajor=version
43+
)
4044
for result in self.stub.RawValues(params):
41-
BTrDBError.checkProtoStat(result.stat)
45+
check_proto_stat(result.stat)
4246
yield result.values, result.versionMajor
4347

44-
def alignedWindows(self, uu, start, end, pointwidth, version = 0):
45-
params = btrdb_pb2.AlignedWindowsParams(uuid = uu.bytes, start = start, end = end, versionMajor = version, pointWidth = int(pointwidth))
48+
@error_handler
49+
def alignedWindows(self, uu, start, end, pointwidth, version=0):
50+
params = btrdb_pb2.AlignedWindowsParams(
51+
uuid=uu.bytes,
52+
start=start,
53+
end=end,
54+
versionMajor=version,
55+
pointWidth=int(pointwidth),
56+
)
4657
for result in self.stub.AlignedWindows(params):
47-
BTrDBError.checkProtoStat(result.stat)
58+
check_proto_stat(result.stat)
4859
yield result.values, result.versionMajor
4960

50-
def windows(self, uu, start, end, width, depth, version = 0):
51-
params = btrdb_pb2.WindowsParams(uuid = uu.bytes, start = start, end = end, versionMajor = version, width = width, depth = depth)
61+
@error_handler
62+
def windows(self, uu, start, end, width, depth, version=0):
63+
params = btrdb_pb2.WindowsParams(
64+
uuid=uu.bytes,
65+
start=start,
66+
end=end,
67+
versionMajor=version,
68+
width=width,
69+
depth=depth,
70+
)
5271
for result in self.stub.Windows(params):
53-
BTrDBError.checkProtoStat(result.stat)
72+
check_proto_stat(result.stat)
5473
yield result.values, result.versionMajor
5574

75+
@error_handler
5676
def streamInfo(self, uu, omitDescriptor, omitVersion):
57-
params = btrdb_pb2.StreamInfoParams(uuid = uu.bytes, omitVersion = omitVersion, omitDescriptor = omitDescriptor)
77+
params = btrdb_pb2.StreamInfoParams(
78+
uuid=uu.bytes, omitVersion=omitVersion, omitDescriptor=omitDescriptor
79+
)
5880
result = self.stub.StreamInfo(params)
5981
desc = result.descriptor
60-
BTrDBError.checkProtoStat(result.stat)
82+
check_proto_stat(result.stat)
6183
tagsanns = unpack_stream_descriptor(desc)
6284
return desc.collection, desc.propertyVersion, tagsanns[0], tagsanns[1], result.versionMajor
6385

86+
@error_handler
6487
def obliterate(self, uu):
65-
params = btrdb_pb2.ObliterateParams(uuid = uu.bytes)
88+
params = btrdb_pb2.ObliterateParams(uuid=uu.bytes)
6689
result = self.stub.Obliterate(params)
67-
BTrDBError.checkProtoStat(result.stat)
90+
check_proto_stat(result.stat)
6891

92+
@error_handler
6993
def setStreamAnnotations(self, uu, expected, changes, removals):
7094
annkvlist = []
7195
for k, v in changes.items():
@@ -74,13 +98,19 @@ def setStreamAnnotations(self, uu, expected, changes, removals):
7498
else:
7599
if isinstance(v, str):
76100
v = v.encode("utf-8")
77-
ov = btrdb_pb2.OptValue(value = v)
78-
kv = btrdb_pb2.KeyOptValue(key = k, val = ov)
101+
ov = btrdb_pb2.OptValue(value=v)
102+
kv = btrdb_pb2.KeyOptValue(key=k, val=ov)
79103
annkvlist.append(kv)
80-
params = btrdb_pb2.SetStreamAnnotationsParams(uuid=uu.bytes, expectedPropertyVersion=expected, changes=annkvlist, removals=removals)
104+
params = btrdb_pb2.SetStreamAnnotationsParams(
105+
uuid=uu.bytes,
106+
expectedPropertyVersion=expected,
107+
changes=annkvlist,
108+
removals=removals,
109+
)
81110
result = self.stub.SetStreamAnnotations(params)
82-
BTrDBError.checkProtoStat(result.stat)
111+
check_proto_stat(result.stat)
83112

113+
@error_handler
84114
def setStreamTags(self, uu, expected, tags, collection):
85115
tag_data = []
86116
for k, v in tags.items():
@@ -89,13 +119,19 @@ def setStreamTags(self, uu, expected, tags, collection):
89119
else:
90120
if isinstance(v, str):
91121
v = v.encode("utf-8")
92-
ov = btrdb_pb2.OptValue(value = v)
93-
kv = btrdb_pb2.KeyOptValue(key = k, val = ov)
122+
ov = btrdb_pb2.OptValue(value=v)
123+
kv = btrdb_pb2.KeyOptValue(key=k, val=ov)
94124
tag_data.append(kv)
95-
params = btrdb_pb2.SetStreamTagsParams(uuid=uu.bytes, expectedPropertyVersion=expected, tags=tag_data, collection=collection)
125+
params = btrdb_pb2.SetStreamTagsParams(
126+
uuid=uu.bytes,
127+
expectedPropertyVersion=expected,
128+
tags=tag_data,
129+
collection=collection,
130+
)
96131
result = self.stub.SetStreamTags(params)
97-
BTrDBError.checkProtoStat(result.stat)
132+
check_proto_stat(result.stat)
98133

134+
@error_handler
99135
def create(self, uu, collection, tags, annotations):
100136
tagkvlist = []
101137
for k, v in tags.items():
@@ -105,10 +141,13 @@ def create(self, uu, collection, tags, annotations):
105141
for k, v in annotations.items():
106142
kv = btrdb_pb2.KeyOptValue(key = k, val = btrdb_pb2.OptValue(value=v))
107143
annkvlist.append(kv)
108-
params = btrdb_pb2.CreateParams(uuid = uu.bytes, collection = collection, tags = tagkvlist, annotations = annkvlist)
144+
params = btrdb_pb2.CreateParams(
145+
uuid=uu.bytes, collection=collection, tags=tagkvlist, annotations=annkvlist
146+
)
109147
result = self.stub.Create(params)
110-
BTrDBError.checkProtoStat(result.stat)
148+
check_proto_stat(result.stat)
111149

150+
@error_handler
112151
def listCollections(self, prefix):
113152
"""
114153
Returns a generator for windows of collection paths matching search
@@ -119,9 +158,10 @@ def listCollections(self, prefix):
119158
"""
120159
params = btrdb_pb2.ListCollectionsParams(prefix=prefix)
121160
for msg in self.stub.ListCollections(params):
122-
BTrDBError.checkProtoStat(msg.stat)
161+
check_proto_stat(msg.stat)
123162
yield msg.collections
124163

164+
@error_handler
125165
def lookupStreams(self, collection, isCollectionPrefix, tags, annotations):
126166
tagkvlist = []
127167
for k, v in tags.items():
@@ -130,8 +170,8 @@ def lookupStreams(self, collection, isCollectionPrefix, tags, annotations):
130170
else:
131171
if isinstance(v, str):
132172
v = v.encode("utf-8")
133-
ov = btrdb_pb2.OptValue(value = v)
134-
kv = btrdb_pb2.KeyOptValue(key = k, val = ov)
173+
ov = btrdb_pb2.OptValue(value=v)
174+
kv = btrdb_pb2.KeyOptValue(key=k, val=ov)
135175
tagkvlist.append(kv)
136176
annkvlist = []
137177
for k, v in annotations.items():
@@ -140,87 +180,113 @@ def lookupStreams(self, collection, isCollectionPrefix, tags, annotations):
140180
else:
141181
if isinstance(v, str):
142182
v = v.encode("utf-8")
143-
ov = btrdb_pb2.OptValue(value = v)
144-
kv = btrdb_pb2.KeyOptValue(key = k, val = ov)
183+
ov = btrdb_pb2.OptValue(value=v)
184+
kv = btrdb_pb2.KeyOptValue(key=k, val=ov)
145185
annkvlist.append(kv)
146-
params = btrdb_pb2.LookupStreamsParams(collection = collection, isCollectionPrefix = isCollectionPrefix, tags = tagkvlist, annotations = annkvlist)
186+
params = btrdb_pb2.LookupStreamsParams(
187+
collection=collection,
188+
isCollectionPrefix=isCollectionPrefix,
189+
tags=tagkvlist,
190+
annotations=annkvlist,
191+
)
147192
for result in self.stub.LookupStreams(params):
148-
BTrDBError.checkProtoStat(result.stat)
193+
check_proto_stat(result.stat)
149194
yield result.results
150195

196+
@error_handler
151197
def nearest(self, uu, time, version, backward):
152-
params = btrdb_pb2.NearestParams(uuid = uu.bytes, time = time, versionMajor = version, backward = backward)
198+
params = btrdb_pb2.NearestParams(
199+
uuid=uu.bytes, time=time, versionMajor=version, backward=backward
200+
)
153201
result = self.stub.Nearest(params)
154-
BTrDBError.checkProtoStat(result.stat)
202+
check_proto_stat(result.stat)
155203
return result.value, result.versionMajor
156-
204+
205+
@error_handler
157206
def changes(self, uu, fromVersion, toVersion, resolution):
158-
params = btrdb_pb2.ChangesParams(uuid = uu.bytes, fromMajor = fromVersion, toMajor = toVersion, resolution = resolution)
207+
params = btrdb_pb2.ChangesParams(
208+
uuid=uu.bytes,
209+
fromMajor=fromVersion,
210+
toMajor=toVersion,
211+
resolution=resolution,
212+
)
159213
for result in self.stub.Changes(params):
160-
BTrDBError.checkProtoStat(result.stat)
214+
check_proto_stat(result.stat)
161215
yield result.ranges, result.versionMajor
162216

217+
@error_handler
163218
def insert(self, uu, values, policy):
164219
policy_map = {
165-
'never': btrdb_pb2.MergePolicy.NEVER,
166-
'equal': btrdb_pb2.MergePolicy.EQUAL,
167-
'retain': btrdb_pb2.MergePolicy.RETAIN,
168-
'replace': btrdb_pb2.MergePolicy.REPLACE,
220+
"never": btrdb_pb2.MergePolicy.NEVER,
221+
"equal": btrdb_pb2.MergePolicy.EQUAL,
222+
"retain": btrdb_pb2.MergePolicy.RETAIN,
223+
"replace": btrdb_pb2.MergePolicy.REPLACE,
169224
}
170225
protoValues = RawPoint.to_proto_list(values)
171-
params = btrdb_pb2.InsertParams(uuid = uu.bytes, sync = False, values = protoValues, merge_policy = policy_map[policy])
226+
params = btrdb_pb2.InsertParams(
227+
uuid=uu.bytes,
228+
sync=False,
229+
values=protoValues,
230+
merge_policy=policy_map[policy],
231+
)
172232
result = self.stub.Insert(params)
173-
BTrDBError.checkProtoStat(result.stat)
233+
check_proto_stat(result.stat)
174234
return result.versionMajor
175235

236+
@error_handler
176237
def deleteRange(self, uu, start, end):
177-
params = btrdb_pb2.DeleteParams(uuid = uu.bytes, start = start, end = end)
238+
params = btrdb_pb2.DeleteParams(uuid=uu.bytes, start=start, end=end)
178239
result = self.stub.Delete(params)
179-
BTrDBError.checkProtoStat(result.stat)
240+
check_proto_stat(result.stat)
180241
return result.versionMajor
181242

243+
@error_handler
182244
def info(self):
183245
params = btrdb_pb2.InfoParams()
184246
result = self.stub.Info(params)
185-
BTrDBError.checkProtoStat(result.stat)
247+
check_proto_stat(result.stat)
186248
return result
187249

250+
@error_handler
188251
def faultInject(self, typ, args):
189-
params = btrdb_pb2.FaultInjectParams(type = typ, params = args)
252+
params = btrdb_pb2.FaultInjectParams(type=typ, params=args)
190253
result = self.stub.FaultInject(params)
191-
BTrDBError.checkProtoStat(result.stat)
254+
check_proto_stat(result.stat)
192255
return result.rv
193256

257+
@error_handler
194258
def flush(self, uu):
195-
params = btrdb_pb2.FlushParams(uuid = uu.bytes)
259+
params = btrdb_pb2.FlushParams(uuid=uu.bytes)
196260
result = self.stub.Flush(params)
197-
BTrDBError.checkProtoStat(result.stat)
261+
check_proto_stat(result.stat)
198262

263+
@error_handler
199264
def getMetadataUsage(self, prefix):
200-
params = btrdb_pb2.MetadataUsageParams(prefix = prefix)
265+
params = btrdb_pb2.MetadataUsageParams(prefix=prefix)
201266
result = self.stub.GetMetadataUsage(params)
202-
BTrDBError.checkProtoStat(result.stat)
267+
check_proto_stat(result.stat)
203268
return result.tags, result.annotations
204269

270+
@error_handler
205271
def generateCSV(self, queryType, start, end, width, depth, includeVersions, *streams):
206272
protoStreams = [btrdb_pb2.StreamCSVConfig(version = stream[0],
207-
label = stream[1],
208-
uuid = stream[2].bytes)
273+
label = stream[1],
274+
uuid = stream[2].bytes)
209275
for stream in streams]
210276
params = btrdb_pb2.GenerateCSVParams(queryType = queryType.to_proto(),
211-
startTime = start,
212-
endTime = end,
213-
windowSize = width,
214-
depth = depth,
215-
includeVersions = includeVersions,
216-
streams = protoStreams)
217-
result = self.stub.GenerateCSV(params)
277+
startTime = start,
278+
endTime = end,
279+
windowSize = width,
280+
depth = depth,
281+
includeVersions = includeVersions,
282+
streams = protoStreams)
218283
for result in self.stub.GenerateCSV(params):
219-
BTrDBError.checkProtoStat(result.stat)
284+
check_proto_stat(result.stat)
220285
yield result.row
221286

287+
@error_handler
222288
def sql_query(self, stmt, params=[]):
223289
request = btrdb_pb2.SQLQueryParams(query=stmt, params=params)
224290
for page in self.stub.SQLQuery(request):
225-
BTrDBError.checkProtoStat(page.stat)
291+
check_proto_stat(page.stat)
226292
yield page.SQLQueryRow

0 commit comments

Comments
 (0)