Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused code in discovery queue creation #17515

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 3 additions & 60 deletions go/vt/vtorc/discovery/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ import (
"vitess.io/vitess/go/vt/vtorc/config"
)

// QueueMetric contains the queue's active and queued sizes
type QueueMetric struct {
Active int
Queued int
}

// Queue contains information for managing discovery requests
type Queue struct {
sync.Mutex
Expand All @@ -48,67 +42,16 @@ type Queue struct {
queue chan string
queuedKeys map[string]time.Time
consumedKeys map[string]time.Time
metrics []QueueMetric
}

// DiscoveryQueue contains the discovery queue which can then be accessed via an API call for monitoring.
// Currently this is accessed by ContinuousDiscovery() but also from http api calls.
// I may need to protect this better?
var discoveryQueue map[string](*Queue)
var dcLock sync.Mutex

func init() {
discoveryQueue = make(map[string](*Queue))
}

// CreateOrReturnQueue allows for creation of a new discovery queue or
// returning a pointer to an existing one given the name.
func CreateOrReturnQueue(name string) *Queue {
dcLock.Lock()
defer dcLock.Unlock()
if q, found := discoveryQueue[name]; found {
return q
}

q := &Queue{
// CreateQueue allows for creation of a new discovery queue
func CreateQueue(name string) *Queue {
return &Queue{
name: name,
queuedKeys: make(map[string]time.Time),
consumedKeys: make(map[string]time.Time),
queue: make(chan string, config.DiscoveryQueueCapacity),
}
go q.startMonitoring()

discoveryQueue[name] = q

return q
}

// monitoring queue sizes until we are told to stop
func (q *Queue) startMonitoring() {
log.Infof("Queue.startMonitoring(%s)", q.name)
ticker := time.NewTicker(time.Second) // hard-coded at every second

for {
select {
case <-ticker.C: // do the periodic expiry
q.collectStatistics()
case <-q.done:
return
}
}
}

// do a check of the entries in the queue, both those active and queued
func (q *Queue) collectStatistics() {
q.Lock()
defer q.Unlock()

q.metrics = append(q.metrics, QueueMetric{Queued: len(q.queuedKeys), Active: len(q.consumedKeys)})

// remove old entries if we get too big
if len(q.metrics) > config.DiscoveryQueueMaxStatisticsSize {
q.metrics = q.metrics[len(q.metrics)-config.DiscoveryQueueMaxStatisticsSize:]
}
}

// QueueLen returns the length of the queue (channel size + queued size)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func waitForLocksRelease() {
// handleDiscoveryRequests iterates the discoveryQueue channel and calls upon
// instance discovery per entry.
func handleDiscoveryRequests() {
discoveryQueue = discovery.CreateOrReturnQueue("DEFAULT")
discoveryQueue = discovery.CreateQueue("DEFAULT")
// create a pool of discovery workers
for i := uint(0); i < config.DiscoveryMaxConcurrency; i++ {
go func() {
Expand Down
Loading