Skip to content

Commit

Permalink
WIP adding slack message adapter
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Paine <[email protected]>
  • Loading branch information
timkpaine committed Feb 6, 2024
1 parent 03b1f51 commit 5355ebe
Showing 1 changed file with 311 additions and 0 deletions.
311 changes: 311 additions & 0 deletions csp/adapters/slack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
import random
import threading
import time
from datetime import datetime, timedelta
from logging import getLogger
from queue import Queue
from threading import Event
from time import sleep
from typing import List, TypeVar

import csp
from csp.impl.adaptermanager import AdapterManagerImpl
from csp.impl.outputadapter import OutputAdapter
from csp.impl.pushadapter import PushInputAdapter
from csp.impl.struct import Struct
from csp.impl.types.tstype import ts
from csp.impl.wiring import py_output_adapter_def, py_push_adapter_def

try:
from slack_sdk.errors import SlackApiError
from slack_sdk.socket_mode import SocketModeClient
from slack_sdk.socket_mode.request import SocketModeRequest
from slack_sdk.socket_mode.response import SocketModeResponse
from slack_sdk.web import WebClient

_HAVE_SLACK_SDK = True
except ImportError:
_HAVE_SLACK_SDK = False

T = TypeVar("T")
log = getLogger(__file__)


class SlackMessage(csp.Struct):
user: str
user_email: str # email of the author
user_id: str # user id of the author
tags: [str] # list of mentions

channel: str # name of channel
channel_id: str # id of channel
channel_type: str # type of channel, in "message", "public" (app_mention), "private" (app_mention)

msg: str # parsed text payload
reaction: str # emoji reacts
thread: str # thread id, if in thread
payload: dict # raw message payload


def mention_user(email_or_userid: str = ""):
# TODO
return ""


class SlackAdapterManager(AdapterManagerImpl):
def __init__(self, app_token: str, bot_token: str):
if not _HAVE_SLACK_SDK:
raise RuntimeError("Could not find slack-sdk installation")
if not app_token.startswith("xapp-") or not bot_token.startswith("xoxb-"):
raise RuntimeError("Slack app token or bot token looks malformed")

self._slack_client = SocketModeClient(
app_token=app_token,
web_client=WebClient(token=bot_token),
)
self._slack_client.socket_mode_request_listeners.append(self._process_slack_message)

# down stream edges
self._subscribers = []
self._publishers = []

# message queues
self._inqueue: Queue[SlackMessage] = Queue()
self._outqueue: Queue[SlackMessage] = Queue()

# handler thread
self._running: bool = False
self._thread: Thread = None

# lookups for mentions and redirection
self._room_id_to_room_name: Dict[str, str] = {}
self._room_id_to_room_type: Dict[str, str] = {}
self._room_name_to_room_id: Dict[str, str] = {}
self._user_id_to_user_name: Dict[str, str] = {}
self._user_id_to_user_email: Dict[str, str] = {}
self._user_name_to_user_id: Dict[str, str] = {}
self._user_email_to_user_id: Dict[str, str] = {}

def subscribe(self):
return _slack_input_adapter(self, push_mode=csp.PushMode.NON_COLLAPSING)

def publish(self, msg: ts[SlackMessage]):
return _slack_output_adapter(self, msg)

def _create(self, engine, memo):
# We'll avoid having a second class and make our AdapterManager and AdapterManagerImpl the same
super().__init__(engine)
return self

def start(self, starttime, endtime):
self._running = True
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()

def stop(self):
if self._running:
self._running = False
self._slack_client.close()
self._thread.join()

def register_subscriber(self, adapter):
if adapter not in self._subscribers:
self._subscribers.append(adapter)

def register_publisher(self, adapter):
if adapter not in self._publishers:
self._publishers.append(adapter)

def _get_user_from_id(self, user_id):
# try to pull from cache
name = self._user_id_to_user_name.get(user_id, None)
email = self._user_id_to_user_email.get(user_id, None)

# if none, refresh data via web client
if name is None or email is None:
ret = self._slack_client.web_client.users_info(user=user_id)
if ret.status_code == 200:
# TODO OAuth scopes required
name = ret.data["user"]["profile"].get("real_name_normalized", ret.data["user"]["name"])
email = ret.data["user"]["profile"]["email"]
self._user_id_to_user_name[user_id] = name
self._user_name_to_user_id[name] = user_id # TODO is this 1-1 in slack?
self._user_id_to_user_email[user_id] = email
self._user_email_to_user_id[email] = user_id
return name, email

def _get_user_from_name(self, user_name):
# try to pull from cache
user_id = self._user_name_to_user_id.get(user_name, None)

