Skip to content

Commit dbdd7eb

Browse files
Adds multilevel index, deprecation warnings, custom column name as callable (#67)
1 parent bb737b5 commit dbdd7eb

File tree

2 files changed

+242
-20
lines changed

2 files changed

+242
-20
lines changed

btrdb/transformers.py

Lines changed: 105 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,36 @@
1818
import csv
1919
import contextlib
2020
from collections import OrderedDict
21+
from warnings import warn
2122

2223
##########################################################################
2324
## Helper Functions
2425
##########################################################################
2526

27+
_STAT_PROPERTIES = ('min', 'mean', 'max', 'count', 'stddev')
28+
2629
def _get_time_from_row(row):
2730
for item in row:
2831
if item: return item.time
2932
raise Exception("Row contains no data")
3033

3134

32-
def _stream_names(streamset):
35+
def _stream_names(streamset, func):
36+
"""
37+
private convenience function to come up with proper final stream names
38+
before sending a collection of streams (dataframe, etc.) back to the
39+
user.
40+
"""
3341
return tuple(
34-
s.collection + "/" + s.name \
35-
for s in streamset._streams
42+
func(s) for s in streamset._streams
3643
)
3744

3845

3946
##########################################################################
4047
## Transform Functions
4148
##########################################################################
4249

43-
def to_series(streamset, datetime64_index=True, agg="mean"):
50+
def to_series(streamset, datetime64_index=True, agg="mean", name_callable=None):
4451
"""
4552
Returns a list of Pandas Series objects indexed by time
4653
@@ -55,14 +62,26 @@ def to_series(streamset, datetime64_index=True, agg="mean"):
5562
from. Must be one of "min", "mean", "max", "count", or "stddev". This
5663
argument is ignored if RawPoint values are passed into the function.
5764
65+
name_callable : lambda, default: lambda s: s.collection + "/" + s.name
66+
Sprecify a callable that can be used to determine the series name given a
67+
Stream object.
68+
5869
"""
5970
try:
6071
import pandas as pd
6172
except ImportError:
6273
raise ImportError("Please install Pandas to use this transformation function.")
6374

75+
# TODO: allow this at some future point
76+
if agg == "all":
77+
raise AttributeError("cannot use 'all' as aggregate at this time")
78+
79+
if not callable(name_callable):
80+
name_callable = lambda s: s.collection + "/" + s.name
81+
82+
6483
result = []
65-
stream_names = _stream_names(streamset)
84+
stream_names = _stream_names(streamset, name_callable)
6685

6786
for idx, output in enumerate(streamset.values()):
6887
times, values = [], []
@@ -82,30 +101,59 @@ def to_series(streamset, datetime64_index=True, agg="mean"):
82101
return result
83102

84103

85-
def to_dataframe(streamset, columns=None, agg="mean"):
104+
def to_dataframe(streamset, columns=None, agg="mean", name_callable=None):
86105
"""
87106
Returns a Pandas DataFrame object indexed by time and using the values of a
88107
stream for each column.
89108
90109
Parameters
91110
----------
92111
columns: sequence
93-
column names to use for DataFrame
112+
column names to use for DataFrame. Deprecated and not compatible with name_callable.
94113
95114
agg : str, default: "mean"
96115
Specify the StatPoint field (e.g. aggregating function) to create the Series
97-
from. Must be one of "min", "mean", "max", "count", or "stddev". This
98-
argument is ignored if RawPoint values are passed into the function.
116+
from. Must be one of "min", "mean", "max", "count", "stddev", or "all". This
117+
argument is ignored if not using StatPoints.
118+
119+
name_callable : lambda, default: lambda s: s.collection + "/" + s.name
120+
Sprecify a callable that can be used to determine the series name given a
121+
Stream object. This is not compatible with agg == "all" at this time
122+
99123
100124
"""
101125
try:
102126
import pandas as pd
103127
except ImportError:
104128
raise ImportError("Please install Pandas to use this transformation function.")
105129

106-
stream_names = _stream_names(streamset)
107-
columns = columns if columns else ["time"] + list(stream_names)
108-
return pd.DataFrame(to_dict(streamset,agg=agg), columns=columns).set_index("time")
130+
# deprecation warning added in v5.8
131+
if columns:
132+
warn("the columns argument is deprecated and will be removed in a future release", DeprecationWarning, stacklevel=2)
133+
134+
# TODO: allow this at some future point
135+
if agg == "all" and name_callable is not None:
136+
raise AttributeError("cannot provide name_callable when using 'all' as aggregate at this time")
137+
138+
# do not allow agg="all" with RawPoints
139+
if agg == "all" and streamset.allow_window:
140+
agg=""
141+
142+
# default arg values
143+
if not callable(name_callable):
144+
name_callable = lambda s: s.collection + "/" + s.name
145+
146+
147+
df = pd.DataFrame(to_dict(streamset,agg=agg))
148+
df = df.set_index("time")
149+
150+
if agg == "all" and not streamset.allow_window:
151+
stream_names = [[s.collection, s.name, prop] for s in streamset._streams for prop in _STAT_PROPERTIES]
152+
df.columns=pd.MultiIndex.from_tuples(stream_names)
153+
else:
154+
df.columns = columns if columns else _stream_names(streamset, name_callable)
155+
156+
return df
109157

110158

111159
def to_array(streamset, agg="mean"):
@@ -126,6 +174,10 @@ def to_array(streamset, agg="mean"):
126174
except ImportError:
127175
raise ImportError("Please install Numpy to use this transformation function.")
128176

177+
# TODO: allow this at some future point
178+
if agg == "all":
179+
raise AttributeError("cannot use 'all' as aggregate at this time")
180+
129181
results = []
130182
for points in streamset.values():
131183
segment = []
@@ -138,7 +190,7 @@ def to_array(streamset, agg="mean"):
138190
return np.array(results)
139191

140192

141-
def to_dict(streamset, agg="mean"):
193+
def to_dict(streamset, agg="mean", name_callable=None):
142194
"""
143195
Returns a list of OrderedDict for each time code with the appropriate
144196
stream data attached.
@@ -150,9 +202,17 @@ def to_dict(streamset, agg="mean"):
150202
keys. Must be one of "min", "mean", "max", "count", or "stddev". This
151203
argument is ignored if RawPoint values are passed into the function.
152204
205+
name_callable : lambda, default: lambda s: s.collection + "/" + s.name
206+
Sprecify a callable that can be used to determine the series name given a
207+
Stream object.
208+
153209
"""
210+
if not callable(name_callable):
211+
name_callable = lambda s: s.collection + "/" + s.name
212+
154213
data = []
155-
stream_names = _stream_names(streamset)
214+
stream_names = _stream_names(streamset, name_callable)
215+
156216
for row in streamset.rows():
157217
item = OrderedDict({
158218
"time": _get_time_from_row(row),
@@ -161,12 +221,16 @@ def to_dict(streamset, agg="mean"):
161221
if row[idx].__class__.__name__ == "RawPoint":
162222
item[col] = row[idx].value if row[idx] else None
163223
else:
164-
item[col] = getattr(row[idx], agg) if row[idx] else None
224+
if agg == "all":
225+
for stat in _STAT_PROPERTIES:
226+
item["{}-{}".format(col, stat)] = getattr(row[idx], stat) if row[idx] else None
227+
else:
228+
item[col] = getattr(row[idx], agg) if row[idx] else None
165229
data.append(item)
166230
return data
167231

168232

169-
def to_csv(streamset, fobj, dialect=None, fieldnames=None, agg="mean"):
233+
def to_csv(streamset, fobj, dialect=None, fieldnames=None, agg="mean", name_callable=None):
170234
"""
171235
Saves stream data as a CSV file.
172236
@@ -187,8 +251,19 @@ def to_csv(streamset, fobj, dialect=None, fieldnames=None, agg="mean"):
187251
Specify the StatPoint field (e.g. aggregating function) to return when
188252
limiting results. Must be one of "min", "mean", "max", "count", or "stddev".
189253
This argument is ignored if RawPoint values are passed into the function.
254+
255+
name_callable : lambda, default: lambda s: s.collection + "/" + s.name
256+
Sprecify a callable that can be used to determine the series name given a
257+
Stream object.
190258
"""
191259

260+
# TODO: allow this at some future point
261+
if agg == "all":
262+
raise AttributeError("cannot use 'all' as aggregate at this time")
263+
264+
if not callable(name_callable):
265+
name_callable = lambda s: s.collection + "/" + s.name
266+
192267
@contextlib.contextmanager
193268
def open_path_or_file(path_or_file):
194269
if isinstance(path_or_file, str):
@@ -203,7 +278,7 @@ def open_path_or_file(path_or_file):
203278
file_to_close.close()
204279

205280
with open_path_or_file(fobj) as csvfile:
206-
stream_names = _stream_names(streamset)
281+
stream_names = _stream_names(streamset, name_callable)
207282
fieldnames = fieldnames if fieldnames else ["time"] + list(stream_names)
208283

209284
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, dialect=dialect)
@@ -213,7 +288,7 @@ def open_path_or_file(path_or_file):
213288
writer.writerow(item)
214289

215290

216-
def to_table(streamset, agg="mean"):
291+
def to_table(streamset, agg="mean", name_callable=None):
217292
"""
218293
Returns string representation of the data in tabular form using the tabulate
219294
library.
@@ -225,13 +300,24 @@ def to_table(streamset, agg="mean"):
225300
from. Must be one of "min", "mean", "max", "count", or "stddev". This
226301
argument is ignored if RawPoint values are passed into the function.
227302
303+
name_callable : lambda, default: lambda s: s.collection + "/" + s.name
304+
Sprecify a callable that can be used to determine the column name given a
305+
Stream object.
306+
228307
"""
229308
try:
230309
from tabulate import tabulate
231310
except ImportError:
232311
raise ImportError("Please install tabulate to use this transformation function.")
233312

234-
return tabulate(streamset.to_dict(agg=agg), headers="keys")
313+
# TODO: allow this at some future point
314+
if agg == "all":
315+
raise AttributeError("cannot use 'all' as aggregate at this time")
316+
317+
if not callable(name_callable):
318+
name_callable = lambda s: s.collection + "/" + s.name
319+
320+
return tabulate(streamset.to_dict(agg=agg, name_callable=name_callable), headers="keys")
235321

236322

237323
##########################################################################

0 commit comments

Comments
 (0)