diff --git a/CMakeLists.txt b/CMakeLists.txt index 572e4e0ac..4bb693281 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -172,6 +172,7 @@ else() if(CSP_BUILD_NO_CXX_ABI) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_GLIBCXX_USE_CXX11_ABI=0") endif() + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fvisibility=hidden") if (COVERAGE) # TODO windows add_compile_options(--coverage) diff --git a/Makefile b/Makefile index d37aa8f64..6c8bf66f0 100644 --- a/Makefile +++ b/Makefile @@ -26,6 +26,9 @@ build-debug: ## build the library ( DEBUG ) - May need a make clean when switch build-conda: ## build the library in Conda python setup.py build build_ext --csp-no-vcpkg --inplace +build-conda-debug: ## build the library ( DEBUG ) - in Conda + SKBUILD_CONFIGURE_OPTIONS="" DEBUG=1 python setup.py build build_ext --csp-no-vcpkg --inplace + install: ## install library python -m pip install . diff --git a/conda/dev-environment-unix.yml b/conda/dev-environment-unix.yml index fa3740a5b..d6ad291c8 100644 --- a/conda/dev-environment-unix.yml +++ b/conda/dev-environment-unix.yml @@ -45,7 +45,6 @@ dependencies: - ruamel.yaml - ruff>=0.3,<0.4 - scikit-build - - slack-sdk - sqlalchemy - tar - threadpoolctl diff --git a/conda/dev-environment-win.yml b/conda/dev-environment-win.yml index 8ca2482d8..8c1ce5fdb 100644 --- a/conda/dev-environment-win.yml +++ b/conda/dev-environment-win.yml @@ -44,7 +44,6 @@ dependencies: - ruamel.yaml - ruff>=0.3,<0.4 - scikit-build - - slack-sdk - sqlalchemy - threadpoolctl - tornado diff --git a/cpp/csp/adapters/kafka/KafkaAdapterManager.h b/cpp/csp/adapters/kafka/KafkaAdapterManager.h index 234f5fc11..bf4e23158 100644 --- a/cpp/csp/adapters/kafka/KafkaAdapterManager.h +++ b/cpp/csp/adapters/kafka/KafkaAdapterManager.h @@ -47,7 +47,7 @@ struct KafkaStatusMessageTypeTraits using KafkaStatusMessageType = csp::Enum; //Top level AdapterManager object for all kafka adapters in the engine -class KafkaAdapterManager final : public csp::AdapterManager +class CSP_PUBLIC KafkaAdapterManager final : public csp::AdapterManager { public: KafkaAdapterManager( csp::Engine * engine, const Dictionary & properties ); diff --git a/cpp/csp/adapters/parquet/DialectGenericListReaderInterface.h b/cpp/csp/adapters/parquet/DialectGenericListReaderInterface.h index c88097f6f..61b77cd85 100644 --- a/cpp/csp/adapters/parquet/DialectGenericListReaderInterface.h +++ b/cpp/csp/adapters/parquet/DialectGenericListReaderInterface.h @@ -25,7 +25,7 @@ class DialectGenericListReaderInterface }; template< typename T > -class TypedDialectGenericListReaderInterface : public DialectGenericListReaderInterface +class CSP_PUBLIC TypedDialectGenericListReaderInterface : public DialectGenericListReaderInterface { public: using Ptr = std::shared_ptr>; @@ -45,4 +45,4 @@ class TypedDialectGenericListReaderInterface : public DialectGenericListReaderIn } -#endif \ No newline at end of file +#endif diff --git a/cpp/csp/adapters/parquet/ParquetInputAdapterManager.h b/cpp/csp/adapters/parquet/ParquetInputAdapterManager.h index baa67e19c..8f3effa1e 100644 --- a/cpp/csp/adapters/parquet/ParquetInputAdapterManager.h +++ b/cpp/csp/adapters/parquet/ParquetInputAdapterManager.h @@ -18,7 +18,7 @@ namespace csp::adapters::parquet //Top level AdapterManager object for all parquet adapters in the engine -class ParquetInputAdapterManager final : public csp::AdapterManager +class CSP_PUBLIC ParquetInputAdapterManager final : public csp::AdapterManager { public: using GeneratorPtr = csp::Generator::Ptr; diff --git a/cpp/csp/adapters/parquet/ParquetOutputAdapterManager.h b/cpp/csp/adapters/parquet/ParquetOutputAdapterManager.h index a2b5da200..b7fe029f4 100644 --- a/cpp/csp/adapters/parquet/ParquetOutputAdapterManager.h +++ b/cpp/csp/adapters/parquet/ParquetOutputAdapterManager.h @@ -21,7 +21,7 @@ class ParquetOutputFilenameAdapter; class ParquetDictBasketOutputWriter; //Top level AdapterManager object for all parquet adapters in the engine -class ParquetOutputAdapterManager final : public csp::AdapterManager +class CSP_PUBLIC ParquetOutputAdapterManager final : public csp::AdapterManager { public: using FileVisitorCallback = std::function; diff --git a/cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.cpp b/cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.cpp index 3fe763a60..0525ffbf9 100644 --- a/cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.cpp +++ b/cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.cpp @@ -734,7 +734,7 @@ void ListColumnAdapter::readCurValue() if( this -> m_curChunkArray -> IsValid( curRow ) ) { auto values = this -> m_curChunkArray -> value_slice( curRow ); - auto typedValues = std::dynamic_pointer_cast( values ); + auto typedValues = std::static_pointer_cast( values ); auto arrayValue = m_listReader -> create( typedValues -> length() ); auto* internalBuffer = m_listReader -> getRawDataBuffer( arrayValue ); diff --git a/cpp/csp/adapters/websocket/ClientAdapterManager.h b/cpp/csp/adapters/websocket/ClientAdapterManager.h index 62577d769..b2b15fa78 100644 --- a/cpp/csp/adapters/websocket/ClientAdapterManager.h +++ b/cpp/csp/adapters/websocket/ClientAdapterManager.h @@ -40,10 +40,8 @@ struct WebsocketClientStatusTypeTraits using ClientStatusType = Enum; -class ClientAdapterManager final : public AdapterManager +class CSP_PUBLIC ClientAdapterManager final : public AdapterManager { - - public: ClientAdapterManager( Engine * engine, @@ -78,4 +76,4 @@ class ClientAdapterManager final : public AdapterManager } -#endif \ No newline at end of file +#endif diff --git a/cpp/csp/core/Exception.h b/cpp/csp/core/Exception.h index 074143de7..5227acf34 100644 --- a/cpp/csp/core/Exception.h +++ b/cpp/csp/core/Exception.h @@ -10,7 +10,7 @@ namespace csp { -class Exception : public std::exception +class CSP_PUBLIC Exception : public std::exception { public: Exception( const char * exType, const std::string & description, const char * file, const char * func, int line ) : @@ -59,7 +59,7 @@ class Exception : public std::exception }; #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__) -#define CSP_DECLARE_EXCEPTION( DerivedException, BaseException ) class DerivedException : public BaseException { public: DerivedException( const char * exType, const std::string &r, const char * file, const char * func, int line ) : BaseException( exType, r, file, func, line ) {} }; +#define CSP_DECLARE_EXCEPTION( DerivedException, BaseException ) class CSP_PUBLIC DerivedException : public BaseException { public: DerivedException( const char * exType, const std::string &r, const char * file, const char * func, int line ) : BaseException( exType, r, file, func, line ) {} }; CSP_DECLARE_EXCEPTION( AssertionError, Exception ) CSP_DECLARE_EXCEPTION( RuntimeException, Exception ) diff --git a/cpp/csp/core/Platform.h b/cpp/csp/core/Platform.h index 37474faf6..3ef861259 100644 --- a/cpp/csp/core/Platform.h +++ b/cpp/csp/core/Platform.h @@ -14,7 +14,8 @@ #undef ERROR #undef GetMessage -#define DLL_LOCAL +#define CSP_LOCAL +#define CSP_PUBLIC __declspec(dllexport) #ifdef CSPTYPESIMPL_EXPORTS #define CSPTYPESIMPL_EXPORT __declspec(dllexport) @@ -90,10 +91,11 @@ inline uint8_t ffs(uint64_t n) #else -#define CSPIMPL_EXPORT -#define CSPTYPESIMPL_EXPORT +#define CSPIMPL_EXPORT __attribute__ ((visibility ("default"))) +#define CSPTYPESIMPL_EXPORT __attribute__ ((visibility ("default"))) -#define DLL_LOCAL __attribute__ ((visibility ("hidden"))) +#define CSP_LOCAL __attribute__ ((visibility ("hidden"))) +#define CSP_PUBLIC __attribute__ ((visibility ("default"))) #define START_PACKED #define END_PACKED __attribute__((packed)) diff --git a/cpp/csp/engine/AdapterManager.h b/cpp/csp/engine/AdapterManager.h index a0c1531ee..fd71126c9 100644 --- a/cpp/csp/engine/AdapterManager.h +++ b/cpp/csp/engine/AdapterManager.h @@ -93,7 +93,7 @@ bool ManagedSimInputAdapter::pushNullTick() return true; } -class AdapterManager : public EngineOwned +class CSP_PUBLIC AdapterManager : public EngineOwned { public: AdapterManager( csp::Engine * ); diff --git a/cpp/csp/engine/Feedback.h b/cpp/csp/engine/Feedback.h index 0194d9257..ae02925aa 100644 --- a/cpp/csp/engine/Feedback.h +++ b/cpp/csp/engine/Feedback.h @@ -28,7 +28,7 @@ class FeedbackOutputAdapter final : public OutputAdapter }; template -class FeedbackInputAdapter final : public InputAdapter +class CSP_PUBLIC FeedbackInputAdapter final : public InputAdapter { public: using InputAdapter::InputAdapter; diff --git a/cpp/csp/engine/Struct.h b/cpp/csp/engine/Struct.h index e0653e4a3..1b09cb97b 100644 --- a/cpp/csp/engine/Struct.h +++ b/cpp/csp/engine/Struct.h @@ -756,13 +756,13 @@ class Struct void decref() { //Work around GCC12 bug mis-identifying this code as use-after-free -#ifdef __linux__ +#if defined(__linux__) && !defined(__clang__) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wuse-after-free" #endif if( --hidden() -> refcount == 0 ) delete this; -#ifdef __linux__ +#if defined(__linux__) && !defined(__clang__) #pragma GCC diagnostic pop #endif } diff --git a/cpp/csp/python/Exception.h b/cpp/csp/python/Exception.h index 5c7c2e80a..6040c79a2 100644 --- a/cpp/csp/python/Exception.h +++ b/cpp/csp/python/Exception.h @@ -8,7 +8,7 @@ namespace csp::python { -class PythonPassthrough : public csp::Exception +class CSP_PUBLIC PythonPassthrough : public csp::Exception { public: PythonPassthrough( const char * exType, const std::string &r, const char * file, diff --git a/cpp/csp/python/InitHelper.h b/cpp/csp/python/InitHelper.h index e62dd05b7..1665f847f 100644 --- a/cpp/csp/python/InitHelper.h +++ b/cpp/csp/python/InitHelper.h @@ -10,7 +10,7 @@ namespace csp::python { -class DLL_LOCAL InitHelper +class CSP_LOCAL InitHelper { public: ~InitHelper() {} @@ -111,4 +111,21 @@ inline bool InitHelper::execute( PyObject * module ) } } + +//PyMODINIT_FUNC in Python <3.9 doesn't export the function/make visible +//this is required since we build with hidden visibility by default +//the below macro code can be removed once 3.8 support is dropped +// +//see similar issues: +//https://github.com/scipy/scipy/issues/15996 +//https://github.com/mesonbuild/meson/pull/10369 + +#if PY_VERSION_HEX < 0x03090000 +#ifdef PyMODINIT_FUNC +#undef PyMODINIT_FUNC +#endif + +#define PyMODINIT_FUNC extern "C" CSP_PUBLIC PyObject* +#endif + #endif diff --git a/cpp/csp/python/PyCspEnum.h b/cpp/csp/python/PyCspEnum.h index 0da9e9dbc..fb098735c 100644 --- a/cpp/csp/python/PyCspEnum.h +++ b/cpp/csp/python/PyCspEnum.h @@ -27,7 +27,7 @@ struct CSPTYPESIMPL_EXPORT PyCspEnumMeta : public PyHeapTypeObject static PyTypeObject PyType; }; -//TODO Windows - need to figure out why adding DLL_PUBLIC to this class leads to weird compilation errors on CspEnumMeta's unordered_map... +//TODO Windows - need to figure out why adding CSP_PUBLIC to this class leads to weird compilation errors on CspEnumMeta's unordered_map... //This is an extension of csp::CspEnumMeta for python dialect, we need it in order to //keep a reference to the python enum type from conversion to/from csp::CspEnumMeta <-> PyObject properly diff --git a/cpp/csp/python/adapters/CMakeLists.txt b/cpp/csp/python/adapters/CMakeLists.txt index abcae763e..8cb34950f 100644 --- a/cpp/csp/python/adapters/CMakeLists.txt +++ b/cpp/csp/python/adapters/CMakeLists.txt @@ -35,7 +35,7 @@ if(CSP_BUILD_PARQUET_ADAPTER) endif() target_link_libraries(parquetadapterimpl csp_core csp_engine cspimpl csp_parquet_adapter) target_include_directories(parquetadapterimpl PUBLIC ${ARROW_INCLUDE_DIR} ${PARQUET_INCLUDE_DIR} "${VENDORED_PYARROW_ROOT}") - target_compile_definitions(parquetadapterimpl PUBLIC ARROW_PYTHON_STATIC) + target_compile_definitions(parquetadapterimpl PUBLIC ARROW_PYTHON_STATIC -DARROW_PYTHON_EXPORT=) install(TARGETS parquetadapterimpl RUNTIME DESTINATION ${CSP_RUNTIME_INSTALL_SUBDIR} ) endif() diff --git a/csp/adapters/slack.py b/csp/adapters/slack.py index 0998e6983..6838504b4 100644 --- a/csp/adapters/slack.py +++ b/csp/adapters/slack.py @@ -1,372 +1,4 @@ -import threading -from logging import getLogger -from queue import Queue -from ssl import SSLContext -from threading import Thread -from time import sleep -from typing import Dict, List, Optional, TypeVar - -import csp -from csp.impl.adaptermanager import AdapterManagerImpl -from csp.impl.outputadapter import OutputAdapter -from csp.impl.pushadapter import PushInputAdapter -from csp.impl.struct import Struct -from csp.impl.types.tstype import ts -from csp.impl.wiring import py_output_adapter_def, py_push_adapter_def - try: - from slack_sdk.errors import SlackApiError - from slack_sdk.socket_mode import SocketModeClient - from slack_sdk.socket_mode.request import SocketModeRequest - from slack_sdk.socket_mode.response import SocketModeResponse - from slack_sdk.web import WebClient - - _HAVE_SLACK_SDK = True + from csp_adapter_slack import * # noqa: F403 except ImportError: - _HAVE_SLACK_SDK = False - -T = TypeVar("T") -log = getLogger(__file__) - - -__all__ = ("SlackMessage", "mention_user", "SlackAdapterManager", "SlackInputAdapterImpl", "SlackOutputAdapterImpl") - - -class SlackMessage(Struct): - user: str - user_email: str # email of the author - user_id: str # user id of the author - tags: List[str] # list of mentions - - channel: str # name of channel - channel_id: str # id of channel - channel_type: str # type of channel, in "message", "public" (app_mention), "private" (app_mention) - - msg: str # parsed text payload - reaction: str # emoji reacts - thread: str # thread id, if in thread - payload: dict # raw message payload - - -def mention_user(userid: str) -> str: - """Convenience method, more difficult to do in symphony but we want slack to be symmetric""" - return f"<@{userid}>" - - -class SlackAdapterManager(AdapterManagerImpl): - def __init__(self, app_token: str, bot_token: str, ssl: Optional[SSLContext] = None): - if not _HAVE_SLACK_SDK: - raise RuntimeError("Could not find slack-sdk installation") - if not app_token.startswith("xapp-") or not bot_token.startswith("xoxb-"): - raise RuntimeError("Slack app token or bot token looks malformed") - - self._slack_client = SocketModeClient( - app_token=app_token, - web_client=WebClient(token=bot_token, ssl=ssl), - ) - self._slack_client.socket_mode_request_listeners.append(self._process_slack_message) - - # down stream edges - self._subscribers = [] - self._publishers = [] - - # message queues - self._inqueue: Queue[SlackMessage] = Queue() - self._outqueue: Queue[SlackMessage] = Queue() - - # handler thread - self._running: bool = False - self._thread: Thread = None - - # lookups for mentions and redirection - self._room_id_to_room_name: Dict[str, str] = {} - self._room_id_to_room_type: Dict[str, str] = {} - self._room_name_to_room_id: Dict[str, str] = {} - self._user_id_to_user_name: Dict[str, str] = {} - self._user_id_to_user_email: Dict[str, str] = {} - self._user_name_to_user_id: Dict[str, str] = {} - self._user_email_to_user_id: Dict[str, str] = {} - - def subscribe(self): - return _slack_input_adapter(self, push_mode=csp.PushMode.NON_COLLAPSING) - - def publish(self, msg: ts[SlackMessage]): - return _slack_output_adapter(self, msg) - - def _create(self, engine, memo): - # We'll avoid having a second class and make our AdapterManager and AdapterManagerImpl the same - super().__init__(engine) - return self - - def start(self, starttime, endtime): - self._running = True - self._thread = threading.Thread(target=self._run, daemon=True) - self._thread.start() - - def stop(self): - if self._running: - self._running = False - self._slack_client.close() - self._thread.join() - - def register_subscriber(self, adapter): - if adapter not in self._subscribers: - self._subscribers.append(adapter) - - def register_publisher(self, adapter): - if adapter not in self._publishers: - self._publishers.append(adapter) - - def _get_user_from_id(self, user_id): - # try to pull from cache - name = self._user_id_to_user_name.get(user_id, None) - email = self._user_id_to_user_email.get(user_id, None) - - # if none, refresh data via web client - if name is None or email is None: - ret = self._slack_client.web_client.users_info(user=user_id) - if ret.status_code == 200: - # TODO OAuth scopes required - name = ret.data["user"]["profile"].get("real_name_normalized", ret.data["user"]["name"]) - email = ret.data["user"]["profile"]["email"] - self._user_id_to_user_name[user_id] = name - self._user_name_to_user_id[name] = user_id # TODO is this 1-1 in slack? - self._user_id_to_user_email[user_id] = email - self._user_email_to_user_id[email] = user_id - return name, email - - def _get_user_from_name(self, user_name): - # try to pull from cache - user_id = self._user_name_to_user_id.get(user_name, None) - - # if none, refresh data via web client - if user_id is None: - # unfortunately the reverse lookup is not super nice... - # we need to pull all users and build the reverse mapping - ret = self._slack_client.web_client.users_list() - if ret.status_code == 200: - # TODO OAuth scopes required - for user in ret.data["members"]: - name = user["profile"].get("real_name_normalized", user["name"]) - user_id = user["profile"]["id"] - email = user["profile"]["email"] - self._user_id_to_user_name[user_id] = name - self._user_name_to_user_id[name] = user_id # TODO is this 1-1 in slack? - self._user_id_to_user_email[user_id] = email - self._user_email_to_user_id[email] = user_id - return self._user_name_to_user_id.get(user_name, None) - return user_id - - def _channel_data_to_channel_kind(self, data) -> str: - if data.get("is_im", False): - return "message" - if data.get("is_private", False): - return "private" - return "public" - - def _get_channel_from_id(self, channel_id): - # try to pull from cache - name = self._room_id_to_room_name.get(channel_id, None) - kind = self._room_id_to_room_type.get(channel_id, None) - - # if none, refresh data via web client - if name is None: - ret = self._slack_client.web_client.conversations_info(channel=channel_id) - if ret.status_code == 200: - # TODO OAuth scopes required - kind = self._channel_data_to_channel_kind(ret.data["channel"]) - if kind == "message": - # TODO use same behavior as symphony adapter - name = "DM" - else: - name = ret.data["channel"]["name"] - - self._room_id_to_room_name[channel_id] = name - self._room_name_to_room_id[name] = channel_id - self._room_id_to_room_type[channel_id] = kind - return name, kind - - def _get_channel_from_name(self, channel_name): - # try to pull from cache - channel_id = self._room_name_to_room_id.get(channel_name, None) - - # if none, refresh data via web client - if channel_id is None: - # unfortunately the reverse lookup is not super nice... - # we need to pull all channels and build the reverse mapping - ret = self._slack_client.web_client.conversations_list() - if ret.status_code == 200: - # TODO OAuth scopes required - for channel in ret.data["channels"]: - name = channel["name"] - channel_id = channel["id"] - kind = self._channel_data_to_channel_kind(channel) - self._room_id_to_room_name[channel_id] = name - self._room_name_to_room_id[name] = channel_id - self._room_id_to_room_type[channel_id] = kind - return self._room_name_to_room_id.get(channel_name, None) - return channel_id - - def _get_tags_from_message(self, blocks, authorizations=None) -> List[str]: - """extract tags from message, potentially excluding the bot's own @""" - authorizations = authorizations or [] - if len(authorizations) > 0: - bot_id = authorizations[0]["user_id"] # TODO more than one? - else: - bot_id = "" - - tags = [] - to_search = blocks.copy() - - while to_search: - element = to_search.pop() - # add subsections - if element.get("elements", []): - to_search.extend(element.get("elements")) - - if element.get("type", "") == "user": - tag_id = element.get("user_id") - if tag_id != bot_id: - # TODO tag with id or with name? - name, _ = self._get_user_from_id(tag_id) - if name: - tags.append(name) - return tags - - def _process_slack_message(self, client: SocketModeClient, req: SocketModeRequest): - log.info(req.payload) - if req.type == "events_api": - # Acknowledge the request anyway - response = SocketModeResponse(envelope_id=req.envelope_id) - client.send_socket_mode_response(response) - - if ( - req.payload["event"]["type"] in ("message", "app_mention") - and req.payload["event"].get("subtype") is None - ): - user, user_email = self._get_user_from_id(req.payload["event"]["user"]) - channel, channel_type = self._get_channel_from_id(req.payload["event"]["channel"]) - tags = self._get_tags_from_message(req.payload["event"]["blocks"], req.payload["authorizations"]) - slack_msg = SlackMessage( - user=user or "", - user_email=user_email or "", - user_id=req.payload["event"]["user"], - tags=tags, - channel=channel or "", - channel_id=req.payload["event"]["channel"], - channel_type=channel_type or "", - msg=req.payload["event"]["text"], - reaction="", - thread=req.payload["event"]["ts"], - payload=req.payload.copy(), - ) - self._inqueue.put(slack_msg) - - def _run(self): - self._slack_client.connect() - - while self._running: - # drain outbound - while not self._outqueue.empty(): - # pull SlackMessage from queue - slack_msg = self._outqueue.get() - - # refactor into slack command - # grab channel or DM - if hasattr(slack_msg, "channel_id") and slack_msg.channel_id: - channel_id = slack_msg.channel_id - elif hasattr(slack_msg, "channel") and slack_msg.channel: - # TODO DM - channel_id = self._get_channel_from_name(slack_msg.channel) - - # pull text or reaction - if ( - hasattr(slack_msg, "reaction") - and slack_msg.reaction - and hasattr(slack_msg, "thread") - and slack_msg.thread - ): - # TODO - self._slack_client.web_client.reactions_add( - channel=channel_id, - name=slack_msg.reaction, - timestamp=slack_msg.thread, - ) - elif hasattr(slack_msg, "msg") and slack_msg.msg: - try: - # send text to channel - self._slack_client.web_client.chat_postMessage( - channel=channel_id, - text=getattr(slack_msg, "msg", ""), - ) - except SlackApiError: - # TODO - ... - else: - # cannot send empty message, log an error - log.error(f"Received malformed SlackMessage instance: {slack_msg}") - - if not self._inqueue.empty(): - # pull all SlackMessages from queue - # do as burst to match SymphonyAdapter - slack_msgs = [] - while not self._inqueue.empty(): - slack_msgs.append(self._inqueue.get()) - - # push to all the subscribers - for adapter in self._subscribers: - adapter.push_tick(slack_msgs) - - # do short sleep - sleep(0.1) - - # liveness check - if not self._thread.is_alive(): - self._running = False - self._thread.join() - - # shut down socket client - try: - # TODO which one? - self._slack_client.close() - # self._slack_client.disconnect() - except AttributeError: - # TODO bug in slack sdk causes an exception to be thrown - # File "slack_sdk/socket_mode/builtin/connection.py", line 191, in disconnect - # self.sock.close() - # ^^^^^^^^^^^^^^^ - # AttributeError: 'NoneType' object has no attribute 'close' - ... - - def _on_tick(self, value): - self._outqueue.put(value) - - -class SlackInputAdapterImpl(PushInputAdapter): - def __init__(self, manager): - manager.register_subscriber(self) - super().__init__() - - -class SlackOutputAdapterImpl(OutputAdapter): - def __init__(self, manager): - manager.register_publisher(self) - self._manager = manager - super().__init__() - - def on_tick(self, time, value): - self._manager._on_tick(value) - - -_slack_input_adapter = py_push_adapter_def( - name="SlackInputAdapter", - adapterimpl=SlackInputAdapterImpl, - out_type=ts[List[SlackMessage]], - manager_type=SlackAdapterManager, -) -_slack_output_adapter = py_output_adapter_def( - name="SlackOutputAdapter", - adapterimpl=SlackOutputAdapterImpl, - manager_type=SlackAdapterManager, - input=ts[SlackMessage], -) + raise ModuleNotFoundError("Install `csp-adapter-slack` to use csp's Slack adapter") diff --git a/csp/build/csp_autogen.py b/csp/build/csp_autogen.py index 4e121da5b..71b570dd3 100644 --- a/csp/build/csp_autogen.py +++ b/csp/build/csp_autogen.py @@ -152,7 +152,7 @@ def _generate_enum_class(self, enum_type): cspenum_decls = "\n".join(f" static {enum_name} {x.name};" for x in enum_type) out = f""" -class {enum_name} : public csp::CspEnum +class CSP_PUBLIC {enum_name} : public csp::CspEnum {{ public: // Raw value quick access @@ -315,7 +315,7 @@ def _generate_struct_class(self, struct_type): ) out = f""" -class {struct_name} : public {base_class} +class CSP_PUBLIC {struct_name} : public {base_class} {{ public: diff --git a/csp/tests/adapters/test_slack.py b/csp/tests/adapters/test_slack.py deleted file mode 100644 index a05feb892..000000000 --- a/csp/tests/adapters/test_slack.py +++ /dev/null @@ -1,231 +0,0 @@ -import pytest -from datetime import timedelta -from ssl import create_default_context -from unittest.mock import MagicMock, call, patch - -import csp -from csp import ts -from csp.adapters.slack import SlackAdapterManager, SlackMessage, mention_user - - -@csp.node -def hello(msg: ts[SlackMessage]) -> ts[SlackMessage]: - if csp.ticked(msg): - text = f"Hello <@{msg.user_id}>!" - return SlackMessage( - channel="a new channel", - # reply in thread - thread=msg.thread, - msg=text, - ) - - -@csp.node -def react(msg: ts[SlackMessage]) -> ts[SlackMessage]: - if csp.ticked(msg): - return SlackMessage( - channel=msg.channel, - channel_id=msg.channel_id, - thread=msg.thread, - reaction="eyes", - ) - - -@csp.node -def send_fake_message(clientmock: MagicMock, requestmock: MagicMock, am: SlackAdapterManager) -> ts[bool]: - with csp.alarms(): - a_send = csp.alarm(bool) - with csp.start(): - csp.schedule_alarm(a_send, timedelta(seconds=1), True) - if csp.ticked(a_send): - if a_send: - am._process_slack_message(clientmock, requestmock) - csp.schedule_alarm(a_send, timedelta(seconds=1), False) - else: - return True - - -PUBLIC_CHANNEL_MENTION_PAYLOAD = { - "token": "ABCD", - "team_id": "EFGH", - "api_app_id": "HIJK", - "event": { - "client_msg_id": "1234-5678", - "type": "app_mention", - "text": "<@BOTID> <@USERID> <@USERID2>", - "user": "USERID", - "ts": "1.2", - "blocks": [ - { - "type": "rich_text", - "block_id": "tx381", - "elements": [ - { - "type": "rich_text_section", - "elements": [ - {"type": "user", "user_id": "BOTID"}, - {"type": "text", "text": " "}, - {"type": "user", "user_id": "USERID"}, - {"type": "text", "text": " "}, - {"type": "user", "user_id": "USERID2"}, - ], - } - ], - } - ], - "team": "ABCD", - "channel": "EFGH", - "event_ts": "1.2", - }, - "type": "event_callback", - "event_id": "ABCD", - "event_time": 1707423091, - "authorizations": [ - {"enterprise_id": None, "team_id": "ABCD", "user_id": "BOTID", "is_bot": True, "is_enterprise_install": False} - ], - "is_ext_shared_channel": False, - "event_context": "SOMELONGCONTEXT", -} -DIRECT_MESSAGE_PAYLOAD = { - "token": "ABCD", - "team_id": "EFGH", - "context_team_id": "ABCD", - "context_enterprise_id": None, - "api_app_id": "HIJK", - "event": { - "client_msg_id": "1234-5678", - "type": "message", - "text": "test", - "user": "USERID", - "ts": "2.1", - "blocks": [ - { - "type": "rich_text", - "block_id": "gB9fq", - "elements": [{"type": "rich_text_section", "elements": [{"type": "text", "text": "test"}]}], - } - ], - "team": "ABCD", - "channel": "EFGH", - "event_ts": "2.1", - "channel_type": "im", - }, - "type": "event_callback", - "event_id": "ABCD", - "event_time": 1707423220, - "authorizations": [ - {"enterprise_id": None, "team_id": "ABCD", "user_id": "BOTID", "is_bot": True, "is_enterprise_install": False} - ], - "is_ext_shared_channel": False, - "event_context": "SOMELONGCONTEXT", -} - - -class TestSlack: - def test_slack_tokens(self): - with pytest.raises(RuntimeError): - SlackAdapterManager("abc", "def") - - @pytest.mark.parametrize("payload", (PUBLIC_CHANNEL_MENTION_PAYLOAD, DIRECT_MESSAGE_PAYLOAD)) - def test_slack(self, payload): - with patch("csp.adapters.slack.SocketModeClient") as clientmock: - # mock out the event from the slack sdk - reqmock = MagicMock() - reqmock.type = "events_api" - reqmock.payload = payload - - # mock out the user/room lookup responses - mock_user_response = MagicMock(name="users_info_mock") - mock_user_response.status_code = 200 - mock_user_response.data = { - "user": {"profile": {"real_name_normalized": "johndoe", "email": "johndoe@some.email"}, "name": "blerg"} - } - clientmock.return_value.web_client.users_info.return_value = mock_user_response - mock_room_response = MagicMock(name="conversations_info_mock") - mock_room_response.status_code = 200 - mock_room_response.data = {"channel": {"is_im": False, "is_private": True, "name": "a private channel"}} - clientmock.return_value.web_client.conversations_info.return_value = mock_room_response - mock_list_response = MagicMock(name="conversations_list_mock") - mock_list_response.status_code = 200 - mock_list_response.data = { - "channels": [ - {"name": "a private channel", "id": "EFGH"}, - {"name": "a new channel", "id": "new_channel"}, - ] - } - clientmock.return_value.web_client.conversations_list.return_value = mock_list_response - - def graph(): - am = SlackAdapterManager("xapp-1-dummy", "xoxb-dummy", ssl=create_default_context()) - - # send a fake slack message to the app - stop = send_fake_message(clientmock, reqmock, am) - - # send a response - resp = hello(csp.unroll(am.subscribe())) - am.publish(resp) - - # do a react - rct = react(csp.unroll(am.subscribe())) - am.publish(rct) - - csp.add_graph_output("response", resp) - csp.add_graph_output("react", rct) - - # stop after first messages - done_flag = (csp.count(stop) + csp.count(resp) + csp.count(rct)) == 3 - csp.stop_engine(stop) - - # run the graph - resp = csp.run(graph, realtime=True) - - # check outputs - if payload == PUBLIC_CHANNEL_MENTION_PAYLOAD: - assert resp["react"] - assert resp["response"] - - assert resp["react"][0][1] == SlackMessage( - channel="a private channel", channel_id="EFGH", reaction="eyes", thread="1.2" - ) - assert resp["response"][0][1] == SlackMessage( - channel="a new channel", msg="Hello <@USERID>!", thread="1.2" - ) - else: - assert resp["react"] - assert resp["response"] - - assert resp["react"][0][1] == SlackMessage( - channel="a private channel", channel_id="EFGH", reaction="eyes", thread="2.1" - ) - assert resp["response"][0][1] == SlackMessage( - channel="a new channel", msg="Hello <@USERID>!", thread="2.1" - ) - - # check all inbound mocks got called - if payload == PUBLIC_CHANNEL_MENTION_PAYLOAD: - assert clientmock.return_value.web_client.users_info.call_count == 2 - else: - assert clientmock.return_value.web_client.users_info.call_count == 1 - assert clientmock.return_value.web_client.conversations_info.call_count == 1 - - # check all outbound mocks got called - assert clientmock.return_value.web_client.reactions_add.call_count == 1 - assert clientmock.return_value.web_client.chat_postMessage.call_count == 1 - - if payload == PUBLIC_CHANNEL_MENTION_PAYLOAD: - assert clientmock.return_value.web_client.reactions_add.call_args_list == [ - call(channel="EFGH", name="eyes", timestamp="1.2") - ] - assert clientmock.return_value.web_client.chat_postMessage.call_args_list == [ - call(channel="new_channel", text="Hello <@USERID>!") - ] - else: - assert clientmock.return_value.web_client.reactions_add.call_args_list == [ - call(channel="EFGH", name="eyes", timestamp="2.1") - ] - assert clientmock.return_value.web_client.chat_postMessage.call_args_list == [ - call(channel="new_channel", text="Hello <@USERID>!") - ] - - def test_mention_user(self): - assert mention_user("ABCD") == "<@ABCD>" diff --git a/docs/wiki/api-references/Base-Nodes-API.md b/docs/wiki/api-references/Base-Nodes-API.md index 2df3ec982..6fac0b2ee 100644 --- a/docs/wiki/api-references/Base-Nodes-API.md +++ b/docs/wiki/api-references/Base-Nodes-API.md @@ -174,6 +174,7 @@ csp.unroll(x: ts[['T']]) → ts['T'] Given a timeseries of a *list* of values, unroll will "unroll" the values in the list into a timeseries of the elements. `unroll` will ensure to preserve the order across all list ticks. Ticks will be unrolled in subsequent engine cycles. +For a detailed explanation of this behavior, see the documentation on [duplicate timestamps](Execution-Modes#handling-duplicate-timestamps). ## `csp.collect` diff --git a/docs/wiki/api-references/Input-Output-Adapters-API.md b/docs/wiki/api-references/Input-Output-Adapters-API.md index 7ed5976e7..edecae372 100644 --- a/docs/wiki/api-references/Input-Output-Adapters-API.md +++ b/docs/wiki/api-references/Input-Output-Adapters-API.md @@ -15,7 +15,6 @@ - [Publishing](#publishing) - [DBReader](#dbreader) - [TimeAccessor](#timeaccessor) -- [Slack](#slack) ## Kafka @@ -349,7 +348,3 @@ Both of these calls expect `typ` to be a `csp.Struct` type. `subscribe` is used to subscribe to a stream for the given symbol (symbol_column is required when creating DBReader) `subscribe_all` is used to retrieve all the data resulting from the request as a single timeseries. - -## Slack - -The Slack adapter allows for reading and writing of messages from the [Slack](https://slack.com) message platform using the [Slack Python SDK](https://slack.dev/python-slack-sdk/). diff --git a/docs/wiki/concepts/Common-Mistakes.md b/docs/wiki/concepts/Common-Mistakes.md index 6012e04fa..4958fbd85 100644 --- a/docs/wiki/concepts/Common-Mistakes.md +++ b/docs/wiki/concepts/Common-Mistakes.md @@ -74,7 +74,7 @@ from typing import List def next_movie_showing(show_times: ts[List[datetime]]) -> ts[datetime]: next_showing = None for time in show_times: - if time >= csp.now(): # list may include some shows today that have already past, so let's filter those out + if time >= datetime.now(): # list may include some shows today that have already past, so let's filter those out if next_showing is None or time < next_showing: next_showing = time diff --git a/docs/wiki/concepts/Execution-Modes.md b/docs/wiki/concepts/Execution-Modes.md index 5d288f8c0..0e06d223f 100644 --- a/docs/wiki/concepts/Execution-Modes.md +++ b/docs/wiki/concepts/Execution-Modes.md @@ -6,7 +6,7 @@ All inputs in simulation are driven off the provided timestamped data of its inp In realtime mode, the engine runs in wallclock time as of "now". Realtime engines can get data from realtime adapters which source data on separate threads and pass them through to the engine (ie think of activeMQ events happening on an activeMQ thread and being passed along to the engine in "realtime"). -Since engines can run in both simulated and realtime mode, users should **always** use **`csp.now()`** to get the current time in `csp.node`s. +Since engines can run in both simulated and realtime mode, users should **always** use **`csp.now()`** to get the current time in a `csp.node`. ## Table of Contents @@ -14,6 +14,7 @@ Since engines can run in both simulated and realtime mode, users should **always - [Simulation Mode](#simulation-mode) - [Realtime Mode](#realtime-mode) - [csp.PushMode](#csppushmode) +- [Handling Duplicate Timestamps](#handling-duplicate-timestamps) - [Realtime Group Event Synchronization](#realtime-group-event-synchronization) ## Simulation Mode @@ -50,6 +51,72 @@ When consuming data from input adapters there are three choices on how one can c | **BURST** | Simulation | all ticks from input source with duplicate timestamps (on the same timeseries) will tick once with a list of all values | | | Realtime | all ticks that occurred since previous engine cycle will tick once with a list of all the values | +## Handling duplicate timestamps + +In `csp`, there can be multiple engine cycles that occur at the same engine time. This is often the case when using nodes with internal alarms (e.g. [`csp.unroll`](Base-Nodes-API#cspunroll)) or using feedback edges ([`csp.feedback`](Feedback-and-Delayed-Edge#cspfeedback)). +If multiple events are scheduled at the same timestamp on a single time-series edge, they will be executed on separate cycles *in the order* they were scheduled. For example, consider the code snippet below: + +```python +import csp +from csp import ts +from datetime import datetime, timedelta + +@csp.node +def ticks_n_times(x: ts[int], n: int) -> ts[int]: + # Ticks out a value n times, incrementing it each time + with csp.alarms(): + alarm = csp.alarm(int) + + if csp.ticked(x): + for i in range(n): + csp.schedule_alarm(alarm, timedelta(), x+i) + + if csp.ticked(alarm): + return alarm + +@csp.graph +def duplicate_timestamps(): + v = csp.const(1) + csp.print('ticks_once', ticks_n_times(v, 1)) + csp.print('ticks_twice', ticks_n_times(v, 2)) + csp.print('ticks_thrice', ticks_n_times(v, 3)) + +csp.run(duplicate_timestamps, starttime=datetime(2020,1,1)) +``` + +When we run this graph, the output is: + +```raw +2020-01-01 00:00:00 ticks_once:1 +2020-01-01 00:00:00 ticks_twice:1 +2020-01-01 00:00:00 ticks_thrice:1 +2020-01-01 00:00:00 ticks_twice:2 +2020-01-01 00:00:00 ticks_thrice:2 +2020-01-01 00:00:00 ticks_thrice:3 +``` + +A real life example is when using `csp.unroll` to tick out a list of values on separate engine cycles. If we were to use `csp.sample` on the output, we would get the *first* value that is unrolled at each timestamp. Why? +The event that is scheduled on the sampling timer is its first (and only) event at that time; thus, it is executed on the first engine cycle, and samples the first unrolled value. + +```python +def sampling_unroll(): + u = csp.unroll(csp.const.using(T=[int])([1, 2, 3])) + s = csp.sample(csp.const(True), u) + csp.print('unrolled', u) + csp.print('sampled', s) + +csp.run(sampling_unroll, starttime=datetime(2020,1,1)) +``` + +Output: + +```raw +2020-01-01 00:00:00 unrolled:1 +2020-01-01 00:00:00 sampled:1 +2020-01-01 00:00:00 unrolled:2 +2020-01-01 00:00:00 unrolled:3 +``` + ## Realtime Group Event Synchronization The CSP framework supports properly synchronizing events across multiple timeseries that are sourced from the same realtime adapter. diff --git a/docs/wiki/dev-guides/Roadmap.md b/docs/wiki/dev-guides/Roadmap.md index 25bb34c4d..694336fb7 100644 --- a/docs/wiki/dev-guides/Roadmap.md +++ b/docs/wiki/dev-guides/Roadmap.md @@ -2,7 +2,6 @@ We do not have a formal roadmap, but we're happy to discuss features, improvemen Here are some high level items we hope to accomplish in the next few months: -- Support `msvc` compiler and full Windows support ([#109](https://github.com/Point72/csp/issues/109)) - Establish a better pattern for adapters ([#165](https://github.com/Point72/csp/discussions/165)) - Parallelization to improve runtime, for historical/offline distributions - Support for cross-process communication in realtime distributions @@ -10,6 +9,7 @@ Here are some high level items we hope to accomplish in the next few months: ## Adapters and Extensions - C++-based HTTP/SSE adapter +- C++-based Redis adapter - Add support for other graph viewers, including interactive / standalone / Jupyter ## Other Open Source Projects diff --git a/pyproject.toml b/pyproject.toml index 511405e8e..fb02c2fb5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,7 +79,6 @@ develop = [ "httpx>=0.20,<1", # kafka "polars", # parquet "psutil", # test_engine/test_history - "slack-sdk>=3", # slack "sqlalchemy", # db "threadpoolctl", # test_random "tornado", # profiler, perspective, websocket @@ -108,7 +107,7 @@ symphony = [ "csp-adapter-symphony", ] slack = [ - "slack-sdk>=3", + "csp-adapter-slack", ] [tool.check-manifest]