diff --git a/server/const.go b/server/const.go index 5bd9bda..1074029 100644 --- a/server/const.go +++ b/server/const.go @@ -11,4 +11,5 @@ const ( zipFileName = "tsdb_dump.tar.gz" MaxRequestSize = 5 * 1024 * 1024 // 5MB localRetentionDays = 3 * 24 * time.Hour + maxPingTrials = 10 ) diff --git a/server/plugin.go b/server/plugin.go index aadf903..02b9822 100644 --- a/server/plugin.go +++ b/server/plugin.go @@ -17,13 +17,13 @@ import ( "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/tsdb" - root "github.com/mattermost/mattermost-plugin-metrics" - mmModel "github.com/mattermost/mattermost/server/public/model" "github.com/mattermost/mattermost/server/public/plugin" "github.com/mattermost/mattermost/server/public/pluginapi" "github.com/mattermost/mattermost/server/public/pluginapi/cluster" "github.com/mattermost/mattermost/server/v8/platform/shared/filestore" + + root "github.com/mattermost/mattermost-plugin-metrics" ) // Plugin implements the interface expected by the Mattermost server to communicate between the server and plugin processes. @@ -95,36 +95,61 @@ func (p *Plugin) OnActivate() error { p.scheduler.SetCallback(p.JobCallback) p.scheduler.Start() - // we are using a mutually exclusive lock to run a single instance of this plugin - // we don't really need to collect metrics twice: although TSDB will take care - // of overlapped blocks, it will increase the disk writes to the remote or local - // disk. - if p.isHA() { - var err2 error - p.singletonLock, err2 = cluster.NewMutex(p.API, root.Manifest.Id) - if err2 != nil { - return err2 + p.waitGroup.Add(1) + go func() { + defer p.waitGroup.Done() + + // we are using a mutually exclusive lock to run a single instance of this plugin + // we don't really need to collect metrics twice: although TSDB will take care + // of overlapped blocks, it will increase the disk writes to the remote or local + // disk. + if p.isHA() { + // we will continuously try to acquire the lock here rather than trying + // once and moving on. This is a safeguard against scraper node exits unexpectedly. + loop: + for { + select { + case <-p.closeChan: + return + default: + var err2 error + p.singletonLock, err2 = cluster.NewMutex(p.API, root.Manifest.Id) + if err2 != nil { + p.API.LogError("Could not create mutex", "err", err.Error()) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + err = p.singletonLock.LockWithContext(ctx) + if err != nil && errors.Is(err, context.DeadlineExceeded) { + p.API.LogDebug("Another instance of the plugin is running in another node with scraping mode. Skipping this one.") + continue + } else if err != nil { + p.API.LogError("Could not acquire lock", "err", err.Error()) + return + } + p.singletonLockAcquired = true + break loop + } + } } - // the constant '20' is determined by healthcheck of the plugin which is 30 seconds. - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - err = p.singletonLock.LockWithContext(ctx) - if err != nil && errors.Is(err, context.DeadlineExceeded) { - p.API.LogDebug("Another instance of the plugin is running in another node with scraping mode. Skipping this one.") - return nil - } else if err != nil { - return err + if err := p.RunScraper(); err != nil { + p.API.LogError("Error during scraping", "err", err.Error()) } - p.singletonLockAcquired = true - } + }() - p.closeChan = make(chan bool) - p.waitGroup = sync.WaitGroup{} + return nil +} + +func (p *Plugin) RunScraper() error { + var err error // initiate local tsdb p.tsdbLock.Lock() defer p.tsdbLock.Unlock() + p.db, err = tsdb.Open(*p.configuration.DBPath, p.logger, nil, &tsdb.Options{ RetentionDuration: int64(localRetentionDays / time.Millisecond), AllowOverlappingCompaction: *p.configuration.AllowOverlappingCompaction, @@ -185,6 +210,7 @@ func (p *Plugin) OnActivate() error { defer db.Close() var currentList []*mmModel.ClusterDiscovery + var numTrials int for { select { @@ -192,7 +218,11 @@ func (p *Plugin) OnActivate() error { list, err := pingClusterDiscoveryTable(db, *p.API.GetConfig().ClusterSettings.ClusterName) if err != nil { p.API.LogError("Could not ping the cluster discovery table", "error", err.Error()) - return + numTrials++ + if numTrials > maxPingTrials { + return + } + continue } if !topologyChanged(currentList, list) { @@ -224,14 +254,16 @@ func (p *Plugin) OnActivate() error { go func() { defer p.waitGroup.Done() <-p.closeChan - p.API.LogInfo("Stopping scrape manager...") manager.Stop() + p.API.LogDebug("Scrape manager stopped") }() p.waitGroup.Add(1) go func() { defer p.waitGroup.Done() + // syncFileStore is blocking p.syncFileStore() + p.API.LogDebug("Filestore sync job stopped") }() return nil @@ -251,12 +283,13 @@ func (p *Plugin) OnDeactivate() error { close(p.closeChan) p.waitGroup.Wait() - p.API.LogInfo("Scrape manager stopped") - if p.db != nil { - return p.db.Close() + if err := p.db.Close(); err != nil { + return fmt.Errorf("error while closing tsdb: %w", err) + } } + p.API.LogInfo("Plugin exited gracefully") return nil } diff --git a/server/sync.go b/server/sync.go index 441b87a..cb026d1 100644 --- a/server/sync.go +++ b/server/sync.go @@ -56,7 +56,6 @@ loop: continue } case <-p.closeChan: - p.API.LogDebug("Filestore sync job stopped") return } }