Skip to content

Commit 3d0698b

Browse files
committed
feat: add MSG class for more info
1 parent 5f0e35d commit 3d0698b

File tree

1 file changed

+96
-38
lines changed

1 file changed

+96
-38
lines changed

Diff for: casbin_rabbitmq_watcher/watcher.py

+96-38
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import json
12
import logging
23
import time
34
import threading
5+
import uuid
46

57
import casbin
68
import pika
@@ -18,12 +20,14 @@ def __init__(
1820
username="guest",
1921
password="guest",
2022
key="casbin-policy-updated",
21-
**kwargs
23+
local_id=None,
24+
**kwargs,
2225
):
2326
self.connection = None
2427
self.pub_channel = None
2528
self.key = key
2629
self.callback = None
30+
self.local_id = local_id if local_id is not None else str(uuid.uuid4())
2731
self.mutex = threading.Lock()
2832
self.subscribe_event = threading.Event()
2933
self.subscribe_thread = threading.Thread(target=self.start_watch, daemon=True)
@@ -33,7 +37,7 @@ def __init__(
3337
port=port,
3438
virtual_host=virtual_host,
3539
credentials=credentials,
36-
**kwargs
40+
**kwargs,
3741
)
3842

3943
def create_client(self):
@@ -62,8 +66,9 @@ def update(self):
6266
"""
6367
update the policy
6468
"""
69+
msg = MSG("Update", self.local_id, "", "", "")
6570
self.pub_channel.basic_publish(
66-
exchange=self.key, routing_key="", body=str(time.time())
71+
exchange=self.key, routing_key="", body=msg.marshal_binary()
6772
)
6873
return True
6974

@@ -75,9 +80,14 @@ def update_for_add_policy(self, section, ptype, *params):
7580
:param params: other params
7681
:return: True if updated
7782
"""
78-
message = "Update for add policy: " + section + " " + ptype + " " + str(params)
79-
LOGGER.info(message)
80-
return self.update()
83+
84+
def func():
85+
msg = MSG("UpdateForAddPolicy", self.local_id, section, ptype, params)
86+
return self.pub_channel.basic_publish(
87+
exchange=self.key, routing_key="", body=msg.marshal_binary()
88+
)
89+
90+
return self.log_record(func)
8191

8292
def update_for_remove_policy(self, section, ptype, *params):
8393
"""
@@ -87,11 +97,14 @@ def update_for_remove_policy(self, section, ptype, *params):
8797
:param params: other params
8898
:return: True if updated
8999
"""
90-
message = (
91-
"Update for remove policy: " + section + " " + ptype + " " + str(params)
92-
)
93-
LOGGER.info(message)
94-
return self.update()
100+
101+
def func():
102+
msg = MSG("UpdateForRemovePolicy", self.local_id, section, ptype, params)
103+
return self.pub_channel.basic_publish(
104+
exchange=self.key, routing_key="", body=msg.marshal_binary()
105+
)
106+
107+
return self.log_record(func)
95108

96109
def update_for_remove_filtered_policy(self, section, ptype, field_index, *params):
97110
"""
@@ -102,28 +115,41 @@ def update_for_remove_filtered_policy(self, section, ptype, field_index, *params
102115
:param params: other params
103116
:return:
104117
"""
105-
message = (
106-
"Update for remove filtered policy: "
107-
+ section
108-
+ " "
109-
+ ptype
110-
+ " "
111-
+ str(field_index)
112-
+ " "
113-
+ str(params)
114-
)
115-
LOGGER.info(message)
116-
return self.update()
118+
119+
def func():
120+
msg = MSG(
121+
"UpdateForRemoveFilteredPolicy",
122+
self.local_id,
123+
section,
124+
ptype,
125+
f"{field_index} {' '.join(params)}",
126+
)
127+
return self.pub_channel.basic_publish(
128+
exchange=self.key, routing_key="", body=msg.marshal_binary()
129+
)
130+
131+
return self.log_record(func)
117132

118133
def update_for_save_policy(self, model: casbin.Model):
119134
"""
120135
update for save policy
121136
:param model: casbin model
122137
:return:
123138
"""
124-
message = "Update for save policy: " + model.to_text()
125-
LOGGER.info(message)
126-
return self.update()
139+
140+
def func():
141+
msg = MSG(
142+
"UpdateForSavePolicy",
143+
self.local_id,
144+
"",
145+
"",
146+
model.to_text(),
147+
)
148+
return self.pub_channel.basic_publish(
149+
exchange=self.key, routing_key="", body=msg.marshal_binary()
150+
)
151+
152+
return self.log_record(func)
127153

128154
def update_for_add_policies(self, section, ptype, *params):
129155
"""
@@ -133,11 +159,14 @@ def update_for_add_policies(self, section, ptype, *params):
133159
:param params: other params
134160
:return:
135161
"""
136-
message = (
137-
"Update for add policies: " + section + " " + ptype + " " + str(params)
138-
)
139-
LOGGER.info(message)
140-
return self.update()
162+
163+
def func():
164+
msg = MSG("UpdateForAddPolicies", self.local_id, section, ptype, params)
165+
return self.pub_channel.basic_publish(
166+
exchange=self.key, routing_key="", body=msg.marshal_binary()
167+
)
168+
169+
return self.log_record(func)
141170

142171
def update_for_remove_policies(self, section, ptype, *params):
143172
"""
@@ -147,11 +176,23 @@ def update_for_remove_policies(self, section, ptype, *params):
147176
:param params: other params
148177
:return:
149178
"""
150-
message = (
151-
"Update for remove policies: " + section + " " + ptype + " " + str(params)
152-
)
153-
LOGGER.info(message)
154-
return self.update()
179+
180+
def func():
181+
msg = MSG("UpdateForRemovePolicies", self.local_id, section, ptype, params)
182+
return self.pub_channel.basic_publish(
183+
exchange=self.key, routing_key="", body=msg.marshal_binary()
184+
)
185+
186+
return self.log_record(func)
187+
188+
@staticmethod
189+
def log_record(f: callable):
190+
try:
191+
result = f()
192+
except Exception as e:
193+
print(f"Casbin Redis Watcher error: {e}")
194+
else:
195+
return result
155196

156197
def start_watch(self):
157198
"""
@@ -204,14 +245,31 @@ def _watch_callback(ch, method, properties, body):
204245
continue
205246

206247

248+
class MSG:
249+
def __init__(self, method="", id="", sec="", ptype="", *params):
250+
self.method: str = method
251+
self.id: str = id
252+
self.sec: str = sec
253+
self.ptype: str = ptype
254+
self.params = params
255+
256+
def marshal_binary(self):
257+
return json.dumps(self.__dict__)
258+
259+
@staticmethod
260+
def unmarshal_binary(data: bytes):
261+
loaded = json.loads(data)
262+
return MSG(**loaded)
263+
264+
207265
def new_watcher(
208266
host="localhost",
209267
port=5672,
210268
virtual_host="/",
211269
username="guest",
212270
password="guest",
213271
key="casbin-policy-updated",
214-
**kwargs
272+
**kwargs,
215273
):
216274
"""
217275
creates a new watcher
@@ -230,7 +288,7 @@ def new_watcher(
230288
username=username,
231289
password=password,
232290
key=key,
233-
**kwargs
291+
**kwargs,
234292
)
235293
rabbit.subscribe_thread.start()
236294
rabbit.subscribe_event.wait(timeout=5)

0 commit comments

Comments
 (0)