From f9f46b8d0c38c280b5beea93b9f8ae19fc71476b Mon Sep 17 00:00:00 2001 From: Barnaby Keene <1636971+Southclaws@users.noreply.github.com> Date: Fri, 22 Nov 2024 21:11:21 +0000 Subject: [PATCH] fix http get call and error handling in job queue --- app/services/asset/analyse_job/download.go | 13 ++++++++++++- app/services/asset/analyse_job/job.go | 8 ++++---- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/app/services/asset/analyse_job/download.go b/app/services/asset/analyse_job/download.go index 02a1841f..622ef048 100644 --- a/app/services/asset/analyse_job/download.go +++ b/app/services/asset/analyse_job/download.go @@ -3,6 +3,7 @@ package analyse_job import ( "context" "net/http" + "time" "github.com/Southclaws/fault" "github.com/Southclaws/fault/fctx" @@ -16,11 +17,21 @@ import ( ) func (c *analyseConsumer) downloadAsset(ctx context.Context, src string, fillrule opt.Optional[asset.ContentFillCommand]) error { - resp, err := http.Get(src) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, src, nil) if err != nil { return fault.Wrap(err, fctx.With(ctx)) } + client := &http.Client{ + Timeout: 30 * time.Second, + } + + resp, err := client.Do(req) + if err != nil { + return fault.Wrap(err, fctx.With(ctx)) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { ctx = fctx.WithMeta(ctx, "status", resp.Status) return fault.Wrap(fault.New("failed to get asset"), fctx.With(ctx)) diff --git a/app/services/asset/analyse_job/job.go b/app/services/asset/analyse_job/job.go index 06d64f82..1748548c 100644 --- a/app/services/asset/analyse_job/job.go +++ b/app/services/asset/analyse_job/job.go @@ -23,7 +23,7 @@ func runAnalyseConsumer( lc.Append(fx.StartHook(func(_ context.Context) error { analyseChan, err := analyseQueue.Subscribe(ctx) if err != nil { - panic(err) + return err } go func() { @@ -31,7 +31,7 @@ func runAnalyseConsumer( nctx := session.GetSessionFromMessage(ctx, msg) if err := consumer.analyseAsset(nctx, msg.Payload.AssetID, msg.Payload.ContentFillRule); err != nil { - l.Error("failed to index node", zap.Error(err)) + l.Error("failed to analyse asset", zap.Error(err)) } msg.Ack() @@ -40,7 +40,7 @@ func runAnalyseConsumer( downloadChan, err := downloadQueue.Subscribe(ctx) if err != nil { - panic(err) + return err } go func() { @@ -48,7 +48,7 @@ func runAnalyseConsumer( nctx := session.GetSessionFromMessage(ctx, msg) if err := consumer.downloadAsset(nctx, msg.Payload.URL, msg.Payload.ContentFillRule); err != nil { - l.Error("failed to index node", zap.Error(err)) + l.Error("failed to download asset", zap.Error(err)) } msg.Ack()