Skip to content

Commit

Permalink
Merge pull request #198 Codec support for writer
Browse files Browse the repository at this point in the history
  • Loading branch information
rekby authored Mar 7, 2023
2 parents 95ab809 + ccc6eec commit b31efcb
Show file tree
Hide file tree
Showing 8 changed files with 497 additions and 49 deletions.
4 changes: 3 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ async def topic_path(driver, topic_consumer, database) -> str:
@pytest.fixture()
@pytest.mark.asyncio()
async def topic_with_messages(driver, topic_path):
writer = driver.topic_client.writer(topic_path, producer_id="fixture-producer-id")
writer = driver.topic_client.writer(
topic_path, producer_id="fixture-producer-id", codec=ydb.TopicCodec.RAW
)
await writer.write_with_ack(
[
ydb.TopicWriterMessage(data="123".encode()),
Expand Down
28 changes: 28 additions & 0 deletions tests/topics/test_topic_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ async def test_write_multi_message_with_ack(
assert batch.messages[0].seqno == 2
assert batch.messages[0].data == "456".encode()

@pytest.mark.parametrize(
"codec",
[
ydb.TopicCodec.RAW,
ydb.TopicCodec.GZIP,
None,
],
)
async def test_write_encoded(self, driver: ydb.Driver, topic_path: str, codec):
async with driver.topic_client.writer(topic_path, codec=codec) as writer:
writer.write("a" * 1000)
writer.write("b" * 1000)
writer.write("c" * 1000)


class TestTopicWriterSync:
def test_send_message(self, driver_sync: ydb.Driver, topic_path):
Expand Down Expand Up @@ -163,3 +177,17 @@ def test_write_multi_message_with_ack(
assert batch.messages[0].offset == 1
assert batch.messages[0].seqno == 2
assert batch.messages[0].data == "456".encode()

@pytest.mark.parametrize(
"codec",
[
ydb.TopicCodec.RAW,
ydb.TopicCodec.GZIP,
None,
],
)
def test_write_encoded(self, driver_sync: ydb.Driver, topic_path: str, codec):
with driver_sync.topic_client.writer(topic_path, codec=codec) as writer:
writer.write("a" * 1000)
writer.write("b" * 1000)
writer.write("c" * 1000)
11 changes: 9 additions & 2 deletions ydb/_grpc/grpcwrapper/ydb_topic_public_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,18 @@ class CreateTopicRequestParams:


class PublicCodec(int):
"""
Codec value may contain any int number.
Values below is only well-known predefined values,
but protocol support custom codecs.
"""

UNSPECIFIED = 0
RAW = 1
GZIP = 2
LZOP = 3
ZSTD = 4
LZOP = 3 # Has not supported codec in standard library
ZSTD = 4 # Has not supported codec in standard library


class PublicMeteringMode(IntEnum):
Expand Down
26 changes: 17 additions & 9 deletions ydb/_topic_writer/topic_writer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import concurrent.futures
import datetime
import enum
import uuid
Expand All @@ -10,9 +11,9 @@
import ydb.aio
from .._grpc.grpcwrapper.ydb_topic import Codec, StreamWriteMessage
from .._grpc.grpcwrapper.common_utils import IToProto
from .._grpc.grpcwrapper.ydb_topic_public_types import PublicCodec


MessageType = typing.Union["PublicMessage", "PublicMessage.SimpleMessageSourceType"]
Message = typing.Union["PublicMessage", "PublicMessage.SimpleMessageSourceType"]


@dataclass
Expand All @@ -29,8 +30,14 @@ class PublicWriterSettings:
partition_id: Optional[int] = None
auto_seqno: bool = True
auto_created_at: bool = True
codec: Optional[PublicCodec] = None # default mean auto-select
encoder_executor: Optional[
concurrent.futures.Executor
] = None # default shared client executor pool
encoders: Optional[
typing.Mapping[PublicCodec, typing.Callable[[bytes], bytes]]
] = None
# get_last_seqno: bool = False
# encoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None
# serializer: Union[Callable[[Any], bytes], None] = None
# send_buffer_count: Optional[int] = 10000
# send_buffer_bytes: Optional[int] = 100 * 1024 * 1024
Expand Down Expand Up @@ -85,8 +92,9 @@ class SendMode(Enum):

@dataclass
class PublicWriterInitInfo:
__slots__ = "last_seqno"
__slots__ = ("last_seqno", "supported_codecs")
last_seqno: Optional[int]
supported_codecs: List[PublicCodec]


class PublicMessage:
Expand All @@ -108,24 +116,24 @@ def __init__(
self.data = data

@staticmethod
def _create_message(
data: Union["PublicMessage", "PublicMessage.SimpleMessageSourceType"]
) -> "PublicMessage":
def _create_message(data: Message) -> "PublicMessage":
if isinstance(data, PublicMessage):
return data
return PublicMessage(data=data)


class InternalMessage(StreamWriteMessage.WriteRequest.MessageData, IToProto):
codec: PublicCodec

def __init__(self, mess: PublicMessage):
StreamWriteMessage.WriteRequest.MessageData.__init__(
self,
super().__init__(
seq_no=mess.seqno,
created_at=mess.created_at,
data=mess.data,
uncompressed_size=len(mess.data),
partitioning=None,
)
self.codec = PublicCodec.RAW

def get_bytes(self) -> bytes:
if self.data is None:
Expand Down
Loading

0 comments on commit b31efcb

Please sign in to comment.