From d1383f876a83011d791304de5da44b8694def31d Mon Sep 17 00:00:00 2001 From: Zeeland Date: Wed, 14 Jun 2023 15:58:12 +0800 Subject: [PATCH] pref: add GitHub actions release --- .github/workflows/python-publish.yml | 39 +++++ .gitignore | 2 + broadcast_service/_core.py | 229 +++++++++++++++++++++++---- 3 files changed, 235 insertions(+), 35 deletions(-) create mode 100644 .github/workflows/python-publish.yml diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml new file mode 100644 index 0000000..a9ba2bc --- /dev/null +++ b/.github/workflows/python-publish.yml @@ -0,0 +1,39 @@ +# This workflow will upload a Python Package using Twine when a release is created +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python#publishing-to-package-registries + +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +name: Upload Python Package + +on: + release: + types: [published] + +permissions: + contents: read + +jobs: + deploy: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Set up Python + uses: actions/setup-python@v3 + with: + python-version: '3.7' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install build + - name: Build package + run: python -m build + - name: Publish package + uses: pypa/gh-action-pypi-publish@27b31702a0e7fc50959f5ad993c78deac1bdfc29 + with: + user: __token__ + password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/.gitignore b/.gitignore index 63f8683..f2208d4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +log +gpt_output.md command .idea broadcast_service.egg-info/ diff --git a/broadcast_service/_core.py b/broadcast_service/_core.py index fcda19b..fc28a49 100644 --- a/broadcast_service/_core.py +++ b/broadcast_service/_core.py @@ -1,36 +1,57 @@ -# Copyright 2022 Zeeland(https://github.com/Undertone0809/). All Rights Reserved. +# Copyright (c) 2023 Zeeland # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# +# Copyright Owner: Zeeland +# GitHub Link: https://github.com/Undertone0809/ +# Project Link: https://github.com/Undertone0809/broadcast-service +# Contact Email: zeeland@foxmail.com import logging -from typing import Optional, List, Callable +from pydantic import BaseModel, validator from concurrent.futures import ThreadPoolExecutor +from typing import Optional, List, Callable, Any, Union + from broadcast_service.singleton import Singleton +from broadcast_service.logger import enable_log, get_logger __all__ = ['broadcast_service', 'BroadcastService', 'enable_log'] - - -def enable_log(): - logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') - - -class BroadcastService(metaclass=Singleton): +logger = get_logger() + + +def _invoke_callback( + callback: Callable, + thread_pool: ThreadPoolExecutor, + enable_async: bool = True, + *args, + **kwargs +) -> Any: + if enable_async: + future_result = thread_pool.submit(callback, *args, **kwargs) + if future_result.result() is not None: + logger.debug(f"[broadcast-service invoke_callback result] {future_result.result()}") + return future_result.result() + else: + return callback(*args, **kwargs) + + +class BaseBroadcastService(metaclass=Singleton): """ This class implements broadcast mode, you can import the instance by single class. By BroadcastService, you can send topic message,it will automatically execute the callback function if some classes subscribe the topic. - example: + Example: --------------------------------------------------------------------------------- from broadcast_service import broadcast_service @@ -97,7 +118,7 @@ def listen_all(self, callback: Callable): def broadcast(self, topics: str or List[str], *args, **kwargs): """ - Launch broadcast on the specify topic + Launch broadcast on the specify topic. If all subscribe callback finish, it will call finish_callback. """ self.logger.debug(f"[broadcast-service] broadcast topic <{topics}>") if type(topics) == str: @@ -112,9 +133,9 @@ def broadcast(self, topics: str or List[str], *args, **kwargs): def broadcast_all(self, *args, **kwargs): """ All topics listened on will be called back. - Attention: Not all callback function will called. If your publish - and your subscribe takes different arguments, your callback function - will not be executed. + Attention: Not all callback function will be called. If your publisher callback + and your subscriber callback takes different arguments, your callback function + will not be executed. """ for topic in self.pubsub_channels.keys(): self._invoke_broadcast_topic(topic, *args, **kwargs) @@ -127,26 +148,19 @@ def _invoke_listen_topic(self, topic_name: str, callback: Callable): self.pubsub_channels[topic_name].append(callback) def _invoke_broadcast_topic(self, topic_name: str, *args, **kwargs): - """ - broadcast single topic. - TODO fix problem: There is no guarantee that every callback function will be executed unnecessarily in some cases. - """ if topic_name not in self.pubsub_channels.keys(): self.pubsub_channels[topic_name] = [] for item in self.pubsub_channels[topic_name]: - if self.enable_async: - self.thread_pool.submit( - item, *args, **kwargs) - else: - item(*args, **kwargs) + self._final_invoke_listen_callback(item, *args, **kwargs) for item in self.pubsub_channels['__all__']: - if self.enable_async: - self.thread_pool.submit( - item, *args, **kwargs) - else: - item(*args, **kwargs) + if item not in self.pubsub_channels[topic_name]: + self._final_invoke_listen_callback(item, *args, **kwargs) + + def _final_invoke_listen_callback(self, callback: Callable, *args, **kwargs) -> Any: + self.logger.debug(f"[broadcast-service] {callback.__name__} is called") + return _invoke_callback(callback, self.thread_pool, self.enable_async, *args, **kwargs) def stop_listen(self, topic_name: str, callback: Callable): if topic_name not in self.pubsub_channels.keys(): @@ -157,11 +171,15 @@ def stop_listen(self, topic_name: str, callback: Callable): self.pubsub_channels[topic_name].remove(callback) def on_listen(self, topics: str or Optional[List[str]] = None) -> Callable: - """ - Decorator to listen specify topic. If topics is none, then listen all topics has exits. - :param topics: topic list, you can input topic like: ["topic1", "topic2"]. + """Decorator to listen specify topic. If topics is none, then listen all topics has exits. + + Args: + topics: topic list, you can input topic like: ["topic1", "topic2"]. + + Returns: + return callback functions - Usage:: + Examples: @broadcast_service.on_listen('topic1') def handle_all_msg(): # your code @@ -178,8 +196,9 @@ def handle_all_msg(): def handle_all_msg(*args, **kwargs): # your code - Attention: Your params should keep '*args, **kwargs'. If you publish a topic take arguments, - the callback function you handle should take arguments, otherwise it will not be called back. + Attention: + Your params should keep '*args, **kwargs'. If you publish a topic take arguments, + the callback function you handle should take arguments, otherwise it will not be called back. """ def decorator(fn: Callable) -> Callable: @@ -198,4 +217,144 @@ def inner(*args, **kwargs) -> Callable: return decorator +PUBLISHER_CALLBACK_STATUS = { + "INIT": 'initialization', + "RUNNING": "running", + "END": "end" +} + + +class PublisherDispatchConfig(BaseModel): + status: str = PUBLISHER_CALLBACK_STATUS["INIT"] + """life cycle of publisher callback""" + counter: int = 1 + """Record num of executions for publisher has call publish""" + num_of_executions: int = 1 + """It indicating the number of times the same topic is published at once""" + interval: float = 0 + """interval""" + subscriber_callback_results: Union[dict, List] = [] + """Used to store the return values of all callback functions for subscribers.""" + callback: Optional[Callable] = None + """Your publisher will obtain the callback and subscriber parameters after the callback function + of all subscribers callback is completed.""" + enable_final_return: bool = False + """This parameter indicates whether you want to call the publisher callback after calling the topic + n times, or call the publisher callback after each topic publishing.""" + + @property + def start_publisher_callback_or_not(self) -> bool: + if self.status == PUBLISHER_CALLBACK_STATUS["END"]: + return False + if not self.enable_final_return: + return True + if self.enable_final_return and self.counter == self.num_of_executions: + return True + return False + + @validator("num_of_executions") + def must_be_positive_integer(cls, v): + if v <= 0 or type(v) != int: + raise ValueError('num_of_execution must be a positive integer') + return v + + def get_num_of_executions(self) -> int: + if self.status == PUBLISHER_CALLBACK_STATUS["END"]: + return 1 + return self.num_of_executions + + def finish_callback(self): + logger.debug(f"[broadcast-service] publisher finish callback task") + self.status = PUBLISHER_CALLBACK_STATUS["END"] + + def append_sub_callback_results(self, value: Any): + self.subscriber_callback_results.append(value) + + +class PublisherDispatchConfigManager(metaclass=Singleton): + def __init__(self): + self.publisher_callbacks: List[PublisherDispatchConfig] = [] + + def get_latest_publisher_callback(self) -> PublisherDispatchConfig: + return self.publisher_callbacks[-1] + + def create_publisher_callback(self, **kwargs): + self.publisher_callbacks.append(PublisherDispatchConfig(**kwargs)) + + +class BroadcastService(BaseBroadcastService): + def __init__(self): + super().__init__() + self.publish_dispatch_config_manager = PublisherDispatchConfigManager() + self.cur_publisher_dispatch_config: PublisherDispatchConfig = PublisherDispatchConfig() + + self.enable_config = False + """Enable_config is True when you use `broadcast_service.config(**config).publish(topic_name,**params)` + to publish topic. It indicates whether you need to enable complex configurations to schedule + publishing topics.""" + + def config( + self, + num_of_executions: int = 1, + callback: Optional[Callable] = None, + enable_final_return: bool = False, + interval: float = 0, + ) -> 'BroadcastService': + """Provide more complex topic publish mode + + Args: + num_of_executions: default is 1, indicating the number of times the same topic is published at once + callback: default is None. You can get callback and the parameters of subscriber + after all subscribers' callback functions have been completed. + enable_final_return: default is False, it means you can get callback after you publish + n times topic. In this case, finish_callback params is store in *args rather than **kwargs. + interval: publish interval. Unit seconds. + Returns: + Returns current object, which is used to call broadcast with configuration. + """ + self.enable_config = True + self.publish_dispatch_config_manager.create_publisher_callback( + num_of_executions=num_of_executions, + callback=callback, + enable_final_return=enable_final_return, + interval=interval, + status=PUBLISHER_CALLBACK_STATUS['RUNNING'] + ) + return self + + def broadcast(self, topics: str or List[str], *args, **kwargs): + if self.enable_config: + self.cur_publisher_dispatch_config = self.publish_dispatch_config_manager.get_latest_publisher_callback() + + for i in range(self.cur_publisher_dispatch_config.get_num_of_executions()): + super().broadcast(topics, *args, **kwargs) + self.cur_publisher_dispatch_config.counter += 1 + # time.sleep(0.01) + + self.enable_config = False + + def _invoke_finish_callback(self): + if self.cur_publisher_dispatch_config.callback: + self._final_invoke_listen_callback( + self.cur_publisher_dispatch_config.callback, + *self.cur_publisher_dispatch_config.subscriber_callback_results + ) + if self.cur_publisher_dispatch_config.counter == self.cur_publisher_dispatch_config.num_of_executions: + self.cur_publisher_dispatch_config.finish_callback() + + def _invoke_broadcast_topic(self, topic_name: str, *args, **kwargs): + super()._invoke_broadcast_topic(topic_name, *args, **kwargs) + + logger.debug( + f"[broadcast-service] start_publisher_callback_or_not: {self.cur_publisher_dispatch_config.start_publisher_callback_or_not}") + if self.enable_config and self.cur_publisher_dispatch_config.start_publisher_callback_or_not: + self._invoke_finish_callback() + + def _final_invoke_listen_callback(self, callback: Callable, *args, **kwargs): + result = super()._final_invoke_listen_callback(callback, *args, **kwargs) + + if result: + self.cur_publisher_dispatch_config.append_sub_callback_results(result) + + broadcast_service = BroadcastService()