Skip to content
This repository was archived by the owner on Jan 24, 2023. It is now read-only.

Commit 132c751

Browse files
Ariel Shtulgkorland
authored andcommitted
Pipeline (#30)
* Added pipeline + tests
1 parent b2e46d9 commit 132c751

File tree

2 files changed

+46
-8
lines changed

2 files changed

+46
-8
lines changed

redistimeseries/client.py

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,32 @@
11
import six
2-
import redis
3-
from redis import Redis, RedisError
2+
from redis import Redis
3+
from redis.client import Pipeline
44
from redis.client import bool_ok
5-
from redis.client import int_or_none
6-
from redis._compat import (long, nativestr)
7-
from redis.exceptions import DataError
5+
from redis._compat import nativestr
86

97
class TSInfo(object):
108
rules = []
119
labels = []
1210
sourceKey = None
1311
chunk_count = None
14-
last_time_stamp = None
12+
memory_usage = None
13+
total_samples = None
1514
retention_msecs = None
15+
last_time_stamp = None
16+
first_time_stamp = None
1617
max_samples_per_chunk = None
1718

1819
def __init__(self, args):
1920
response = dict(zip(map(nativestr, args[::2]), args[1::2]))
2021
self.rules = response['rules']
2122
self.sourceKey = response['sourceKey']
2223
self.chunkCount = response['chunkCount']
24+
self.memory_usage = response['memoryUsage']
25+
self.total_samples = response['totalSamples']
2326
self.labels = list_to_dict(response['labels'])
24-
self.lastTimeStamp = response['lastTimestamp']
2527
self.retention_msecs = response['retentionTime']
28+
self.lastTimeStamp = response['lastTimestamp']
29+
self.first_time_stamp = response['firstTimestamp']
2630
self.maxSamplesPerChunk = response['maxSamplesPerChunk']
2731

2832
def list_to_dict(aList):
@@ -262,3 +266,22 @@ def info(self, key):
262266
def queryindex(self, filters):
263267
"""Get all the keys matching the ``filter`` list."""
264268
return self.execute_command(self.QUERYINDEX_CMD, *filters)
269+
270+
def pipeline(self, transaction=True, shard_hint=None):
271+
"""
272+
Return a new pipeline object that can queue multiple commands for
273+
later execution. ``transaction`` indicates whether all commands
274+
should be executed atomically. Apart from making a group of operations
275+
atomic, pipelines are useful for reducing the back-and-forth overhead
276+
between the client and server.
277+
Overridden in order to provide the right client through the pipeline.
278+
"""
279+
p = Pipeline(
280+
connection_pool=self.connection_pool,
281+
response_callbacks=self.response_callbacks,
282+
transaction=transaction,
283+
shard_hint=shard_hint)
284+
return p
285+
286+
class Pipeline(Pipeline, Client):
287+
"Pipeline for Redis TimeSeries Client"

test_commands.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ def testAlter(self):
3535
rts.alter(1, labels={'Time':'Series'})
3636
self.assertEqual('Series', rts.info(1).labels['Time'])
3737
self.assertEqual(10, rts.info(1).retention_msecs)
38-
38+
pipe = rts.pipeline()
39+
self.assertTrue(pipe.create(2))
3940
def testAdd(self):
4041
'''Test TS.ADD calls'''
4142

@@ -141,11 +142,25 @@ def testInfo(self):
141142
self.assertEqual(info.labels['currentLabel'], 'currentData')
142143

143144
def testQueryIndex(self):
145+
'''Test TS.QUERYINDEX calls'''
144146
rts.create(1, labels={'Test':'This'})
145147
rts.create(2, labels={'Test':'This', 'Taste':'That'})
146148
self.assertEqual(2, len(rts.queryindex(['Test=This'])))
147149
self.assertEqual(1, len(rts.queryindex(['Taste=That'])))
148150
self.assertEqual(['2'], rts.queryindex(['Taste=That']))
149151

152+
def testPipeline(self):
153+
'''Test pipeline'''
154+
pipeline = rts.pipeline()
155+
pipeline.create('with_pipeline')
156+
for i in range(100):
157+
pipeline.add('with_pipeline', i, 1.1 * i)
158+
pipeline.execute()
159+
160+
info = rts.info('with_pipeline')
161+
self.assertEqual(info.lastTimeStamp, 99)
162+
self.assertEqual(info.total_samples, 100)
163+
self.assertEqual(rts.get('with_pipeline')[1], 99 * 1.1)
164+
150165
if __name__ == '__main__':
151166
unittest.main()

0 commit comments

Comments
 (0)