From 6718229667e464400a478cb22cb1db6b88f713af Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Mon, 22 May 2023 19:16:47 +0300 Subject: [PATCH 01/11] add desrialize message type --- ydb/_grpc/grpcwrapper/ydb_topic.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index f20b80a9..24dac266 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -592,12 +592,22 @@ class PartitionSessionStatusRequest: partition_session_id: int @dataclass - class PartitionSessionStatusResponse: + class PartitionSessionStatusResponse(IFromProto): partition_session_id: int partition_offsets: "OffsetsRange" committed_offset: int write_time_high_watermark: float + @staticmethod + def from_proto(msg: ydb_topic_pb2.StreamReadMessage.PartitionSessionStatusResponse) -> "StreamReadMessage.PartitionSessionStatusResponse": + return StreamReadMessage.PartitionSessionStatusResponse( + partition_session_id=msg.partition_session_id, + partition_offsets=OffsetsRange.from_proto(msg.partition_offsets), + committed_offset=msg.committed_offset, + write_time_high_watermark=msg.write_time_high_watermark, + ) + + @dataclass class StartPartitionSessionRequest(IFromProto): partition_session: "StreamReadMessage.PartitionSession" @@ -694,7 +704,7 @@ def from_proto( return StreamReadMessage.FromServer( server_status=server_status, server_message=StreamReadMessage.StartPartitionSessionRequest.from_proto( - msg.start_partition_session_request + msg.start_partition_session_request, ), ) elif mess_type == "update_token_response": @@ -702,6 +712,13 @@ def from_proto( server_status=server_status, server_message=UpdateTokenResponse.from_proto(msg.update_token_response), ) + elif mess_type == "partition_session_status_response": + return StreamReadMessage.FromServer( + server_status=server_status, + server_message=StreamReadMessage.PartitionSessionStatusResponse.from_proto( + msg.partition_session_status_response + ) + ) # todo replace exception to log raise NotImplementedError() From c62147d55b1258dbb20805307c409ad5e0001498 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 24 May 2023 16:54:43 +0300 Subject: [PATCH 02/11] parse stop partition message handle unexpected message for topic reader --- ydb/_grpc/grpcwrapper/ydb_topic.py | 23 +++++- ydb/_topic_reader/topic_reader.py | 2 + ydb/_topic_reader/topic_reader_asyncio.py | 82 ++++++++++++------- .../topic_reader_asyncio_test.py | 24 ++++++ ydb/issues.py | 5 ++ 5 files changed, 103 insertions(+), 33 deletions(-) diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index 24dac266..4b9c35c9 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import datetime import enum import typing @@ -642,11 +644,20 @@ def to_proto( return res @dataclass - class StopPartitionSessionRequest: + class StopPartitionSessionRequest(IFromProto): partition_session_id: int graceful: bool committed_offset: int + @staticmethod + def from_proto(msg: ydb_topic_pb2.StreamReadMessage.StopPartitionSessionRequest) -> StreamReadMessage.StopPartitionSessionRequest: + return StreamReadMessage.StopPartitionSessionRequest( + partition_session_id=msg.partition_session_id, + graceful=msg.graceful, + committed_offset=msg.committed_offset, + ) + + @dataclass class StopPartitionSessionResponse: partition_session_id: int @@ -707,6 +718,13 @@ def from_proto( msg.start_partition_session_request, ), ) + elif mess_type == "stop_partition_session_request": + return StreamReadMessage.FromServer( + server_status=server_status, + server_message=StreamReadMessage.StopPartitionSessionRequest.from_proto( + msg.stop_partition_session_request + ) + ) elif mess_type == "update_token_response": return StreamReadMessage.FromServer( server_status=server_status, @@ -720,9 +738,6 @@ def from_proto( ) ) - # todo replace exception to log - raise NotImplementedError() - ReaderMessagesFromClientToServer = Union[ StreamReadMessage.InitRequest, diff --git a/ydb/_topic_reader/topic_reader.py b/ydb/_topic_reader/topic_reader.py index 17fb2885..ec2d3afb 100644 --- a/ydb/_topic_reader/topic_reader.py +++ b/ydb/_topic_reader/topic_reader.py @@ -1,6 +1,7 @@ import concurrent.futures import enum import datetime +import logging from dataclasses import dataclass from typing import ( Union, @@ -52,6 +53,7 @@ class PublicReaderSettings: # decoder_executor, must be set for handle non raw messages decoder_executor: Optional[concurrent.futures.Executor] = None update_token_interval: Union[int, float] = 3600 + logger: Optional[logging.Logger] = None def __post_init__(self): # check possible create init message diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 539d6831..68813bac 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -2,6 +2,7 @@ import asyncio import concurrent.futures +import copy import gzip import typing from asyncio import Task @@ -26,7 +27,9 @@ Codec, ) from .._errors import check_retriable_error +import logging +_module_logger = logging.getLogger(__name__) class TopicReaderError(YdbError): pass @@ -62,6 +65,7 @@ class PublicAsyncIOReader: _closed: bool _reconnector: ReaderReconnector _parent: typing.Any # need for prevent close parent client by GC + _logger: logging.Logger def __init__( self, @@ -70,8 +74,14 @@ def __init__( *, _parent=None, ): + if settings.logger: + self._logger = settings.logger + else: + self._logger = _module_logger + self._loop = asyncio.get_running_loop() self._closed = False + self._logger = settings.logger self._reconnector = ReaderReconnector(driver, settings) self._parent = _parent @@ -139,14 +149,19 @@ class ReaderReconnector: _settings: topic_reader.PublicReaderSettings _driver: Driver _background_tasks: Set[Task] + _logger: logging.Logger _state_changed: asyncio.Event _stream_reader: Optional["ReaderStream"] _first_error: asyncio.Future[YdbError] def __init__(self, driver: Driver, settings: topic_reader.PublicReaderSettings): - self._id = self._static_reader_reconnector_counter.inc_and_get() + if settings.logger: + self._logger = settings.logger + else: + self._logger = _module_logger + self._id = self._static_reader_reconnector_counter.inc_and_get() self._settings = settings self._driver = driver self._background_tasks = set() @@ -234,6 +249,7 @@ class ReaderStream: _buffer_size_bytes: int # use for init request, then for debug purposes only _decode_executor: concurrent.futures.Executor _decoders: Dict[int, typing.Callable[[bytes], bytes]] # dict[codec_code] func(encoded_bytes)->decoded_bytes + _logger: logging.Logger if typing.TYPE_CHECKING: _batches_to_decode: asyncio.Queue[datatypes.PublicBatch] @@ -255,6 +271,11 @@ def __init__( settings: topic_reader.PublicReaderSettings, get_token_function: Optional[Callable[[], str]] = None, ): + if settings.logger: + self._logger = settings.logger + else: + self._logger = _module_logger + self._loop = asyncio.get_running_loop() self._id = ReaderStream._static_id_counter.inc_and_get() self._reader_reconnector_id = reader_reconnector_id @@ -395,34 +416,37 @@ async def _read_messages_loop(self): ) ) while True: - message = await self._stream.receive() # type: StreamReadMessage.FromServer - _process_response(message.server_status) - - if isinstance(message.server_message, StreamReadMessage.ReadResponse): - self._on_read_response(message.server_message) - - elif isinstance(message.server_message, StreamReadMessage.CommitOffsetResponse): - self._on_commit_response(message.server_message) - - elif isinstance( - message.server_message, - StreamReadMessage.StartPartitionSessionRequest, - ): - self._on_start_partition_session(message.server_message) - - elif isinstance( - message.server_message, - StreamReadMessage.StopPartitionSessionRequest, - ): - self._on_partition_session_stop(message.server_message) - - elif isinstance(message.server_message, UpdateTokenResponse): - self._update_token_event.set() - - else: - raise NotImplementedError( - "Unexpected type of StreamReadMessage.FromServer message: %s" % message.server_message - ) + try: + message = await self._stream.receive() # type: StreamReadMessage.FromServer + _process_response(message.server_status) + + if isinstance(message.server_message, StreamReadMessage.ReadResponse): + self._on_read_response(message.server_message) + + elif isinstance(message.server_message, StreamReadMessage.CommitOffsetResponse): + self._on_commit_response(message.server_message) + + elif isinstance( + message.server_message, + StreamReadMessage.StartPartitionSessionRequest, + ): + self._on_start_partition_session(message.server_message) + + elif isinstance( + message.server_message, + StreamReadMessage.StopPartitionSessionRequest, + ): + self._on_partition_session_stop(message.server_message) + + elif isinstance(message.server_message, UpdateTokenResponse): + self._update_token_event.set() + + else: + raise issues.UnexpectedGrpcMessage( + "Unexpected message in _read_messages_loop: %s" % type(message.server_message) + ) + except issues.UnexpectedGrpcMessage: + self._logger.exception("unexpected message in stream reader") self._state_changed.set() except Exception as e: diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index a8e59dfc..04cbc258 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -1127,6 +1127,30 @@ async def test_update_token(self, stream): await reader.close() + async def test_read_unknown_message(self, stream, stream_reader): + class TestMessage: + pass + + logged = asyncio.Event() + + def set_logged(*args, **kwargs): + logged.set() + + stream_reader._logger = mock.Mock() + stream_reader._logger.exception = mock.Mock(side_effect=set_logged) + + # noinspection PyTypeChecker + stream.from_server.put_nowait(StreamReadMessage.FromServer( + server_status=ServerStatus( + status=issues.StatusCode.SUCCESS, + issues=[], + ), + server_message=TestMessage(), + )) + await wait_for_fast(logged.wait()) + + stream_reader._logger.exception.assert_called_once() + @pytest.mark.asyncio class TestReaderReconnector: diff --git a/ydb/issues.py b/ydb/issues.py index f15c475c..a489d4e0 100644 --- a/ydb/issues.py +++ b/ydb/issues.py @@ -156,6 +156,11 @@ class SessionPoolEmpty(Error, queue.Empty): status = StatusCode.SESSION_POOL_EMPTY +class UnexpectedGrpcMessage(Error): + def __init__(self, message: str): + super().__init__(message) + + def _format_issues(issues): if not issues: return "" From 430854acd2ee5f7dde9f783dde8faa57b28fd65b Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 24 May 2023 18:09:07 +0300 Subject: [PATCH 03/11] add test for issue --- tests/conftest.py | 20 +++++++++++ tests/topics/test_topic_reader.py | 43 +++++++++++++++++++++++ ydb/_topic_reader/topic_reader_asyncio.py | 2 +- 3 files changed, 64 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index c2ff8f39..b1de747d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -176,6 +176,26 @@ async def topic2_path(driver, topic_consumer, database) -> str: return topic_path +@pytest.fixture() +@pytest.mark.asyncio() +async def topic_with_two_partitions_path(driver, topic_consumer, database) -> str: + topic_path = database + "/test-topic-two-partitions" + + try: + await driver.topic_client.drop_topic(topic_path) + except issues.SchemeError: + pass + + await driver.topic_client.create_topic( + path=topic_path, + consumers=[topic_consumer], + min_active_partitions=2, + partition_count_limit=2, + ) + + return topic_path + + @pytest.fixture() @pytest.mark.asyncio() async def topic_with_messages(driver, topic_consumer, database): diff --git a/tests/topics/test_topic_reader.py b/tests/topics/test_topic_reader.py index 3f85662b..78542065 100644 --- a/tests/topics/test_topic_reader.py +++ b/tests/topics/test_topic_reader.py @@ -1,3 +1,5 @@ +import asyncio + import pytest import ydb @@ -161,3 +163,44 @@ def decode(b: bytes): with driver_sync.topic_client.reader(topic_path, topic_consumer, decoders={codec: decode}) as reader: batch = reader.receive_batch() assert batch.messages[0].data.decode() == "123" + + +@pytest.mark.asyncio +class TestBugFixesAsync: + @pytest.mark.skip("LOGBROKER-8319") + async def test_issue_297_bad_handle_stop_partition(self, driver, topic_consumer, topic_with_two_partitions_path: str): + + async def wait(fut): + return await asyncio.wait_for(fut, timeout=10) + + topic = topic_with_two_partitions_path # type: str + + async with driver.topic_client.writer(topic, partition_id=0) as writer: + await writer.write_with_ack("01") + + async with driver.topic_client.writer(topic, partition_id=1) as writer: + await writer.write_with_ack("1") + + # Start first reader and receive messages from both partitions + reader0 = driver.topic_client.reader(topic, consumer=topic_consumer) + await wait(reader0.receive_message()) + await wait(reader0.receive_message()) + + # Start second reader for same topic, same consumer, partition 1 + reader1 = driver.topic_client.reader(ydb.TopicReaderSelector( + path=topic, + partitions=1, + ), consumer=topic_consumer) + + await asyncio.sleep(0.1) + + # receive uncommited message from partition 1 + msg = await wait(reader1.receive_message()) + assert msg.data.decode() == "1" + + # write message to partition 0 - for reader 0 + # async with driver.topic_client.writer(topic, partition_id=0) as writer: + # await writer.write_with_ack("02") + + msg = await wait(reader0.receive_message()) + assert msg.data.decode() == "02" diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 539d6831..7d73f88e 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -427,7 +427,7 @@ async def _read_messages_loop(self): self._state_changed.set() except Exception as e: self._set_first_error(e) - raise + return async def _update_token_loop(self): while True: From 438bde3e62bd4e682ee767dd196354ef5d9bfb58 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 24 May 2023 18:17:15 +0300 Subject: [PATCH 04/11] linter --- tests/topics/test_topic_reader.py | 16 ++++++++++------ ydb/_grpc/grpcwrapper/ydb_topic.py | 14 ++++++++------ ydb/_topic_reader/topic_reader_asyncio.py | 1 + ydb/_topic_reader/topic_reader_asyncio_test.py | 16 +++++++++------- 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/tests/topics/test_topic_reader.py b/tests/topics/test_topic_reader.py index 78542065..161f8d08 100644 --- a/tests/topics/test_topic_reader.py +++ b/tests/topics/test_topic_reader.py @@ -168,8 +168,9 @@ def decode(b: bytes): @pytest.mark.asyncio class TestBugFixesAsync: @pytest.mark.skip("LOGBROKER-8319") - async def test_issue_297_bad_handle_stop_partition(self, driver, topic_consumer, topic_with_two_partitions_path: str): - + async def test_issue_297_bad_handle_stop_partition( + self, driver, topic_consumer, topic_with_two_partitions_path: str + ): async def wait(fut): return await asyncio.wait_for(fut, timeout=10) @@ -187,10 +188,13 @@ async def wait(fut): await wait(reader0.receive_message()) # Start second reader for same topic, same consumer, partition 1 - reader1 = driver.topic_client.reader(ydb.TopicReaderSelector( - path=topic, - partitions=1, - ), consumer=topic_consumer) + reader1 = driver.topic_client.reader( + ydb.TopicReaderSelector( + path=topic, + partitions=1, + ), + consumer=topic_consumer, + ) await asyncio.sleep(0.1) diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index 4b9c35c9..909a0bb7 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -601,7 +601,9 @@ class PartitionSessionStatusResponse(IFromProto): write_time_high_watermark: float @staticmethod - def from_proto(msg: ydb_topic_pb2.StreamReadMessage.PartitionSessionStatusResponse) -> "StreamReadMessage.PartitionSessionStatusResponse": + def from_proto( + msg: ydb_topic_pb2.StreamReadMessage.PartitionSessionStatusResponse, + ) -> "StreamReadMessage.PartitionSessionStatusResponse": return StreamReadMessage.PartitionSessionStatusResponse( partition_session_id=msg.partition_session_id, partition_offsets=OffsetsRange.from_proto(msg.partition_offsets), @@ -609,7 +611,6 @@ def from_proto(msg: ydb_topic_pb2.StreamReadMessage.PartitionSessionStatusRespon write_time_high_watermark=msg.write_time_high_watermark, ) - @dataclass class StartPartitionSessionRequest(IFromProto): partition_session: "StreamReadMessage.PartitionSession" @@ -650,14 +651,15 @@ class StopPartitionSessionRequest(IFromProto): committed_offset: int @staticmethod - def from_proto(msg: ydb_topic_pb2.StreamReadMessage.StopPartitionSessionRequest) -> StreamReadMessage.StopPartitionSessionRequest: + def from_proto( + msg: ydb_topic_pb2.StreamReadMessage.StopPartitionSessionRequest, + ) -> StreamReadMessage.StopPartitionSessionRequest: return StreamReadMessage.StopPartitionSessionRequest( partition_session_id=msg.partition_session_id, graceful=msg.graceful, committed_offset=msg.committed_offset, ) - @dataclass class StopPartitionSessionResponse: partition_session_id: int @@ -723,7 +725,7 @@ def from_proto( server_status=server_status, server_message=StreamReadMessage.StopPartitionSessionRequest.from_proto( msg.stop_partition_session_request - ) + ), ) elif mess_type == "update_token_response": return StreamReadMessage.FromServer( @@ -735,7 +737,7 @@ def from_proto( server_status=server_status, server_message=StreamReadMessage.PartitionSessionStatusResponse.from_proto( msg.partition_session_status_response - ) + ), ) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index dbf73537..0729db00 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -31,6 +31,7 @@ _module_logger = logging.getLogger(__name__) + class TopicReaderError(YdbError): pass diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index 04cbc258..78a3d7b6 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -1140,13 +1140,15 @@ def set_logged(*args, **kwargs): stream_reader._logger.exception = mock.Mock(side_effect=set_logged) # noinspection PyTypeChecker - stream.from_server.put_nowait(StreamReadMessage.FromServer( - server_status=ServerStatus( - status=issues.StatusCode.SUCCESS, - issues=[], - ), - server_message=TestMessage(), - )) + stream.from_server.put_nowait( + StreamReadMessage.FromServer( + server_status=ServerStatus( + status=issues.StatusCode.SUCCESS, + issues=[], + ), + server_message=TestMessage(), + ) + ) await wait_for_fast(logged.wait()) stream_reader._logger.exception.assert_called_once() From 7e0c4212bd47203d0207f89b48bb643d09dd9d1a Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 24 May 2023 18:26:06 +0300 Subject: [PATCH 05/11] workaround specific partition bug LOGBROKER-8319 --- tests/topics/test_topic_reader.py | 37 ++++++++++++++----------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/tests/topics/test_topic_reader.py b/tests/topics/test_topic_reader.py index 161f8d08..7b1947c8 100644 --- a/tests/topics/test_topic_reader.py +++ b/tests/topics/test_topic_reader.py @@ -167,9 +167,8 @@ def decode(b: bytes): @pytest.mark.asyncio class TestBugFixesAsync: - @pytest.mark.skip("LOGBROKER-8319") async def test_issue_297_bad_handle_stop_partition( - self, driver, topic_consumer, topic_with_two_partitions_path: str + self, driver, topic_consumer, topic_with_two_partitions_path: str ): async def wait(fut): return await asyncio.wait_for(fut, timeout=10) @@ -177,10 +176,10 @@ async def wait(fut): topic = topic_with_two_partitions_path # type: str async with driver.topic_client.writer(topic, partition_id=0) as writer: - await writer.write_with_ack("01") + await writer.write_with_ack("00") async with driver.topic_client.writer(topic, partition_id=1) as writer: - await writer.write_with_ack("1") + await writer.write_with_ack("01") # Start first reader and receive messages from both partitions reader0 = driver.topic_client.reader(topic, consumer=topic_consumer) @@ -188,23 +187,21 @@ async def wait(fut): await wait(reader0.receive_message()) # Start second reader for same topic, same consumer, partition 1 - reader1 = driver.topic_client.reader( - ydb.TopicReaderSelector( - path=topic, - partitions=1, - ), - consumer=topic_consumer, - ) + reader1 = driver.topic_client.reader(topic, consumer=topic_consumer) - await asyncio.sleep(0.1) + # receive uncommited message + await reader1.receive_message() + + # write one message for every partition + async with driver.topic_client.writer(topic, partition_id=0) as writer: + await writer.write_with_ack("10") + async with driver.topic_client.writer(topic, partition_id=0) as writer: + await writer.write_with_ack("11") - # receive uncommited message from partition 1 - msg = await wait(reader1.receive_message()) - assert msg.data.decode() == "1" + msg0 = await wait(reader0.receive_message()) + msg1 = await wait(reader1.receive_message()) - # write message to partition 0 - for reader 0 - # async with driver.topic_client.writer(topic, partition_id=0) as writer: - # await writer.write_with_ack("02") + datas = [msg0.data.decode(), msg1.data.decode] + datas.sort() - msg = await wait(reader0.receive_message()) - assert msg.data.decode() == "02" + assert datas == ["10", "11"] From 7f94fe7df8cb4227fbdf669e90a3a87d0934a5ba Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 24 May 2023 18:39:19 +0300 Subject: [PATCH 06/11] pause in test --- tests/topics/test_topic_reader.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/topics/test_topic_reader.py b/tests/topics/test_topic_reader.py index 7b1947c8..67d8eb9c 100644 --- a/tests/topics/test_topic_reader.py +++ b/tests/topics/test_topic_reader.py @@ -189,6 +189,16 @@ async def wait(fut): # Start second reader for same topic, same consumer, partition 1 reader1 = driver.topic_client.reader(topic, consumer=topic_consumer) + await asyncio.sleep(1) + + async with driver.topic_client.writer(topic, partition_id=0) as writer: + await writer.write_with_ack("--") + async with driver.topic_client.writer(topic, partition_id=1) as writer: + await writer.write_with_ack("--") + + await reader0.receive_message() + await reader0.receive_message() + # receive uncommited message await reader1.receive_message() From 8df806b4dc72ddfe9d20a1aec271628610cac5f1 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 24 May 2023 18:42:01 +0300 Subject: [PATCH 07/11] raise exception while parse incoming message --- ydb/_grpc/grpcwrapper/ydb_topic.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index 909a0bb7..009f9a03 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -10,6 +10,7 @@ from . import ydb_topic_public_types from ... import scheme +from ... import issues # Workaround for good IDE and universal for runtime if typing.TYPE_CHECKING: @@ -739,6 +740,10 @@ def from_proto( msg.partition_session_status_response ), ) + else: + raise issues.UnexpectedGrpcMessage( + "Unexpected message while parse ReaderMessagesFromServerToClient: '%s'" % mess_type + ) ReaderMessagesFromClientToServer = Union[ From 640d9279c026058a63042a3a728e02ad8b1a68e4 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 25 May 2023 12:18:13 +0300 Subject: [PATCH 08/11] fix linter --- tests/topics/test_topic_reader.py | 16 +++------------- ydb/_grpc/grpcwrapper/ydb_topic.py | 18 ++++++++++++++++-- ydb/_topic_reader/topic_reader_asyncio.py | 1 - ydb/topic.py | 4 ++-- 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/tests/topics/test_topic_reader.py b/tests/topics/test_topic_reader.py index 67d8eb9c..46bc0faa 100644 --- a/tests/topics/test_topic_reader.py +++ b/tests/topics/test_topic_reader.py @@ -168,7 +168,7 @@ def decode(b: bytes): @pytest.mark.asyncio class TestBugFixesAsync: async def test_issue_297_bad_handle_stop_partition( - self, driver, topic_consumer, topic_with_two_partitions_path: str + self, driver, topic_consumer, topic_with_two_partitions_path: str ): async def wait(fut): return await asyncio.wait_for(fut, timeout=10) @@ -189,29 +189,19 @@ async def wait(fut): # Start second reader for same topic, same consumer, partition 1 reader1 = driver.topic_client.reader(topic, consumer=topic_consumer) - await asyncio.sleep(1) - - async with driver.topic_client.writer(topic, partition_id=0) as writer: - await writer.write_with_ack("--") - async with driver.topic_client.writer(topic, partition_id=1) as writer: - await writer.write_with_ack("--") - - await reader0.receive_message() - await reader0.receive_message() - # receive uncommited message await reader1.receive_message() # write one message for every partition async with driver.topic_client.writer(topic, partition_id=0) as writer: await writer.write_with_ack("10") - async with driver.topic_client.writer(topic, partition_id=0) as writer: + async with driver.topic_client.writer(topic, partition_id=1) as writer: await writer.write_with_ack("11") msg0 = await wait(reader0.receive_message()) msg1 = await wait(reader1.receive_message()) - datas = [msg0.data.decode(), msg1.data.decode] + datas = [msg0.data.decode(), msg1.data.decode()] datas.sort() assert datas == ["10", "11"] diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index 009f9a03..5b5e294a 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -591,9 +591,14 @@ def from_proto( ) @dataclass - class PartitionSessionStatusRequest: + class PartitionSessionStatusRequest(IToProto): partition_session_id: int + def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.PartitionSessionStatusRequest: + return ydb_topic_pb2.StreamReadMessage.PartitionSessionStatusRequest( + partition_session_id=self.partition_session_id + ) + @dataclass class PartitionSessionStatusResponse(IFromProto): partition_session_id: int @@ -662,9 +667,14 @@ def from_proto( ) @dataclass - class StopPartitionSessionResponse: + class StopPartitionSessionResponse(IToProto): partition_session_id: int + def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.StopPartitionSessionResponse: + return ydb_topic_pb2.StreamReadMessage.StopPartitionSessionResponse( + partition_session_id=self.partition_session_id, + ) + @dataclass class FromClient(IToProto): client_message: "ReaderMessagesFromClientToServer" @@ -684,6 +694,10 @@ def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.FromClient: res.update_token_request.CopyFrom(self.client_message.to_proto()) elif isinstance(self.client_message, StreamReadMessage.StartPartitionSessionResponse): res.start_partition_session_response.CopyFrom(self.client_message.to_proto()) + elif isinstance(self.client_message, StreamReadMessage.StopPartitionSessionResponse): + res.stop_partition_session_response.CopyFrom(self.client_message.to_proto()) + elif isinstance(self.client_message, StreamReadMessage.PartitionSessionStatusRequest): + res.start_partition_session_response.CopyFrom(self.client_message.to_proto()) else: raise NotImplementedError("Unknown message type: %s" % type(self.client_message)) return res diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 0729db00..4a775506 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -2,7 +2,6 @@ import asyncio import concurrent.futures -import copy import gzip import typing from asyncio import Task diff --git a/ydb/topic.py b/ydb/topic.py index abf93903..190f5329 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -168,7 +168,7 @@ def reader( if not decoder_executor: decoder_executor = self._executor - args = locals() + args = locals().copy() del args["self"] settings = TopicReaderSettings(**args) @@ -188,7 +188,7 @@ def writer( encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None, encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool ) -> TopicWriterAsyncIO: - args = locals() + args = locals().copy() del args["self"] settings = TopicWriterSettings(**args) From 067e09d7b1ef2faf24d41fe591d11137e2edd6cd Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 25 May 2023 16:33:10 +0300 Subject: [PATCH 09/11] remove personal logger instance --- ydb/_topic_reader/topic_reader_asyncio.py | 25 +++---------------- .../topic_reader_asyncio_test.py | 18 ++++++------- 2 files changed, 10 insertions(+), 33 deletions(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 4a775506..ebe7bd6b 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -28,7 +28,7 @@ from .._errors import check_retriable_error import logging -_module_logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__) class TopicReaderError(YdbError): @@ -65,7 +65,6 @@ class PublicAsyncIOReader: _closed: bool _reconnector: ReaderReconnector _parent: typing.Any # need for prevent close parent client by GC - _logger: logging.Logger def __init__( self, @@ -74,14 +73,8 @@ def __init__( *, _parent=None, ): - if settings.logger: - self._logger = settings.logger - else: - self._logger = _module_logger - self._loop = asyncio.get_running_loop() self._closed = False - self._logger = settings.logger self._reconnector = ReaderReconnector(driver, settings) self._parent = _parent @@ -149,18 +142,12 @@ class ReaderReconnector: _settings: topic_reader.PublicReaderSettings _driver: Driver _background_tasks: Set[Task] - _logger: logging.Logger _state_changed: asyncio.Event _stream_reader: Optional["ReaderStream"] _first_error: asyncio.Future[YdbError] def __init__(self, driver: Driver, settings: topic_reader.PublicReaderSettings): - if settings.logger: - self._logger = settings.logger - else: - self._logger = _module_logger - self._id = self._static_reader_reconnector_counter.inc_and_get() self._settings = settings self._driver = driver @@ -249,7 +236,6 @@ class ReaderStream: _buffer_size_bytes: int # use for init request, then for debug purposes only _decode_executor: concurrent.futures.Executor _decoders: Dict[int, typing.Callable[[bytes], bytes]] # dict[codec_code] func(encoded_bytes)->decoded_bytes - _logger: logging.Logger if typing.TYPE_CHECKING: _batches_to_decode: asyncio.Queue[datatypes.PublicBatch] @@ -271,11 +257,6 @@ def __init__( settings: topic_reader.PublicReaderSettings, get_token_function: Optional[Callable[[], str]] = None, ): - if settings.logger: - self._logger = settings.logger - else: - self._logger = _module_logger - self._loop = asyncio.get_running_loop() self._id = ReaderStream._static_id_counter.inc_and_get() self._reader_reconnector_id = reader_reconnector_id @@ -445,8 +426,8 @@ async def _read_messages_loop(self): raise issues.UnexpectedGrpcMessage( "Unexpected message in _read_messages_loop: %s" % type(message.server_message) ) - except issues.UnexpectedGrpcMessage: - self._logger.exception("unexpected message in stream reader") + except issues.UnexpectedGrpcMessage as e: + logger.exception("unexpected message in stream reader: %s" % e) self._state_changed.set() except Exception as e: diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index 78a3d7b6..2972aab4 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -1127,18 +1127,10 @@ async def test_update_token(self, stream): await reader.close() - async def test_read_unknown_message(self, stream, stream_reader): + async def test_read_unknown_message(self, stream, stream_reader, caplog): class TestMessage: pass - logged = asyncio.Event() - - def set_logged(*args, **kwargs): - logged.set() - - stream_reader._logger = mock.Mock() - stream_reader._logger.exception = mock.Mock(side_effect=set_logged) - # noinspection PyTypeChecker stream.from_server.put_nowait( StreamReadMessage.FromServer( @@ -1149,10 +1141,14 @@ def set_logged(*args, **kwargs): server_message=TestMessage(), ) ) - await wait_for_fast(logged.wait()) - stream_reader._logger.exception.assert_called_once() + def logged(): + for rec in caplog.records: + if TestMessage.__name__ in rec.message: + return True + return False + await wait_condition(logged) @pytest.mark.asyncio class TestReaderReconnector: From a9eebd9953de92bdc6e57b0f08b86c35ce5483a0 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 25 May 2023 16:34:23 +0300 Subject: [PATCH 10/11] fix linter --- ydb/_topic_reader/topic_reader_asyncio_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index 2972aab4..c1019b02 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -1150,6 +1150,7 @@ def logged(): await wait_condition(logged) + @pytest.mark.asyncio class TestReaderReconnector: async def test_reconnect_on_repeatable_error(self, monkeypatch): From 0a9d7fa73ef6518adc1d99ffa25ead2c3b0ec478 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 25 May 2023 16:44:13 +0300 Subject: [PATCH 11/11] remove logger from settings --- ydb/_topic_reader/topic_reader.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/ydb/_topic_reader/topic_reader.py b/ydb/_topic_reader/topic_reader.py index ec2d3afb..17fb2885 100644 --- a/ydb/_topic_reader/topic_reader.py +++ b/ydb/_topic_reader/topic_reader.py @@ -1,7 +1,6 @@ import concurrent.futures import enum import datetime -import logging from dataclasses import dataclass from typing import ( Union, @@ -53,7 +52,6 @@ class PublicReaderSettings: # decoder_executor, must be set for handle non raw messages decoder_executor: Optional[concurrent.futures.Executor] = None update_token_interval: Union[int, float] = 3600 - logger: Optional[logging.Logger] = None def __post_init__(self): # check possible create init message