diff --git a/worker/cmd/generate.go b/worker/cmd/generate.go index ed40f048..1270e401 100644 --- a/worker/cmd/generate.go +++ b/worker/cmd/generate.go @@ -139,7 +139,9 @@ var generateCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { logger := initLogger(Debug) sugar := logger.Sugar() - ctx := cmd.Context() + + ctx, cancel := signal.NotifyContext(cmd.Context(), syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT) + defer cancel() sugar.Info("Starting generate worker") @@ -160,21 +162,19 @@ var generateCmd = &cobra.Command{ svc := s3.NewFromConfig(cfg) sigChan := make(chan os.Signal, 1) - jobChan := make(chan string) stopChan := make(chan struct{}) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) var wg sync.WaitGroup wg.Add(1) - go func(jobChan chan<- string, stopChan <-chan struct{}) { + go func(stopChan <-chan struct{}) { defer wg.Done() timer := time.NewTicker(1 * time.Second) for { select { case <-stopChan: sugar.Info("Shutting down job listener") - close(jobChan) return case <-timer.C: conn := pool.Get() @@ -186,10 +186,16 @@ var generateCmd = &cobra.Command{ sugar.Errorf("Could not pop from redis queue: %v", err) continue } - jobChan <- job + NewJobProcessor(ctx, pool, svc, sugar, job, + PreCheckEndpointURL, + SdgEndpointURL, + TlsClientCertPath, + TlsClientKeyPath, + TlsServerCaCertPath, + MaxSeed).processJob() } } - }(jobChan, stopChan) + }(stopChan) wg.Add(1) go func(ch <-chan os.Signal) { @@ -199,15 +205,6 @@ var generateCmd = &cobra.Command{ close(stopChan) }(sigChan) - wg.Add(1) - go func(ch <-chan string) { - defer wg.Done() - for job := range ch { - jp := NewJobProcessor(ctx, pool, svc, sugar, job, PreCheckEndpointURL, SdgEndpointURL, TlsClientCertPath, TlsClientKeyPath, TlsServerCaCertPath, MaxSeed) - jp.processJob() - } - }(jobChan) - wg.Wait() }, } @@ -419,6 +416,8 @@ func (w *Worker) processJob() { // If in test mode, immediately post to the results queue if TestMode { + //sleep to simulate processing time + time.Sleep(10 * time.Second) w.postJobResults("https://example.com", jobType) sugar.Info("Job done (test mode)") return @@ -943,7 +942,7 @@ func (w *Worker) datagenSvc(taxonomyFiles []string, outputDir string, numSamples requestURL = strings.Replace(requestURL, "skill", "knowledge", -1) } - request, err := http.NewRequest("POST", requestURL, bytes.NewBuffer(jsonData)) + request, err := http.NewRequestWithContext(w.ctx, "POST", requestURL, bytes.NewBuffer(jsonData)) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) }