44from typing import Any , BinaryIO , Optional , Union
55
66import numpy as np
7+ import numpy .typing as npt
78import pandas as pd
89from pandas ._typing import Axes
910
11+ from harp .typing import BufferLike
12+
1013REFERENCE_EPOCH = datetime (1904 , 1 , 1 )
1114"""The reference epoch for UTC harp time."""
1215
@@ -21,8 +24,9 @@ class MessageType(IntEnum):
2124
2225
2326_SECONDS_PER_TICK = 32e-6
27+ _PAYLOAD_TIMESTAMP_MASK = 0x10
2428_messagetypes = [type .name for type in MessageType ]
25- _payloadtypes = {
29+ _dtypefrompayloadtype = {
2630 1 : np .dtype (np .uint8 ),
2731 2 : np .dtype (np .uint16 ),
2832 4 : np .dtype (np .uint32 ),
@@ -33,6 +37,7 @@ class MessageType(IntEnum):
3337 136 : np .dtype (np .int64 ),
3438 68 : np .dtype (np .float32 ),
3539}
40+ _payloadtypefromdtype = {v : k for k , v in _dtypefrompayloadtype .items ()}
3641
3742
3843def read (
@@ -73,8 +78,66 @@ def read(
7378 A pandas data frame containing message data, sorted by time.
7479 """
7580 data = np .fromfile (file , dtype = np .uint8 )
81+ return _fromraw (data , address , dtype , length , columns , epoch , keep_type )
82+
83+
84+ def parse (
85+ buffer : BufferLike ,
86+ address : Optional [int ] = None ,
87+ dtype : Optional [np .dtype ] = None ,
88+ length : Optional [int ] = None ,
89+ columns : Optional [Axes ] = None ,
90+ epoch : Optional [datetime ] = None ,
91+ keep_type : bool = False ,
92+ ):
93+ """Parse single-register Harp data from the specified buffer.
94+
95+ Parameters
96+ ----------
97+ buffer
98+ An object that exposes a buffer interface containing binary data from
99+ a single device register.
100+ address
101+ Expected register address. If specified, the address of
102+ the first message in the buffer is used for validation.
103+ dtype
104+ Expected data type of the register payload. If specified, the
105+ payload type of the first message in the buffer is used for validation.
106+ length
107+ Expected number of elements in register payload. If specified, the
108+ payload length of the first message in the buffer is used for validation.
109+ columns
110+ The optional column labels to use for the data values.
111+ epoch
112+ Reference datetime at which time zero begins. If specified,
113+ the result data frame will have a datetime index.
114+ keep_type
115+ Specifies whether to include a column with the message type.
116+
117+ Returns
118+ -------
119+ A pandas data frame containing message data, sorted by time.
120+ """
121+ data = np .frombuffer (buffer , dtype = np .uint8 )
122+ return _fromraw (data , address , dtype , length , columns , epoch , keep_type )
123+
124+
125+ def _fromraw (
126+ data : npt .NDArray [np .uint8 ],
127+ address : Optional [int ] = None ,
128+ dtype : Optional [np .dtype ] = None ,
129+ length : Optional [int ] = None ,
130+ columns : Optional [Axes ] = None ,
131+ epoch : Optional [datetime ] = None ,
132+ keep_type : bool = False ,
133+ ):
76134 if len (data ) == 0 :
77- return pd .DataFrame (columns = columns , index = pd .Index ([], dtype = np .float64 , name = "Time" ))
135+ return pd .DataFrame (
136+ columns = columns ,
137+ index = pd .DatetimeIndex ([], name = "Time" )
138+ if epoch
139+ else pd .Index ([], dtype = np .float64 , name = "Time" ),
140+ )
78141
79142 if address is not None and address != data [2 ]:
80143 raise ValueError (f"expected address { address } but got { data [2 ]} " )
@@ -84,20 +147,20 @@ def read(
84147 nrows = len (data ) // stride
85148 payloadtype = data [4 ]
86149 payloadoffset = 5
87- if payloadtype & 0x10 != 0 :
150+ if payloadtype & _PAYLOAD_TIMESTAMP_MASK != 0 :
88151 seconds = np .ndarray (nrows , dtype = np .uint32 , buffer = data , offset = payloadoffset , strides = stride )
89152 payloadoffset += 4
90153 micros = np .ndarray (nrows , dtype = np .uint16 , buffer = data , offset = payloadoffset , strides = stride )
91154 payloadoffset += 2
92155 time = micros * _SECONDS_PER_TICK + seconds
93- payloadtype = payloadtype & ~ np .uint8 (0x10 )
156+ payloadtype = payloadtype & ~ np .uint8 (_PAYLOAD_TIMESTAMP_MASK )
94157 if epoch is not None :
95158 time = epoch + pd .to_timedelta (time , "s" ) # type: ignore
96159 index = pd .Series (time )
97160 index .name = "Time"
98161
99162 payloadsize = stride - payloadoffset - 1
100- payloadtype = _payloadtypes [payloadtype ]
163+ payloadtype = _dtypefrompayloadtype [payloadtype ]
101164 if dtype is not None and dtype != payloadtype :
102165 raise ValueError (f"expected payload type { dtype } but got { payloadtype } " )
103166
@@ -120,3 +183,127 @@ def read(
120183 msgtype = pd .Categorical .from_codes (msgtype , categories = _messagetypes ) # type: ignore
121184 result [MessageType .__name__ ] = msgtype
122185 return result
186+
187+
188+ def write (
189+ file : Union [str , bytes , PathLike [Any ], BinaryIO ],
190+ data : pd .DataFrame ,
191+ address : int ,
192+ dtype : Optional [np .dtype ] = None ,
193+ port : Optional [int ] = None ,
194+ epoch : Optional [datetime ] = None ,
195+ message_type : Optional [MessageType ] = None ,
196+ ):
197+ """Write single-register Harp data to the specified file.
198+
199+ Parameters
200+ ----------
201+ file
202+ Open file object or filename where to store binary data from
203+ a single device register.
204+ data
205+ Pandas data frame containing message payload.
206+ address
207+ Register address used to identify all formatted Harp messages.
208+ dtype
209+ Data type of the register payload. If specified, all data will
210+ be converted before formatting the binary payload.
211+ port
212+ Optional port value used for all formatted Harp messages.
213+ epoch
214+ Reference datetime at which time zero begins. If specified,
215+ the input data frame must have a datetime index.
216+ message_type
217+ Optional message type used for all formatted Harp messages.
218+ If not specified, data must contain a MessageType column.
219+ """
220+ buffer = format (data , address , dtype , port , epoch , message_type )
221+ buffer .tofile (file )
222+
223+
224+ def format (
225+ data : pd .DataFrame ,
226+ address : int ,
227+ dtype : Optional [np .dtype ] = None ,
228+ port : Optional [int ] = None ,
229+ epoch : Optional [datetime ] = None ,
230+ message_type : Optional [MessageType ] = None ,
231+ ) -> npt .NDArray [np .uint8 ]:
232+ """Format single-register Harp data as a flat binary buffer.
233+
234+ Parameters
235+ ----------
236+ data
237+ Pandas data frame containing message payload.
238+ address
239+ Register address used to identify all formatted Harp messages.
240+ dtype
241+ Data type of the register payload. If specified, all data will
242+ be converted before formatting the binary payload.
243+ port
244+ Optional port value used for all formatted Harp messages.
245+ epoch
246+ Reference datetime at which time zero begins. If specified,
247+ the input data frame must have a datetime index.
248+ message_type
249+ Optional message type used for all formatted Harp messages.
250+ If not specified, data must contain a MessageType column.
251+
252+ Returns
253+ -------
254+ An array object containing message data formatted according
255+ to the Harp binary protocol.
256+ """
257+ if len (data ) == 0 :
258+ return np .empty (0 , dtype = np .uint8 )
259+
260+ if "MessageType" in data .columns :
261+ msgtype = data ["MessageType" ].cat .codes
262+ payload = data [data .columns .drop ("MessageType" )].values
263+ elif message_type is not None :
264+ msgtype = message_type
265+ payload = data .values
266+ else :
267+ raise ValueError (f"message type must be specified either in the data or as argument" )
268+
269+ time = data .index
270+ is_timestamped = True
271+ if epoch is not None :
272+ if not isinstance (time , pd .DatetimeIndex ):
273+ raise ValueError (f"expected datetime index to encode with epoch but got { time .inferred_type } " )
274+ time = (time - epoch ).total_seconds ()
275+ elif isinstance (time , pd .RangeIndex ):
276+ is_timestamped = False
277+
278+ if dtype is not None :
279+ payload = payload .astype (dtype , copy = False )
280+
281+ if port is None :
282+ port = 255
283+
284+ payloadtype = _payloadtypefromdtype [payload .dtype ]
285+ payloadlength = payload .shape [1 ] * payload .dtype .itemsize
286+ stride = payloadlength + 6
287+ if is_timestamped :
288+ payloadtype |= _PAYLOAD_TIMESTAMP_MASK
289+ stride += 6
290+
291+ nrows = len (data )
292+ buffer = np .empty ((nrows , stride ), dtype = np .uint8 )
293+ buffer [:, 0 ] = msgtype
294+ buffer [:, 1 :5 ] = [stride - 2 , address , port , payloadtype ]
295+
296+ payloadoffset = 5
297+ if is_timestamped :
298+ seconds = time .astype (np .uint32 )
299+ micros = np .around (((time - seconds ) / _SECONDS_PER_TICK ).values ).astype (np .uint16 )
300+ buffer [:, 5 :9 ] = np .ndarray ((nrows , 4 ), dtype = np .uint8 , buffer = seconds .values )
301+ buffer [:, 9 :11 ] = np .ndarray ((nrows , 2 ), dtype = np .uint8 , buffer = micros )
302+ payloadoffset += 6
303+
304+ payloadstop = payloadoffset + payloadlength
305+ buffer [:, payloadoffset :payloadstop ] = np .ndarray (
306+ (nrows , payloadlength ), dtype = np .uint8 , buffer = np .ascontiguousarray (payload )
307+ )
308+ buffer [:, - 1 ] = np .sum (buffer [:, 0 :- 1 ], axis = 1 , dtype = np .uint8 )
309+ return buffer .reshape (- 1 )
0 commit comments