Skip to content

Commit

Permalink
fix doc for fake input && linter && race on get offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Oct 4, 2024
1 parent c0b71e3 commit 083240e
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
2 changes: 1 addition & 1 deletion plugin/input/fake/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ It provides an API to test pipelines and other plugins.
> No config params
### API description
``In(sourceID pipeline.SourceID, sourceName string, offset int64, bytes []byte)``
``In(sourceID pipeline.SourceID, sourceName string, offset pipeline.Offsets, bytes []byte)``

It sends a test event into the pipeline.

Expand Down
5 changes: 3 additions & 2 deletions plugin/input/file/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
sourceName := job.filename
skipLine := job.shouldSkip.Load()
lastOffset := job.curOffset
offsets := job.offsets
if job.symlink != "" {
sourceName = job.symlink
}
Expand Down Expand Up @@ -125,7 +126,7 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
}
}

job.lastEventSeq = controller.In(sourceID, sourceName, Offset{lastOffset + scanned, job.offsets}, inBuf, isVirgin, metadataInfo)
job.lastEventSeq = controller.In(sourceID, sourceName, Offset{lastOffset + scanned, offsets}, inBuf, isVirgin, metadataInfo)
}
// restore the line buffer
accumBuf = accumBuf[:0]
Expand Down Expand Up @@ -205,7 +206,7 @@ type Offset struct {
}

func (o Offset) Current() int64 {
return int64(o.current)
return o.current
}

func (o Offset) ByStream(stream string) int64 {
Expand Down

0 comments on commit 083240e

Please sign in to comment.