Skip to content

Commit

Permalink
feat: add publisher dispatch config
Browse files Browse the repository at this point in the history
  • Loading branch information
Undertone0809 committed Jun 14, 2023
1 parent d1383f8 commit b8c6e25
Show file tree
Hide file tree
Showing 12 changed files with 346 additions and 12 deletions.
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@
- [https://pypi.org/project/broadcast-service/](https://pypi.org/project/broadcast-service/)

## Setup

```sh
pip install broadcast-service
```


## Usage
There is a easy demo to show how to use broadcast-service.
There is an easy demo to show how to use broadcast-service.

```python
from broadcast_service import broadcast_service

Expand All @@ -50,7 +51,7 @@ def handle_msg(params):


# callback of decorator
@broadcast_service.on_listen(['my_topic'])
@broadcast_service.on_listen('my_topic')
def handle_decorator_msg(params):
print(f"handle_decorator_msg receive params: {params}")

Expand All @@ -67,6 +68,7 @@ if __name__ == '__main__':
- You can use `publish, emit, broadcast` to send your topic msg and use `listen, on, subscribe` to listen your topic msg.

- You can also add more arguments or no argument when you publish thr broadcast.

```python
from broadcast_service import broadcast_service

Expand Down Expand Up @@ -104,8 +106,12 @@ Moreover, you can see more example in [document](https://undertone0809.github.io
- optimize documents and show more examples.
- ~~optimize the syntax expression of broadcast-service~~
- provide more test cases
- privide the ability to subscribe the topic and callback once
- provide the ability to subscribe the topic and callback once
- Support for fuzzy subscriptions
- ~~the publisher of the topic can provide a return value~~
- optimize usage in class ('self' params problem)
- build observer mode
- ~~provide publisher callback when all subscriber have completed callback~~


## Contribution
Expand Down
9 changes: 7 additions & 2 deletions broadcast_service/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
# Copyright 2022 Zeeland(https://github.com/Undertone0809/). All Rights Reserved.
# Copyright (c) 2023 Zeeland
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copyright Owner: Zeeland
# GitHub Link: https://github.com/Undertone0809/
# Project Link: https://github.com/Undertone0809/broadcast-service
# Contact Email: [email protected]

from ._core import *
3 changes: 2 additions & 1 deletion broadcast_service/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# Project Link: https://github.com/Undertone0809/broadcast-service
# Contact Email: [email protected]

import time
import logging
from pydantic import BaseModel, validator
from concurrent.futures import ThreadPoolExecutor
Expand Down Expand Up @@ -329,7 +330,7 @@ def broadcast(self, topics: str or List[str], *args, **kwargs):
for i in range(self.cur_publisher_dispatch_config.get_num_of_executions()):
super().broadcast(topics, *args, **kwargs)
self.cur_publisher_dispatch_config.counter += 1
# time.sleep(0.01)
time.sleep(self.cur_publisher_dispatch_config.interval)

self.enable_config = False

Expand Down
100 changes: 100 additions & 0 deletions broadcast_service/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright (c) 2023 Zeeland
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copyright Owner: Zeeland
# GitHub Link: https://github.com/Undertone0809/
# Project Link: https://github.com/Undertone0809/broadcast-service
# Contact Email: [email protected]

import os
import logging
import tempfile
import datetime
import platform

__all__ = ['get_logger', 'enable_log_no_file', 'enable_log']
logger = logging.getLogger("cushy-storage")


def get_logger():
return logger


def get_project_root_path() -> str:
"""get project root path"""
project_path = os.getcwd()
max_depth = 10
count = 0
while not os.path.exists(os.path.join(project_path, 'README.md')):
project_path = os.path.split(project_path)[0]
count += 1
if count > max_depth:
return os.getcwd()
return project_path


STORAGE_PATH = {
'PROJECT_ROOT': get_project_root_path(),
'CURRENT': "./"
}


def get_default_storage_path(file_name: str, root_path: str = STORAGE_PATH['PROJECT_ROOT']) -> str:
if platform.system() == 'Windows':
return f"{root_path}/{file_name}"
elif platform.system() == 'Linux' or 'Darwin':
dir_path = os.environ.get('TMPDIR')
if not dir_path:
dir_path = tempfile.gettempdir()
dir_path = os.path.join(dir_path, "broadcast_service")
return f"{dir_path}/{file_name}"
else:
return f"./{file_name}"


def get_default_log_path() -> str:
return get_default_storage_path("log")


def _check_log_path():
"""check whether log file exist"""
if not os.path.exists(get_default_log_path()):
os.makedirs(get_default_log_path())


def get_log_name() -> str:
_check_log_path()
cur_time = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
return f"{get_default_log_path()}/log_{cur_time}.log"


def enable_log():
"""enable logging to terminal and file"""
logging.basicConfig(
level=logging.DEBUG,
format='%(levelname)s - %(asctime)s:%(message)s -',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.FileHandler(f"{get_log_name()}", mode='w', encoding='utf-8'),
logging.StreamHandler()
],
)


def enable_log_no_file():
logging.basicConfig(
level=logging.DEBUG,
format='[%(levelname)s] %(asctime)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
13 changes: 13 additions & 0 deletions docs/update.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,19 @@ Please update the latest version. The old version is shit.
```bash
pip install --upgrade broadcast-service
```
## v2.0.0 2023-06-14

#### feat
1. Add publisher dispatch config. It can publish topic with a complex mode.
- provide publisher callback
- provide the return value of subscriber callbacks
- provide multiple call publish at once
- provide multiple call time interval

## v1.3.1 2023-06-02

#### fix
1. Add singleton to keep only one `broadcast_service` instance in an application [#11](https://github.com/Undertone0809/broadcast-service/pull/11)

## v1.3.0 2023-03-21

Expand Down
28 changes: 28 additions & 0 deletions example/demo4_class.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# @Time : 2023/6/13 17:19
# @Author : Zeeland
# @File : demo4_class.py
# @Software: PyCharm

import logging
from broadcast_service import broadcast_service

logging.basicConfig(level=logging.DEBUG)


class Component:
def __init__(self):
pass

@broadcast_service.on_listen("activate component")
def handle_callback(self, value):
print(self)
print(value)

def method(self):
broadcast_service.broadcast("activate component", self, "ohohohohoh")


if __name__ == '__main__':
c1 = Component()
c1.method()
42 changes: 42 additions & 0 deletions example/demo5_publisher_dispatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright (c) 2023 Zeeland
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copyright Owner: Zeeland
# GitHub Link: https://github.com/Undertone0809/
# Project Link: https://github.com/Undertone0809/broadcast-service
# Contact Email: [email protected]

from broadcast_service import broadcast_service


@broadcast_service.on_listen("topic")
def handle_subscriber_callback():
print("handle_subscriber_callback")


def handle_publisher_callback(*args):
print("handle_publisher_callback")


def main():
broadcast_service.config(
num_of_executions=5,
callback=handle_publisher_callback,
enable_final_return=True,
interval=0.1
).publish("topic")


if __name__ == '__main__':
main()
40 changes: 40 additions & 0 deletions example/demo6_publisher_rec_sub_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright (c) 2023 Zeeland
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copyright Owner: Zeeland
# GitHub Link: https://github.com/Undertone0809/
# Project Link: https://github.com/Undertone0809/broadcast-service
# Contact Email: [email protected]

from broadcast_service import broadcast_service


@broadcast_service.on_listen("topic")
def handle_subscriber_callback():
print("handle_subscriber_callback")
return {"key", "value"}


def handle_publisher_callback(*args):
print(f"handle_publisher_callback {args}")


def main():
broadcast_service.config(
callback=handle_publisher_callback,
).publish("topic")


if __name__ == '__main__':
main()
6 changes: 4 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@

setuptools.setup(
name="broadcast_service",
version="1.3.1",
version="2.0.0",
author="Zeeland",
author_email="[email protected]",
description="A lightweight third-party broadcast/pubsub library",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/Undertone0809/broadcast-service",
packages=setuptools.find_packages(),
install_requires=['pydantic', 'cushy-storage'],
python_requires='>=3.6',
license="Apache 2.0",
classifiers=[
"Development Status :: 3 - Alpha",
Expand All @@ -42,5 +44,5 @@
"Operating System :: OS Independent",

],
keywords="broadcast, broadcast-service",
keywords="broadcast, broadcast-service, publisher, subscriber, pubsub",
)
2 changes: 1 addition & 1 deletion tests/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_async(self):
broadcast_service.listen("test_topic", handle)
broadcast_service.broadcast("test_topic")
used_time = time.time() - start_time
self.assertLessEqual(used_time, 1)
self.assertAlmostEqual(1, used_time, delta=0.1)

def test_sync(self):
start_time = time.time()
Expand Down
17 changes: 15 additions & 2 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

import time
from unittest import TestCase
from broadcast_service import broadcast_service, enable_log
from broadcast_service import broadcast_service
from broadcast_service.logger import get_logger, enable_log

enable_log()
logger = get_logger()


def wait(seconds=0.1):
Expand Down Expand Up @@ -199,3 +200,15 @@ def handle_multi_topic2():
broadcast_service.publish_all()
wait()
self.assertEqual(8, self.counter)

def test_broadcast_multiple_call_one_topic(self):
self.counter = 0

@broadcast_service.on_listen("test_broadcast_multiple_call_one_topic")
def handle_multi_topic1():
self.counter += 1
self.test_broadcast_multi_topic1 = True

broadcast_service.config(num_of_executions=5).publish("test_broadcast_multiple_call_one_topic")
wait()
self.assertTrue(5, self.counter)
Loading

0 comments on commit b8c6e25

Please sign in to comment.