Skip to content

Commit

Permalink
partition eof error in the reader
Browse files Browse the repository at this point in the history
  • Loading branch information
glossd committed Jan 26, 2024
1 parent 1b64d17 commit eb9d9ac
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 38 deletions.
5 changes: 5 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ const (
InconsistentClusterID Error = 104
TransactionalIDNotFound Error = 105
FetchSessionTopicIDError Error = 106
PartitionEoF Error = 107
)

// Error satisfies the error interface.
Expand Down Expand Up @@ -377,6 +378,8 @@ func (e Error) Title() string {
return "Transactional ID Not Found"
case FetchSessionTopicIDError:
return "Fetch Session Topic ID Error"
case PartitionEoF:
return "Partition End of File"
}
return ""
}
Expand Down Expand Up @@ -586,6 +589,8 @@ func (e Error) Description() string {
return "The transactionalId could not be found"
case FetchSessionTopicIDError:
return "The fetch session encountered inconsistent topic ID usage"
case PartitionEoF:
return "Consumer reached the end of partition"
}
return ""
}
Expand Down
91 changes: 53 additions & 38 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,10 @@ type ReaderConfig struct {
// This flag is being added to retain backwards-compatibility, so it will be
// removed in a future version of kafka-go.
OffsetOutOfRangeError bool

// EnablePartitionEoF will notify the reader by throwing PartitionEoF when
// it reaches the end of partition.
EnablePartitionEoF bool
}

// Validate method validates ReaderConfig properties.
Expand Down Expand Up @@ -1194,23 +1198,24 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
defer join.Done()

(&reader{
dialer: r.config.Dialer,
logger: r.config.Logger,
errorLogger: r.config.ErrorLogger,
brokers: r.config.Brokers,
topic: key.topic,
partition: int(key.partition),
minBytes: r.config.MinBytes,
maxBytes: r.config.MaxBytes,
maxWait: r.config.MaxWait,
readBatchTimeout: r.config.ReadBatchTimeout,
backoffDelayMin: r.config.ReadBackoffMin,
backoffDelayMax: r.config.ReadBackoffMax,
version: r.version,
msgs: r.msgs,
stats: r.stats,
isolationLevel: r.config.IsolationLevel,
maxAttempts: r.config.MaxAttempts,
dialer: r.config.Dialer,
logger: r.config.Logger,
errorLogger: r.config.ErrorLogger,
brokers: r.config.Brokers,
topic: key.topic,
partition: int(key.partition),
minBytes: r.config.MinBytes,
maxBytes: r.config.MaxBytes,
maxWait: r.config.MaxWait,
readBatchTimeout: r.config.ReadBatchTimeout,
backoffDelayMin: r.config.ReadBackoffMin,
backoffDelayMax: r.config.ReadBackoffMax,
version: r.version,
msgs: r.msgs,
stats: r.stats,
isolationLevel: r.config.IsolationLevel,
maxAttempts: r.config.MaxAttempts,
enablePartitionEoF: r.config.EnablePartitionEoF,

// backwards-compatibility flags
offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
Expand All @@ -1223,23 +1228,24 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
// used as a way to asynchronously fetch messages while the main program reads
// them using the high level reader API.
type reader struct {
dialer *Dialer
logger Logger
errorLogger Logger
brokers []string
topic string
partition int
minBytes int
maxBytes int
maxWait time.Duration
readBatchTimeout time.Duration
backoffDelayMin time.Duration
backoffDelayMax time.Duration
version int64
msgs chan<- readerMessage
stats *readerStats
isolationLevel IsolationLevel
maxAttempts int
dialer *Dialer
logger Logger
errorLogger Logger
brokers []string
topic string
partition int
minBytes int
maxBytes int
maxWait time.Duration
readBatchTimeout time.Duration
backoffDelayMin time.Duration
backoffDelayMax time.Duration
version int64
msgs chan<- readerMessage
stats *readerStats
isolationLevel IsolationLevel
maxAttempts int
enablePartitionEoF bool

offsetOutOfRangeError bool
}
Expand Down Expand Up @@ -1376,6 +1382,10 @@ func (r *reader) run(ctx context.Context, offset int64) {
})
r.stats.timeouts.observe(1)
continue
case errors.Is(err, PartitionEoF):
errcount = 0
r.sendError(ctx, err)
continue

case errors.Is(err, OffsetOutOfRange):
first, last, err := r.readOffsets(conn)
Expand Down Expand Up @@ -1506,6 +1516,7 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
var err error
var size int64
var bytes int64
var newOffset = offset

for {
conn.SetReadDeadline(time.Now().Add(r.readBatchTimeout))
Expand All @@ -1524,9 +1535,9 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
break
}

offset = msg.Offset + 1
r.stats.offset.observe(offset)
r.stats.lag.observe(highWaterMark - offset)
newOffset = msg.Offset + 1
r.stats.offset.observe(newOffset)
r.stats.lag.observe(highWaterMark - newOffset)

size++
bytes += n
Expand All @@ -1538,7 +1549,11 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
r.stats.readTime.observeDuration(t2.Sub(t1))
r.stats.fetchSize.observe(size)
r.stats.fetchBytes.observe(bytes)
return offset, err

if r.enablePartitionEoF && newOffset == highWaterMark && newOffset > offset {
return offset, PartitionEoF
}
return newOffset, err
}

func (r *reader) readOffsets(conn *Conn) (first, last int64, err error) {
Expand Down
25 changes: 25 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func TestReader(t *testing.T) {
scenario: "topic being recreated will return an error",
function: testReaderTopicRecreated,
},
{
scenario: "reading with enabled partition eof config",
function: testReaderPartitionEoF,
},
}

for _, test := range tests {
Expand Down Expand Up @@ -1982,3 +1986,24 @@ func testReaderTopicRecreated(t *testing.T, ctx context.Context, r *Reader) {
_, err = r.ReadMessage(ctx)
require.ErrorIs(t, err, OffsetOutOfRange)
}

func testReaderPartitionEoF(t *testing.T, ctx context.Context, r *Reader) {
r.config.EnablePartitionEoF = true

const N = 10
prepareReader(t, ctx, r, makeTestSequence(N)...)

var err error
for i := 0; i < N; i++ {
_, err = r.ReadMessage(ctx)
require.NoError(t, err)
}
_, err = r.ReadMessage(ctx)
require.True(t, errors.Is(err, PartitionEoF))

prepareReader(t, ctx, r, makeTestSequence(1)...)
_, err = r.ReadMessage(ctx)
require.NoError(t, err)
_, err = r.ReadMessage(ctx)
require.True(t, errors.Is(err, PartitionEoF))
}

0 comments on commit eb9d9ac

Please sign in to comment.