Skip to content

Commit

Permalink
Fix: pop schema_ from publishers dto
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniil Dumchenko committed Jan 20, 2025
1 parent e0515c2 commit 4815fab
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 45 deletions.
9 changes: 4 additions & 5 deletions faststream/_internal/publisher/specified.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from inspect import Parameter, unwrap
from typing import TYPE_CHECKING, Any, Callable, Union
from typing import TYPE_CHECKING, Any, Callable, Optional, Union

from fast_depends.core import build_call_model
from fast_depends.pydantic._compat import create_model, get_config_base

from faststream._internal.publisher.schemas import SpecificationPublisherOptions
from faststream._internal.types import (
MsgType,
P_HandlerParams,
Expand All @@ -30,13 +29,13 @@ class SpecificationPublisher(EndpointSpecification[MsgType, PublisherSpec]):
def __init__(
self,
*args: Any,
specification_options: SpecificationPublisherOptions,
schema_: Optional[Any],
**kwargs: Any,
) -> None:
self.calls: list[AnyCallable] = []
self.schema_ = specification_options.schema_
self.schema_ = schema_
# Call next base class parent init
super().__init__(*args, specification_options=specification_options, **kwargs)
super().__init__(*args, **kwargs)

def __call__(
self,
Expand Down
23 changes: 9 additions & 14 deletions faststream/confluent/publisher/factory.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
from collections.abc import Sequence
from typing import (
TYPE_CHECKING,
Any,
Literal,
Optional,
Union,
overload,
cast
)
from typing import TYPE_CHECKING, Any, Literal, Optional, Union, cast, overload

from faststream._internal.publisher.schemas import (
PublisherUsecaseOptions,
SpecificationPublisherOptions,
)
from faststream.confluent.schemas.publisher import ConfluentPublisherBaseOptions
from faststream.exceptions import SetupError
from faststream.specification.schema.base import SpecificationOptions

from .specified import SpecificationBatchPublisher, SpecificationDefaultPublisher

Expand Down Expand Up @@ -128,11 +120,10 @@ def create_publisher(
reply_to=reply_to,
internal_options=internal_options,
)
specification_options = SpecificationPublisherOptions(
specification_options = SpecificationOptions(
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
schema_=schema_,
)

if batch:
Expand All @@ -141,9 +132,13 @@ def create_publisher(
raise SetupError(msg)

return SpecificationBatchPublisher(
specification_options=specification_options, base_options=base_options
specification_options=specification_options,
base_options=base_options,
schema_=schema_
)

return SpecificationDefaultPublisher(
specification_options=specification_options, base_options=base_options
specification_options=specification_options,
base_options=base_options,
schema_=schema_
)
9 changes: 1 addition & 8 deletions faststream/confluent/subscriber/factory.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
import warnings
from collections.abc import Iterable, Sequence
from typing import (
TYPE_CHECKING,
Literal,
Optional,
Union,
overload,
cast
)
from typing import TYPE_CHECKING, Literal, Optional, Union, cast, overload

from faststream._internal.constants import EMPTY
from faststream._internal.subscriber.schemas import SubscriberUsecaseOptions
Expand Down
13 changes: 8 additions & 5 deletions faststream/kafka/publisher/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@

from faststream._internal.publisher.schemas import (
PublisherUsecaseOptions,
SpecificationPublisherOptions,
)
from faststream.exceptions import SetupError
from faststream.kafka.schemas.publishers import KafkaPublisherBaseOptions
from faststream.specification.schema.base import SpecificationOptions

from .specified import SpecificationBatchPublisher, SpecificationDefaultPublisher

Expand Down Expand Up @@ -121,11 +121,10 @@ def create_publisher(
reply_to=reply_to,
internal_options=internal_options,
)
specification_options = SpecificationPublisherOptions(
specification_options = SpecificationOptions(
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
schema_=schema_,
)

if batch:
Expand All @@ -134,8 +133,12 @@ def create_publisher(
raise SetupError(msg)

return SpecificationBatchPublisher(
base_options=base_options, specification_options=specification_options
base_options=base_options,
specification_options=specification_options,
schema_=schema_
)
return SpecificationDefaultPublisher(
base_options=base_options, specification_options=specification_options
base_options=base_options,
specification_options=specification_options,
schema_=schema_
)
2 changes: 1 addition & 1 deletion faststream/kafka/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import anyio
from aiokafka import ConsumerRecord, TopicPartition
from aiokafka.errors import ConsumerStoppedError, KafkaError
from faststream.middlewares.acknowledgement.conf import AckPolicy
from typing_extensions import override

from faststream._internal.subscriber.mixins import ConcurrentMixin, TasksMixin
Expand All @@ -21,6 +20,7 @@
from faststream.kafka.parser import AioKafkaBatchParser, AioKafkaParser
from faststream.kafka.publisher.fake import KafkaFakePublisher
from faststream.kafka.schemas.subscribers import KafkaSubscriberBaseOptions
from faststream.middlewares.acknowledgement.conf import AckPolicy

if TYPE_CHECKING:
from aiokafka import AIOKafkaConsumer
Expand Down
6 changes: 3 additions & 3 deletions faststream/nats/publisher/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

from faststream._internal.publisher.schemas import (
PublisherUsecaseOptions,
SpecificationPublisherOptions,
)
from faststream.nats.schemas.publishers import NatsPublisherBaseOptions
from faststream.specification.schema.base import SpecificationOptions

from .specified import SpecificationPublisher

Expand Down Expand Up @@ -45,8 +45,7 @@ def create_publisher(
internal_options=internal_options,
)

specification_options = SpecificationPublisherOptions(
schema_=schema_,
specification_options = SpecificationOptions(
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
Expand All @@ -55,4 +54,5 @@ def create_publisher(
return SpecificationPublisher(
base_options=base_options,
specification_options=specification_options,
schema_=schema_
)
8 changes: 4 additions & 4 deletions faststream/rabbit/publisher/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

from faststream._internal.publisher.schemas import (
PublisherUsecaseOptions,
SpecificationPublisherOptions,
)
from faststream.rabbit.schemas.base import RabbitBaseOptions
from faststream.rabbit.schemas.publishers import RabbitPublisherBaseOptions
from faststream.specification.schema.base import SpecificationOptions

from .specified import SpecificationPublisher

Expand Down Expand Up @@ -50,9 +50,8 @@ def create_publisher(
queue=queue,
exchange=exchange,
)
# AsyncAPI options
specification_options = SpecificationPublisherOptions(
schema_=schema_,

specification_options = SpecificationOptions(
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
Expand All @@ -61,4 +60,5 @@ def create_publisher(
base_options=base_options,
rabbit_mq_base_options=rabbit_mq_base_options,
specification_options=specification_options,
schema_=schema_,
)
8 changes: 6 additions & 2 deletions faststream/rabbit/publisher/specified.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from faststream._internal.publisher.schemas import SpecificationPublisherOptions
from typing import Any, Optional

from faststream._internal.publisher.specified import (
SpecificationPublisher as SpecificationPublisherMixin,
)
Expand All @@ -10,6 +11,7 @@
from faststream.rabbit.utils import is_routing_exchange
from faststream.specification.asyncapi.utils import resolve_payloads
from faststream.specification.schema import Message, Operation, PublisherSpec
from faststream.specification.schema.base import SpecificationOptions
from faststream.specification.schema.bindings import (
ChannelBinding,
OperationBinding,
Expand All @@ -31,10 +33,12 @@ def __init__(
*,
base_options: RabbitPublisherBaseOptions,
rabbit_mq_base_options: RabbitBaseOptions,
specification_options: SpecificationPublisherOptions,
specification_options: SpecificationOptions,
schema_: Optional["Any"],
) -> None:
super().__init__(
specification_options=specification_options,
schema_=schema_,
# propagate to RMQSpecificationMixin
rabbit_mq_options=rabbit_mq_base_options,
)
Expand Down
9 changes: 6 additions & 3 deletions faststream/redis/publisher/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

from faststream._internal.publisher.schemas import (
PublisherUsecaseOptions,
SpecificationPublisherOptions,
)
from faststream.exceptions import SetupError
from faststream.redis.schemas import INCORRECT_SETUP_MSG, ListSub, PubSub, StreamSub
from faststream.redis.schemas.proto import validate_options
from faststream.redis.schemas.publishers import RedisPublisherBaseOptions
from faststream.specification.schema.base import SpecificationOptions

from .specified import (
SpecificationChannelPublisher,
Expand Down Expand Up @@ -58,8 +58,7 @@ def create_publisher(
reply_to=reply_to, headers=headers, internal_options=internal_options
)

specification_options = SpecificationPublisherOptions(
schema_=schema_,
specification_options = SpecificationOptions(
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
Expand All @@ -70,13 +69,15 @@ def create_publisher(
channel=channel,
base_options=base_options,
specification_options=specification_options,
schema_=schema_,
)

if (stream := StreamSub.validate(stream)) is not None:
return SpecificationStreamPublisher(
stream=stream,
base_options=base_options,
specification_options=specification_options,
schema_=schema_,
)

if (list := ListSub.validate(list)) is not None:
Expand All @@ -85,11 +86,13 @@ def create_publisher(
list=list,
base_options=base_options,
specification_options=specification_options,
schema_=schema_,
)
return SpecificationListPublisher(
list=list,
base_options=base_options,
specification_options=specification_options,
schema_=schema_,
)

raise SetupError(INCORRECT_SETUP_MSG)

0 comments on commit 4815fab

Please sign in to comment.