diff --git a/brokers/redis/broker.go b/brokers/redis/broker.go index 2bf3c69..6603293 100644 --- a/brokers/redis/broker.go +++ b/brokers/redis/broker.go @@ -29,18 +29,19 @@ type Options struct { } type Broker struct { - log *slog.Logger - conn redis.UniversalClient - pollPeriod time.Duration + log *slog.Logger + opts Options + + conn redis.UniversalClient } func New(o Options, lo *slog.Logger) *Broker { - pollPeriod := o.PollPeriod if o.PollPeriod == 0 { - pollPeriod = DefaultPollPeriod + o.PollPeriod = DefaultPollPeriod } return &Broker{ - log: lo, + opts: o, + log: lo, conn: redis.NewUniversalClient(&redis.UniversalOptions{ Addrs: o.Addrs, DB: o.DB, @@ -51,7 +52,6 @@ func New(o Options, lo *slog.Logger) *Broker { MinIdleConns: o.MinIdleConns, IdleTimeout: o.IdleTimeout, }), - pollPeriod: pollPeriod, } } @@ -87,7 +87,7 @@ func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) { return default: b.log.Debug("receiving from consumer..") - res, err := b.conn.BLPop(ctx, b.pollPeriod, queue).Result() + res, err := b.conn.BLPop(ctx, b.opts.PollPeriod, queue).Result() if err != nil && err.Error() != "redis: nil" { b.log.Error("error consuming from redis queue", "error", err) } else if errors.Is(err, redis.Nil) { @@ -105,7 +105,7 @@ func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) { } func (b *Broker) consumeScheduled(ctx context.Context, queue string) { - poll := time.NewTicker(b.pollPeriod) + poll := time.NewTicker(b.opts.PollPeriod) for { select { diff --git a/brokers/redis/broker_piped.go b/brokers/redis/broker_piped.go new file mode 100644 index 0000000..fc0d5b4 --- /dev/null +++ b/brokers/redis/broker_piped.go @@ -0,0 +1,176 @@ +package redis + +import ( + "context" + "errors" + "fmt" + "log/slog" + "strconv" + "time" + + "github.com/go-redis/redis/v8" +) + +const ( + DefaultPipePeriod = 200 * time.Millisecond +) + +type PipeBroker struct { + log *slog.Logger + opts PipedOptions + + conn redis.UniversalClient + pipe redis.Pipeliner +} + +type PipedOptions struct { + Addrs []string + Password string + DB int + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + IdleTimeout time.Duration + MinIdleConns int + + PollPeriod time.Duration + PipePeriod time.Duration +} + +func NewPiped(o PipedOptions, lo *slog.Logger) *PipeBroker { + if o.PollPeriod == 0 { + o.PollPeriod = DefaultPollPeriod + } + if o.PipePeriod == 0 { + o.PipePeriod = DefaultPipePeriod + } + + conn := redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: o.Addrs, + DB: o.DB, + Password: o.Password, + DialTimeout: o.DialTimeout, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, + MinIdleConns: o.MinIdleConns, + IdleTimeout: o.IdleTimeout, + }) + + p := &PipeBroker{ + log: lo, + conn: conn, + pipe: conn.Pipeline(), + opts: o, + } + + go p.pushPipe(context.TODO()) + + return p +} + +func (r *PipeBroker) pushPipe(ctx context.Context) { + tk := time.NewTicker(r.opts.PipePeriod) + for { + select { + case <-ctx.Done(): + return + case <-tk.C: + r.log.Debug("submitting redis pipe") + if r.pipe.Len() == 0 { + continue + } + if _, err := r.pipe.Exec(ctx); err != nil { + r.log.Error("error executing redis pipe: %v", err) + } + } + } +} + +func (r *PipeBroker) GetPending(ctx context.Context, queue string) ([]string, error) { + rs, err := r.conn.LRange(ctx, queue, 0, -1).Result() + if err == redis.Nil { + return []string{}, nil + } else if err != nil { + return []string{}, err + } + + return rs, nil +} + +func (b *PipeBroker) Enqueue(ctx context.Context, msg []byte, queue string) error { + return b.pipe.LPush(ctx, queue, msg).Err() +} + +func (b *PipeBroker) EnqueueScheduled(ctx context.Context, msg []byte, queue string, ts time.Time) error { + return b.pipe.ZAdd(ctx, fmt.Sprintf(sortedSetKey, queue), &redis.Z{ + Score: float64(ts.UnixNano()), + Member: msg, + }).Err() +} + +func (b *PipeBroker) Consume(ctx context.Context, work chan []byte, queue string) { + go b.consumeScheduled(ctx, queue) + + for { + select { + case <-ctx.Done(): + b.log.Debug("shutting down consumer..") + return + default: + b.log.Debug("receiving from consumer..") + res, err := b.conn.BLPop(ctx, b.opts.PollPeriod, queue).Result() + if err != nil && err.Error() != "redis: nil" { + b.log.Error("error consuming from redis queue", "error", err) + } else if errors.Is(err, redis.Nil) { + b.log.Debug("no tasks to consume..", "queue", queue) + } else { + msg, err := blpopResult(res) + if err != nil { + b.log.Error("error parsing response from redis", "error", err) + return + } + work <- []byte(msg) + } + } + } +} + +func (b *PipeBroker) consumeScheduled(ctx context.Context, queue string) { + poll := time.NewTicker(b.opts.PollPeriod) + + for { + select { + case <-ctx.Done(): + b.log.Debug("shutting down scheduled consumer..") + return + case <-poll.C: + b.conn.Watch(ctx, func(tx *redis.Tx) error { + // Fetch the tasks with score less than current time. These tasks have been scheduled + // to be queued. + tasks, err := tx.ZRevRangeByScore(ctx, fmt.Sprintf(sortedSetKey, queue), &redis.ZRangeBy{ + Min: "0", + Max: strconv.FormatInt(time.Now().UnixNano(), 10), + Offset: 0, + Count: 1, + }).Result() + if err != nil { + return err + } + + for _, task := range tasks { + if err := b.Enqueue(ctx, []byte(task), queue); err != nil { + return err + } + } + + // Remove the tasks + if err := tx.ZRem(ctx, fmt.Sprintf(sortedSetKey, queue), tasks).Err(); err != nil { + return err + } + + return nil + }) + } + + } +} diff --git a/results/redis/results_piped.go b/results/redis/results_piped.go new file mode 100644 index 0000000..00446e7 --- /dev/null +++ b/results/redis/results_piped.go @@ -0,0 +1,202 @@ +package redis + +import ( + "context" + "log/slog" + "strconv" + "time" + + "github.com/go-redis/redis/v8" +) + +const DefaultPipePeriod = 200 * time.Millisecond + +type PipedResults struct { + lo *slog.Logger + opt PipedOptions + + conn redis.UniversalClient + pipe redis.Pipeliner +} + +type PipedOptions struct { + Addrs []string + Password string + DB int + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + IdleTimeout time.Duration + Expiry time.Duration + MetaExpiry time.Duration + MinIdleConns int + + PipePeriod time.Duration +} + +func NewPiped(o PipedOptions, lo *slog.Logger) *PipedResults { + if o.PipePeriod == 0 { + o.PipePeriod = DefaultPipePeriod + } + + conn := redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: o.Addrs, + DB: o.DB, + Password: o.Password, + DialTimeout: o.DialTimeout, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, + MinIdleConns: o.MinIdleConns, + IdleTimeout: o.IdleTimeout, + }) + + p := &PipedResults{ + lo: lo, + conn: conn, + pipe: conn.Pipeline(), + opt: o, + } + + // TODO: pass ctx here somehow + if o.MetaExpiry != 0 { + go p.expireMeta(context.TODO(), o.MetaExpiry) + } + + go p.execPipe(context.TODO()) + + return p +} + +func (r *PipedResults) execPipe(ctx context.Context) { + tk := time.NewTicker(r.opt.PipePeriod) + for { + select { + case <-ctx.Done(): + r.lo.Info("context closed, draining redis pipe", "length", r.pipe.Len()) + if _, err := r.pipe.Exec(ctx); err != nil { + r.lo.Error("error executing redis pipe: %v", err) + } + return + case <-tk.C: + plen := r.pipe.Len() + if plen == 0 { + continue + } + + r.lo.Info("submitting redis pipe", "length", plen) + if _, err := r.pipe.Exec(ctx); err != nil { + r.lo.Error("error executing redis pipe: %v", err) + } + } + } +} + +func (r *PipedResults) DeleteJob(ctx context.Context, id string) error { + r.lo.Debug("deleting job") + if err := r.conn.ZRem(ctx, resultPrefix+success, 1, id).Err(); err != nil { + return err + } + + if err := r.conn.ZRem(ctx, resultPrefix+failed, 1, id).Err(); err != nil { + return err + } + + if err := r.conn.Del(ctx, resultPrefix+id).Err(); err != nil { + return err + } + + return nil +} + +func (r *PipedResults) GetSuccess(ctx context.Context) ([]string, error) { + // Fetch the failed tasks with score less than current time + r.lo.Debug("getting successful jobs") + rs, err := r.conn.ZRevRangeByScore(ctx, resultPrefix+success, &redis.ZRangeBy{ + Min: "0", + Max: strconv.FormatInt(time.Now().UnixNano(), 10), + }).Result() + if err != nil { + return nil, err + } + + return rs, nil +} + +func (r *PipedResults) GetFailed(ctx context.Context) ([]string, error) { + // Fetch the failed tasks with score less than current time + r.lo.Debug("getting failed jobs") + rs, err := r.conn.ZRevRangeByScore(ctx, resultPrefix+failed, &redis.ZRangeBy{ + Min: "0", + Max: strconv.FormatInt(time.Now().UnixNano(), 10), + }).Result() + if err != nil { + return nil, err + } + + return rs, nil +} + +func (r *PipedResults) SetSuccess(ctx context.Context, id string) error { + r.lo.Debug("setting job as successful", "id", id) + return r.pipe.ZAdd(ctx, resultPrefix+success, &redis.Z{ + Score: float64(time.Now().UnixNano()), + Member: id, + }).Err() +} + +func (r *PipedResults) SetFailed(ctx context.Context, id string) error { + r.lo.Debug("setting job as failed", "id", id) + return r.pipe.ZAdd(ctx, resultPrefix+failed, &redis.Z{ + Score: float64(time.Now().UnixNano()), + Member: id, + }).Err() +} + +func (r *PipedResults) Set(ctx context.Context, id string, b []byte) error { + r.lo.Debug("setting result for job", "id", id) + return r.pipe.Set(ctx, resultPrefix+id, b, r.opt.Expiry).Err() +} + +func (r *PipedResults) Get(ctx context.Context, id string) ([]byte, error) { + r.lo.Debug("getting result for job", "id", id) + rs, err := r.conn.Get(ctx, resultPrefix+id).Bytes() + if err != nil { + return nil, err + } + + return rs, nil +} + +// TODO: accpet a ctx here and shutdown gracefully +func (r *PipedResults) expireMeta(ctx context.Context, ttl time.Duration) { + r.lo.Info("starting results meta purger", "ttl", ttl) + + var ( + tk = time.NewTicker(ttl) + ) + + for { + select { + case <-ctx.Done(): + r.lo.Info("shutting down meta purger", "ttl", ttl) + return + case <-tk.C: + now := time.Now().UnixNano() - int64(ttl) + score := strconv.FormatInt(now, 10) + + r.lo.Debug("purging failed results metadata", "score", score) + if err := r.pipe.ZRemRangeByScore(context.Background(), resultPrefix+failed, "0", score).Err(); err != nil { + r.lo.Error("could not expire success/failed metadata", "err", err) + } + + r.lo.Debug("purging success results metadata", "score", score) + if err := r.pipe.ZRemRangeByScore(context.Background(), resultPrefix+success, "0", score).Err(); err != nil { + r.lo.Error("could not expire success/failed metadata", "err", err) + } + } + } +} + +func (r *PipedResults) NilError() error { + return redis.Nil +} diff --git a/server.go b/server.go index bc9077f..6fd640b 100644 --- a/server.go +++ b/server.go @@ -57,8 +57,8 @@ type TaskOpts struct { Queue string SuccessCB func(JobCtx) ProcessingCB func(JobCtx) - RetryingCB func(JobCtx) - FailedCB func(JobCtx) + RetryingCB func(JobCtx, error) + FailedCB func(JobCtx, error) } // RegisterTask maps a new task against the tasks map on the server. @@ -294,7 +294,7 @@ func (s *Server) process(ctx context.Context, w chan []byte) { if err := s.execJob(ctx, msg, task); err != nil { s.spanError(span, err) - s.log.Error("could not execute job. err", "error", err) + s.log.Error("could not execute job", "error", err) } } } @@ -357,12 +357,12 @@ func (s *Server) execJob(ctx context.Context, msg JobMessage, task Task) error { // Try queueing the job again. if msg.MaxRetry != msg.Retried { if task.opts.RetryingCB != nil { - task.opts.RetryingCB(taskCtx) + task.opts.RetryingCB(taskCtx, err) } return s.retryJob(ctx, msg) } else { if task.opts.FailedCB != nil { - task.opts.FailedCB(taskCtx) + task.opts.FailedCB(taskCtx, err) } // If we hit max retries, set the task status as failed. return s.statusFailed(ctx, msg) @@ -381,13 +381,13 @@ func (s *Server) execJob(ctx context.Context, msg JobMessage, task Task) error { meta := DefaultMeta(nj.Opts) meta.PrevJobResult, err = s.GetResult(ctx, msg.ID) if err != nil { - return err + return fmt.Errorf("could not get result for id (%s) : %w", msg.ID, err) } // Set the ID of the next job in the chain msg.OnSuccessID, err = s.enqueueWithMeta(ctx, nj, meta) if err != nil { - return err + return fmt.Errorf("could not enqueue job id (%s) : %w", msg.ID, err) } }