From 35d61ce2c4f42185722a4a7a7f9dede433efd100 Mon Sep 17 00:00:00 2001 From: childe Date: Thu, 17 Oct 2024 15:07:52 +0800 Subject: [PATCH 1/4] fix: set appended topics value to request --- fetch_request.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fetch_request.go b/fetch_request.go index ab056e3..7dd2afd 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -58,7 +58,7 @@ func (fetchRequest *FetchRequest) addPartition(topic string, partitionID int32, } if value, ok := fetchRequest.Topics[topic]; ok { - value = append(value, partitionBlock) + fetchRequest.Topics[topic] = append(value, partitionBlock) } else { fetchRequest.Topics[topic] = []*PartitionBlock{partitionBlock} } From 6ec7a53a996a08f7562beb67a35317e686e06127 Mon Sep 17 00:00:00 2001 From: childe Date: Thu, 17 Oct 2024 15:08:35 +0800 Subject: [PATCH 2/4] add fetch version 7 --- request.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/request.go b/request.go index 9db3f2e..fe93392 100644 --- a/request.go +++ b/request.go @@ -40,7 +40,7 @@ const ( // It must be sorted from high to low var availableVersions map[uint16][]uint16 = map[uint16][]uint16{ API_MetadataRequest: {7, 1}, - API_FetchRequest: {10, 0}, + API_FetchRequest: {10, 7, 0}, API_OffsetRequest: {1, 0}, API_CreatePartitions: {2, 0}, API_SaslHandshake: {1, 0}, From f047fbbf5b4ed86e99737aaa08f6be4a9aa2eb88 Mon Sep 17 00:00:00 2001 From: childe Date: Thu, 17 Oct 2024 15:11:27 +0800 Subject: [PATCH 3/4] add version 7 decode --- fetch_request.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/fetch_request.go b/fetch_request.go index 7dd2afd..20d07d8 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -94,17 +94,17 @@ func (fetchRequest *FetchRequest) Encode(version uint16) []byte { binary.BigEndian.PutUint32(payload[offset:], uint32(fetchRequest.MinBytes)) offset += 4 - if version >= 10 { + if version >= 7 { binary.BigEndian.PutUint32(payload[offset:], uint32(fetchRequest.MaxBytes)) offset += 4 payload[offset] = byte(fetchRequest.ISOLationLevel) offset++ } - if version >= 10 { + if version >= 7 { binary.BigEndian.PutUint32(payload[offset:], uint32(fetchRequest.SessionID)) offset += 4 } - if version >= 10 { + if version >= 7 { binary.BigEndian.PutUint32(payload[offset:], uint32(fetchRequest.SessionEpoch)) offset += 4 } @@ -127,7 +127,7 @@ func (fetchRequest *FetchRequest) Encode(version uint16) []byte { } binary.BigEndian.PutUint64(payload[offset:], uint64(partitionBlock.FetchOffset)) offset += 8 - if version >= 10 { + if version >= 7 { binary.BigEndian.PutUint64(payload[offset:], uint64(partitionBlock.LogStartOffset)) offset += 8 } @@ -136,7 +136,7 @@ func (fetchRequest *FetchRequest) Encode(version uint16) []byte { } } - if version >= 10 { + if version >= 7 { binary.BigEndian.PutUint32(payload[offset:], uint32(len(fetchRequest.ForgottenTopicsDatas))) offset += 4 for topicName, partitions := range fetchRequest.ForgottenTopicsDatas { From 586bd4b0461c95363bba3353a7c7ba3ba907ac32 Mon Sep 17 00:00:00 2001 From: childe Date: Thu, 17 Oct 2024 17:14:52 +0800 Subject: [PATCH 4/4] add version 7 --- fetch_response.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fetch_response.go b/fetch_response.go index a3081d5..de36243 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -344,7 +344,7 @@ func (streamDecoder *fetchResponseStreamDecoder) decodePartitionResponse(topicNa switch version { case 0: bytesBeforeRecordsLength = 18 - case 10: + case 7, 10: bytesBeforeRecordsLength = 38 } buffer, n, err = streamDecoder.read(bytesBeforeRecordsLength) @@ -424,7 +424,7 @@ func (streamDecoder *fetchResponseStreamDecoder) decodeHeader(version uint16) er case 0: headerLength = 8 countOffset = 4 - case 10: + case 7, 10: headerLength = 18 countOffset = 14 }