diff --git a/go/vt/vtorc/discovery/queue.go b/go/vt/vtorc/discovery/queue.go index 4b18303959b..bf279b781f2 100644 --- a/go/vt/vtorc/discovery/queue.go +++ b/go/vt/vtorc/discovery/queue.go @@ -33,12 +33,6 @@ import ( "vitess.io/vitess/go/vt/vtorc/config" ) -// QueueMetric contains the queue's active and queued sizes -type QueueMetric struct { - Active int - Queued int -} - // Queue contains information for managing discovery requests type Queue struct { sync.Mutex @@ -48,67 +42,16 @@ type Queue struct { queue chan string queuedKeys map[string]time.Time consumedKeys map[string]time.Time - metrics []QueueMetric -} - -// DiscoveryQueue contains the discovery queue which can then be accessed via an API call for monitoring. -// Currently this is accessed by ContinuousDiscovery() but also from http api calls. -// I may need to protect this better? -var discoveryQueue map[string](*Queue) -var dcLock sync.Mutex - -func init() { - discoveryQueue = make(map[string](*Queue)) } -// CreateOrReturnQueue allows for creation of a new discovery queue or -// returning a pointer to an existing one given the name. -func CreateOrReturnQueue(name string) *Queue { - dcLock.Lock() - defer dcLock.Unlock() - if q, found := discoveryQueue[name]; found { - return q - } - - q := &Queue{ +// CreateQueue allows for creation of a new discovery queue +func CreateQueue(name string) *Queue { + return &Queue{ name: name, queuedKeys: make(map[string]time.Time), consumedKeys: make(map[string]time.Time), queue: make(chan string, config.DiscoveryQueueCapacity), } - go q.startMonitoring() - - discoveryQueue[name] = q - - return q -} - -// monitoring queue sizes until we are told to stop -func (q *Queue) startMonitoring() { - log.Infof("Queue.startMonitoring(%s)", q.name) - ticker := time.NewTicker(time.Second) // hard-coded at every second - - for { - select { - case <-ticker.C: // do the periodic expiry - q.collectStatistics() - case <-q.done: - return - } - } -} - -// do a check of the entries in the queue, both those active and queued -func (q *Queue) collectStatistics() { - q.Lock() - defer q.Unlock() - - q.metrics = append(q.metrics, QueueMetric{Queued: len(q.queuedKeys), Active: len(q.consumedKeys)}) - - // remove old entries if we get too big - if len(q.metrics) > config.DiscoveryQueueMaxStatisticsSize { - q.metrics = q.metrics[len(q.metrics)-config.DiscoveryQueueMaxStatisticsSize:] - } } // QueueLen returns the length of the queue (channel size + queued size) diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 39326525ce2..5ac5af50d47 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -108,7 +108,7 @@ func waitForLocksRelease() { // handleDiscoveryRequests iterates the discoveryQueue channel and calls upon // instance discovery per entry. func handleDiscoveryRequests() { - discoveryQueue = discovery.CreateOrReturnQueue("DEFAULT") + discoveryQueue = discovery.CreateQueue("DEFAULT") // create a pool of discovery workers for i := uint(0); i < config.DiscoveryMaxConcurrency; i++ { go func() {