# if none, refresh data via web client
if user_id is None:
# unfortunately the reverse lookup is not super nice...
# we need to pull all users and build the reverse mapping
ret = self._slack_client.web_client.users_list()
if ret.status_code == 200:
# TODO OAuth scopes required
for user in ret.data["members"]:
name = user["profile"].get("real_name_normalized", user["name"])
user_id = user["profile"]["id"]
email = user["profile"]["email"]
self._user_id_to_user_name[user_id] = name
self._user_name_to_user_id[name] = user_id # TODO is this 1-1 in slack?
self._user_id_to_user_email[user_id] = email
self._user_email_to_user_id[email] = user_id
return self._user_name_to_user_id.get(user_name, None)
return user_id

def _channel_data_to_channel_kind(self, data) -> str:
if data.get("is_im", False):
return "message"
if data.get("is_private", False):
return "private"
return "public"

def _get_channel_from_id(self, channel_id):
# try to pull from cache
name = self._room_id_to_room_name.get(channel_id, None)
kind = self._room_id_to_room_type.get(channel_id, None)

# if none, refresh data via web client
if name is None:
ret = self._slack_client.web_client.conversations_info(channel=channel_id)
if ret.status_code == 200:
# TODO OAuth scopes required
name = ret.data["channel"]["name"]
kind = self._channel_data_to_channel_kind(ret.data["channel"])
self._room_id_to_room_name[channel_id] = name
self._room_name_to_room_id[name] = channel_id
self._room_id_to_room_type[channel_id] = kind
return name, kind

def _get_channel_from_name(self, channel_name):
# try to pull from cache
channel_id = self._room_name_to_room_id.get(channel_name, None)

# if none, refresh data via web client
if channel_id is None:
# unfortunately the reverse lookup is not super nice...
# we need to pull all channels and build the reverse mapping
ret = client.web_client.conversations_list()
if ret.status_code == 200:
# TODO OAuth scopes required
for channel in ret.data["channels"]:
name = channel["name"]
channel_id = channel["id"]
kind = self._channel_data_to_channel_kind(channel)
self._room_id_to_room_name[channel_id] = name
self._room_name_to_room_id[name] = channel_id
self._room_id_to_room_type[channel_id] = kind
return self._room_name_to_room_id.get(channel_name, None)
return channel_id

def _process_slack_message(self, client: SocketModeClient, req: SocketModeRequest):
log.info(req.payload)
if req.type == "events_api":
# Acknowledge the request anyway
response = SocketModeResponse(envelope_id=req.envelope_id)
client.send_socket_mode_response(response)

if (
req.payload["event"]["type"] in ("message", "app_mention")
and req.payload["event"].get("subtype") is None
):
user, user_email = self._get_user_from_id(req.payload["event"]["user"])
channel, channel_type = self._get_channel_from_id(req.payload["event"]["channel"])
slack_msg = SlackMessage(
user=user or "",
user_email=user_email or "",
user_id=req.payload["event"]["user"],
tags=[],
channel=channel or "",
channel_id=req.payload["event"]["channel"],
channel_type=channel_type or "",
msg=req.payload["event"]["text"],
reaction="",
thread="TODO",
payload=req.payload.copy(),
)
self._inqueue.put(slack_msg)

def _run(self):
self._slack_client.connect()

while self._running:
# drain outbound
while not self._outqueue.empty():
# pull SlackMessage from queue
slack_msg = self._outqueue.get()

# refactor into slack command
# grab channel or DM
if hasattr(slack_msg, "channel_id") and slack_msg.channel_id:
channel_id = slack_msg.channel_id
elif hasattr(slack_msg, "channel") and slack_msg.channel:
# TODO DM
channel_id = self._get_channel_from_name(slack_msg.channel)

# pull text or reaction
if hasattr(slack_msg, "reaction") and slack_msg.reaction:
# TODO
...
else:
# send text to channel
self._slack_client.web_client.chat_postMessage(
channel=channel_id,
text=getattr(slack_msg, "msg", ""),
)

while not self._inqueue.empty():
# pull SlackMessage from queue
slack_msg = self._inqueue.get()

# push to all the subscribers
for adapter in self._subscribers:
adapter.push_tick(slack_msg)

# do short sleep
sleep(1)

# liveness check
if not self._thread.is_alive():
self._running = False
self._thread.join()

# shut down socket client
self._slack_client.close()
# self._slack_client.disconnect()

def _on_tick(self, value):
self._outqueue.put(value)


class SlackInputAdapterImpl(PushInputAdapter):
def __init__(self, manager):
manager.register_subscriber(self)
super().__init__()


class SlackOutputAdapterImpl(OutputAdapter):
def __init__(self, manager):
manager.register_publisher(self)
self._manager = manager
super().__init__()

def on_tick(self, time, value):
self._manager._on_tick(value)


_slack_input_adapter = py_push_adapter_def(
name="SlackInputAdapter",
adapterimpl=SlackInputAdapterImpl,
out_type=ts[SlackMessage],
manager_type=SlackAdapterManager,
)
_slack_output_adapter = py_output_adapter_def(
name="SlackOutputAdapter",
adapterimpl=SlackOutputAdapterImpl,
manager_type=SlackAdapterManager,
input=ts[SlackMessage],
)

0 comments on commit 5355ebe

Please sign in to comment.