Skip to content

Commit

Permalink
Add Exponential Retry Mechanism with Idempotency Headers
Browse files Browse the repository at this point in the history
  • Loading branch information
mahendraHegde committed Oct 7, 2023
1 parent 1449de3 commit 7a7a7b3
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 3 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA=
github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.5 h1:s5PTfem8p8EbKQOctVV53k6jCJt3UX4IEJzwh+C324Q=
github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
Expand Down
49 changes: 46 additions & 3 deletions lib/novu.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/google/uuid"
"github.com/hashicorp/go-retryablehttp"
"github.com/pkg/errors"
)

Expand All @@ -17,9 +21,17 @@ const (
NovuVersion = "v1"
)

type RetryConfigType struct {
InitialDelay time.Duration // inital delay
WaitMin time.Duration // Minimum time to wait
WaitMax time.Duration // Maximum time to wait
RetryMax int // Maximum number of retries
}

type Config struct {
BackendURL *url.URL
HttpClient *http.Client
BackendURL *url.URL
HttpClient *http.Client
RetryConfig *RetryConfigType
}

type APIClient struct {
Expand Down Expand Up @@ -47,7 +59,37 @@ func NewAPIClient(apiKey string, cfg *Config) *APIClient {
cfg.BackendURL = buildBackendURL(cfg)

if cfg.HttpClient == nil {
cfg.HttpClient = &http.Client{Timeout: 20 * time.Second}
retyableClient := retryablehttp.NewClient()
if cfg.RetryConfig != nil {
retyableClient.RetryWaitMin = cfg.RetryConfig.WaitMin
retyableClient.RetryWaitMax = cfg.RetryConfig.WaitMax
retyableClient.RetryMax = cfg.RetryConfig.RetryMax
retyableClient.Backoff = func(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration {
if resp != nil {
if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable {
if s, ok := resp.Header["Retry-After"]; ok {
if sleep, err := strconv.ParseInt(s[0], 10, 64); err == nil {
return time.Second * time.Duration(sleep)
}
}
}
}
if attemptNum == 0 {
return cfg.RetryConfig.InitialDelay //wait for InitialDelay on 1st retry
}
mult := math.Pow(2, float64(attemptNum)) * float64(min)
sleep := time.Duration(mult)
//float64(sleep) != mult is to make sure there is no conversion error
//if there is a conversion error, number is huge and we set the sleep to max
if float64(sleep) != mult || sleep > max {
sleep = max
}
return sleep
}
} else {
retyableClient.RetryMax = 0 //by default no retry
}
cfg.HttpClient = retyableClient.StandardClient()
}

c := &APIClient{apiKey: apiKey}
Expand All @@ -70,6 +112,7 @@ func NewAPIClient(apiKey string, cfg *Config) *APIClient {
func (c APIClient) sendRequest(req *http.Request, resp interface{}) (*http.Response, error) {
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("ApiKey %s", c.apiKey))
req.Header.Set("Idempotency-Key", uuid.New().String())

res, err := c.config.HttpClient.Do(req)
if err != nil {
Expand Down
168 changes: 168 additions & 0 deletions lib/novu_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package lib_test

import (
"context"
"encoding/json"
"log"
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"testing"
"time"

"github.com/novuhq/go-novu/lib"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestError_Retry_With_Custom_Config(t *testing.T) {
var (
subscriberBulkPayload lib.SubscriberBulkPayload
receivedBody lib.SubscriberBulkPayload
expectedRequest lib.SubscriberBulkPayload
)
reqCount := 0
var idempotencyHeader []string
allElementsSame := func(arr []string) bool {
if len(arr) == 0 {
return true // An empty array is considered to have all elements the same.
}
firstElement := arr[0]
for _, element := range arr {
if element != firstElement {
return false
}
}
return true
}
subscriberService := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if err := json.NewDecoder(req.Body).Decode(&receivedBody); err != nil {
log.Printf("error in unmarshalling %+v", err)
w.WriteHeader(http.StatusBadRequest)
return
}

reqCount++

t.Run("Header must contain Idempotency-Key", func(t *testing.T) {
idKey := req.Header.Get("Idempotency-Key")
idempotencyHeader = append(idempotencyHeader, idKey)
assert.NotNil(t, idKey)
})
t.Run("Header must contain ApiKey", func(t *testing.T) {
authKey := req.Header.Get("Authorization")
assert.True(t, strings.Contains(authKey, novuApiKey))
assert.True(t, strings.HasPrefix(authKey, "ApiKey"))
})

t.Run("URL and request method is as expected", func(t *testing.T) {
expectedURL := "/v1/subscribers/bulk"
assert.Equal(t, http.MethodPost, req.Method)
assert.Equal(t, expectedURL, req.RequestURI)
})

t.Run("Request is as expected", func(t *testing.T) {
fileToStruct(filepath.Join("../testdata", "subscriber_bulk.json"), &expectedRequest)
assert.Equal(t, expectedRequest, receivedBody)
})

var resp lib.SubscriberResponse
fileToStruct(filepath.Join("../testdata", "subscriber_bulk_response.json"), &resp)

w.WriteHeader(http.StatusInternalServerError)
bb, _ := json.Marshal(resp)
w.Write(bb)
}))

defer subscriberService.Close()

ctx := context.Background()
fileToStruct(filepath.Join("../testdata", "subscriber_bulk.json"), &subscriberBulkPayload)

c := lib.NewAPIClient(novuApiKey, &lib.Config{BackendURL: lib.MustParseURL(subscriberService.URL), RetryConfig: &lib.RetryConfigType{RetryMax: 5, InitialDelay: 0 * time.Second}})

resp, err := c.SubscriberApi.BulkCreate(ctx, subscriberBulkPayload)
require.NotNil(t, err)
assert.NotNil(t, resp)

//idempotency and retry tests
assert.Equal(t, reqCount, 6)
assert.Equal(t, len(idempotencyHeader), 6)
assert.True(t, allElementsSame(idempotencyHeader))
}
func TestError_Retry_With_Default_Config(t *testing.T) {
var (
subscriberBulkPayload lib.SubscriberBulkPayload
receivedBody lib.SubscriberBulkPayload
expectedRequest lib.SubscriberBulkPayload
)
reqCount := 0
var idempotencyHeader []string
allElementsSame := func(arr []string) bool {
if len(arr) == 0 {
return true // An empty array is considered to have all elements the same.
}
firstElement := arr[0]
for _, element := range arr {
if element != firstElement {
return false
}
}
return true
}
subscriberService := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if err := json.NewDecoder(req.Body).Decode(&receivedBody); err != nil {
log.Printf("error in unmarshalling %+v", err)
w.WriteHeader(http.StatusBadRequest)
return
}

reqCount++

t.Run("Header must contain Idempotency-Key", func(t *testing.T) {
idKey := req.Header.Get("Idempotency-Key")
idempotencyHeader = append(idempotencyHeader, idKey)
assert.NotNil(t, idKey)
})
t.Run("Header must contain ApiKey", func(t *testing.T) {
authKey := req.Header.Get("Authorization")
assert.True(t, strings.Contains(authKey, novuApiKey))
assert.True(t, strings.HasPrefix(authKey, "ApiKey"))
})

t.Run("URL and request method is as expected", func(t *testing.T) {
expectedURL := "/v1/subscribers/bulk"
assert.Equal(t, http.MethodPost, req.Method)
assert.Equal(t, expectedURL, req.RequestURI)
})

t.Run("Request is as expected", func(t *testing.T) {
fileToStruct(filepath.Join("../testdata", "subscriber_bulk.json"), &expectedRequest)
assert.Equal(t, expectedRequest, receivedBody)
})

var resp lib.SubscriberResponse
fileToStruct(filepath.Join("../testdata", "subscriber_bulk_response.json"), &resp)

w.WriteHeader(http.StatusInternalServerError)
bb, _ := json.Marshal(resp)
w.Write(bb)
}))

defer subscriberService.Close()

ctx := context.Background()
fileToStruct(filepath.Join("../testdata", "subscriber_bulk.json"), &subscriberBulkPayload)

c := lib.NewAPIClient(novuApiKey, &lib.Config{BackendURL: lib.MustParseURL(subscriberService.URL)})

resp, err := c.SubscriberApi.BulkCreate(ctx, subscriberBulkPayload)
require.NotNil(t, err)
assert.NotNil(t, resp)

//idempotency and retry tests
assert.Equal(t, reqCount, 1)
assert.True(t, allElementsSame(idempotencyHeader))
assert.Equal(t, len(idempotencyHeader), 1)
}

0 comments on commit 7a7a7b3

Please sign in to comment.