From 78ad1da361f3db654e5cc95563f68ad0614ef775 Mon Sep 17 00:00:00 2001 From: Tim Paine <3105306+timkpaine@users.noreply.github.com> Date: Tue, 6 Feb 2024 12:46:34 -0500 Subject: [PATCH] Add a slack message adapter Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com> --- csp/adapters/slack.py | 368 +++++++++++++++++++++++++++++++ csp/tests/adapters/test_slack.py | 230 +++++++++++++++++++ pyproject.toml | 8 +- 3 files changed, 605 insertions(+), 1 deletion(-) create mode 100644 csp/adapters/slack.py create mode 100644 csp/tests/adapters/test_slack.py diff --git a/csp/adapters/slack.py b/csp/adapters/slack.py new file mode 100644 index 000000000..ae9ce06ed --- /dev/null +++ b/csp/adapters/slack.py @@ -0,0 +1,368 @@ +import threading +from logging import getLogger +from queue import Queue +from threading import Thread +from time import sleep +from typing import Dict, List, TypeVar + +import csp +from csp.impl.adaptermanager import AdapterManagerImpl +from csp.impl.outputadapter import OutputAdapter +from csp.impl.pushadapter import PushInputAdapter +from csp.impl.struct import Struct +from csp.impl.types.tstype import ts +from csp.impl.wiring import py_output_adapter_def, py_push_adapter_def + +try: + from slack_sdk.errors import SlackApiError + from slack_sdk.socket_mode import SocketModeClient + from slack_sdk.socket_mode.request import SocketModeRequest + from slack_sdk.socket_mode.response import SocketModeResponse + from slack_sdk.web import WebClient + + _HAVE_SLACK_SDK = True +except ImportError: + _HAVE_SLACK_SDK = False + +T = TypeVar("T") +log = getLogger(__file__) + + +__all__ = ("SlackMessage", "mention_user", "SlackAdapterManager", "SlackInputAdapterImpl", "SlackOutputAdapterImpl") + + +class SlackMessage(Struct): + user: str + user_email: str # email of the author + user_id: str # user id of the author + tags: List[str] # list of mentions + + channel: str # name of channel + channel_id: str # id of channel + channel_type: str # type of channel, in "message", "public" (app_mention), "private" (app_mention) + + msg: str # parsed text payload + reaction: str # emoji reacts + thread: str # thread id, if in thread + payload: dict # raw message payload + + +def mention_user(userid: str) -> str: + """Convenience method, more difficult to do in symphony but we want slack to be symmetric""" + return f"<@{userid}>" + + +class SlackAdapterManager(AdapterManagerImpl): + def __init__(self, app_token: str, bot_token: str): + if not _HAVE_SLACK_SDK: + raise RuntimeError("Could not find slack-sdk installation") + if not app_token.startswith("xapp-") or not bot_token.startswith("xoxb-"): + raise RuntimeError("Slack app token or bot token looks malformed") + + self._slack_client = SocketModeClient( + app_token=app_token, + web_client=WebClient(token=bot_token), + ) + self._slack_client.socket_mode_request_listeners.append(self._process_slack_message) + + # down stream edges + self._subscribers = [] + self._publishers = [] + + # message queues + self._inqueue: Queue[SlackMessage] = Queue() + self._outqueue: Queue[SlackMessage] = Queue() + + # handler thread + self._running: bool = False + self._thread: Thread = None + + # lookups for mentions and redirection + self._room_id_to_room_name: Dict[str, str] = {} + self._room_id_to_room_type: Dict[str, str] = {} + self._room_name_to_room_id: Dict[str, str] = {} + self._user_id_to_user_name: Dict[str, str] = {} + self._user_id_to_user_email: Dict[str, str] = {} + self._user_name_to_user_id: Dict[str, str] = {} + self._user_email_to_user_id: Dict[str, str] = {} + + def subscribe(self): + return _slack_input_adapter(self, push_mode=csp.PushMode.NON_COLLAPSING) + + def publish(self, msg: ts[SlackMessage]): + return _slack_output_adapter(self, msg) + + def _create(self, engine, memo): + # We'll avoid having a second class and make our AdapterManager and AdapterManagerImpl the same + super().__init__(engine) + return self + + def start(self, starttime, endtime): + self._running = True + self._thread = threading.Thread(target=self._run, daemon=True) + self._thread.start() + + def stop(self): + if self._running: + self._running = False + self._slack_client.close() + self._thread.join() + + def register_subscriber(self, adapter): + if adapter not in self._subscribers: + self._subscribers.append(adapter) + + def register_publisher(self, adapter): + if adapter not in self._publishers: + self._publishers.append(adapter) + + def _get_user_from_id(self, user_id): + # try to pull from cache + name = self._user_id_to_user_name.get(user_id, None) + email = self._user_id_to_user_email.get(user_id, None) + + # if none, refresh data via web client + if name is None or email is None: + ret = self._slack_client.web_client.users_info(user=user_id) + if ret.status_code == 200: + # TODO OAuth scopes required + name = ret.data["user"]["profile"].get("real_name_normalized", ret.data["user"]["name"]) + email = ret.data["user"]["profile"]["email"] + self._user_id_to_user_name[user_id] = name + self._user_name_to_user_id[name] = user_id # TODO is this 1-1 in slack? + self._user_id_to_user_email[user_id] = email + self._user_email_to_user_id[email] = user_id + return name, email + + def _get_user_from_name(self, user_name): + # try to pull from cache + user_id = self._user_name_to_user_id.get(user_name, None) + + # if none, refresh data via web client + if user_id is None: + # unfortunately the reverse lookup is not super nice... + # we need to pull all users and build the reverse mapping + ret = self._slack_client.web_client.users_list() + if ret.status_code == 200: + # TODO OAuth scopes required + for user in ret.data["members"]: + name = user["profile"].get("real_name_normalized", user["name"]) + user_id = user["profile"]["id"] + email = user["profile"]["email"] + self._user_id_to_user_name[user_id] = name + self._user_name_to_user_id[name] = user_id # TODO is this 1-1 in slack? + self._user_id_to_user_email[user_id] = email + self._user_email_to_user_id[email] = user_id + return self._user_name_to_user_id.get(user_name, None) + return user_id + + def _channel_data_to_channel_kind(self, data) -> str: + if data.get("is_im", False): + return "message" + if data.get("is_private", False): + return "private" + return "public" + + def _get_channel_from_id(self, channel_id): + # try to pull from cache + name = self._room_id_to_room_name.get(channel_id, None) + kind = self._room_id_to_room_type.get(channel_id, None) + + # if none, refresh data via web client + if name is None: + ret = self._slack_client.web_client.conversations_info(channel=channel_id) + if ret.status_code == 200: + # TODO OAuth scopes required + kind = self._channel_data_to_channel_kind(ret.data["channel"]) + if kind == "message": + # TODO use same behavior as symphony adapter + name = "DM" + else: + name = ret.data["channel"]["name"] + + self._room_id_to_room_name[channel_id] = name + self._room_name_to_room_id[name] = channel_id + self._room_id_to_room_type[channel_id] = kind + return name, kind + + def _get_channel_from_name(self, channel_name): + # try to pull from cache + channel_id = self._room_name_to_room_id.get(channel_name, None) + + # if none, refresh data via web client + if channel_id is None: + # unfortunately the reverse lookup is not super nice... + # we need to pull all channels and build the reverse mapping + ret = self._slack_client.web_client.conversations_list() + if ret.status_code == 200: + # TODO OAuth scopes required + for channel in ret.data["channels"]: + name = channel["name"] + channel_id = channel["id"] + kind = self._channel_data_to_channel_kind(channel) + self._room_id_to_room_name[channel_id] = name + self._room_name_to_room_id[name] = channel_id + self._room_id_to_room_type[channel_id] = kind + return self._room_name_to_room_id.get(channel_name, None) + return channel_id + + def _get_tags_from_message(self, blocks, authorizations=None) -> List[str]: + """extract tags from message, potentially excluding the bot's own @""" + authorizations = authorizations or [] + if len(authorizations) > 0: + bot_id = authorizations[0]["user_id"] # TODO more than one? + else: + bot_id = "" + + tags = [] + to_search = blocks.copy() + + while to_search: + element = to_search.pop() + # add subsections + if element.get("elements", []): + to_search.extend(element.get("elements")) + + if element.get("type", "") == "user": + tag_id = element.get("user_id") + if tag_id != bot_id: + # TODO tag with id or with name? + name, _ = self._get_user_from_id(tag_id) + if name: + tags.append(name) + return tags + + def _process_slack_message(self, client: SocketModeClient, req: SocketModeRequest): + log.info(req.payload) + if req.type == "events_api": + # Acknowledge the request anyway + response = SocketModeResponse(envelope_id=req.envelope_id) + client.send_socket_mode_response(response) + + if ( + req.payload["event"]["type"] in ("message", "app_mention") + and req.payload["event"].get("subtype") is None + ): + user, user_email = self._get_user_from_id(req.payload["event"]["user"]) + channel, channel_type = self._get_channel_from_id(req.payload["event"]["channel"]) + tags = self._get_tags_from_message(req.payload["event"]["blocks"], req.payload["authorizations"]) + slack_msg = SlackMessage( + user=user or "", + user_email=user_email or "", + user_id=req.payload["event"]["user"], + tags=tags, + channel=channel or "", + channel_id=req.payload["event"]["channel"], + channel_type=channel_type or "", + msg=req.payload["event"]["text"], + reaction="", + thread=req.payload["event"]["ts"], + payload=req.payload.copy(), + ) + self._inqueue.put(slack_msg) + + def _run(self): + self._slack_client.connect() + + while self._running: + # drain outbound + while not self._outqueue.empty(): + # pull SlackMessage from queue + slack_msg = self._outqueue.get() + + # refactor into slack command + # grab channel or DM + if hasattr(slack_msg, "channel_id") and slack_msg.channel_id: + channel_id = slack_msg.channel_id + elif hasattr(slack_msg, "channel") and slack_msg.channel: + # TODO DM + channel_id = self._get_channel_from_name(slack_msg.channel) + + # pull text or reaction + if ( + hasattr(slack_msg, "reaction") + and slack_msg.reaction + and hasattr(slack_msg, "thread") + and slack_msg.thread + ): + # TODO + self._slack_client.web_client.reactions_add( + channel=channel_id, + name=slack_msg.reaction, + timestamp=slack_msg.thread, + ) + elif hasattr(slack_msg, "msg") and slack_msg.msg: + try: + # send text to channel + self._slack_client.web_client.chat_postMessage( + channel=channel_id, + text=getattr(slack_msg, "msg", ""), + ) + except SlackApiError: + # TODO + ... + else: + # cannot send empty message, log an error + log.error(f"Received malformed SlackMessage instance: {slack_msg}") + + while not self._inqueue.empty(): + # pull SlackMessage from queue + slack_msg = self._inqueue.get() + + # push to all the subscribers + for adapter in self._subscribers: + adapter.push_tick(slack_msg) + + # do short sleep + sleep(.1) + + # liveness check + if not self._thread.is_alive(): + self._running = False + self._thread.join() + + # shut down socket client + try: + # TODO which one? + self._slack_client.close() + # self._slack_client.disconnect() + except AttributeError: + # TODO bug in slack sdk causes an exception to be thrown + # File "slack_sdk/socket_mode/builtin/connection.py", line 191, in disconnect + # self.sock.close() + # ^^^^^^^^^^^^^^^ + # AttributeError: 'NoneType' object has no attribute 'close' + ... + + def _on_tick(self, value): + self._outqueue.put(value) + + +class SlackInputAdapterImpl(PushInputAdapter): + def __init__(self, manager): + manager.register_subscriber(self) + super().__init__() + + +class SlackOutputAdapterImpl(OutputAdapter): + def __init__(self, manager): + manager.register_publisher(self) + self._manager = manager + super().__init__() + + def on_tick(self, time, value): + self._manager._on_tick(value) + + +_slack_input_adapter = py_push_adapter_def( + name="SlackInputAdapter", + adapterimpl=SlackInputAdapterImpl, + out_type=ts[SlackMessage], + manager_type=SlackAdapterManager, +) +_slack_output_adapter = py_output_adapter_def( + name="SlackOutputAdapter", + adapterimpl=SlackOutputAdapterImpl, + manager_type=SlackAdapterManager, + input=ts[SlackMessage], +) diff --git a/csp/tests/adapters/test_slack.py b/csp/tests/adapters/test_slack.py new file mode 100644 index 000000000..813bf36e1 --- /dev/null +++ b/csp/tests/adapters/test_slack.py @@ -0,0 +1,230 @@ +import pytest +from datetime import timedelta +from unittest.mock import MagicMock, call, patch + +import csp +from csp import ts +from csp.adapters.slack import SlackAdapterManager, SlackMessage, mention_user + + +@csp.node +def hello(msg: ts[SlackMessage]) -> ts[SlackMessage]: + if csp.ticked(msg): + text = f"Hello <@{msg.user_id}>!" + return SlackMessage( + channel="a new channel", + # reply in thread + thread=msg.thread, + msg=text, + ) + + +@csp.node +def react(msg: ts[SlackMessage]) -> ts[SlackMessage]: + if csp.ticked(msg): + return SlackMessage( + channel=msg.channel, + channel_id=msg.channel_id, + thread=msg.thread, + reaction="eyes", + ) + + +@csp.node +def send_fake_message(clientmock: MagicMock, requestmock: MagicMock, am: SlackAdapterManager) -> ts[bool]: + with csp.alarms(): + a_send = csp.alarm(bool) + with csp.start(): + csp.schedule_alarm(a_send, timedelta(seconds=1), True) + if csp.ticked(a_send): + if a_send: + am._process_slack_message(clientmock, requestmock) + csp.schedule_alarm(a_send, timedelta(seconds=1), False) + else: + return True + + +PUBLIC_CHANNEL_MENTION_PAYLOAD = { + "token": "ABCD", + "team_id": "EFGH", + "api_app_id": "HIJK", + "event": { + "client_msg_id": "1234-5678", + "type": "app_mention", + "text": "<@BOTID> <@USERID> <@USERID2>", + "user": "USERID", + "ts": "1.2", + "blocks": [ + { + "type": "rich_text", + "block_id": "tx381", + "elements": [ + { + "type": "rich_text_section", + "elements": [ + {"type": "user", "user_id": "BOTID"}, + {"type": "text", "text": " "}, + {"type": "user", "user_id": "USERID"}, + {"type": "text", "text": " "}, + {"type": "user", "user_id": "USERID2"}, + ], + } + ], + } + ], + "team": "ABCD", + "channel": "EFGH", + "event_ts": "1.2", + }, + "type": "event_callback", + "event_id": "ABCD", + "event_time": 1707423091, + "authorizations": [ + {"enterprise_id": None, "team_id": "ABCD", "user_id": "BOTID", "is_bot": True, "is_enterprise_install": False} + ], + "is_ext_shared_channel": False, + "event_context": "SOMELONGCONTEXT", +} +DIRECT_MESSAGE_PAYLOAD = { + "token": "ABCD", + "team_id": "EFGH", + "context_team_id": "ABCD", + "context_enterprise_id": None, + "api_app_id": "HIJK", + "event": { + "client_msg_id": "1234-5678", + "type": "message", + "text": "test", + "user": "USERID", + "ts": "2.1", + "blocks": [ + { + "type": "rich_text", + "block_id": "gB9fq", + "elements": [{"type": "rich_text_section", "elements": [{"type": "text", "text": "test"}]}], + } + ], + "team": "ABCD", + "channel": "EFGH", + "event_ts": "2.1", + "channel_type": "im", + }, + "type": "event_callback", + "event_id": "ABCD", + "event_time": 1707423220, + "authorizations": [ + {"enterprise_id": None, "team_id": "ABCD", "user_id": "BOTID", "is_bot": True, "is_enterprise_install": False} + ], + "is_ext_shared_channel": False, + "event_context": "SOMELONGCONTEXT", +} + + +class TestSlack: + def test_slack_tokens(self): + with pytest.raises(RuntimeError): + SlackAdapterManager("abc", "def") + + @pytest.mark.parametrize("payload", (PUBLIC_CHANNEL_MENTION_PAYLOAD, DIRECT_MESSAGE_PAYLOAD)) + def test_slack(self, payload): + with patch("csp.adapters.slack.SocketModeClient") as clientmock: + # mock out the event from the slack sdk + reqmock = MagicMock() + reqmock.type = "events_api" + reqmock.payload = payload + + # mock out the user/room lookup responses + mock_user_response = MagicMock(name="users_info_mock") + mock_user_response.status_code = 200 + mock_user_response.data = { + "user": {"profile": {"real_name_normalized": "johndoe", "email": "johndoe@some.email"}, "name": "blerg"} + } + clientmock.return_value.web_client.users_info.return_value = mock_user_response + mock_room_response = MagicMock(name="conversations_info_mock") + mock_room_response.status_code = 200 + mock_room_response.data = {"channel": {"is_im": False, "is_private": True, "name": "a private channel"}} + clientmock.return_value.web_client.conversations_info.return_value = mock_room_response + mock_list_response = MagicMock(name="conversations_list_mock") + mock_list_response.status_code = 200 + mock_list_response.data = { + "channels": [ + {"name": "a private channel", "id": "EFGH"}, + {"name": "a new channel", "id": "new_channel"}, + ] + } + clientmock.return_value.web_client.conversations_list.return_value = mock_list_response + + def graph(): + am = SlackAdapterManager("xapp-1-dummy", "xoxb-dummy") + + # send a fake slack message to the app + stop = send_fake_message(clientmock, reqmock, am) + + # send a response + resp = hello(am.subscribe()) + am.publish(resp) + + # do a react + rct = react(am.subscribe()) + am.publish(rct) + + csp.add_graph_output("response", resp) + csp.add_graph_output("react", rct) + + # stop after first messages + done_flag = (csp.count(stop) + csp.count(resp) + csp.count(rct)) == 3 + csp.stop_engine(stop) + + # run the graph + resp = csp.run(graph, realtime=True) + + # check outputs + if payload == PUBLIC_CHANNEL_MENTION_PAYLOAD: + assert resp["react"] + assert resp["response"] + + assert resp["react"][0][1] == SlackMessage( + channel="a private channel", channel_id="EFGH", reaction="eyes", thread="1.2" + ) + assert resp["response"][0][1] == SlackMessage( + channel="a new channel", msg="Hello <@USERID>!", thread="1.2" + ) + else: + assert resp["react"] + assert resp["response"] + + assert resp["react"][0][1] == SlackMessage( + channel="a private channel", channel_id="EFGH", reaction="eyes", thread="2.1" + ) + assert resp["response"][0][1] == SlackMessage( + channel="a new channel", msg="Hello <@USERID>!", thread="2.1" + ) + + # check all inbound mocks got called + if payload == PUBLIC_CHANNEL_MENTION_PAYLOAD: + assert clientmock.return_value.web_client.users_info.call_count == 2 + else: + assert clientmock.return_value.web_client.users_info.call_count == 1 + assert clientmock.return_value.web_client.conversations_info.call_count == 1 + + # check all outbound mocks got called + assert clientmock.return_value.web_client.reactions_add.call_count == 1 + assert clientmock.return_value.web_client.chat_postMessage.call_count == 1 + + if payload == PUBLIC_CHANNEL_MENTION_PAYLOAD: + assert clientmock.return_value.web_client.reactions_add.call_args_list == [ + call(channel="EFGH", name="eyes", timestamp="1.2") + ] + assert clientmock.return_value.web_client.chat_postMessage.call_args_list == [ + call(channel="new_channel", text="Hello <@USERID>!") + ] + else: + assert clientmock.return_value.web_client.reactions_add.call_args_list == [ + call(channel="EFGH", name="eyes", timestamp="2.1") + ] + assert clientmock.return_value.web_client.chat_postMessage.call_args_list == [ + call(channel="new_channel", text="Hello <@USERID>!") + ] + + def test_mention_user(self): + assert mention_user("ABCD") == "<@ABCD>" diff --git a/pyproject.toml b/pyproject.toml index f046f0063..d971bc6de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ develop = [ "isort", "ruamel.yaml", "ruff", + "scikit-build", "twine", "wheel", # Test dependencies @@ -63,8 +64,13 @@ develop = [ "pytest-asyncio", "pytest-cov", "pytest-sugar", - "scikit-build", + "sqlalchemy", "tornado", + # Extras + "slack-sdk>=3", +] +slack = [ + "slack-sdk>=3", ] [tool.check-manifest]