From 8a3e8c6b9aa10597cc69e41e4952d08ac58452e9 Mon Sep 17 00:00:00 2001 From: childe Date: Wed, 4 Sep 2024 00:13:00 +0800 Subject: [PATCH] fix: ignore ErrUnexpectedEOF in the last truncated record --- fetch_response.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/fetch_response.go b/fetch_response.go index 1bdcdaa..5f835fc 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -47,8 +47,8 @@ func (streamDecoder *fetchResponseStreamDecoder) readAll() (length int, err erro streamDecoder.offset += length }() - buf := make([]byte, streamDecoder.totalLength-streamDecoder.offset) - return io.ReadFull(streamDecoder.buffers, buf) + written, err := io.Copy(io.Discard, streamDecoder.buffers) + return int(written), err } var errShortRead = errors.New("short read") @@ -182,7 +182,10 @@ func (streamDecoder *fetchResponseStreamDecoder) decodeRecordsMagic2(topicName s bytesBeforeRecordsLength := 44 // (magic, records count] bytesBeforeRecords, n, err := streamDecoder.read(bytesBeforeRecordsLength) if err != nil { - return offset, err + if err == io.ErrUnexpectedEOF { + return n, nil + } + return n, err } offset += n @@ -284,6 +287,9 @@ func (streamDecoder *fetchResponseStreamDecoder) decodeMessageSet(topicName stri // payload before magic header17, n, e := streamDecoder.read(17) if e != nil { + if e == io.ErrUnexpectedEOF { + return nil + } return e } if n < 17 {