Skip to content

Commit

Permalink
Merge pull request #289 from vgarvardt/chore/enque-batch-real
Browse files Browse the repository at this point in the history
chore: batch enqueue really does batch insert instead of inserts in a loop
  • Loading branch information
vgarvardt authored Jul 12, 2024
2 parents ff4301f + 9e074b2 commit 7b607aa
Showing 1 changed file with 54 additions and 53 deletions.
107 changes: 54 additions & 53 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"strings"
"time"

"github.com/oklog/ulid/v2"
Expand Down Expand Up @@ -64,12 +65,12 @@ func NewClient(pool adapter.ConnPool, options ...ClientOption) (*Client, error)

// Enqueue adds a job to the queue.
func (c *Client) Enqueue(ctx context.Context, j *Job) error {
return c.execEnqueue(ctx, j, c.pool)
return c.execEnqueue(ctx, []*Job{j}, c.pool)
}

// EnqueueWithID adds a job to the queue with a specific id
func (c *Client) EnqueueWithID(ctx context.Context, j *Job, ulid ulid.ULID) error {
return c.execEnqueueWithID(ctx, j, c.pool, ulid)
func (c *Client) EnqueueWithID(ctx context.Context, j *Job, jobID ulid.ULID) error {
return c.execEnqueueWithID(ctx, []*Job{j}, c.pool, []ulid.ULID{jobID})
}

// EnqueueTx adds a job to the queue within the scope of the transaction.
Expand All @@ -79,7 +80,7 @@ func (c *Client) EnqueueWithID(ctx context.Context, j *Job, ulid ulid.ULID) erro
// It is the caller's responsibility to Commit or Rollback the transaction after
// this function is called.
func (c *Client) EnqueueTx(ctx context.Context, j *Job, tx adapter.Tx) error {
return c.execEnqueue(ctx, j, tx)
return c.execEnqueue(ctx, []*Job{j}, tx)
}

// EnqueueBatch adds a batch of jobs. Operation is atomic, so either all jobs are added, or none.
Expand All @@ -89,21 +90,7 @@ func (c *Client) EnqueueBatch(ctx context.Context, jobs []*Job) error {
return nil
}

tx, err := c.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("could not begin transaction: %w", err)
}

for i, j := range jobs {
if err := c.execEnqueue(ctx, j, tx); err != nil {
if rbErr := tx.Rollback(ctx); rbErr != nil {
c.logger.Error("Could not properly rollback transaction", adapter.Err(err))
}
return fmt.Errorf("could not enqueue job from the batch [idx %d]: %w", i, err)
}
}

return tx.Commit(ctx)
return c.execEnqueue(ctx, jobs, c.pool)
}

// EnqueueBatchTx adds a batch of jobs within the scope of the transaction.
Expand All @@ -117,59 +104,73 @@ func (c *Client) EnqueueBatchTx(ctx context.Context, jobs []*Job, tx adapter.Tx)
return nil
}

for i, j := range jobs {
if err := c.execEnqueue(ctx, j, tx); err != nil {
return fmt.Errorf("could not enqueue job from the batch [idx %d]: %w", i, err)
}
}

return nil
return c.execEnqueue(ctx, jobs, tx)
}

func (c *Client) execEnqueueWithID(ctx context.Context, j *Job, q adapter.Queryable, jobID ulid.ULID) (err error) {
if j.Type == "" {
return ErrMissingType
var errSlicesMustMatch = errors.New("jobs and jobIDs slices must have the same non-zero length, pls report this a bug")

func (c *Client) execEnqueueWithID(ctx context.Context, jobs []*Job, q adapter.Queryable, jobIDs []ulid.ULID) (err error) {
if len(jobs) != len(jobIDs) || len(jobs) == 0 || len(jobIDs) == 0 {
return errSlicesMustMatch
}

j.CreatedAt = time.Now().UTC()
var (
args []any
values []string
)
for i, j := range jobs {
if j.Type == "" {
return ErrMissingType
}

runAt := j.RunAt
if runAt.IsZero() {
j.RunAt = j.CreatedAt
}
j.CreatedAt = time.Now().UTC()

j.ID = jobID
idAsString := jobID.String()
runAt := j.RunAt
if runAt.IsZero() {
j.RunAt = j.CreatedAt
}

j.ID = jobIDs[i]
idAsString := jobIDs[i].String()

if j.Args == nil {
j.Args = []byte{}
if j.Args == nil {
j.Args = []byte{}
}

values = append(values, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", i*8+1, i*8+2, i*8+3, i*8+4, i*8+5, i*8+6, i*8+7, i*8+8))
args = append(args, idAsString, j.Queue, j.Priority, j.RunAt, j.Type, j.Args, j.CreatedAt, j.CreatedAt)
}

_, err = q.Exec(ctx, `INSERT INTO gue_jobs
(job_id, queue, priority, run_at, job_type, args, created_at, updated_at)
VALUES
($1, $2, $3, $4, $5, $6, $7, $7)
`, idAsString, j.Queue, j.Priority, j.RunAt, j.Type, j.Args, j.CreatedAt)

c.logger.Debug(
"Tried to enqueue a job",
adapter.Err(err),
adapter.F("queue", j.Queue),
adapter.F("id", idAsString),
)
`+strings.Join(values, ", "), args...)

for _, j := range jobs {
c.logger.Debug(
"Tried to enqueue a job",
adapter.Err(err),
adapter.F("queue", j.Queue),
adapter.F("id", j.ID.String()),
)

c.mEnqueue.Add(ctx, 1, metric.WithAttributes(attrJobType.String(j.Type), attrSuccess.Bool(err == nil)))
c.mEnqueue.Add(ctx, 1, metric.WithAttributes(attrJobType.String(j.Type), attrSuccess.Bool(err == nil)))
}

return err
}

func (c *Client) execEnqueue(ctx context.Context, j *Job, q adapter.Queryable) error {
jobID, err := ulid.New(ulid.Now(), c.entropy)
if err != nil {
return fmt.Errorf("could not generate new Job ULID ID: %w", err)
func (c *Client) execEnqueue(ctx context.Context, jobs []*Job, q adapter.Queryable) error {
jobIDs := make([]ulid.ULID, 0, len(jobs))
for range jobs {
jobID, err := ulid.New(ulid.Now(), c.entropy)
if err != nil {
return fmt.Errorf("could not generate new Job ULID ID: %w", err)
}
jobIDs = append(jobIDs, jobID)
}

return c.execEnqueueWithID(ctx, j, q, jobID)
return c.execEnqueueWithID(ctx, jobs, q, jobIDs)
}

// LockJob attempts to retrieve a Job from the database in the specified queue.
Expand Down

0 comments on commit 7b607aa

Please sign in to comment.