Skip to content

Commit

Permalink
pref: add GitHub actions release
Browse files Browse the repository at this point in the history
  • Loading branch information
Undertone0809 committed Jun 14, 2023
1 parent 6eef0e6 commit d1383f8
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 35 deletions.
39 changes: 39 additions & 0 deletions .github/workflows/python-publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# This workflow will upload a Python Package using Twine when a release is created
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python#publishing-to-package-registries

# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

name: Upload Python Package

on:
release:
types: [published]

permissions:
contents: read

jobs:
deploy:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: '3.7'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install build
- name: Build package
run: python -m build
- name: Publish package
uses: pypa/gh-action-pypi-publish@27b31702a0e7fc50959f5ad993c78deac1bdfc29
with:
user: __token__
password: ${{ secrets.PYPI_API_TOKEN }}
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
log
gpt_output.md
command
.idea
broadcast_service.egg-info/
Expand Down
229 changes: 194 additions & 35 deletions broadcast_service/_core.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,57 @@
# Copyright 2022 Zeeland(https://github.com/Undertone0809/). All Rights Reserved.
# Copyright (c) 2023 Zeeland
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copyright Owner: Zeeland
# GitHub Link: https://github.com/Undertone0809/
# Project Link: https://github.com/Undertone0809/broadcast-service
# Contact Email: [email protected]

import logging
from typing import Optional, List, Callable
from pydantic import BaseModel, validator
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, List, Callable, Any, Union

from broadcast_service.singleton import Singleton
from broadcast_service.logger import enable_log, get_logger

__all__ = ['broadcast_service', 'BroadcastService', 'enable_log']


def enable_log():
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')


class BroadcastService(metaclass=Singleton):
logger = get_logger()


def _invoke_callback(
callback: Callable,
thread_pool: ThreadPoolExecutor,
enable_async: bool = True,
*args,
**kwargs
) -> Any:
if enable_async:
future_result = thread_pool.submit(callback, *args, **kwargs)
if future_result.result() is not None:
logger.debug(f"[broadcast-service invoke_callback result] {future_result.result()}")
return future_result.result()
else:
return callback(*args, **kwargs)


class BaseBroadcastService(metaclass=Singleton):
"""
This class implements broadcast mode, you can import the instance by single class.
By BroadcastService, you can send topic message,it will automatically execute the
callback function if some classes subscribe the topic.
example:
Example:
---------------------------------------------------------------------------------
from broadcast_service import broadcast_service
Expand Down Expand Up @@ -97,7 +118,7 @@ def listen_all(self, callback: Callable):

def broadcast(self, topics: str or List[str], *args, **kwargs):
"""
Launch broadcast on the specify topic
Launch broadcast on the specify topic. If all subscribe callback finish, it will call finish_callback.
"""
self.logger.debug(f"[broadcast-service] broadcast topic <{topics}>")
if type(topics) == str:
Expand All @@ -112,9 +133,9 @@ def broadcast(self, topics: str or List[str], *args, **kwargs):
def broadcast_all(self, *args, **kwargs):
"""
All topics listened on will be called back.
Attention: Not all callback function will called. If your publish
and your subscribe takes different arguments, your callback function
will not be executed.
Attention: Not all callback function will be called. If your publisher callback
and your subscriber callback takes different arguments, your callback function
will not be executed.
"""
for topic in self.pubsub_channels.keys():
self._invoke_broadcast_topic(topic, *args, **kwargs)
Expand All @@ -127,26 +148,19 @@ def _invoke_listen_topic(self, topic_name: str, callback: Callable):
self.pubsub_channels[topic_name].append(callback)

def _invoke_broadcast_topic(self, topic_name: str, *args, **kwargs):
"""
broadcast single topic.
TODO fix problem: There is no guarantee that every callback function will be executed unnecessarily in some cases.
"""
if topic_name not in self.pubsub_channels.keys():
self.pubsub_channels[topic_name] = []

for item in self.pubsub_channels[topic_name]:
if self.enable_async:
self.thread_pool.submit(
item, *args, **kwargs)
else:
item(*args, **kwargs)
self._final_invoke_listen_callback(item, *args, **kwargs)

for item in self.pubsub_channels['__all__']:
if self.enable_async:
self.thread_pool.submit(
item, *args, **kwargs)
else:
item(*args, **kwargs)
if item not in self.pubsub_channels[topic_name]:
self._final_invoke_listen_callback(item, *args, **kwargs)

def _final_invoke_listen_callback(self, callback: Callable, *args, **kwargs) -> Any:
self.logger.debug(f"[broadcast-service] {callback.__name__} is called")
return _invoke_callback(callback, self.thread_pool, self.enable_async, *args, **kwargs)

def stop_listen(self, topic_name: str, callback: Callable):
if topic_name not in self.pubsub_channels.keys():
Expand All @@ -157,11 +171,15 @@ def stop_listen(self, topic_name: str, callback: Callable):
self.pubsub_channels[topic_name].remove(callback)

def on_listen(self, topics: str or Optional[List[str]] = None) -> Callable:
"""
Decorator to listen specify topic. If topics is none, then listen all topics has exits.
:param topics: topic list, you can input topic like: ["topic1", "topic2"].
"""Decorator to listen specify topic. If topics is none, then listen all topics has exits.
Args:
topics: topic list, you can input topic like: ["topic1", "topic2"].
Returns:
return callback functions
Usage::
Examples:
@broadcast_service.on_listen('topic1')
def handle_all_msg():
# your code
Expand All @@ -178,8 +196,9 @@ def handle_all_msg():
def handle_all_msg(*args, **kwargs):
# your code
Attention: Your params should keep '*args, **kwargs'. If you publish a topic take arguments,
the callback function you handle should take arguments, otherwise it will not be called back.
Attention:
Your params should keep '*args, **kwargs'. If you publish a topic take arguments,
the callback function you handle should take arguments, otherwise it will not be called back.
"""

