Skip to content

Commit

Permalink
Add support of publish/consume with job attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk committed Jul 12, 2024
1 parent 6694ab6 commit 06effc1
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 7 deletions.
48 changes: 43 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ type Job struct {
TTL int64 `json:"ttl"`
ElapsedMS int64 `json:"elapsed_ms"`
RemainTries int64 `json:"remain_tries"`
Attributes map[string]string
}

type JobRequest struct {
Queue string `json:"queue"`
ID string `json:"job_id"`
Data []byte `json:"data"`
TTL uint32 `json:"ttl"`
Tries uint16 `json:"tries"`
Delay uint32 `json:"delay"`
Attributes map[string]string `json:"attributes"`
}

type LmstfyClient struct {
Expand Down Expand Up @@ -111,27 +122,48 @@ func (c *LmstfyClient) getReq(method, relativePath string, query url.Values, bod
return
}

// Deprecated: Use PublishJob instead
//
// Publish a new job to the queue.
// - ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL.
// - tries is the maximum times the job can be fetched.
// - delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied.
func (c *LmstfyClient) Publish(queue string, data []byte, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) {
return c.publish(queue, "", data, ttlSecond, tries, delaySecond)
return c.publish(queue, "", data, nil, ttlSecond, tries, delaySecond)
}

func (c *LmstfyClient) PublishJob(job *JobRequest) (jobID string, e error) {
return c.publish(job.Queue, "", job.Data, job.Attributes, job.TTL, job.Tries, job.Delay)
}

// Deprecated: Use RePublishJob instead
//
// RePublish delete(ack) the job of the queue and publish the job again.
// - ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL.
// - tries is the maximum times the job can be fetched.
// - delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied.
func (c *LmstfyClient) RePublish(job *Job, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) {
return c.publish(job.Queue, job.ID, job.Data, ttlSecond, tries, delaySecond)
return c.publish(job.Queue, job.ID, job.Data, nil, ttlSecond, tries, delaySecond)
}

func (c *LmstfyClient) RePublishJob(job *JobRequest) (jobID string, e error) {
return c.publish(job.Queue, jobID, job.Data, job.Attributes, job.TTL, job.Tries, job.Delay)
}

func (c *LmstfyClient) publish(queue, ackJobID string, data []byte, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) {
func (c *LmstfyClient) publish(
queue,
ackJobID string,
data []byte,
attributes map[string]string,
ttlSecond uint32,
tries uint16,
delaySecond uint32,
) (jobID string, e error) {
query := url.Values{}
query.Add("ttl", strconv.FormatUint(uint64(ttlSecond), 10))
query.Add("tries", strconv.FormatUint(uint64(tries), 10))
query.Add("delay", strconv.FormatUint(uint64(delaySecond), 10))

retryCount := 0
relativePath := queue
if ackJobID != "" {
Expand All @@ -145,6 +177,12 @@ RETRY:
Reason: err.Error(),
}
}
if len(attributes) > 0 {
req.Header.Add("Enable-Job-Version", "YES")
for k, v := range attributes {
req.Header.Add(fmt.Sprintf("Job-Attr-%s", strings.ToTitle(k)), v)
}
}

resp, err := c.httpCli.Do(req)
if err != nil {
Expand Down Expand Up @@ -364,7 +402,7 @@ func (c *LmstfyClient) consume(queue string, ttrSecond, timeoutSecond uint32, fr
RequestID: resp.Header.Get("X-Request-ID"),
}
}
respBytes, err := ioutil.ReadAll(resp.Body)
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return nil, &APIError{
Type: ResponseErr,
Expand Down Expand Up @@ -502,7 +540,7 @@ func (c *LmstfyClient) batchConsume(queues []string, count, ttrSecond, timeoutSe

// Consume from multiple queues with priority.
// The order of the queues in the params implies the priority. eg.
// ConsumeFromQueues(120, 5, "queue-a", "queue-b", "queue-c")
// ConsumeFromQueues(120, 5, "queue-a", "queue-b", "queue-c")
// if all the queues have jobs to be fetched, the job in `queue-a` will be return.
func (c *LmstfyClient) ConsumeFromQueues(ttrSecond, timeoutSecond uint32, queues ...string) (job *Job, e error) {
return c.consumeFromQueues(ttrSecond, timeoutSecond, false, queues...)
Expand Down
25 changes: 25 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestParseSchemeFromURL(t *testing.T) {
Expand Down Expand Up @@ -121,6 +123,29 @@ func TestLmstfyClient_Consume(t *testing.T) {
}
}

func TestLmstfyClient_PublishWithAttributes(t *testing.T) {
cli := NewLmstfyClient(Host, Port, Namespace, Token)
jobID, _ := cli.PublishJob(&JobRequest{
Queue: "test-publish-attributes",
Data: []byte("hello"),
TTL: 10,
Tries: 10,
Delay: 0,
Attributes: map[string]string{
"hello": "world",
"foo": "bar",
},
})
job, err := cli.Consume("test-publish-attributes", 10, 3)
require.NoError(t, err)
require.NotNil(t, job)
require.Equal(t, jobID, job.ID)
require.Equal(t, "hello", string(job.Data))
require.Len(t, job.Attributes, 2)
require.Equal(t, "world", job.Attributes["hello"])
require.Equal(t, "bar", job.Attributes["foo"])
}

func TestLmstfyClient_BatchConsume(t *testing.T) {
cli := NewLmstfyClient(Host, Port, Namespace, Token)
jobMap := map[string]bool{}
Expand Down
4 changes: 2 additions & 2 deletions client/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"net/http"
"os"
"testing"
Expand Down Expand Up @@ -53,7 +53,7 @@ func setup(CONF *config.Config) {
if resp.StatusCode != http.StatusCreated {
panic("Failed to create testing token")
}
respBytes, err := ioutil.ReadAll(resp.Body)
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
panic("Failed to create testing token")
}
Expand Down

0 comments on commit 06effc1

Please sign in to comment.