Skip to content

Commit

Permalink
Each worker node process the jobs sequentially,
Browse files Browse the repository at this point in the history
so worker doesn't need to run job popping and
processing in separate go routine.

Signed-off-by: Anil Vishnoi <[email protected]>
  • Loading branch information
vishnoianil committed Apr 27, 2024
1 parent 753273d commit 47e4179
Showing 1 changed file with 11 additions and 14 deletions.
25 changes: 11 additions & 14 deletions worker/cmd/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,21 +153,19 @@ var generateCmd = &cobra.Command{
svc := s3.NewFromConfig(cfg)

sigChan := make(chan os.Signal, 1)
jobChan := make(chan string)
stopChan := make(chan struct{})

signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

var wg sync.WaitGroup
wg.Add(1)
go func(jobChan chan<- string, stopChan <-chan struct{}) {
go func(stopChan <-chan struct{}) {
defer wg.Done()
timer := time.NewTicker(1 * time.Second)
for {
select {
case <-stopChan:
sugar.Info("Shutting down job listener")
close(jobChan)
return
case <-timer.C:
conn := pool.Get()
Expand All @@ -179,10 +177,16 @@ var generateCmd = &cobra.Command{
sugar.Errorf("Could not pop from redis queue: %v", err)
continue
}
jobChan <- job
NewJobProcessor(ctx, pool, svc, sugar, job,
PreCheckEndpointURL,
SdgEndpointURL,
TlsClientCertPath,
TlsClientKeyPath,
TlsServerCaCertPath,
MaxSeed).processJob()
}
}
}(jobChan, stopChan)
}(stopChan)

wg.Add(1)
go func(ch <-chan os.Signal) {
Expand All @@ -192,15 +196,6 @@ var generateCmd = &cobra.Command{
close(stopChan)
}(sigChan)

wg.Add(1)
go func(ch <-chan string) {
defer wg.Done()
for job := range ch {
jp := NewJobProcessor(ctx, pool, svc, sugar, job, PreCheckEndpointURL, SdgEndpointURL, TlsClientCertPath, TlsClientKeyPath, TlsServerCaCertPath, MaxSeed)
jp.processJob()
}
}(jobChan)

wg.Wait()
},
}
Expand Down Expand Up @@ -406,6 +401,8 @@ func (w *Worker) processJob() {

// If in test mode, immediately post to the results queue
if TestMode {
//sleep to simulate processing time
time.Sleep(10 * time.Second)
w.postJobResults("https://example.com", jobType)
sugar.Info("Job done (test mode)")
return
Expand Down

0 comments on commit 47e4179

Please sign in to comment.