Skip to content

Commit 4648706

Browse files
committed
Adding implementation
1 parent 67a255f commit 4648706

File tree

8 files changed

+184
-1
lines changed

8 files changed

+184
-1
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
# redis-streams-python
1+
# redis-streams-python
2+
3+
Testing out Redis Streams using Python client

broadcast_redis.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import redis
2+
import random
3+
4+
r = redis.Redis(host="localhost", port=6379)
5+
6+
stream = "controller"
7+
group = f"group_uuid"
8+
consumer = "subscriber_uuid"
9+
10+
try:
11+
r.xgroup_create(stream, group, id="0", mkstream=True)
12+
except redis.exceptions.ResponseError as e:
13+
if "BUSYGROUP" not in str(e):
14+
raise
15+
16+
r.xadd(stream, {"value": "broadcast"})

docker-compose.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
version: '3.8'
2+
3+
services:
4+
redis:
5+
image: redis:7
6+
ports:
7+
- 6379:6379
8+
stop_grace_period: 60s
9+
restart: always
10+
11+
12+
networks:
13+
default:
14+
name: redis-streams
15+
driver: bridge

exit_redis.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import redis
2+
import random
3+
4+
r = redis.Redis(host="localhost", port=6379)
5+
6+
stream = "controller"
7+
group = f"group_uuid"
8+
consumer = "subscriber_uuid"
9+
10+
try:
11+
r.xgroup_create(stream, group, id="0", mkstream=True)
12+
except redis.exceptions.ResponseError as e:
13+
if "BUSYGROUP" not in str(e):
14+
raise
15+
16+
r.xadd(stream, {"value": "exit"})

poetry.lock

Lines changed: 36 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

publish_redis.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import redis
2+
import time
3+
import random
4+
5+
r = redis.Redis(host="localhost", port=6379)
6+
7+
stream = "controller"
8+
group = f"group_uuid"
9+
consumer = "subscriber_uuid"
10+
11+
exit_condition = False
12+
13+
try:
14+
r.xgroup_create(stream, group, id="0", mkstream=True)
15+
except redis.exceptions.ResponseError as e:
16+
if "BUSYGROUP" not in str(e):
17+
raise
18+
19+
while not exit_condition:
20+
entries = r.xreadgroup(
21+
groupname=group,
22+
consumername=consumer,
23+
streams={stream: '>'},
24+
block=1,
25+
count=1
26+
)
27+
28+
for _, msgs in entries:
29+
for msg_id, data in msgs:
30+
value = data[b'value'].decode()
31+
print(f"Received controller value: {value}")
32+
r.xack(stream, group, msg_id)
33+
34+
if value == "exit":
35+
exit_condition = True
36+
r.xadd("temperature", {"value": "exit"})
37+
r.xtrim(stream, maxlen=0)
38+
r.xtrim("temperature", maxlen=0)
39+
break
40+
elif value == "broadcast":
41+
r.xadd("temperature", {"value": "broadcast"})
42+
43+
if not exit_condition:
44+
value = random.randint(0, 100)
45+
r.xadd("temperature", {"value": value})
46+
print(f"Published: {value}")
47+
48+
time.sleep(5)

pyproject.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[tool.poetry]
2+
name = "redis-streams-python"
3+
version = "0.1.0"
4+
description = ""
5+
authors = ["James Bristow <[email protected]>"]
6+
readme = "README.md"
7+
8+
[tool.poetry.dependencies]
9+
python = "^3.10"
10+
redis = "^6.2.0"
11+
12+
13+
[build-system]
14+
requires = ["poetry-core"]
15+
build-backend = "poetry.core.masonry.api"

subscribe_redis.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import redis
2+
import uuid
3+
4+
r = redis.Redis(host="localhost", port=6379)
5+
exit_condition = False
6+
7+
stream = "temperature"
8+
group = f"group_{str(uuid.uuid4())}"
9+
consumer = "subscriber_uuid"
10+
11+
try:
12+
r.xgroup_create(stream, group, id="0", mkstream=True)
13+
except redis.exceptions.ResponseError as e:
14+
if "BUSYGROUP" not in str(e):
15+
raise
16+
17+
print("Waiting...")
18+
19+
while not exit_condition:
20+
entries = r.xreadgroup(
21+
groupname=group,
22+
consumername=consumer,
23+
streams={stream: '>'},
24+
block=0
25+
)
26+
27+
for _, msgs in entries:
28+
for msg_id, data in msgs:
29+
value = data[b'value'].decode()
30+
print(f"Received value: {value}")
31+
r.xack(stream, group, msg_id)
32+
33+
if value == "exit":
34+
exit_condition = True
35+
break

0 commit comments

Comments
 (0)