def decorator(fn: Callable) -> Callable:
Expand All @@ -198,4 +217,144 @@ def inner(*args, **kwargs) -> Callable:
return decorator


PUBLISHER_CALLBACK_STATUS = {
"INIT": 'initialization',
"RUNNING": "running",
"END": "end"
}


class PublisherDispatchConfig(BaseModel):
status: str = PUBLISHER_CALLBACK_STATUS["INIT"]
"""life cycle of publisher callback"""
counter: int = 1
"""Record num of executions for publisher has call publish"""
num_of_executions: int = 1
"""It indicating the number of times the same topic is published at once"""
interval: float = 0
"""interval"""
subscriber_callback_results: Union[dict, List] = []
"""Used to store the return values of all callback functions for subscribers."""
callback: Optional[Callable] = None
"""Your publisher will obtain the callback and subscriber parameters after the callback function
of all subscribers callback is completed."""
enable_final_return: bool = False
"""This parameter indicates whether you want to call the publisher callback after calling the topic
n times, or call the publisher callback after each topic publishing."""

@property
def start_publisher_callback_or_not(self) -> bool:
if self.status == PUBLISHER_CALLBACK_STATUS["END"]:
return False
if not self.enable_final_return:
return True
if self.enable_final_return and self.counter == self.num_of_executions:
return True
return False

@validator("num_of_executions")
def must_be_positive_integer(cls, v):
if v <= 0 or type(v) != int:
raise ValueError('num_of_execution must be a positive integer')
return v

def get_num_of_executions(self) -> int:
if self.status == PUBLISHER_CALLBACK_STATUS["END"]:
return 1
return self.num_of_executions

def finish_callback(self):
logger.debug(f"[broadcast-service] publisher finish callback task")
self.status = PUBLISHER_CALLBACK_STATUS["END"]

def append_sub_callback_results(self, value: Any):
self.subscriber_callback_results.append(value)


class PublisherDispatchConfigManager(metaclass=Singleton):
def __init__(self):
self.publisher_callbacks: List[PublisherDispatchConfig] = []

def get_latest_publisher_callback(self) -> PublisherDispatchConfig:
return self.publisher_callbacks[-1]

def create_publisher_callback(self, **kwargs):
self.publisher_callbacks.append(PublisherDispatchConfig(**kwargs))


class BroadcastService(BaseBroadcastService):
def __init__(self):
super().__init__()
self.publish_dispatch_config_manager = PublisherDispatchConfigManager()
self.cur_publisher_dispatch_config: PublisherDispatchConfig = PublisherDispatchConfig()

self.enable_config = False
"""Enable_config is True when you use `broadcast_service.config(**config).publish(topic_name,**params)`
to publish topic. It indicates whether you need to enable complex configurations to schedule
publishing topics."""

def config(
self,
num_of_executions: int = 1,
callback: Optional[Callable] = None,
enable_final_return: bool = False,
interval: float = 0,
) -> 'BroadcastService':
"""Provide more complex topic publish mode
Args:
num_of_executions: default is 1, indicating the number of times the same topic is published at once
callback: default is None. You can get callback and the parameters of subscriber
after all subscribers' callback functions have been completed.
enable_final_return: default is False, it means you can get callback after you publish
n times topic. In this case, finish_callback params is store in *args rather than **kwargs.
interval: publish interval. Unit seconds.
Returns:
Returns current object, which is used to call broadcast with configuration.
"""
self.enable_config = True
self.publish_dispatch_config_manager.create_publisher_callback(
num_of_executions=num_of_executions,
callback=callback,
enable_final_return=enable_final_return,
interval=interval,
status=PUBLISHER_CALLBACK_STATUS['RUNNING']
)
return self

def broadcast(self, topics: str or List[str], *args, **kwargs):
if self.enable_config:
self.cur_publisher_dispatch_config = self.publish_dispatch_config_manager.get_latest_publisher_callback()

for i in range(self.cur_publisher_dispatch_config.get_num_of_executions()):
super().broadcast(topics, *args, **kwargs)
self.cur_publisher_dispatch_config.counter += 1
# time.sleep(0.01)

self.enable_config = False

def _invoke_finish_callback(self):
if self.cur_publisher_dispatch_config.callback:
self._final_invoke_listen_callback(
self.cur_publisher_dispatch_config.callback,
*self.cur_publisher_dispatch_config.subscriber_callback_results
)
if self.cur_publisher_dispatch_config.counter == self.cur_publisher_dispatch_config.num_of_executions:
self.cur_publisher_dispatch_config.finish_callback()

def _invoke_broadcast_topic(self, topic_name: str, *args, **kwargs):
super()._invoke_broadcast_topic(topic_name, *args, **kwargs)

logger.debug(
f"[broadcast-service] start_publisher_callback_or_not: {self.cur_publisher_dispatch_config.start_publisher_callback_or_not}")
if self.enable_config and self.cur_publisher_dispatch_config.start_publisher_callback_or_not:
self._invoke_finish_callback()

def _final_invoke_listen_callback(self, callback: Callable, *args, **kwargs):
result = super()._final_invoke_listen_callback(callback, *args, **kwargs)

if result:
self.cur_publisher_dispatch_config.append_sub_callback_results(result)


broadcast_service = BroadcastService()

0 comments on commit d1383f8

Please sign in to comment.