From 432d7f6e9b7a49c7dd6a6af0039267b82e94864e Mon Sep 17 00:00:00 2001 From: Pastukhov Nikita Date: Mon, 3 Jul 2023 22:22:49 +0300 Subject: [PATCH] NatsJS release (#92, close #65) --- .github/workflows/tests.yml | 15 +- README.md | 24 +- docs/docs/en/CHANGELOG.md | 8 + docs/docs/en/index.md | 2 +- docs/docs/en/nats/4_nats-js.md | 30 ++ docs/docs/ru/CHANGELOG.md | 8 + docs/docs/ru/index.md | 2 +- docs/docs/ru/nats/4_nats-js.md | 31 ++ docs/docs_src/nats/js.py | 23 ++ docs/mkdocs.yml | 1 + propan/__about__.py | 2 +- propan/__init__.py | 5 +- propan/annotations.py | 12 +- propan/brokers/nats/__init__.py | 5 + propan/brokers/nats/consts.py | 23 ++ propan/brokers/nats/nats_js_broker.py | 353 +++++++++++++++++++--- propan/brokers/nats/nats_js_broker.pyi | 225 ++++++++++++++ propan/cli/app.py | 121 ++++++-- propan/cli/startproject/async_app/app.py | 9 +- propan/cli/startproject/async_app/nats.py | 19 +- propan/fastapi/__init__.py | 13 +- propan/fastapi/kafka/router.py | 2 +- propan/fastapi/nats/__init__.py | 7 +- propan/fastapi/nats/router.py | 6 +- propan/fastapi/nats/router.pyi | 148 ++++++++- propan/fastapi/rabbit/router.py | 2 +- propan/fastapi/redis/router.py | 2 +- propan/fastapi/sqs/router.py | 2 +- propan/test/nats.py | 27 +- scripts/test-cov.sh | 10 +- tests/brokers/nats_js/__init__.py | 0 tests/brokers/nats_js/conftest.py | 47 +++ tests/brokers/nats_js/test_connect.py | 23 ++ tests/brokers/nats_js/test_consume.py | 8 + tests/brokers/nats_js/test_publish.py | 8 + tests/brokers/nats_js/test_router.py | 6 + tests/brokers/nats_js/test_rpc.py | 8 + tests/brokers/nats_js/test_test_client.py | 6 + tests/cli/conftest.py | 7 + tests/cli/test_creation.py | 22 +- tests/cli/test_run.py | 23 ++ 41 files changed, 1184 insertions(+), 111 deletions(-) create mode 100644 docs/docs/en/nats/4_nats-js.md create mode 100644 docs/docs/ru/nats/4_nats-js.md create mode 100644 docs/docs_src/nats/js.py create mode 100644 propan/brokers/nats/consts.py create mode 100644 propan/brokers/nats/nats_js_broker.pyi create mode 100644 tests/brokers/nats_js/__init__.py create mode 100644 tests/brokers/nats_js/conftest.py create mode 100644 tests/brokers/nats_js/test_connect.py create mode 100644 tests/brokers/nats_js/test_consume.py create mode 100644 tests/brokers/nats_js/test_publish.py create mode 100644 tests/brokers/nats_js/test_router.py create mode 100644 tests/brokers/nats_js/test_rpc.py create mode 100644 tests/brokers/nats_js/test_test_client.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index f4f2dbd8..f5dabdf4 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -44,7 +44,7 @@ jobs: - 9324:9324 nats: - image: nats + image: diementros/nats:js ports: - 4222:4222 @@ -91,9 +91,10 @@ jobs: runs-on: ubuntu-latest services: nats: - image: nats + image: diementros/nats:js ports: - 4222:4222 + steps: - uses: actions/checkout@v3 - name: Set up Python @@ -107,7 +108,7 @@ jobs: run: pip install -e .[testsuite,async-nats] - run: mkdir coverage - name: Test - run: coverage run -m pytest -m "run" -k test_run_nats_correct tests/cli/test_run.py + run: coverage run -m pytest -m "run and nats" tests/cli/test_run.py env: COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }} CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }} @@ -137,7 +138,7 @@ jobs: run: pip install -e .[testsuite,async-rabbit] - run: mkdir coverage - name: Test - run: coverage run -m pytest -m "run" -k test_run_rabbit_correct tests/cli/test_run.py + run: coverage run -m pytest -m "run and rabbit" tests/cli/test_run.py env: COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }} CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }} @@ -178,7 +179,7 @@ jobs: run: pip install -e .[testsuite,async-kafka] - run: mkdir coverage - name: Test - run: coverage run -m pytest -m "run" -k test_run_kafka_correct tests/cli/test_run.py + run: coverage run -m pytest -m "run and kafka" tests/cli/test_run.py env: COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }} CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }} @@ -208,7 +209,7 @@ jobs: run: pip install -e .[testsuite,async-sqs] - run: mkdir coverage - name: Test - run: coverage run -m pytest -m "run" -k test_run_sqs_correct tests/cli/test_run.py + run: coverage run -m pytest -m "run and sqs" tests/cli/test_run.py env: COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }} CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }} @@ -238,7 +239,7 @@ jobs: run: pip install -e .[testsuite,async-redis] - run: mkdir coverage - name: Test - run: coverage run -m pytest -m "run" -k test_run_redis_correct tests/cli/test_run.py + run: coverage run -m pytest -m "run and redis" tests/cli/test_run.py env: COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }} CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }} diff --git a/README.md b/README.md index 06e1f382..92528970 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ It is a modern, high-level framework on top of popular specific Python brokers l | **Nats** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: | | **Kafka** | :warning: **beta** :warning: | :mag: planning :mag: | | **SQS** | :warning: **beta** :warning: | :mag: planning :mag: | -| **NatsJS** | :hammer_and_wrench: **WIP** :hammer_and_wrench: | :mag: planning :mag: | +| **NatsJS** | :warning: **beta** :warning: | :mag: planning :mag: | | **MQTT** | :mag: planning :mag: | :mag: planning :mag: | | **Redis Streams** | :mag: planning :mag: | :mag: planning :mag: | | **Pulsar** | :mag: planning :mag: | :mag: planning :mag: | @@ -232,7 +232,6 @@ and [more](https://github.com/Lancetnik/Propan/tree/main/examples/dependencies). from propan import PropanApp, RabbitBroker, Context, Depends rabbit_broker = RabbitBroker("amqp://guest:guest@localhost:5672/") - app = PropanApp(rabbit_broker) async def dependency(user_id: int) -> bool: @@ -248,6 +247,27 @@ async def base_handler(user_id: int, --- +## RPC over MQ + +Also, **Propan** allows you to use **RPC** requests over your broker with a simple way: + +```python +from propan import PropanApp, RabbitBroker + +broker = RabbitBroker("amqp://guest:guest@localhost:5672/") +app = PropanApp(rabbit_broker) + +@broker.handle("ping") +async def base_handler(): + return "pong" + +@app.after_startup +async def self_ping(): + assert (await broker.publish("", "ping")) == "pong" +``` + +--- + ## Project Documentation **Propan** automatically generates documentation for your project according to the **AsyncAPI** specification. You can work with both generated artifacts and place a Web view of your documentation on resources available to related teams. diff --git a/docs/docs/en/CHANGELOG.md b/docs/docs/en/CHANGELOG.md index 442fe20f..09470a93 100644 --- a/docs/docs/en/CHANGELOG.md +++ b/docs/docs/en/CHANGELOG.md @@ -1,5 +1,13 @@ # CHANGELOG +## 2023-07-03 **0.1.5.0** NastJS + +This update adds **NATS JetStream**(a persistent layer of **NATS**) supporting. + +Now you can work with this great broker without fear of losing messages, using the acknowledgment confirmation mechanism and the built-in `key-value` and `object` storages. + +Also, some internal classes were changed to further create synchronous interfaces based on them. + ## 2023-06-26 **0.1.4.0** PydanticV2 The main change in this update is the support for the **PydanticV2** beta version. diff --git a/docs/docs/en/index.md b/docs/docs/en/index.md index 1271a35a..6568bef6 100644 --- a/docs/docs/en/index.md +++ b/docs/docs/en/index.md @@ -120,7 +120,7 @@ This is the **Propan** declarative way to write the same code. That is so much e | **Nats** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: | | **Kafka** | :warning: **beta** :warning: | :mag: planning :mag: | | **SQS** | :warning: **beta** :warning: | :mag: planning :mag: | -| **NatsJS** | :hammer_and_wrench: **WIP** :hammer_and_wrench: | :mag: planning :mag: | +| **NatsJS** | :warning: **beta** :warning: | :mag: planning :mag: | | **MQTT** | :mag: planning :mag: | :mag: planning :mag: | | **Redis Streams** | :mag: planning :mag: | :mag: planning :mag: | | **Pulsar** | :mag: planning :mag: | :mag: planning :mag: | diff --git a/docs/docs/en/nats/4_nats-js.md b/docs/docs/en/nats/4_nats-js.md new file mode 100644 index 00000000..5269addb --- /dev/null +++ b/docs/docs/en/nats/4_nats-js.md @@ -0,0 +1,30 @@ +# NATS JetStream + +The default **NATS** usage is suitable for scenarios where: + +* `publisher` and `consumer` are always online; +* a system tolerate to the messages loss. + +If you need stricter restrictions, like: + +* an availability of a message processing confirmation mechanism (`ack`/`nack'); +* message persistence (will accumulate in the queue when `consumer` is offline). + +You should use the **NATS JetStream** extension. + +In fact, the **JetStream** extension is the same **NATS** with the addition a persistent layer above the file system. Therefore, all interfaces for publishing and consuming messages are similar to the refular **NATS** usage. + +However, the **JetStream** layer has many possibilities for configuration: from the deleting old messages policy to the maximum stored messages number limit. You can find out more about all **JetStream** features in the official [documentation](https://docs.nats.io/using-nats/developer/develop_jetstream ){.external-link target="_blank"}. + +!!! tip "" + If you have worked with other message brokers, then you should know that the logic of **JS** is closer to **Kafka** than to **RabbitMQ**: messages after confirmation are not deleted from the queue, but remain there until the queue is full and it will start deleting old messages (or in accordance with other logic that you can configure yourself). + + When connecting a `consumer` (and, especially, when reconnecting), you must determine for yourself, according to what logic it will consume messages: from the subject beginning, starting with some message, starting from some time, only new ones, etc. Don't be surprised if a connection is restored, your `consumer` starts to process all messages received earlier again - you haven't defined the rule. + +Also **NATS JetStream** has built-in `key-value` (close to **Redis**) and `object` (close to **Minio**) storages, which, in addition to interface *put/get* have the ability to subscribe to events, which can be extremely useful in vary scenarios. + +**Propan** does not provide access to this functionality directly, but it is covered by the [nats-py] library used({{urls.nats_py }}){target="_blank"}. You can access the **JS** object from the application context: + +```python linenums="1" hl_lines="2 8" +{!> docs_src/nats/js.py !} +``` diff --git a/docs/docs/ru/CHANGELOG.md b/docs/docs/ru/CHANGELOG.md index b61070ee..ac78bfac 100644 --- a/docs/docs/ru/CHANGELOG.md +++ b/docs/docs/ru/CHANGELOG.md @@ -1,5 +1,13 @@ # CHANGELOG +## 2023-07-03 **0.1.5.0** NastJS + +В этом обновлении добавлена и протестирована полная поддержка **NATS JetStream** - персистентного слоя **NATS**. + +Теперь вы можете работать с этим великолепным брокером, не боясь потери сообщений, используя механизм подтверждения получения, а также встроенные `key-value` и `object` хранилища. + +Также, в этом обновлении были изменены некоторые внутренние классы для дальнейшего создания на их основе синхронных интерфейсов. + ## 2023-06-26 **0.1.4.0** PydanticV2 Основное изменение в этом обновлении - поддержка бета-версии **PydanticV2**. diff --git a/docs/docs/ru/index.md b/docs/docs/ru/index.md index 0e7037ba..fa4c7395 100644 --- a/docs/docs/ru/index.md +++ b/docs/docs/ru/index.md @@ -123,7 +123,7 @@ async def base_handler(body): | **Nats** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: | | **Kafka** | :warning: **beta** :warning: | :mag: planning :mag: | | **SQS** | :warning: **beta** :warning: | :mag: planning :mag: | -| **NatsJS** | :hammer_and_wrench: **WIP** :hammer_and_wrench: | :mag: planning :mag: | +| **NatsJS** | :warning: **beta** :warning: | :mag: planning :mag: | | **MQTT** | :mag: planning :mag: | :mag: planning :mag: | | **Redis Streams** | :mag: planning :mag: | :mag: planning :mag: | | **Pulsar** | :mag: planning :mag: | :mag: planning :mag: | diff --git a/docs/docs/ru/nats/4_nats-js.md b/docs/docs/ru/nats/4_nats-js.md new file mode 100644 index 00000000..f6f68d26 --- /dev/null +++ b/docs/docs/ru/nats/4_nats-js.md @@ -0,0 +1,31 @@ +# NATS JetStream + +Обычное использование **NATS** идеально подходит для сценариев, в которых: + +* `publisher` и `consumer` всегда находятся онлайн; +* система допускает потерю сообщений. + +Если вам нужны более строгие ограничения, а именно: + +* наличие механизма подтверждения обработки сообщений (`ack`/`nack`); +* персистентность сообщений (при отсутствии `consumer`'а сообщения будут накапливаться в очереди). + +Вам следует использование расширение **NATS JetStream**. + +На самом деле расширение **JetStream** - это тот же самый **NATS** с добавлением +персистентного слоя над файловой системой, который обеспечивает хранение сообщений в очереди. Поэтому все интерфейсы публикации и потребления сообщений аналогичны обычному использованию **NATS**. + +Однако, сама логика работы слоя **JetStream** имеет множество возможностей для конфигурации: от политики удаления старых сообщений до ограничения на максимальное число хранимых сообщений. Подробно со всеми возможностями **JetStream** вы можете ознакомиться в официальной [документации](https://docs.nats.io/using-nats/developer/develop_jetstream){.external-link target="_blank"}. + +!!! tip "" + Если вы работали с другими брокерами сообщений, то вам следует знать, что логика работы **JS** ближе к **Kafka**, нежели к **RabbitMQ**: сообщения после подтверждения их обработки не удаляются из очереди, а остаются там до тех пор, пока очередь не наполнится и не начнет удалять старые сообщения (либо в соответсвии с другой логикой, которую вы можете сконфигурировать сами). + + При подключении `consumer`'а (и, особенно, при переподключении) вы должны сами определить, в соотвествии с какой логикой он будет потреблять сообщения: с самого начала, начиная с какого-то сообщения, начиная с какого-то времени, только новые и т.д. Не удивляйтесь, если при восстановлении соединения ваш `consumer` начнет заново обрабатывать все сообщения, полученные ранее - вы просто не определили это правило. + +Также **NATS JetStream** имеет встроенное `key-value`(cхоже с **Redis**) и `object`(схоже с **Minio**) хранилища, которые, помимо своего базового интерфейса *положить/прочитать* имеют возможность подписки на события, что может быть крайне полезно во многих сценариях. + +**Propan** не предоставляет доступ к этому функционалу напрямую, однако он покрывается используемой библиотекой [nats-py]({{ urls.nats_py }}){target="_blank"}. Доступ к объекту **JS** вы можете получить из контекста приложения: + +```python linenums="1" hl_lines="2 8" +{!> docs_src/nats/js.py !} +``` \ No newline at end of file diff --git a/docs/docs_src/nats/js.py b/docs/docs_src/nats/js.py new file mode 100644 index 00000000..d7ef9aa4 --- /dev/null +++ b/docs/docs_src/nats/js.py @@ -0,0 +1,23 @@ +from propan import PropanApp, NatsJSBroker +from propan.annotations import NatsJS + +broker = NatsJSBroker() +app = PropanApp(broker) + +@app.after_startup +async def example(js: NatsJS): + # JS Key-Value Storage + storage = await js.create_key_value(bucket="propan_kv") + + await storage.put("hello", b"propan!") + assert (await storage.get("hello")) == b"propan!" + + # JS Object Storage + storage = await js.create_object_sotre("propan-obs") + + obj_name = "file.mp4" + with open(obj_name) as f: + await storage.put(obj_name, f) + + with open(f"copy-{obj_name}") as f: + await storage.get(obj_name, f) diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 1b1995eb..146a1032 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -159,6 +159,7 @@ nav: - Examples: - Direct: nats/3_examples/1_direct.md - Pattern: nats/3_examples/2_pattern.md + - NatsJS: nats/4_nats-js.md - Integrations: integrations/1_integrations-index.md - FastAPI Plugin: integrations/2_fastapi-plugin.md - Contributing: diff --git a/propan/__about__.py b/propan/__about__.py index f0420473..834801ae 100644 --- a/propan/__about__.py +++ b/propan/__about__.py @@ -2,7 +2,7 @@ from unittest.mock import Mock -__version__ = "0.1.4.6" +__version__ = "0.1.5.0" INSTALL_MESSAGE = ( diff --git a/propan/__init__.py b/propan/__init__.py index d3cd38eb..7cb39c6c 100644 --- a/propan/__init__.py +++ b/propan/__init__.py @@ -11,9 +11,9 @@ RabbitBroker = RabbitRouter = about.INSTALL_RABBIT # type: ignore try: - from propan.brokers.nats import NatsBroker, NatsRouter + from propan.brokers.nats import NatsBroker, NatsJSBroker, NatsRouter except ImportError: - NatsBroker = NatsRouter = about.INSTALL_NATS # type: ignore + NatsJSBroker = NatsBroker = NatsRouter = about.INSTALL_NATS # type: ignore try: from propan.brokers.redis import RedisBroker, RedisRouter @@ -52,6 +52,7 @@ "PropanMessage", ## nats "NatsBroker", + "NatsJSBroker", "NatsRouter", ## rabbit "RabbitBroker", diff --git a/propan/annotations.py b/propan/annotations.py index fa19d7c1..0c0de430 100644 --- a/propan/annotations.py +++ b/propan/annotations.py @@ -30,14 +30,22 @@ try: + from nats.aio.client import Client from nats.aio.msg import Msg + from nats.js.client import JetStreamContext from propan.brokers.nats import NatsBroker as NB + from propan.brokers.nats import NatsJSBroker as NJB NatsBroker = Annotated[NB, ContextField("broker")] + NatsJSBroker = Annotated[NJB, ContextField("broker")] NatsMessage = Annotated[Msg, ContextField("message")] + NatsConnection = Annotated[Client, ContextField("broker._connection")] + NatsJS = Annotated[JetStreamContext, ContextField("broker._connection")] except ImportError: - NatsBroker = NatsMessage = about.INSTALL_NATS + NatsBroker = ( + NatsMessage + ) = NatsJSBroker = NatsJS = NatsConnection = about.INSTALL_NATS try: @@ -79,7 +87,7 @@ assert any( ( all((RabbitBroker, RabbitMessage, Channel)), - all((NatsBroker, NatsMessage)), + all((NatsBroker, NatsJSBroker, NatsJS, NatsMessage)), all((RedisBroker, Redis)), all((KafkaBroker, KafkaMessage, Producer)), all((SQSBroker, client, queue_url)), diff --git a/propan/brokers/nats/__init__.py b/propan/brokers/nats/__init__.py index 709139f4..c1eb6a7f 100644 --- a/propan/brokers/nats/__init__.py +++ b/propan/brokers/nats/__init__.py @@ -1,8 +1,13 @@ +from nats.js.api import DeliverPolicy + from propan.brokers.nats.nats_broker import NatsBroker, NatsMessage +from propan.brokers.nats.nats_js_broker import NatsJSBroker from propan.brokers.nats.routing import NatsRouter __all__ = ( "NatsBroker", "NatsMessage", + "DeliverPolicy", + "NatsJSBroker", "NatsRouter", ) diff --git a/propan/brokers/nats/consts.py b/propan/brokers/nats/consts.py new file mode 100644 index 00000000..5f3b092e --- /dev/null +++ b/propan/brokers/nats/consts.py @@ -0,0 +1,23 @@ +from nats.js.api import ( + ConsumerConfig, + DeliverPolicy, + DiscardPolicy, + Placement, + RePublish, + RetentionPolicy, + StorageType, + StreamConfig, + StreamSource, +) + +__all__ = ( + "StreamConfig", + "RetentionPolicy", + "DiscardPolicy", + "Placement", + "StorageType", + "StreamSource", + "RePublish", + "ConsumerConfig", + "DeliverPolicy", +) diff --git a/propan/brokers/nats/nats_js_broker.py b/propan/brokers/nats/nats_js_broker.py index 9ac4652b..95a88161 100644 --- a/propan/brokers/nats/nats_js_broker.py +++ b/propan/brokers/nats/nats_js_broker.py @@ -1,58 +1,337 @@ -# TODO: remove mypy ignore at complete -# type: ignore +import asyncio +import logging +from contextlib import suppress from functools import wraps -from typing import Any, Awaitable, Callable, Optional, TypeVar +from secrets import token_hex +from types import TracebackType +from typing import Any, Awaitable, Callable, Dict, List, Optional, Type, TypeVar -import nats -from nats.js.client import JetStreamContext +import anyio +import nats.errors +import nats.js +from nats.aio.client import Client +from nats.aio.msg import Msg +from nats.js.client import ( + DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, + DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, + JetStreamContext, +) -from propan._compat import model_to_dict +from propan.brokers._model.broker_usecase import HandlerCallable, T_HandlerReturn from propan.brokers._model.schemas import PropanMessage -from propan.brokers.nats.nats_broker import NatsBroker -from propan.brokers.nats.schemas import JetStream -from propan.brokers.push_back_watcher import BaseWatcher, WatcherContext -from propan.types import AnyDict +from propan.brokers.exceptions import WRONG_PUBLISH_ARGS +from propan.brokers.nats import consts as api +from propan.brokers.nats.nats_broker import NatsBroker, NatsMessage +from propan.brokers.nats.schemas import Handler +from propan.brokers.push_back_watcher import ( + BaseWatcher, + NotPushBackWatcher, + WatcherContext, +) +from propan.types import AnyDict, DecodedMessage, SendableMessage T = TypeVar("T") class NatsJSBroker(NatsBroker): - _js: Optional[JetStream] = None - _connection: Optional[JetStreamContext] = None + _raw_connection: Optional[Client] + _connection: Optional[JetStreamContext] + _stream_config: api.StreamConfig - def __init__(self, *args: Any, jetstream: JetStream, **kwargs: AnyDict): + @property + def js(self) -> JetStreamContext: + assert self._connection, "You should start connection first" + return self._connection + + def __init__( + self, + *args: Any, + # JS + stream: Optional[str] = "propan-stream", + description: Optional[str] = None, + retention: Optional[api.RetentionPolicy] = None, + max_consumers: Optional[int] = None, + max_msgs: Optional[int] = None, + max_bytes: Optional[int] = None, + discard: Optional[api.DiscardPolicy] = api.DiscardPolicy.OLD, + max_age: Optional[float] = None, # in seconds + max_msgs_per_subject: int = -1, + max_msg_size: Optional[int] = -1, + storage: Optional[api.StorageType] = None, + num_replicas: Optional[int] = None, + no_ack: bool = False, + template_owner: Optional[str] = None, + duplicate_window: int = 0, + placement: Optional[api.Placement] = None, + mirror: Optional[api.StreamSource] = None, + sources: Optional[List[api.StreamSource]] = None, + sealed: bool = False, + deny_delete: bool = False, + deny_purge: bool = False, + allow_rollup_hdrs: bool = False, + republish: Optional[api.RePublish] = None, + allow_direct: Optional[bool] = None, + mirror_direct: Optional[bool] = None, + **kwargs: AnyDict, + ) -> None: super().__init__(*args, **kwargs) - self._js = jetstream + self._raw_connection = None + self._stream_config = api.StreamConfig( + name=stream, + description=description, + retention=retention, + max_consumers=max_consumers, + max_msgs=max_msgs, + max_bytes=max_bytes, + discard=discard, + max_age=max_age, + max_msgs_per_subject=max_msgs_per_subject, + max_msg_size=max_msg_size, + storage=storage, + num_replicas=num_replicas, + no_ack=no_ack, + template_owner=template_owner, + duplicate_window=duplicate_window, + placement=placement, + mirror=mirror, + sources=sources, + sealed=sealed, + deny_delete=deny_delete, + deny_purge=deny_purge, + allow_rollup_hdrs=allow_rollup_hdrs, + republish=republish, + allow_direct=allow_direct, + mirror_direct=mirror_direct, + ) - async def _connect(self, *args: Any, **kwargs: AnyDict) -> JetStreamContext: - assert self._js + async def _connect( + self, + *, + url: Optional[str] = None, + **kwargs: Any, + ) -> JetStreamContext: + nc = await super()._connect(url=url, **kwargs) + stream = nc.jetstream() + self._raw_connection = nc + return stream - nc = await nats.connect(*args, **kwargs) + async def close( + self, + exc_type: Optional[Type[BaseException]] = None, + exc_val: Optional[BaseException] = None, + exec_tb: Optional[TracebackType] = None, + ) -> None: + await super(NatsBroker, self).close(exc_type, exc_val, exec_tb) + for h in self.handlers: + if h.subscription is not None: + await h.subscription.unsubscribe() + h.subscription = None - stream = await nc.jetstream( - **model_to_dict(self._js, include={"prefix", "domain", "timeout"}) - ) + if self._raw_connection is not None: + await self._raw_connection.drain() + self._raw_connection = None - return stream + async def _start(self): + await super()._start() + try: + await self._connection.add_stream( + config=self._stream_config, + subjects=[h.subject for h in self.handlers], + ) + except nats.js.errors.BadRequestError as e: + if ( + e.description + == "stream name already in use with a different configuration" + ): + self._log(e, logging.WARNING) + await self._connection.update_stream( + config=self._stream_config, + subjects=[h.subject for h in self.handlers], + ) + else: + self._log(e, logging.ERROR) - @staticmethod def _process_message( - func: Callable[[PropanMessage], Awaitable[T]], + self, + func: Callable[[NatsMessage], Awaitable[T]], watcher: Optional[BaseWatcher] = None, - ) -> Callable[[PropanMessage], Awaitable[T]]: + ) -> Callable[[NatsMessage], Awaitable[T]]: + if watcher is None: # pragma: no branch + watcher = NotPushBackWatcher() + @wraps(func) - async def wrapper(message: PropanMessage) -> T: - if watcher is None: - return await func(message) - else: - async with WatcherContext( - watcher, - message.message_id, - on_success=message.raw_message.ack, - on_error=message.raw_message.nak, - on_max=message.raw_message.term, - ): - await message.in_progress() - return await func(message) + async def wrapper(message: NatsMessage) -> T: + context = WatcherContext( + watcher, + message, + on_success=message_ack, + on_error=message_nak, + on_max=message_term, + ) + + async with context: + await message.raw_message.in_progress() + + r = await func(message) + + if message.reply_to: + reply_msg, content_type = self._encode_message(r) + + await self._raw_connection.publish( + subject=message.reply_to, + payload=reply_msg, + headers={ + "content-type": content_type or "", + }, + ) + + return r + + return wrapper + + async def _parse_message(self, message: Msg) -> NatsMessage: + headers = message.header or {} + return PropanMessage( + body=message.data, + content_type=headers.get("content-type", ""), + headers=headers, + reply_to=headers.get("reply_to", ""), + raw_message=message, + ) + + def handle( + self, + subject: str, + queue: str = "", + *, + durable: Optional[str] = None, + config: Optional[api.ConsumerConfig] = None, + ordered_consumer: bool = False, + idle_heartbeat: Optional[float] = None, + flow_control: bool = False, + pending_msgs_limit: Optional[int] = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, + pending_bytes_limit: Optional[int] = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, + deliver_policy: Optional[api.DeliverPolicy] = None, + headers_only: Optional[bool] = None, + # broker kwargs + description: str = "", + **original_kwargs: AnyDict, + ) -> Callable[ + [HandlerCallable[T_HandlerReturn]], + Callable[[Msg, bool], Awaitable[T_HandlerReturn]], + ]: + super(NatsBroker, self).handle() + self._max_subject_len = max((self._max_subject_len, len(subject))) + self._max_queue_len = max((self._max_queue_len, len(queue))) + + def wrapper(func: HandlerCallable) -> HandlerCallable: + func, dependant = self._wrap_handler( + func, + queue=queue, + subject=subject, + **original_kwargs, + ) + handler = Handler( + callback=func, + subject=subject, + queue=queue, + _description=description, + dependant=dependant, + extra_args={ + "stream": self._stream_config.name, + "durable": durable, + "manual_ack": True, + "ordered_consumer": ordered_consumer, + "idle_heartbeat": idle_heartbeat, + "flow_control": flow_control, + "config": config, + "pending_msgs_limit": pending_msgs_limit, + "pending_bytes_limit": pending_bytes_limit, + "deliver_policy": deliver_policy, + "headers_only": headers_only, + }, + ) + self.handlers.append(handler) + + return func return wrapper + + async def publish( + self, + message: SendableMessage, + subject: str, + *, + headers: Optional[Dict[str, str]] = None, + reply_to: str = "", + callback: bool = False, + callback_timeout: Optional[float] = 30.0, + raise_timeout: bool = False, + ) -> Optional[DecodedMessage]: + if self._connection is None: + raise ValueError("NatsConnection not started yet") + + payload, content_type = self._encode_message(message) + + client = self._raw_connection + + if callback is True: + if reply_to: + raise WRONG_PUBLISH_ARGS + + token = client._nuid.next() + token.extend(token_hex(2).encode()) + reply_to = token.decode() + + if reply_to: + future: asyncio.Future[Msg] = asyncio.Future() + sub = await client.subscribe(reply_to, future=future, max_msgs=1) + await sub.unsubscribe(limit=1) + + if raise_timeout: + scope = anyio.fail_after + else: + scope = anyio.move_on_after + + with suppress(nats.errors.TimeoutError): # py37 compatibility + with scope(callback_timeout): + await self._connection.publish( + subject=subject, + payload=payload, + headers={ + **(headers or {}), + "reply_to": reply_to, + "content-type": content_type or "", + }, + timeout=callback_timeout, + stream=self._stream_config.name, + ) + + if reply_to: + msg: Any = None + with scope(callback_timeout): + msg = await future + + if msg: + if msg.headers: # pragma: no branch + if ( + msg.headers.get(nats.js.api.Header.STATUS) + == nats.aio.client.NO_RESPONDERS_STATUS + ): + raise nats.errors.NoRespondersError + return await self._decode_message(await self._parse_message(msg)) + + +async def message_ack(message: NatsMessage) -> None: + with suppress(nats.errors.MsgAlreadyAckdError): + await message.raw_message.ack() + + +async def message_nak(message: NatsMessage) -> None: + with suppress(nats.errors.MsgAlreadyAckdError): + await message.raw_message.nak() + + +async def message_term(message: NatsMessage) -> None: + with suppress(nats.errors.MsgAlreadyAckdError): + await message.raw_message.term() diff --git a/propan/brokers/nats/nats_js_broker.pyi b/propan/brokers/nats/nats_js_broker.pyi new file mode 100644 index 00000000..89d797ef --- /dev/null +++ b/propan/brokers/nats/nats_js_broker.pyi @@ -0,0 +1,225 @@ +import logging +import ssl +from types import TracebackType +from typing import ( + Awaitable, + Callable, + Dict, + List, + Optional, + Sequence, + Type, + TypeVar, + Union, +) + +from fast_depends.dependencies import Depends +from nats.aio.client import ( + DEFAULT_CONNECT_TIMEOUT, + DEFAULT_DRAIN_TIMEOUT, + DEFAULT_INBOX_PREFIX, + DEFAULT_MAX_FLUSHER_QUEUE_SIZE, + DEFAULT_MAX_OUTSTANDING_PINGS, + DEFAULT_MAX_RECONNECT_ATTEMPTS, + DEFAULT_PENDING_SIZE, + DEFAULT_PING_INTERVAL, + DEFAULT_RECONNECT_TIME_WAIT, + Callback, + Client, + Credentials, + ErrorCallback, + JWTCallback, + SignatureCallback, +) +from nats.aio.msg import Msg +from nats.js.client import ( + DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, + DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, + JetStreamContext, +) + +from propan.brokers._model.broker_usecase import ( + AsyncDecoder, + AsyncParser, + HandlerCallable, + T_HandlerReturn, +) +from propan.brokers.nats import consts as api +from propan.brokers.nats.nats_broker import NatsBroker, NatsMessage +from propan.brokers.push_back_watcher import BaseWatcher +from propan.log import access_logger +from propan.types import DecodedMessage, SendableMessage + +T = TypeVar("T") + +class NatsJSBroker(NatsBroker): + _raw_connection: Optional[Client] + _connection: Optional[JetStreamContext] + _stream_config: api.StreamConfig + + @property + def js(self) -> JetStreamContext: + """JetStreamContext object to use it with native `nats-py` features""" + def __init__( + self, + servers: Union[str, List[str]] = ["nats://localhost:4222"], # noqa: B006 + *, + # JS + stream: Optional[str] = "propan-stream", + description: Optional[str] = None, + retention: Optional[api.RetentionPolicy] = None, + max_consumers: Optional[int] = None, + max_msgs: Optional[int] = None, + max_bytes: Optional[int] = None, + discard: Optional[api.DiscardPolicy] = api.DiscardPolicy.OLD, + max_age: Optional[float] = None, # in seconds + max_msgs_per_subject: int = -1, + max_msg_size: Optional[int] = -1, + storage: Optional[api.StorageType] = None, + num_replicas: Optional[int] = None, + no_ack: bool = False, + template_owner: Optional[str] = None, + duplicate_window: int = 0, + placement: Optional[api.Placement] = None, + mirror: Optional[api.StreamSource] = None, + sources: Optional[List[api.StreamSource]] = None, + sealed: bool = False, + deny_delete: bool = False, + deny_purge: bool = False, + allow_rollup_hdrs: bool = False, + republish: Optional[api.RePublish] = None, + allow_direct: Optional[bool] = None, + mirror_direct: Optional[bool] = None, + # connection + error_cb: Optional[ErrorCallback] = None, + disconnected_cb: Optional[Callback] = None, + closed_cb: Optional[Callback] = None, + discovered_server_cb: Optional[Callback] = None, + reconnected_cb: Optional[Callback] = None, + name: Optional[str] = None, + pedantic: bool = False, + verbose: bool = False, + allow_reconnect: bool = True, + connect_timeout: int = DEFAULT_CONNECT_TIMEOUT, + reconnect_time_wait: int = DEFAULT_RECONNECT_TIME_WAIT, + max_reconnect_attempts: int = DEFAULT_MAX_RECONNECT_ATTEMPTS, + ping_interval: int = DEFAULT_PING_INTERVAL, + max_outstanding_pings: int = DEFAULT_MAX_OUTSTANDING_PINGS, + dont_randomize: bool = False, + flusher_queue_size: int = DEFAULT_MAX_FLUSHER_QUEUE_SIZE, + no_echo: bool = False, + tls: Optional[ssl.SSLContext] = None, + tls_hostname: Optional[str] = None, + user: Optional[str] = None, + password: Optional[str] = None, + token: Optional[str] = None, + drain_timeout: int = DEFAULT_DRAIN_TIMEOUT, + signature_cb: Optional[SignatureCallback] = None, + user_jwt_cb: Optional[JWTCallback] = None, + user_credentials: Optional[Credentials] = None, + nkeys_seed: Optional[str] = None, + inbox_prefix: Union[str, bytes] = DEFAULT_INBOX_PREFIX, + pending_size: int = DEFAULT_PENDING_SIZE, + flush_timeout: Optional[float] = None, + # broker kwargs + logger: Optional[logging.Logger] = access_logger, + log_level: int = logging.INFO, + log_fmt: Optional[str] = None, + apply_types: bool = True, + dependencies: Sequence[Depends] = (), + decode_message: AsyncDecoder[Msg] = None, + parse_message: AsyncParser[Msg] = None, + protocol: str = "nats", + ) -> None: + """""" + async def _connect( + self, + *, + # connection + servers: Union[str, List[str]] = ["nats://localhost:4222"], # noqa: B006 + error_cb: Optional[ErrorCallback] = None, + disconnected_cb: Optional[Callback] = None, + closed_cb: Optional[Callback] = None, + discovered_server_cb: Optional[Callback] = None, + reconnected_cb: Optional[Callback] = None, + name: Optional[str] = None, + pedantic: bool = False, + verbose: bool = False, + allow_reconnect: bool = True, + connect_timeout: int = DEFAULT_CONNECT_TIMEOUT, + reconnect_time_wait: int = DEFAULT_RECONNECT_TIME_WAIT, + max_reconnect_attempts: int = DEFAULT_MAX_RECONNECT_ATTEMPTS, + ping_interval: int = DEFAULT_PING_INTERVAL, + max_outstanding_pings: int = DEFAULT_MAX_OUTSTANDING_PINGS, + dont_randomize: bool = False, + flusher_queue_size: int = DEFAULT_MAX_FLUSHER_QUEUE_SIZE, + no_echo: bool = False, + tls: Optional[ssl.SSLContext] = None, + tls_hostname: Optional[str] = None, + user: Optional[str] = None, + password: Optional[str] = None, + token: Optional[str] = None, + drain_timeout: int = DEFAULT_DRAIN_TIMEOUT, + signature_cb: Optional[SignatureCallback] = None, + user_jwt_cb: Optional[JWTCallback] = None, + user_credentials: Optional[Credentials] = None, + nkeys_seed: Optional[str] = None, + inbox_prefix: Union[str, bytes] = DEFAULT_INBOX_PREFIX, + pending_size: int = DEFAULT_PENDING_SIZE, + flush_timeout: Optional[float] = None, + ) -> JetStreamContext: + """""" + async def close( + self, + exc_type: Optional[Type[BaseException]] = None, + exc_val: Optional[BaseException] = None, + exec_tb: Optional[TracebackType] = None, + ) -> None: + """""" + async def _start(self): + """""" + def _process_message( + self, + func: Callable[[NatsMessage], Awaitable[T]], + watcher: Optional[BaseWatcher] = None, + ) -> Callable[[NatsMessage], Awaitable[T]]: + """""" + async def _parse_message(self, message: Msg) -> NatsMessage: + """""" + def handle( + self, + subject: str, + queue: str = "", + *, + durable: Optional[str] = None, + config: Optional[api.ConsumerConfig] = None, + ordered_consumer: bool = False, + idle_heartbeat: Optional[float] = None, + flow_control: bool = False, + pending_msgs_limit: Optional[int] = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, + pending_bytes_limit: Optional[int] = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, + deliver_policy: Optional[api.DeliverPolicy] = None, + headers_only: Optional[bool] = None, + # broker kwargs + retry: Union[bool, int] = False, + dependencies: Sequence[Depends] = (), + decode_message: AsyncDecoder[Msg] = None, + parse_message: AsyncParser[Msg] = None, + description: str = "", + ) -> Callable[ + [HandlerCallable[T_HandlerReturn]], + Callable[[Msg, bool], Awaitable[T_HandlerReturn]], + ]: + """""" + async def publish( + self, + message: SendableMessage, + subject: str, + *, + headers: Optional[Dict[str, str]] = None, + reply_to: str = "", + callback: bool = False, + callback_timeout: Optional[float] = 30.0, + raise_timeout: bool = False, + ) -> Optional[DecodedMessage]: + """""" diff --git a/propan/cli/app.py b/propan/cli/app.py index 9cf0e6d9..05a09e1e 100644 --- a/propan/cli/app.py +++ b/propan/cli/app.py @@ -1,8 +1,9 @@ import logging -from typing import Dict, List, Optional +from abc import ABC +from typing import Awaitable, Callable, Dict, List, Optional import anyio -from typing_extensions import Protocol +from typing_extensions import ParamSpec, Protocol, TypeVar from propan.asyncapi.info import AsyncAPIContact, AsyncAPILicense from propan.cli.supervisors.utils import set_exit @@ -21,13 +22,16 @@ async def close(self) -> None: ... -class PropanApp: - _on_startup_calling: List[AsyncFunc] - _after_startup_calling: List[AsyncFunc] - _on_shutdown_calling: List[AsyncFunc] - _after_shutdown_calling: List[AsyncFunc] +P_HookParams = ParamSpec("P_HookParams") +T_HookReturn = TypeVar("T_HookReturn") + + +class ABCApp(ABC): + _on_startup_calling: List[AnyCallable] + _after_startup_calling: List[AnyCallable] + _on_shutdown_calling: List[AnyCallable] + _after_shutdown_calling: List[AnyCallable] - _stop_event: Optional[anyio.Event] license: Optional[AsyncAPILicense] contact: Optional[AsyncAPIContact] @@ -51,8 +55,6 @@ def __init__( self._after_startup_calling = [] self._on_shutdown_calling = [] self._after_shutdown_calling = [] - self._stop_stream = None - self._receive_stream = None self._command_line_options: Dict[str, SettingField] = {} self.title = title @@ -60,24 +62,91 @@ def __init__( self.description = description self.license = license self.contact = contact - self._stop_event = None - - set_exit(lambda *_: self.__exit()) def set_broker(self, broker: Runnable) -> None: self.broker = broker - def on_startup(self, func: AnyCallable) -> AnyCallable: - return _set_async_hook(self._on_startup_calling, func) + def on_startup( + self, func: Callable[P_HookParams, T_HookReturn] + ) -> Callable[P_HookParams, T_HookReturn]: + self._on_startup_calling.append(apply_types(func)) + return func + + def on_shutdown( + self, func: Callable[P_HookParams, T_HookReturn] + ) -> Callable[P_HookParams, T_HookReturn]: + self._on_shutdown_calling.append(apply_types(func)) + return func + + def after_startup( + self, func: Callable[P_HookParams, T_HookReturn] + ) -> Callable[P_HookParams, T_HookReturn]: + self._after_startup_calling.append(apply_types(func)) + return func + + def after_shutdown( + self, func: Callable[P_HookParams, T_HookReturn] + ) -> Callable[P_HookParams, T_HookReturn]: + self._after_shutdown_calling.append(apply_types(func)) + return func - def on_shutdown(self, func: AnyCallable) -> AnyCallable: - return _set_async_hook(self._on_shutdown_calling, func) + def _log(self, level: int, message: str) -> None: + if self.logger is not None: + self.logger.log(level, message) - def after_startup(self, func: AnyCallable) -> AnyCallable: - return _set_async_hook(self._after_startup_calling, func) - def after_shutdown(self, func: AnyCallable) -> AnyCallable: - return _set_async_hook(self._after_shutdown_calling, func) +class PropanApp(ABCApp): + _on_startup_calling: List[AsyncFunc] + _after_startup_calling: List[AsyncFunc] + _on_shutdown_calling: List[AsyncFunc] + _after_shutdown_calling: List[AsyncFunc] + + _stop_event: Optional[anyio.Event] + + def __init__( + self, + broker: Optional[Runnable] = None, + logger: Optional[logging.Logger] = logger, + # AsyncAPI args, + title: str = "Propan", + version: str = "0.1.0", + description: str = "", + license: Optional[AsyncAPILicense] = None, + contact: Optional[AsyncAPIContact] = None, + ): + super().__init__( + broker=broker, + logger=logger, + title=title, + version=version, + description=description, + license=license, + contact=contact, + ) + + self._stop_event = None + + set_exit(lambda *_: self.__exit()) + + def on_startup( + self, func: Callable[P_HookParams, T_HookReturn] + ) -> Callable[P_HookParams, Awaitable[T_HookReturn]]: + return super().on_startup(to_async(func)) + + def on_shutdown( + self, func: Callable[P_HookParams, Awaitable[T_HookReturn]] + ) -> AsyncFunc: + return super().on_shutdown(to_async(func)) + + def after_startup( + self, func: Callable[P_HookParams, Awaitable[T_HookReturn]] + ) -> AsyncFunc: + return super().after_startup(to_async(func)) + + def after_shutdown( + self, func: Callable[P_HookParams, Awaitable[T_HookReturn]] + ) -> AsyncFunc: + return super().after_shutdown(to_async(func)) async def run(self, log_level: int = logging.INFO) -> None: self._init_async_cycle() @@ -102,10 +171,6 @@ async def _stop(self, log_level: int = logging.INFO) -> None: await self._shutdown() self._log(log_level, "Propan app shut down gracefully.") - def _log(self, level: int, message: str) -> None: - if self.logger is not None: - self.logger.log(level, message) - async def _startup(self) -> None: for func in self._on_startup_calling: await func(**self._command_line_options) @@ -132,9 +197,3 @@ async def _shutdown(self) -> None: def __exit(self) -> None: if self._stop_event is not None: # pragma: no branch self._stop_event.set() - - -def _set_async_hook(hooks: List[AsyncFunc], func: AnyCallable) -> AnyCallable: - f: AsyncFunc = apply_types(to_async(func)) - hooks.append(f) - return func diff --git a/propan/cli/startproject/async_app/app.py b/propan/cli/startproject/async_app/app.py index d0a1da5c..5b7cd4c1 100644 --- a/propan/cli/startproject/async_app/app.py +++ b/propan/cli/startproject/async_app/app.py @@ -3,7 +3,7 @@ import typer from propan.cli.startproject.async_app.kafka import create_kafka -from propan.cli.startproject.async_app.nats import create_nats +from propan.cli.startproject.async_app.nats import create_nats, create_nats_js from propan.cli.startproject.async_app.rabbit import create_rabbit from propan.cli.startproject.async_app.redis import create_redis from propan.cli.startproject.async_app.sqs import create_sqs @@ -32,6 +32,13 @@ def nats(appname: str) -> None: typer.echo(f"Create an asyncronous Nats Propan project at: {project}") +@async_app.command(name="nats-js") +def nats_js(appname: str) -> None: + """Create an asyncronous NatsJS Propan project at [APPNAME] directory""" + project = create_nats_js(Path.cwd() / appname) + typer.echo(f"Create an asyncronous NatsJS Propan project at: {project}") + + @async_app.command() def kafka(appname: str) -> None: """Create an asyncronous Kafka Propan project at [APPNAME] directory""" diff --git a/propan/cli/startproject/async_app/nats.py b/propan/cli/startproject/async_app/nats.py index 3eeca314..33699aa7 100644 --- a/propan/cli/startproject/async_app/nats.py +++ b/propan/cli/startproject/async_app/nats.py @@ -11,14 +11,22 @@ def create_nats(dir: Path) -> Path: - project_dir = _create_project_dir(dir) - app_dir = _create_app_dir(project_dir / "app") + project_dir = _create_project_dir(dir, js=False) + app_dir = _create_app_dir(project_dir / "app", js=False) _create_config_dir(app_dir / "config") _create_apps_dir(app_dir / "apps") return project_dir -def _create_project_dir(dirname: Path) -> Path: +def create_nats_js(dir: Path) -> Path: + project_dir = _create_project_dir(dir, js=True) + app_dir = _create_app_dir(project_dir / "app", js=True) + _create_config_dir(app_dir / "config") + _create_apps_dir(app_dir / "apps") + return project_dir + + +def _create_project_dir(dirname: Path, js: bool) -> Path: project_dir = create_project_dir(dirname, "propan[async-nats]") write_file( @@ -31,6 +39,7 @@ def _create_project_dir(dirname: Path) -> Path: " ports:", " - 4222:4222", " - 8222:8222 # management", + " command: -js" if js else "", "", " app:", " build: .", @@ -45,9 +54,9 @@ def _create_project_dir(dirname: Path) -> Path: return project_dir -def _create_app_dir(app: Path) -> Path: +def _create_app_dir(app: Path, js: bool) -> Path: app_dir = touch_dir(app) - create_app_file(app_dir, "NatsBroker") + create_app_file(app_dir, "NatsJSBroker" if js else "NatsBroker") return app_dir diff --git a/propan/fastapi/__init__.py b/propan/fastapi/__init__.py index 1d655104..b33e179e 100644 --- a/propan/fastapi/__init__.py +++ b/propan/fastapi/__init__.py @@ -16,9 +16,9 @@ KafkaRouter = about.INSTALL_KAFKA # type: ignore try: - from propan.fastapi.nats import NatsRouter + from propan.fastapi.nats import NatsJSRouter, NatsRouter except ImportError: - NatsRouter = about.INSTALL_NATS # type: ignore + NatsRouter = NatsJSRouter = about.INSTALL_NATS # type: ignore try: from propan.fastapi.sqs import SQSRouter @@ -29,4 +29,11 @@ (RabbitRouter, RedisRouter, KafkaRouter, NatsRouter, SQSRouter) ), about.INSTALL_MESSAGE -__all__ = ("RabbitRouter", "RedisRouter", "KafkaRouter", "NatsRouter", "SQSRouter") +__all__ = ( + "RabbitRouter", + "RedisRouter", + "KafkaRouter", + "NatsRouter", + "SQSRouter", + "NatsJSRouter", +) diff --git a/propan/fastapi/kafka/router.py b/propan/fastapi/kafka/router.py index 58d062f1..0944543b 100644 --- a/propan/fastapi/kafka/router.py +++ b/propan/fastapi/kafka/router.py @@ -1,6 +1,6 @@ from aiokafka.structs import ConsumerRecord -from propan import KafkaBroker +from propan.brokers.kafka import KafkaBroker from propan.fastapi.core.router import PropanRouter diff --git a/propan/fastapi/nats/__init__.py b/propan/fastapi/nats/__init__.py index 6e4c50cc..47d66d1e 100644 --- a/propan/fastapi/nats/__init__.py +++ b/propan/fastapi/nats/__init__.py @@ -1,3 +1,6 @@ -from propan.fastapi.nats.router import NatsRouter +from propan.fastapi.nats.router import NatsJSRouter, NatsRouter -__all__ = ("NatsRouter",) +__all__ = ( + "NatsRouter", + "NatsJSRouter", +) diff --git a/propan/fastapi/nats/router.py b/propan/fastapi/nats/router.py index 70a195cd..b948e5fb 100644 --- a/propan/fastapi/nats/router.py +++ b/propan/fastapi/nats/router.py @@ -1,8 +1,12 @@ from nats.aio.msg import Msg -from propan import NatsBroker +from propan.brokers.nats import NatsBroker, NatsJSBroker from propan.fastapi.core.router import PropanRouter class NatsRouter(PropanRouter[NatsBroker, Msg]): broker_class = NatsBroker + + +class NatsJSRouter(PropanRouter[NatsJSBroker, Msg]): + broker_class = NatsJSBroker diff --git a/propan/fastapi/nats/router.pyi b/propan/fastapi/nats/router.pyi index 76fc4d3e..08e62497 100644 --- a/propan/fastapi/nats/router.pyi +++ b/propan/fastapi/nats/router.pyi @@ -24,17 +24,22 @@ from nats.aio.client import ( SignatureCallback, ) from nats.aio.msg import Msg +from nats.js.client import ( + DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, + DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, +) from starlette import routing from starlette.responses import JSONResponse, Response from starlette.types import ASGIApp -from propan import NatsBroker +from propan import NatsBroker, NatsJSBroker from propan.brokers._model.broker_usecase import ( AsyncDecoder, AsyncParser, HandlerCallable, T_HandlerReturn, ) +from propan.brokers.nats import consts as api from propan.fastapi.core.router import PropanRouter from propan.log import access_logger @@ -130,3 +135,144 @@ class NatsRouter(PropanRouter[NatsBroker]): Callable[[Msg, bool], Awaitable[T_HandlerReturn]], ]: pass + +class NatsJSRouter(PropanRouter[NatsJSBroker]): + def __init__( + self, + servers: Union[str, List[str]] = ["nats://localhost:4222"], # noqa: B006 + *, + # JS + stream: Optional[str] = "propan-stream", + description: Optional[str] = None, + retention: Optional[api.RetentionPolicy] = None, + max_consumers: Optional[int] = None, + max_msgs: Optional[int] = None, + max_bytes: Optional[int] = None, + discard: Optional[api.DiscardPolicy] = api.DiscardPolicy.OLD, + max_age: Optional[float] = None, # in seconds + max_msgs_per_subject: int = -1, + max_msg_size: Optional[int] = -1, + storage: Optional[api.StorageType] = None, + num_replicas: Optional[int] = None, + no_ack: bool = False, + template_owner: Optional[str] = None, + duplicate_window: int = 0, + placement: Optional[api.Placement] = None, + mirror: Optional[api.StreamSource] = None, + sources: Optional[List[api.StreamSource]] = None, + sealed: bool = False, + deny_delete: bool = False, + deny_purge: bool = False, + allow_rollup_hdrs: bool = False, + republish: Optional[api.RePublish] = None, + allow_direct: Optional[bool] = None, + mirror_direct: Optional[bool] = None, + # connection + error_cb: Optional[ErrorCallback] = None, + disconnected_cb: Optional[Callback] = None, + closed_cb: Optional[Callback] = None, + discovered_server_cb: Optional[Callback] = None, + reconnected_cb: Optional[Callback] = None, + name: Optional[str] = None, + pedantic: bool = False, + verbose: bool = False, + allow_reconnect: bool = True, + connect_timeout: int = DEFAULT_CONNECT_TIMEOUT, + reconnect_time_wait: int = DEFAULT_RECONNECT_TIME_WAIT, + max_reconnect_attempts: int = DEFAULT_MAX_RECONNECT_ATTEMPTS, + ping_interval: int = DEFAULT_PING_INTERVAL, + max_outstanding_pings: int = DEFAULT_MAX_OUTSTANDING_PINGS, + dont_randomize: bool = False, + flusher_queue_size: int = DEFAULT_MAX_FLUSHER_QUEUE_SIZE, + no_echo: bool = False, + tls: Optional[ssl.SSLContext] = None, + tls_hostname: Optional[str] = None, + user: Optional[str] = None, + password: Optional[str] = None, + token: Optional[str] = None, + drain_timeout: int = DEFAULT_DRAIN_TIMEOUT, + signature_cb: Optional[SignatureCallback] = None, + user_jwt_cb: Optional[JWTCallback] = None, + user_credentials: Optional[Credentials] = None, + nkeys_seed: Optional[str] = None, + inbox_prefix: Union[str, bytes] = DEFAULT_INBOX_PREFIX, + pending_size: int = DEFAULT_PENDING_SIZE, + flush_timeout: Optional[float] = None, + # FastAPI kwargs + prefix: str = "", + tags: Optional[List[Union[str, Enum]]] = None, + dependencies: Optional[Sequence[params.Depends]] = None, + default_response_class: Type[Response] = Default(JSONResponse), + responses: Optional[Dict[Union[int, str], Dict[str, Any]]] = None, + callbacks: Optional[List[routing.BaseRoute]] = None, + routes: Optional[List[routing.BaseRoute]] = None, + redirect_slashes: bool = True, + default: Optional[ASGIApp] = None, + dependency_overrides_provider: Optional[Any] = None, + route_class: Type[APIRoute] = APIRoute, + on_startup: Optional[Sequence[Callable[[], Any]]] = None, + on_shutdown: Optional[Sequence[Callable[[], Any]]] = None, + deprecated: Optional[bool] = None, + include_in_schema: bool = True, + generate_unique_id_function: Callable[[APIRoute], str] = Default( + generate_unique_id + ), + # Broker kwargs + schema_url: str = "/asyncapi", + logger: Optional[logging.Logger] = access_logger, + log_level: int = logging.INFO, + log_fmt: Optional[str] = None, + apply_types: bool = True, + decode_message: AsyncDecoder[Msg] = None, + parse_message: AsyncParser[Msg] = None, + protocol: str = "nats", + ) -> None: + pass + def add_api_mq_route( # type: ignore[override] + self, + subject: str, + *, + durable: Optional[str] = None, + config: Optional[api.ConsumerConfig] = None, + ordered_consumer: bool = False, + idle_heartbeat: Optional[float] = None, + flow_control: bool = False, + pending_msgs_limit: Optional[int] = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, + pending_bytes_limit: Optional[int] = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, + deliver_policy: Optional[api.DeliverPolicy] = None, + headers_only: Optional[bool] = None, + # fastapi kwargs + endpoint: HandlerCallable[T_HandlerReturn], + # broker kwargs + retry: Union[bool, int] = False, + decode_message: AsyncDecoder[Msg] = None, + parse_message: AsyncParser[Msg] = None, + description: str = "", + dependencies: Optional[Sequence[params.Depends]] = None, + ) -> Callable[[Msg, bool], Awaitable[T_HandlerReturn]]: + pass + def event( # type: ignore[override] + self, + subject: str, + *, + queue: str = "", + durable: Optional[str] = None, + config: Optional[api.ConsumerConfig] = None, + ordered_consumer: bool = False, + idle_heartbeat: Optional[float] = None, + flow_control: bool = False, + pending_msgs_limit: Optional[int] = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, + pending_bytes_limit: Optional[int] = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, + deliver_policy: Optional[api.DeliverPolicy] = None, + headers_only: Optional[bool] = None, + # broker kwargs + retry: Union[bool, int] = False, + decode_message: AsyncDecoder[Msg] = None, + parse_message: AsyncParser[Msg] = None, + description: str = "", + dependencies: Optional[Sequence[params.Depends]] = None, + ) -> Callable[ + [HandlerCallable[T_HandlerReturn]], + Callable[[Msg, bool], Awaitable[T_HandlerReturn]], + ]: + pass diff --git a/propan/fastapi/rabbit/router.py b/propan/fastapi/rabbit/router.py index 499cc1cc..80dc32ed 100644 --- a/propan/fastapi/rabbit/router.py +++ b/propan/fastapi/rabbit/router.py @@ -1,6 +1,6 @@ from aio_pika.message import IncomingMessage -from propan import RabbitBroker +from propan.brokers.rabbit import RabbitBroker from propan.fastapi.core.router import PropanRouter diff --git a/propan/fastapi/redis/router.py b/propan/fastapi/redis/router.py index ce5a33cf..5ceccbae 100644 --- a/propan/fastapi/redis/router.py +++ b/propan/fastapi/redis/router.py @@ -1,4 +1,4 @@ -from propan import RedisBroker +from propan.brokers.redis import RedisBroker from propan.fastapi.core.router import PropanRouter from propan.types import AnyDict diff --git a/propan/fastapi/sqs/router.py b/propan/fastapi/sqs/router.py index 15316ed0..137e5f69 100644 --- a/propan/fastapi/sqs/router.py +++ b/propan/fastapi/sqs/router.py @@ -1,4 +1,4 @@ -from propan import SQSBroker +from propan.brokers.sqs import SQSBroker from propan.fastapi.core.router import PropanRouter from propan.types import AnyDict diff --git a/propan/test/nats.py b/propan/test/nats.py index d36a4441..7ed2dc44 100644 --- a/propan/test/nats.py +++ b/propan/test/nats.py @@ -1,6 +1,7 @@ import sys from types import MethodType -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union +from uuid import uuid4 from nats.aio.msg import Msg @@ -19,18 +20,35 @@ ) +class PatchedMessage(Msg): + async def ack(self) -> None: + pass + + async def ack_sync(self, timeout: float = 1) -> None: + pass + + async def nak(self, delay: Union[int, float, None] = None) -> None: + pass + + async def term(self) -> None: + pass + + async def in_progress(self) -> None: + pass + + def build_message( message: SendableMessage, subject: str, *, reply_to: str = "", headers: Optional[Dict[str, Any]] = None, -) -> Msg: +) -> PatchedMessage: msg, content_type = NatsBroker._encode_message(message) - return Msg( + return PatchedMessage( _client=None, # type: ignore subject=subject, - reply=reply_to, + reply=reply_to or str(uuid4()), data=msg, headers={ **(headers or {}), @@ -69,5 +87,6 @@ async def publish( def TestNatsBroker(broker: NatsBroker) -> NatsBroker: broker.connect = AsyncMock() # type: ignore broker.start = AsyncMock() # type: ignore + broker._raw_connection = AsyncMock() # type: ignore broker.publish = MethodType(publish, broker) # type: ignore return broker diff --git a/scripts/test-cov.sh b/scripts/test-cov.sh index 6163baaf..5bd68e53 100755 --- a/scripts/test-cov.sh +++ b/scripts/test-cov.sh @@ -1,9 +1,9 @@ bash scripts/test.sh "$@" -coverage run -m pytest -m "run" -k test_run_nats_correct tests/cli/test_run.py -coverage run -m pytest -m "run" -k test_run_rabbit_correct tests/cli/test_run.py -coverage run -m pytest -m "run" -k test_run_kafka_correct tests/cli/test_run.py -coverage run -m pytest -m "run" -k test_run_sqs_correct tests/cli/test_run.py -coverage run -m pytest -m "run" -k test_run_redis_correct tests/cli/test_run.py +coverage run -m pytest -m "run and rabbit" tests/cli/test_run.py +coverage run -m pytest -m "run and kafka" tests/cli/test_run.py +coverage run -m pytest -m "run and sqs" tests/cli/test_run.py +coverage run -m pytest -m "run and redis" tests/cli/test_run.py +coverage run -m pytest -m "run and nats" tests/cli/test_run.py coverage combine coverage report --show-missing diff --git a/tests/brokers/nats_js/__init__.py b/tests/brokers/nats_js/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/brokers/nats_js/conftest.py b/tests/brokers/nats_js/conftest.py new file mode 100644 index 00000000..eebb729b --- /dev/null +++ b/tests/brokers/nats_js/conftest.py @@ -0,0 +1,47 @@ +from dataclasses import dataclass + +import pytest +import pytest_asyncio + +from propan import NatsJSBroker, NatsRouter +from propan.test import TestNatsBroker + + +@dataclass +class Settings: + url = "nats://localhost:4222" + + +@pytest.fixture() +def router(): + return NatsRouter() + + +@pytest.fixture(scope="session") +def settings(): + return Settings() + + +@pytest.mark.nats +@pytest_asyncio.fixture() +async def broker(settings): + broker = NatsJSBroker( + settings.url, + apply_types=False, + ) + yield broker + await broker.close() + + +@pytest.mark.nats +@pytest_asyncio.fixture() +async def full_broker(settings): + broker = NatsJSBroker(settings.url) + yield broker + await broker.close() + + +@pytest.fixture() +def test_broker(): + broker = NatsJSBroker() + yield TestNatsBroker(broker) diff --git a/tests/brokers/nats_js/test_connect.py b/tests/brokers/nats_js/test_connect.py new file mode 100644 index 00000000..40dcfe70 --- /dev/null +++ b/tests/brokers/nats_js/test_connect.py @@ -0,0 +1,23 @@ +import pytest +from nats.js.client import JetStreamContext + +from propan import NatsJSBroker +from tests.brokers.base.connection import BrokerConnectionTestcase + + +@pytest.mark.nats +class TestNatsJSConnect(BrokerConnectionTestcase): + broker = NatsJSBroker + + @pytest.mark.asyncio + async def test_connect_merge_args_and_kwargs_native(self, settings): + broker = self.broker("fake-url") # will be ignored + assert await broker.connect(servers=settings.url) + await broker.close() + + @pytest.mark.asyncio + async def test_js_exists(self, settings): + broker = self.broker(settings.url) + assert await broker.connect() + assert isinstance(broker.js, JetStreamContext) + await broker.close() diff --git a/tests/brokers/nats_js/test_consume.py b/tests/brokers/nats_js/test_consume.py new file mode 100644 index 00000000..ae8f5615 --- /dev/null +++ b/tests/brokers/nats_js/test_consume.py @@ -0,0 +1,8 @@ +import pytest + +from tests.brokers.base.consume import BrokerConsumeTestcase + + +@pytest.mark.nats +class TestNatsJSConsume(BrokerConsumeTestcase): + pass diff --git a/tests/brokers/nats_js/test_publish.py b/tests/brokers/nats_js/test_publish.py new file mode 100644 index 00000000..03f2fa0e --- /dev/null +++ b/tests/brokers/nats_js/test_publish.py @@ -0,0 +1,8 @@ +import pytest + +from tests.brokers.base.publish import BrokerPublishTestcase + + +@pytest.mark.nats +class TestNatsJSPublish(BrokerPublishTestcase): + pass diff --git a/tests/brokers/nats_js/test_router.py b/tests/brokers/nats_js/test_router.py new file mode 100644 index 00000000..4357c076 --- /dev/null +++ b/tests/brokers/nats_js/test_router.py @@ -0,0 +1,6 @@ +from propan.test.nats import build_message +from tests.brokers.base.router import RouterTestcase + + +class TestNatsJSRouter(RouterTestcase): + build_message = staticmethod(build_message) diff --git a/tests/brokers/nats_js/test_rpc.py b/tests/brokers/nats_js/test_rpc.py new file mode 100644 index 00000000..333bf815 --- /dev/null +++ b/tests/brokers/nats_js/test_rpc.py @@ -0,0 +1,8 @@ +import pytest + +from tests.brokers.base.rpc import BrokerRPCTestcase, ReplyAndConsumeForbidden + + +@pytest.mark.nats +class TestNatsJSRPC(BrokerRPCTestcase, ReplyAndConsumeForbidden): + pass diff --git a/tests/brokers/nats_js/test_test_client.py b/tests/brokers/nats_js/test_test_client.py new file mode 100644 index 00000000..f1428542 --- /dev/null +++ b/tests/brokers/nats_js/test_test_client.py @@ -0,0 +1,6 @@ +from propan.test.nats import build_message +from tests.brokers.base.testclient import BrokerTestclientTestcase + + +class TestNatsJSTestclient(BrokerTestclientTestcase): + build_message = staticmethod(build_message) diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py index 12307180..f018aa20 100644 --- a/tests/cli/conftest.py +++ b/tests/cli/conftest.py @@ -58,6 +58,13 @@ def nats_async_project(runner: CliRunner) -> Path: yield Path.cwd() / Path(project_name) +@pytest.fixture(scope="session") +def nats_js_async_project(runner: CliRunner) -> Path: + project_name = "nats_js" + runner.invoke(cli, ["create", "async", "nats-js", project_name]) + yield Path.cwd() / Path(project_name) + + @pytest.fixture(scope="session") def kafka_async_project(runner: CliRunner) -> Path: project_name = "kafka" diff --git a/tests/cli/test_creation.py b/tests/cli/test_creation.py index d9700cd2..7a3a2a5c 100644 --- a/tests/cli/test_creation.py +++ b/tests/cli/test_creation.py @@ -1,2 +1,22 @@ -def test_create_propject(rabbit_async_project): +def test_create_rabbit_project(rabbit_async_project): assert rabbit_async_project.exists() + + +def test_create_redis_project(redis_async_project): + assert redis_async_project.exists() + + +def test_create_nats_project(nats_async_project): + assert nats_async_project.exists() + + +def test_create_nats_js_project(nats_js_async_project): + assert nats_js_async_project.exists() + + +def test_create_kafka_project(kafka_async_project): + assert kafka_async_project.exists() + + +def test_create_sqs_project(sqs_async_project): + assert sqs_async_project.exists() diff --git a/tests/cli/test_run.py b/tests/cli/test_run.py index 3139f26a..a332c28c 100644 --- a/tests/cli/test_run.py +++ b/tests/cli/test_run.py @@ -100,6 +100,29 @@ async def patched_run(self: PropanApp, *args, **kwargs): mock.assert_called_once() +@pytest.mark.nats +@pytest.mark.run +def test_run_nats_js_correct( + runner: CliRunner, + nats_js_async_project: Path, + monkeypatch, + mock: Mock, +): + app_path = f"{nats_js_async_project.name}.app.serve:app" + + async def patched_run(self: PropanApp, *args, **kwargs): + await self._startup() + await self._shutdown() + mock() + + with monkeypatch.context() as m: + m.setattr(PropanApp, "run", patched_run) + r = runner.invoke(cli, ["run", app_path]) + + assert r.exit_code == 0 + mock.assert_called_once() + + @pytest.mark.sqs @pytest.mark.run def test_run_sqs_correct(