11import warnings
2+ from dataclasses import dataclass
23from datetime import datetime
34from pathlib import Path
45from typing import Any , List , Mapping , Optional , Sequence , Tuple
3334 Filter ,
3435 GetDatabaseConnectionRequest ,
3536 GetDatabaseConnectionResponse ,
37+ Order ,
3638 RemoveBinaryDataFromDatasetByIDsRequest ,
3739 RemoveBoundingBoxFromImageByIDRequest ,
3840 RemoveTagsFromBinaryDataByFilterRequest ,
@@ -109,6 +111,7 @@ async def main():
109111
110112 """
111113
114+ @dataclass
112115 class TabularData :
113116 """Class representing a piece of tabular data and associated metadata.
114117
@@ -119,16 +122,17 @@ class TabularData:
119122 time_received (datetime): the time the requested data was received.
120123 """
121124
122- def __init__ (self , data : Mapping [str , Any ], metadata : CaptureMetadata , time_requested : datetime , time_received : datetime ) -> None :
123- self .data = data
124- self .metadata = metadata
125- self .time_requested = time_requested
126- self .time_received = time_received
127-
128125 data : Mapping [str , Any ]
126+ """The requested data"""
127+
129128 metadata : CaptureMetadata
129+ """The metadata associated with the data"""
130+
130131 time_requested : datetime
132+ """The time the data were requested"""
133+
131134 time_received : datetime
135+ """The time the data were received"""
132136
133137 def __str__ (self ) -> str :
134138 return f"{ self .data } \n { self .metadata } Time requested: { self .time_requested } \n Time received: { self .time_received } \n "
@@ -139,6 +143,7 @@ def __eq__(self, other: object) -> bool:
139143 return False
140144
141145 # TODO (RSDK-6684): Revisit if this shadow type is necessary
146+ @dataclass
142147 class BinaryData :
143148 """Class representing a piece of binary data and associated metadata.
144149
@@ -147,12 +152,11 @@ class BinaryData:
147152 metadata (viam.proto.app.data.BinaryMetadata): the metadata from the request.
148153 """
149154
150- def __init__ (self , data : bytes , metadata : BinaryMetadata ) -> None :
151- self .data = data
152- self .metadata = metadata
153-
154155 data : bytes
156+ """The request data"""
157+
155158 metadata : BinaryMetadata
159+ """The metadata associated with the data"""
156160
157161 def __str__ (self ) -> str :
158162 return f"{ self .data } \n { self .metadata } "
@@ -184,47 +188,68 @@ def __init__(self, channel: Channel, metadata: Mapping[str, str]):
184188 async def tabular_data_by_filter (
185189 self ,
186190 filter : Optional [Filter ] = None ,
191+ limit : Optional [int ] = None ,
192+ sort_order : Optional [Order .ValueType ] = None ,
193+ last : Optional [str ] = None ,
194+ count_only : bool = False ,
195+ include_internal_data : bool = False ,
187196 dest : Optional [str ] = None ,
188- ) -> List [TabularData ]:
189- """Filter and download tabular data.
197+ ) -> Tuple [List [TabularData ], int , str ]:
198+ """Filter and download tabular data. The data will be paginated into pages of `limit` items, and the pagination ID will be included
199+ in the returned tuple. If a destination is provided, the data will be saved to that file.
200+ If the file is not empty, it will be overwritten.
190201
191202 ::
192203
193204 from viam.proto.app.data import Filter
194205
206+ my_data = []
207+ last = None
195208 my_filter = Filter(component_name="left_motor")
196- tabular_data = await data_client.tabular_data_by_filter(my_filter)
209+ while True:
210+ tabular_data, last = await data_client.tabular_data_by_filter(my_filter, last)
211+ if not tabular_data:
212+ break
213+ my_data.extend(tabular_data)
214+
197215
198216 Args:
199217 filter (viam.proto.app.data.Filter): Optional `Filter` specifying tabular data to retrieve. No `Filter` implies all tabular
200218 data.
219+ limit (int): The maximum number of entries to include in a page. Defaults to 50 if unspecified.
220+ sort_order (viam.proto.app.data.Order): The desired sort order of the data.
221+ last (str): Optional string indicating the ID of the last-returned data.
222+ If provided, the server will return the next data entries after the `last` ID.
223+ count_only (bool): Whether to return only the total count of entries.
224+ include_internal_data (bool): Whether to return the internal data. Internal data is used for Viam-specific data ingestion,
225+ like cloud SLAM. Defaults to `False`
201226 dest (str): Optional filepath for writing retrieved data.
202227
203228 Returns:
204229 List[TabularData]: The tabular data.
230+ int: The count (number of entries)
231+ str: The last-returned page ID.
205232 """
206233 filter = filter if filter else Filter ()
207- last = ""
208- data = []
209-
210- # `DataRequest`s are limited to 100 pieces of data, so we loop through calls until
211- # we are certain we've received everything.
212- while True :
213- data_request = DataRequest (filter = filter , limit = 100 , last = last )
214- request = TabularDataByFilterRequest (data_request = data_request , count_only = False )
215- response : TabularDataByFilterResponse = await self ._data_client .TabularDataByFilter (request , metadata = self ._metadata )
216- if not response .data or len (response .data ) == 0 :
217- break
218- data += [
219- DataClient .TabularData (
220- struct_to_dict (struct .data ),
221- response .metadata [struct .metadata_index ],
222- struct .time_requested .ToDatetime (),
223- struct .time_received .ToDatetime (),
224- )
225- for struct in response .data
226- ]
227- last = response .last
234+
235+ data_request = DataRequest (filter = filter )
236+ if limit :
237+ data_request .limit = limit
238+ if sort_order :
239+ data_request .sort_order = sort_order
240+ if last :
241+ data_request .last = last
242+ request = TabularDataByFilterRequest (data_request = data_request , count_only = count_only , include_internal_data = include_internal_data )
243+ response : TabularDataByFilterResponse = await self ._data_client .TabularDataByFilter (request , metadata = self ._metadata )
244+ data = [
245+ DataClient .TabularData (
246+ struct_to_dict (struct .data ),
247+ response .metadata [struct .metadata_index ],
248+ struct .time_requested .ToDatetime (),
249+ struct .time_received .ToDatetime (),
250+ )
251+ for struct in response .data
252+ ]
228253
229254 if dest :
230255 try :
@@ -233,59 +258,72 @@ async def tabular_data_by_filter(
233258 file .flush ()
234259 except Exception as e :
235260 LOGGER .error (f"Failed to write tabular data to file { dest } " , exc_info = e )
236- return data
261+ return data , response . count , response . last
237262
238263 async def binary_data_by_filter (
239- self , filter : Optional [Filter ] = None , dest : Optional [str ] = None , include_file_data : bool = True , num_files : Optional [int ] = None
240- ) -> List [BinaryData ]:
241- """Filter and download binary data.
264+ self ,
265+ filter : Optional [Filter ] = None ,
266+ limit : Optional [int ] = None ,
267+ sort_order : Optional [Order .ValueType ] = None ,
268+ last : Optional [str ] = None ,
269+ include_binary_data : bool = True ,
270+ count_only : bool = False ,
271+ include_internal_data : bool = False ,
272+ dest : Optional [str ] = None ,
273+ ) -> Tuple [List [BinaryData ], int , str ]:
274+ """Filter and download binary data. The data will be paginated into pages of `limit` items, and the pagination ID will be included
275+ in the returned tuple. If a destination is provided, the data will be saved to that file.
276+ If the file is not empty, it will be overwritten.
242277
243278 ::
244279
245280 from viam.proto.app.data import Filter
246281
247- my_filter = Filter(component_type="camera")
248- binary_data = await data_client.binary_data_by_filter(my_filter)
249282
250- Args:
251- filter (Optional[viam.proto.app.data.Filter]): Optional `Filter` specifying binary data to retrieve. No `Filter` implies all
252- binary data.
253- dest (Optional[str]): Optional filepath for writing retrieved data.
254- include_file_data (bool): Boolean specifying whether to actually include the binary file data with each retrieved file. Defaults
255- to true (i.e., both the files' data and metadata are returned).
256- num_files (Optional[str]): Number of binary data to return. Passing 0 returns all binary data matching the filter no matter.
257- Defaults to 100 if no binary data is requested, otherwise 10. All binary data or the first `num_files` will be returned,
258- whichever comes first.
283+ my_data = []
284+ last = None
285+ my_filter = Filter(component_name="camera")
286+ while True:
287+ data, last = await data_client.binary_data_by_filter(my_filter, last)
288+ if not data:
289+ break
290+ my_data.extend(data)
259291
260- Raises:
261- ValueError: If `num_files` is less than 0.
292+ Args:
293+ filter (viam.proto.app.data.Filter): Optional `Filter` specifying tabular data to retrieve. No `Filter` implies all binary
294+ data.
295+ limit (int): The maximum number of entries to include in a page. Defaults to 50 if unspecified.
296+ sort_order (viam.proto.app.data.Order): The desired sort order of the data.
297+ last (str): Optional string indicating the ID of the last-returned data.
298+ If provided, the server will return the next data entries after the `last` ID.
299+ include_binary_data (bool): Boolean specifying whether to actually include the binary file data with each retrieved file.
300+ Defaults to true (i.e., both the files' data and metadata are returned).
301+ count_only (bool): Whether to return only the total count of entries.
302+ include_internal_data (bool): Whether to return the internal data. Internal data is used for Viam-specific data ingestion,
303+ like cloud SLAM. Defaults to `False`
304+ dest (str): Optional filepath for writing retrieved data.
262305
263306 Returns:
264307 List[BinaryData]: The binary data.
308+ int: The count (number of entries)
309+ str: The last-returned page ID.
265310 """
266- num_files = num_files if num_files else 10 if include_file_data else 100
267- if num_files < 0 :
268- raise ValueError ("num_files must be at least 0." )
269- filter = filter if filter else Filter ()
270- limit = 1 if include_file_data else 100
271- last = ""
272- data = []
273-
274- # `DataRequest`s are limited in pieces of data, so we loop through calls until
275- # we are certain we've received everything.
276- while True :
277- new_data , last = await self ._binary_data_by_filter (filter = filter , limit = limit , include_binary = include_file_data , last = last )
278- if not new_data or len (new_data ) == 0 :
279- break
280- elif num_files != 0 and len (new_data ) > num_files :
281- data += new_data [0 :num_files ]
282- break
283- else :
284- data += new_data
285- num_files -= len (new_data )
286- if num_files == 0 :
287- break
288311
312+ data_request = DataRequest (filter = filter )
313+ if limit :
314+ data_request .limit = limit
315+ if sort_order :
316+ data_request .sort_order = sort_order
317+ if last :
318+ data_request .last = last
319+ request = BinaryDataByFilterRequest (
320+ data_request = data_request ,
321+ include_binary = include_binary_data ,
322+ count_only = count_only ,
323+ include_internal_data = include_internal_data ,
324+ )
325+ response : BinaryDataByFilterResponse = await self ._data_client .BinaryDataByFilter (request , metadata = self ._metadata )
326+ data = [DataClient .BinaryData (data .binary , data .metadata ) for data in response .data ]
289327 if dest :
290328 try :
291329 file = open (dest , "w" )
@@ -294,13 +332,7 @@ async def binary_data_by_filter(
294332 except Exception as e :
295333 LOGGER .error (f"Failed to write binary data to file { dest } " , exc_info = e )
296334
297- return data
298-
299- async def _binary_data_by_filter (self , filter : Filter , limit : int , include_binary : bool , last : str ) -> Tuple [List [BinaryData ], str ]:
300- data_request = DataRequest (filter = filter , limit = limit , last = last )
301- request = BinaryDataByFilterRequest (data_request = data_request , count_only = False , include_binary = include_binary )
302- response : BinaryDataByFilterResponse = await self ._data_client .BinaryDataByFilter (request , metadata = self ._metadata )
303- return [DataClient .BinaryData (data .binary , data .metadata ) for data in response .data ], response .last
335+ return data , response .count , response .last
304336
305337 async def binary_data_by_ids (
306338 self ,
0 commit comments