From 47e4179a2d970f939df344b47d14ca0e1e95d899 Mon Sep 17 00:00:00 2001 From: Anil Vishnoi Date: Tue, 23 Apr 2024 02:02:05 -0700 Subject: [PATCH 1/2] Each worker node process the jobs sequentially, so worker doesn't need to run job popping and processing in separate go routine. Signed-off-by: Anil Vishnoi --- worker/cmd/generate.go | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/worker/cmd/generate.go b/worker/cmd/generate.go index 89a4565e..62820ab2 100644 --- a/worker/cmd/generate.go +++ b/worker/cmd/generate.go @@ -153,21 +153,19 @@ var generateCmd = &cobra.Command{ svc := s3.NewFromConfig(cfg) sigChan := make(chan os.Signal, 1) - jobChan := make(chan string) stopChan := make(chan struct{}) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) var wg sync.WaitGroup wg.Add(1) - go func(jobChan chan<- string, stopChan <-chan struct{}) { + go func(stopChan <-chan struct{}) { defer wg.Done() timer := time.NewTicker(1 * time.Second) for { select { case <-stopChan: sugar.Info("Shutting down job listener") - close(jobChan) return case <-timer.C: conn := pool.Get() @@ -179,10 +177,16 @@ var generateCmd = &cobra.Command{ sugar.Errorf("Could not pop from redis queue: %v", err) continue } - jobChan <- job + NewJobProcessor(ctx, pool, svc, sugar, job, + PreCheckEndpointURL, + SdgEndpointURL, + TlsClientCertPath, + TlsClientKeyPath, + TlsServerCaCertPath, + MaxSeed).processJob() } } - }(jobChan, stopChan) + }(stopChan) wg.Add(1) go func(ch <-chan os.Signal) { @@ -192,15 +196,6 @@ var generateCmd = &cobra.Command{ close(stopChan) }(sigChan) - wg.Add(1) - go func(ch <-chan string) { - defer wg.Done() - for job := range ch { - jp := NewJobProcessor(ctx, pool, svc, sugar, job, PreCheckEndpointURL, SdgEndpointURL, TlsClientCertPath, TlsClientKeyPath, TlsServerCaCertPath, MaxSeed) - jp.processJob() - } - }(jobChan) - wg.Wait() }, } @@ -406,6 +401,8 @@ func (w *Worker) processJob() { // If in test mode, immediately post to the results queue if TestMode { + //sleep to simulate processing time + time.Sleep(10 * time.Second) w.postJobResults("https://example.com", jobType) sugar.Info("Job done (test mode)") return From fbc98fc65cec22f7237d4a86b704e47b4b3ccf81 Mon Sep 17 00:00:00 2001 From: Anil Vishnoi Date: Sat, 27 Apr 2024 01:46:35 -0700 Subject: [PATCH 2/2] Add NotifyContext and pass it to http request to abort the request of the worker is killed, so worker won't be stuck on http request Signed-off-by: Anil Vishnoi --- worker/cmd/generate.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/worker/cmd/generate.go b/worker/cmd/generate.go index 62820ab2..b6f132d8 100644 --- a/worker/cmd/generate.go +++ b/worker/cmd/generate.go @@ -132,7 +132,9 @@ var generateCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { logger := initLogger(Debug) sugar := logger.Sugar() - ctx := cmd.Context() + + ctx, cancel := signal.NotifyContext(cmd.Context(), syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT) + defer cancel() sugar.Info("Starting generate worker") @@ -916,7 +918,7 @@ func (w *Worker) datagenSvc(taxonomyFiles []string, outputDir string, numSamples requestURL = strings.Replace(requestURL, "skill", "knowledge", -1) } - request, err := http.NewRequest("POST", requestURL, bytes.NewBuffer(jsonData)) + request, err := http.NewRequestWithContext(w.ctx, "POST", requestURL, bytes.NewBuffer(jsonData)) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) }