diff --git a/fetch_response.go b/fetch_response.go index 1bdcdaa..5f835fc 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -47,8 +47,8 @@ func (streamDecoder *fetchResponseStreamDecoder) readAll() (length int, err erro streamDecoder.offset += length }() - buf := make([]byte, streamDecoder.totalLength-streamDecoder.offset) - return io.ReadFull(streamDecoder.buffers, buf) + written, err := io.Copy(io.Discard, streamDecoder.buffers) + return int(written), err } var errShortRead = errors.New("short read") @@ -182,7 +182,10 @@ func (streamDecoder *fetchResponseStreamDecoder) decodeRecordsMagic2(topicName s bytesBeforeRecordsLength := 44 // (magic, records count] bytesBeforeRecords, n, err := streamDecoder.read(bytesBeforeRecordsLength) if err != nil { - return offset, err + if err == io.ErrUnexpectedEOF { + return n, nil + } + return n, err } offset += n @@ -284,6 +287,9 @@ func (streamDecoder *fetchResponseStreamDecoder) decodeMessageSet(topicName stri // payload before magic header17, n, e := streamDecoder.read(17) if e != nil { + if e == io.ErrUnexpectedEOF { + return nil + } return e } if n < 17 {