From 488c445ed746fcaa6543ab52acb4a77ef3d39dce Mon Sep 17 00:00:00 2001 From: ShourieG <105607378+ShourieG@users.noreply.github.com> Date: Thu, 3 Oct 2024 14:51:04 +0530 Subject: [PATCH] [filebeat][gcs] - Simplified state checkpoint calculation (#40937) * Simplified state checkpoint calculation. The existing state checkpoint calculation logic was convoluted and depended on unreliable flag logic, then it has been simplified so that if errors occur they are become simpler to debug. --- CHANGELOG-developer.next.asciidoc | 1 + x-pack/filebeat/input/gcs/scheduler.go | 46 ++++++++------------------ 2 files changed, 14 insertions(+), 33 deletions(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 685d641ad0c..92d93f88b98 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -205,6 +205,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Add a configuration option for TCP/UDP network type. {issue}40407[40407] {pull}40623[40623] - Added debug logging to parquet reader in x-pack/libbeat/reader. {pull}40651[40651] - Added filebeat debug histograms for s3 object size and events per processed s3 object. {pull}40775[40775] +- Simplified GCS input state checkpoint calculation logic. {issue}40878[40878] {pull}40937[40937] - Simplified Azure Blob Storage input state checkpoint calculation logic. {issue}40674[40674] {pull}40936[40936] ==== Deprecated diff --git a/x-pack/filebeat/input/gcs/scheduler.go b/x-pack/filebeat/input/gcs/scheduler.go index 686ad1394dd..45e7585c14e 100644 --- a/x-pack/filebeat/input/gcs/scheduler.go +++ b/x-pack/filebeat/input/gcs/scheduler.go @@ -7,6 +7,8 @@ package gcs import ( "context" "fmt" + "slices" + "sort" "strings" "sync" @@ -181,41 +183,19 @@ func (s *scheduler) fetchObjectPager(ctx context.Context, pageSize int) *iterato // moveToLastSeenJob, moves to the latest job position past the last seen job // Jobs are stored in lexicographical order always, hence the latest position can be found either on the basis of job name or timestamp func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { - var latestJobs []*job - jobsToReturn := make([]*job, 0) - counter := 0 - flag := false - ignore := false - - for _, job := range jobs { - switch { - case job.Timestamp().After(s.state.checkpoint().LatestEntryTime): - latestJobs = append(latestJobs, job) - case job.Name() == s.state.checkpoint().ObjectName: - flag = true - case job.Name() > s.state.checkpoint().ObjectName: - flag = true - counter-- - case job.Name() <= s.state.checkpoint().ObjectName && (!ignore): - ignore = true - } - counter++ - } - - if flag && (counter < len(jobs)-1) { - jobsToReturn = jobs[counter+1:] - } else if !flag && !ignore { - jobsToReturn = jobs - } + cp := s.state.checkpoint() + jobs = slices.DeleteFunc(jobs, func(j *job) bool { + return !(j.Timestamp().After(cp.LatestEntryTime) || j.Name() > cp.ObjectName) + }) - // in a senario where there are some jobs which have a later time stamp + // In a scenario where there are some jobs which have a greater timestamp // but lesser lexicographic order and some jobs have greater lexicographic order - // than the current checkpoint object name, then we append the latest jobs - if len(jobsToReturn) != len(jobs) && len(latestJobs) > 0 { - jobsToReturn = append(latestJobs, jobsToReturn...) - } - - return jobsToReturn + // than the current checkpoint blob name, we then sort around the pivot checkpoint + // timestamp. + sort.SliceStable(jobs, func(i, _ int) bool { + return jobs[i].Timestamp().After(cp.LatestEntryTime) + }) + return jobs } func (s *scheduler) addFailedJobs(ctx context.Context, jobs []*job) []*job {