Skip to content

Commit cc77b7d

Browse files
authored
Merge pull request #337 from Point72/tkp/rmslack
Move slack adapter to separate package
2 parents baa37d5 + 296a3f4 commit cc77b7d

File tree

6 files changed

+3
-610
lines changed

6 files changed

+3
-610
lines changed

conda/dev-environment-unix.yml

-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ dependencies:
4545
- ruamel.yaml
4646
- ruff>=0.3,<0.4
4747
- scikit-build
48-
- slack-sdk
4948
- sqlalchemy
5049
- tar
5150
- threadpoolctl

conda/dev-environment-win.yml

-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ dependencies:
4444
- ruamel.yaml
4545
- ruff>=0.3,<0.4
4646
- scikit-build
47-
- slack-sdk
4847
- sqlalchemy
4948
- threadpoolctl
5049
- tornado

csp/adapters/slack.py

+2-370
Original file line numberDiff line numberDiff line change
@@ -1,372 +1,4 @@
1-
import threading
2-
from logging import getLogger
3-
from queue import Queue
4-
from ssl import SSLContext
5-
from threading import Thread
6-
from time import sleep
7-
from typing import Dict, List, Optional, TypeVar
8-
9-
import csp
10-
from csp.impl.adaptermanager import AdapterManagerImpl
11-
from csp.impl.outputadapter import OutputAdapter
12-
from csp.impl.pushadapter import PushInputAdapter
13-
from csp.impl.struct import Struct
14-
from csp.impl.types.tstype import ts
15-
from csp.impl.wiring import py_output_adapter_def, py_push_adapter_def
16-
171
try:
18-
from slack_sdk.errors import SlackApiError
19-
from slack_sdk.socket_mode import SocketModeClient
20-
from slack_sdk.socket_mode.request import SocketModeRequest
21-
from slack_sdk.socket_mode.response import SocketModeResponse
22-
from slack_sdk.web import WebClient
23-
24-
_HAVE_SLACK_SDK = True
2+
from csp_adapter_slack import * # noqa: F403
253
except ImportError:
26-
_HAVE_SLACK_SDK = False
27-
28-
T = TypeVar("T")
29-
log = getLogger(__file__)
30-
31-
32-
__all__ = ("SlackMessage", "mention_user", "SlackAdapterManager", "SlackInputAdapterImpl", "SlackOutputAdapterImpl")
33-
34-
35-
class SlackMessage(Struct):
36-
user: str
37-
user_email: str # email of the author
38-
user_id: str # user id of the author
39-
tags: List[str] # list of mentions
40-
41-
channel: str # name of channel
42-
channel_id: str # id of channel
43-
channel_type: str # type of channel, in "message", "public" (app_mention), "private" (app_mention)
44-
45-
msg: str # parsed text payload
46-
reaction: str # emoji reacts
47-
thread: str # thread id, if in thread
48-
payload: dict # raw message payload
49-
50-
51-
def mention_user(userid: str) -> str:
52-
"""Convenience method, more difficult to do in symphony but we want slack to be symmetric"""
53-
return f"<@{userid}>"
54-
55-
56-
class SlackAdapterManager(AdapterManagerImpl):
57-
def __init__(self, app_token: str, bot_token: str, ssl: Optional[SSLContext] = None):
58-
if not _HAVE_SLACK_SDK:
59-
raise RuntimeError("Could not find slack-sdk installation")
60-
if not app_token.startswith("xapp-") or not bot_token.startswith("xoxb-"):
61-
raise RuntimeError("Slack app token or bot token looks malformed")
62-
63-
self._slack_client = SocketModeClient(
64-
app_token=app_token,
65-
web_client=WebClient(token=bot_token, ssl=ssl),
66-
)
67-
self._slack_client.socket_mode_request_listeners.append(self._process_slack_message)
68-
69-
# down stream edges
70-
self._subscribers = []
71-
self._publishers = []
72-
73-
# message queues
74-
self._inqueue: Queue[SlackMessage] = Queue()
75-
self._outqueue: Queue[SlackMessage] = Queue()
76-
77-
# handler thread
78-
self._running: bool = False
79-
self._thread: Thread = None
80-
81-
# lookups for mentions and redirection
82-
self._room_id_to_room_name: Dict[str, str] = {}
83-
self._room_id_to_room_type: Dict[str, str] = {}
84-
self._room_name_to_room_id: Dict[str, str] = {}
85-
self._user_id_to_user_name: Dict[str, str] = {}
86-
self._user_id_to_user_email: Dict[str, str] = {}
87-
self._user_name_to_user_id: Dict[str, str] = {}
88-
self._user_email_to_user_id: Dict[str, str] = {}
89-
90-
def subscribe(self):
91-
return _slack_input_adapter(self, push_mode=csp.PushMode.NON_COLLAPSING)
92-
93-
def publish(self, msg: ts[SlackMessage]):
94-
return _slack_output_adapter(self, msg)
95-
96-
def _create(self, engine, memo):
97-
# We'll avoid having a second class and make our AdapterManager and AdapterManagerImpl the same
98-
super().__init__(engine)
99-
return self
100-
101-
def start(self, starttime, endtime):
102-
self._running = True
103-
self._thread = threading.Thread(target=self._run, daemon=True)
104-
self._thread.start()
105-
106-
def stop(self):
107-
if self._running:
108-
self._running = False
109-
self._slack_client.close()
110-
self._thread.join()
111-
112-
def register_subscriber(self, adapter):
113-
if adapter not in self._subscribers:
114-
self._subscribers.append(adapter)
115-
116-
def register_publisher(self, adapter):
117-
if adapter not in self._publishers:
118-
self._publishers.append(adapter)
119-
120-
def _get_user_from_id(self, user_id):
121-
# try to pull from cache
122-
name = self._user_id_to_user_name.get(user_id, None)
123-
email = self._user_id_to_user_email.get(user_id, None)
124-
125-
# if none, refresh data via web client
126-
if name is None or email is None:
127-
ret = self._slack_client.web_client.users_info(user=user_id)
128-
if ret.status_code == 200:
129-
# TODO OAuth scopes required
130-
name = ret.data["user"]["profile"].get("real_name_normalized", ret.data["user"]["name"])
131-
email = ret.data["user"]["profile"]["email"]
132-
self._user_id_to_user_name[user_id] = name
133-
self._user_name_to_user_id[name] = user_id # TODO is this 1-1 in slack?
134-
self._user_id_to_user_email[user_id] = email
135-
self._user_email_to_user_id[email] = user_id
136-
return name, email
137-
138-
def _get_user_from_name(self, user_name):
139-
# try to pull from cache
140-
user_id = self._user_name_to_user_id.get(user_name, None)
141-
142-
# if none, refresh data via web client
143-
if user_id is None:
144-
# unfortunately the reverse lookup is not super nice...
145-
# we need to pull all users and build the reverse mapping
146-
ret = self._slack_client.web_client.users_list()
147-
if ret.status_code == 200:
148-
# TODO OAuth scopes required
149-
for user in ret.data["members"]:
150-
name = user["profile"].get("real_name_normalized", user["name"])
151-
user_id = user["profile"]["id"]
152-
email = user["profile"]["email"]
153-
self._user_id_to_user_name[user_id] = name
154-
self._user_name_to_user_id[name] = user_id # TODO is this 1-1 in slack?
155-
self._user_id_to_user_email[user_id] = email
156-
self._user_email_to_user_id[email] = user_id
157-
return self._user_name_to_user_id.get(user_name, None)
158-
return user_id
159-
160-
def _channel_data_to_channel_kind(self, data) -> str:
161-
if data.get("is_im", False):
162-
return "message"
163-
if data.get("is_private", False):
164-
return "private"
165-
return "public"
166-
167-
def _get_channel_from_id(self, channel_id):
168-
# try to pull from cache
169-
name = self._room_id_to_room_name.get(channel_id, None)
170-
kind = self._room_id_to_room_type.get(channel_id, None)
171-
172-
# if none, refresh data via web client
173-
if name is None:
174-
ret = self._slack_client.web_client.conversations_info(channel=channel_id)
175-
if ret.status_code == 200:
176-
# TODO OAuth scopes required
177-
kind = self._channel_data_to_channel_kind(ret.data["channel"])
178-
if kind == "message":
179-
# TODO use same behavior as symphony adapter
180-
name = "DM"
181-
else:
182-
name = ret.data["channel"]["name"]
183-
184-
self._room_id_to_room_name[channel_id] = name
185-
self._room_name_to_room_id[name] = channel_id
186-
self._room_id_to_room_type[channel_id] = kind
187-
return name, kind
188-
189-
def _get_channel_from_name(self, channel_name):
190-
# try to pull from cache
191-
channel_id = self._room_name_to_room_id.get(channel_name, None)
192-
193-
# if none, refresh data via web client
194-
if channel_id is None:
195-
# unfortunately the reverse lookup is not super nice...
196-
# we need to pull all channels and build the reverse mapping
197-
ret = self._slack_client.web_client.conversations_list()
198-
if ret.status_code == 200:
199-
# TODO OAuth scopes required
200-
for channel in ret.data["channels"]:
201-
name = channel["name"]
202-
channel_id = channel["id"]
203-
kind = self._channel_data_to_channel_kind(channel)
204-
self._room_id_to_room_name[channel_id] = name
205-
self._room_name_to_room_id[name] = channel_id
206-
self._room_id_to_room_type[channel_id] = kind
207-
return self._room_name_to_room_id.get(channel_name, None)
208-
return channel_id
209-
210-
def _get_tags_from_message(self, blocks, authorizations=None) -> List[str]:
211-
"""extract tags from message, potentially excluding the bot's own @"""
212-
authorizations = authorizations or []
213-
if len(authorizations) > 0:
214-
bot_id = authorizations[0]["user_id"] # TODO more than one?
215-
else:
216-
bot_id = ""
217-
218-
tags = []
219-
to_search = blocks.copy()
220-
221-
while to_search:
222-
element = to_search.pop()
223-
# add subsections
224-
if element.get("elements", []):
225-
to_search.extend(element.get("elements"))
226-
227-
if element.get("type", "") == "user":
228-
tag_id = element.get("user_id")
229-
if tag_id != bot_id:
230-
# TODO tag with id or with name?
231-
name, _ = self._get_user_from_id(tag_id)
232-
if name:
233-
tags.append(name)
234-
return tags
235-
236-
def _process_slack_message(self, client: SocketModeClient, req: SocketModeRequest):
237-
log.info(req.payload)
238-
if req.type == "events_api":
239-
# Acknowledge the request anyway
240-
response = SocketModeResponse(envelope_id=req.envelope_id)
241-
client.send_socket_mode_response(response)
242-
243-
if (
244-
req.payload["event"]["type"] in ("message", "app_mention")
245-
and req.payload["event"].get("subtype") is None
246-
):
247-
user, user_email = self._get_user_from_id(req.payload["event"]["user"])
248-
channel, channel_type = self._get_channel_from_id(req.payload["event"]["channel"])
249-
tags = self._get_tags_from_message(req.payload["event"]["blocks"], req.payload["authorizations"])
250-
slack_msg = SlackMessage(
251-
user=user or "",
252-
user_email=user_email or "",
253-
user_id=req.payload["event"]["user"],
254-
tags=tags,
255-
channel=channel or "",
256-
channel_id=req.payload["event"]["channel"],
257-
channel_type=channel_type or "",
258-
msg=req.payload["event"]["text"],
259-
reaction="",
260-
thread=req.payload["event"]["ts"],
261-
payload=req.payload.copy(),
262-
)
263-
self._inqueue.put(slack_msg)
264-
265-
def _run(self):
266-
self._slack_client.connect()
267-
268-
while self._running:
269-
# drain outbound
270-
while not self._outqueue.empty():
271-
# pull SlackMessage from queue
272-
slack_msg = self._outqueue.get()
273-
274-
# refactor into slack command
275-
# grab channel or DM
276-
if hasattr(slack_msg, "channel_id") and slack_msg.channel_id:
277-
channel_id = slack_msg.channel_id
278-
elif hasattr(slack_msg, "channel") and slack_msg.channel:
279-
# TODO DM
280-
channel_id = self._get_channel_from_name(slack_msg.channel)
281-
282-
# pull text or reaction
283-
if (
284-
hasattr(slack_msg, "reaction")
285-
and slack_msg.reaction
286-
and hasattr(slack_msg, "thread")
287-
and slack_msg.thread
288-
):
289-
# TODO
290-
self._slack_client.web_client.reactions_add(
291-
channel=channel_id,
292-
name=slack_msg.reaction,
293-
timestamp=slack_msg.thread,
294-
)
295-
elif hasattr(slack_msg, "msg") and slack_msg.msg:
296-
try:
297-
# send text to channel
298-
self._slack_client.web_client.chat_postMessage(
299-
channel=channel_id,
300-
text=getattr(slack_msg, "msg", ""),
301-
)
302-
except SlackApiError:
303-
# TODO
304-
...
305-
else:
306-
# cannot send empty message, log an error
307-
log.error(f"Received malformed SlackMessage instance: {slack_msg}")
308-
309-
if not self._inqueue.empty():
310-
# pull all SlackMessages from queue
311-
# do as burst to match SymphonyAdapter
312-
slack_msgs = []
313-
while not self._inqueue.empty():
314-
slack_msgs.append(self._inqueue.get())
315-
316-
# push to all the subscribers
317-
for adapter in self._subscribers:
318-
adapter.push_tick(slack_msgs)
319-
320-
# do short sleep
321-
sleep(0.1)
322-
323-
# liveness check
324-
if not self._thread.is_alive():
325-
self._running = False
326-
self._thread.join()
327-
328-
# shut down socket client
329-
try:
330-
# TODO which one?
331-
self._slack_client.close()
332-
# self._slack_client.disconnect()
333-
except AttributeError:
334-
# TODO bug in slack sdk causes an exception to be thrown
335-
# File "slack_sdk/socket_mode/builtin/connection.py", line 191, in disconnect
336-
# self.sock.close()
337-
# ^^^^^^^^^^^^^^^
338-
# AttributeError: 'NoneType' object has no attribute 'close'
339-
...
340-
341-
def _on_tick(self, value):
342-
self._outqueue.put(value)
343-
344-
345-
class SlackInputAdapterImpl(PushInputAdapter):
346-
def __init__(self, manager):
347-
manager.register_subscriber(self)
348-
super().__init__()
349-
350-
351-
class SlackOutputAdapterImpl(OutputAdapter):
352-
def __init__(self, manager):
353-
manager.register_publisher(self)
354-
self._manager = manager
355-
super().__init__()
356-
357-
def on_tick(self, time, value):
358-
self._manager._on_tick(value)
359-
360-
361-
_slack_input_adapter = py_push_adapter_def(
362-
name="SlackInputAdapter",
363-
adapterimpl=SlackInputAdapterImpl,
364-
out_type=ts[List[SlackMessage]],
365-
manager_type=SlackAdapterManager,
366-
)
367-
_slack_output_adapter = py_output_adapter_def(
368-
name="SlackOutputAdapter",
369-
adapterimpl=SlackOutputAdapterImpl,
370-
manager_type=SlackAdapterManager,
371-
input=ts[SlackMessage],
372-
)
4+
raise ModuleNotFoundError("Install `csp-adapter-slack` to use csp's Slack adapter")

0 commit comments

Comments
 (0)