diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index abe0164fe70..92d93f88b98 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -205,6 +205,8 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Add a configuration option for TCP/UDP network type. {issue}40407[40407] {pull}40623[40623] - Added debug logging to parquet reader in x-pack/libbeat/reader. {pull}40651[40651] - Added filebeat debug histograms for s3 object size and events per processed s3 object. {pull}40775[40775] +- Simplified GCS input state checkpoint calculation logic. {issue}40878[40878] {pull}40937[40937] +- Simplified Azure Blob Storage input state checkpoint calculation logic. {issue}40674[40674] {pull}40936[40936] ==== Deprecated diff --git a/x-pack/filebeat/input/gcs/scheduler.go b/x-pack/filebeat/input/gcs/scheduler.go index 686ad1394dd..45e7585c14e 100644 --- a/x-pack/filebeat/input/gcs/scheduler.go +++ b/x-pack/filebeat/input/gcs/scheduler.go @@ -7,6 +7,8 @@ package gcs import ( "context" "fmt" + "slices" + "sort" "strings" "sync" @@ -181,41 +183,19 @@ func (s *scheduler) fetchObjectPager(ctx context.Context, pageSize int) *iterato // moveToLastSeenJob, moves to the latest job position past the last seen job // Jobs are stored in lexicographical order always, hence the latest position can be found either on the basis of job name or timestamp func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { - var latestJobs []*job - jobsToReturn := make([]*job, 0) - counter := 0 - flag := false - ignore := false - - for _, job := range jobs { - switch { - case job.Timestamp().After(s.state.checkpoint().LatestEntryTime): - latestJobs = append(latestJobs, job) - case job.Name() == s.state.checkpoint().ObjectName: - flag = true - case job.Name() > s.state.checkpoint().ObjectName: - flag = true - counter-- - case job.Name() <= s.state.checkpoint().ObjectName && (!ignore): - ignore = true - } - counter++ - } - - if flag && (counter < len(jobs)-1) { - jobsToReturn = jobs[counter+1:] - } else if !flag && !ignore { - jobsToReturn = jobs - } + cp := s.state.checkpoint() + jobs = slices.DeleteFunc(jobs, func(j *job) bool { + return !(j.Timestamp().After(cp.LatestEntryTime) || j.Name() > cp.ObjectName) + }) - // in a senario where there are some jobs which have a later time stamp + // In a scenario where there are some jobs which have a greater timestamp // but lesser lexicographic order and some jobs have greater lexicographic order - // than the current checkpoint object name, then we append the latest jobs - if len(jobsToReturn) != len(jobs) && len(latestJobs) > 0 { - jobsToReturn = append(latestJobs, jobsToReturn...) - } - - return jobsToReturn + // than the current checkpoint blob name, we then sort around the pivot checkpoint + // timestamp. + sort.SliceStable(jobs, func(i, _ int) bool { + return jobs[i].Timestamp().After(cp.LatestEntryTime) + }) + return jobs } func (s *scheduler) addFailedJobs(ctx context.Context, jobs []*job) []*job {