Skip to content

Commit

Permalink
Merge pull request #298 from vishnoianil/fix-chan
Browse files Browse the repository at this point in the history
Fix worker to pick the next job when it can process it.
  • Loading branch information
mergify[bot] authored Apr 30, 2024
2 parents 2f0e30d + fbc98fc commit d7efe01
Showing 1 changed file with 15 additions and 16 deletions.
31 changes: 15 additions & 16 deletions worker/cmd/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ var generateCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
logger := initLogger(Debug)
sugar := logger.Sugar()
ctx := cmd.Context()

ctx, cancel := signal.NotifyContext(cmd.Context(), syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT)
defer cancel()

sugar.Info("Starting generate worker")

Expand All @@ -160,21 +162,19 @@ var generateCmd = &cobra.Command{
svc := s3.NewFromConfig(cfg)

sigChan := make(chan os.Signal, 1)
jobChan := make(chan string)
stopChan := make(chan struct{})

signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

var wg sync.WaitGroup
wg.Add(1)
go func(jobChan chan<- string, stopChan <-chan struct{}) {
go func(stopChan <-chan struct{}) {
defer wg.Done()
timer := time.NewTicker(1 * time.Second)
for {
select {
case <-stopChan:
sugar.Info("Shutting down job listener")
close(jobChan)
return
case <-timer.C:
conn := pool.Get()
Expand All @@ -186,10 +186,16 @@ var generateCmd = &cobra.Command{
sugar.Errorf("Could not pop from redis queue: %v", err)
continue
}
jobChan <- job
NewJobProcessor(ctx, pool, svc, sugar, job,
PreCheckEndpointURL,
SdgEndpointURL,
TlsClientCertPath,
TlsClientKeyPath,
TlsServerCaCertPath,
MaxSeed).processJob()
}
}
}(jobChan, stopChan)
}(stopChan)

wg.Add(1)
go func(ch <-chan os.Signal) {
Expand All @@ -199,15 +205,6 @@ var generateCmd = &cobra.Command{
close(stopChan)
}(sigChan)

wg.Add(1)
go func(ch <-chan string) {
defer wg.Done()
for job := range ch {
jp := NewJobProcessor(ctx, pool, svc, sugar, job, PreCheckEndpointURL, SdgEndpointURL, TlsClientCertPath, TlsClientKeyPath, TlsServerCaCertPath, MaxSeed)
jp.processJob()
}
}(jobChan)

wg.Wait()
},
}
Expand Down Expand Up @@ -419,6 +416,8 @@ func (w *Worker) processJob() {

// If in test mode, immediately post to the results queue
if TestMode {
//sleep to simulate processing time
time.Sleep(10 * time.Second)
w.postJobResults("https://example.com", jobType)
sugar.Info("Job done (test mode)")
return
Expand Down Expand Up @@ -943,7 +942,7 @@ func (w *Worker) datagenSvc(taxonomyFiles []string, outputDir string, numSamples
requestURL = strings.Replace(requestURL, "skill", "knowledge", -1)
}

request, err := http.NewRequest("POST", requestURL, bytes.NewBuffer(jsonData))
request, err := http.NewRequestWithContext(w.ctx, "POST", requestURL, bytes.NewBuffer(jsonData))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
Expand Down

0 comments on commit d7efe01

Please sign in to comment.