Skip to content

Commit

Permalink
[filebeat][azure-blob-storage] - Simplified state checkpoint calculat…
Browse files Browse the repository at this point in the history
…ion (#40936) (#41073)
  • Loading branch information
mergify[bot] authored Oct 8, 2024
1 parent 903b62e commit 7cf1e16
Showing 1 changed file with 14 additions and 34 deletions.
48 changes: 14 additions & 34 deletions x-pack/filebeat/input/azureblobstorage/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package azureblobstorage
import (
"context"
"fmt"
"slices"
"sort"
"sync"

azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
Expand Down Expand Up @@ -190,41 +192,19 @@ func (s *scheduler) fetchBlobPager(batchSize int32) *azruntime.Pager[azblob.List
// moveToLastSeenJob, moves to the latest job position past the last seen job
// Jobs are stored in lexicographical order always, hence the latest position can be found either on the basis of job name or timestamp
func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job {
var latestJobs []*job
jobsToReturn := make([]*job, 0)
counter := 0
flag := false
ignore := false

for _, job := range jobs {
switch {
case job.timestamp().After(s.state.checkpoint().LatestEntryTime):
latestJobs = append(latestJobs, job)
case job.name() == s.state.checkpoint().BlobName:
flag = true
case job.name() > s.state.checkpoint().BlobName:
flag = true
counter--
case job.name() <= s.state.checkpoint().BlobName && (!ignore):
ignore = true
}
counter++
}

if flag && (counter < len(jobs)-1) {
jobsToReturn = jobs[counter+1:]
} else if !flag && !ignore {
jobsToReturn = jobs
}

// in a senario where there are some jobs which have a greater timestamp
// but lesser alphanumeric order and some jobs have greater alphanumeric order
// than the current checkpoint blob name, then we append the latest jobs
if len(jobsToReturn) != len(jobs) && len(latestJobs) > 0 {
jobsToReturn = append(latestJobs, jobsToReturn...)
}
cp := s.state.checkpoint()
jobs = slices.DeleteFunc(jobs, func(j *job) bool {
return !(j.timestamp().After(cp.LatestEntryTime) || j.name() > cp.BlobName)
})

return jobsToReturn
// In a scenario where there are some jobs which have a greater timestamp
// but lesser lexicographic order and some jobs have greater lexicographic order
// than the current checkpoint blob name, we then sort around the pivot checkpoint
// timestamp.
sort.SliceStable(jobs, func(i, _ int) bool {
return jobs[i].timestamp().After(cp.LatestEntryTime)
})
return jobs
}

func (s *scheduler) isFileSelected(name string) bool {
Expand Down

0 comments on commit 7cf1e16

Please sign in to comment.