Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: never give up on trying #25

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ const (
zipFileName = "tsdb_dump.tar.gz"
MaxRequestSize = 5 * 1024 * 1024 // 5MB
localRetentionDays = 3 * 24 * time.Hour
maxPingTrials = 10
)
91 changes: 62 additions & 29 deletions server/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb"

root "github.com/mattermost/mattermost-plugin-metrics"

mmModel "github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/plugin"
"github.com/mattermost/mattermost/server/public/pluginapi"
"github.com/mattermost/mattermost/server/public/pluginapi/cluster"
"github.com/mattermost/mattermost/server/v8/platform/shared/filestore"

root "github.com/mattermost/mattermost-plugin-metrics"
)

// Plugin implements the interface expected by the Mattermost server to communicate between the server and plugin processes.
Expand Down Expand Up @@ -95,36 +95,61 @@ func (p *Plugin) OnActivate() error {
p.scheduler.SetCallback(p.JobCallback)
p.scheduler.Start()

// we are using a mutually exclusive lock to run a single instance of this plugin
// we don't really need to collect metrics twice: although TSDB will take care
// of overlapped blocks, it will increase the disk writes to the remote or local
// disk.
if p.isHA() {
var err2 error
p.singletonLock, err2 = cluster.NewMutex(p.API, root.Manifest.Id)
if err2 != nil {
return err2
p.waitGroup.Add(1)
go func() {
defer p.waitGroup.Done()

// we are using a mutually exclusive lock to run a single instance of this plugin
// we don't really need to collect metrics twice: although TSDB will take care
// of overlapped blocks, it will increase the disk writes to the remote or local
// disk.
if p.isHA() {
// we will continuously try to acquire the lock here rather than trying
// once and moving on. This is a safeguard against scraper node exits unexpectedly.
loop:
for {
select {
case <-p.closeChan:
return
default:
var err2 error
p.singletonLock, err2 = cluster.NewMutex(p.API, root.Manifest.Id)
if err2 != nil {
p.API.LogError("Could not create mutex", "err", err.Error())
return
}

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
err = p.singletonLock.LockWithContext(ctx)
if err != nil && errors.Is(err, context.DeadlineExceeded) {
p.API.LogDebug("Another instance of the plugin is running in another node with scraping mode. Skipping this one.")
continue
} else if err != nil {
p.API.LogError("Could not acquire lock", "err", err.Error())
return
}
p.singletonLockAcquired = true
break loop
}
}
}

// the constant '20' is determined by healthcheck of the plugin which is 30 seconds.
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
err = p.singletonLock.LockWithContext(ctx)
if err != nil && errors.Is(err, context.DeadlineExceeded) {
p.API.LogDebug("Another instance of the plugin is running in another node with scraping mode. Skipping this one.")
return nil
} else if err != nil {
return err
if err := p.RunScraper(); err != nil {
p.API.LogError("Error during scraping", "err", err.Error())
}
p.singletonLockAcquired = true
}
}()

p.closeChan = make(chan bool)
p.waitGroup = sync.WaitGroup{}
return nil
}

func (p *Plugin) RunScraper() error {
var err error

// initiate local tsdb
p.tsdbLock.Lock()
defer p.tsdbLock.Unlock()

p.db, err = tsdb.Open(*p.configuration.DBPath, p.logger, nil, &tsdb.Options{
RetentionDuration: int64(localRetentionDays / time.Millisecond),
AllowOverlappingCompaction: *p.configuration.AllowOverlappingCompaction,
Expand Down Expand Up @@ -185,14 +210,19 @@ func (p *Plugin) OnActivate() error {
defer db.Close()

var currentList []*mmModel.ClusterDiscovery
var numTrials int

for {
select {
case <-ticker.C:
list, err := pingClusterDiscoveryTable(db, *p.API.GetConfig().ClusterSettings.ClusterName)
if err != nil {
p.API.LogError("Could not ping the cluster discovery table", "error", err.Error())
return
numTrials++
if numTrials > maxPingTrials {
return
}
continue
}

if !topologyChanged(currentList, list) {
Expand Down Expand Up @@ -224,14 +254,16 @@ func (p *Plugin) OnActivate() error {
go func() {
defer p.waitGroup.Done()
<-p.closeChan
p.API.LogInfo("Stopping scrape manager...")
manager.Stop()
p.API.LogDebug("Scrape manager stopped")
}()

p.waitGroup.Add(1)
go func() {
defer p.waitGroup.Done()
// syncFileStore is blocking
p.syncFileStore()
p.API.LogDebug("Filestore sync job stopped")
}()

return nil
Expand All @@ -251,12 +283,13 @@ func (p *Plugin) OnDeactivate() error {
close(p.closeChan)
p.waitGroup.Wait()

p.API.LogInfo("Scrape manager stopped")

if p.db != nil {
return p.db.Close()
if err := p.db.Close(); err != nil {
return fmt.Errorf("error while closing tsdb: %w", err)
}
}

p.API.LogInfo("Plugin exited gracefully")
return nil
}

Expand Down
1 change: 0 additions & 1 deletion server/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ loop:
continue
}
case <-p.closeChan:
p.API.LogDebug("Filestore sync job stopped")
return
}
}
Expand Down
Loading