diff --git a/plugin/input/kafka/kafka.go b/plugin/input/kafka/kafka.go index e51c319e..708122cc 100644 --- a/plugin/input/kafka/kafka.go +++ b/plugin/input/kafka/kafka.go @@ -322,27 +322,6 @@ func (p *Plugin) Commit(event *pipeline.Event) { p.client.MarkCommitOffsets(offsets) } -func (p *Plugin) ConsumeClaim(fetches kgo.Fetches) { - fetches.EachRecord(func(message *kgo.Record) { - sourceID := assembleSourceID( - p.idByTopic[message.Topic], - message.Partition, - ) - - offset := assembleOffset(message) - var metadataInfo metadata.MetaData - var err error - if len(p.config.Meta) > 0 { - metadataInfo, err = p.metaTemplater.Render(newMetaInformation(message)) - if err != nil { - p.logger.Errorf("can't render meta data: %s", err.Error()) - } - } - - _ = p.controller.In(sourceID, "kafka", offset, message.Value, true, metadataInfo) - }) -} - func assembleSourceID(index int, partition int32) pipeline.SourceID { return pipeline.SourceID(index<<16 + int(partition)) }