Skip to content

Commit

Permalink
Merge pull request #1347 from halspang/release-1.4
Browse files Browse the repository at this point in the history
Cherry-pick required changes for 1.4.4
  • Loading branch information
artursouza authored Nov 29, 2021
2 parents ff9f357 + e64edcc commit 2b9cd80
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 93 deletions.
130 changes: 91 additions & 39 deletions bindings/azure/cosmosdb/cosmosdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/a8m/documentdb"
"github.com/cenkalti/backoff/v4"

"github.com/dapr/components-contrib/authentication/azure"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
Expand All @@ -34,7 +37,9 @@ type cosmosDBCredentials struct {
PartitionKey string `json:"partitionKey"`
}

// NewCosmosDB returns a new CosmosDB instance
const statusTooManyRequests = "429" // RFC 6585, 4

// NewCosmosDB returns a new CosmosDB instance.
func NewCosmosDB(logger logger.Logger) *CosmosDB {
return &CosmosDB{logger: logger}
}
Expand Down Expand Up @@ -66,36 +71,63 @@ func (c *CosmosDB) Init(metadata bindings.Metadata) error {
}
config = documentdb.NewConfigWithServicePrincipal(spt)
}
client := documentdb.New(m.URL, config)
// disable the identification hydrator (which autogenerates IDs if missing from the request)
// so we aren't forced to use a struct by the upstream SDK
// this allows us to provide the most flexibility in the request document sent to this binding
config.IdentificationHydrator = nil
config.WithAppIdentifier("dapr-" + logger.DaprVersion)

// Retries initializing the client if a TooManyRequests error is encountered
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = 2 * time.Second
bo.MaxElapsedTime = 5 * time.Minute
err = backoff.RetryNotify(func() (err error) {
client := documentdb.New(m.URL, config)

dbs, err := client.QueryDatabases(&documentdb.Query{
Query: "SELECT * FROM ROOT r WHERE r.id=@id",
Parameters: []documentdb.Parameter{
{Name: "@id", Value: m.Database},
},
})
if err != nil {
if isTooManyRequestsError(err) {
return err
}

return backoff.Permanent(err)
} else if len(dbs) == 0 {
return backoff.Permanent(fmt.Errorf("database %s for CosmosDB binding not found", m.Database))
}

dbs, err := client.QueryDatabases(&documentdb.Query{
Query: "SELECT * FROM ROOT r WHERE r.id=@id",
Parameters: []documentdb.Parameter{
{Name: "@id", Value: m.Database},
},
})
if err != nil {
return err
} else if len(dbs) == 0 {
return fmt.Errorf("database %s for CosmosDB state store not found", m.Database)
}
c.db = &dbs[0]
colls, err := client.QueryCollections(c.db.Self, &documentdb.Query{
Query: "SELECT * FROM ROOT r WHERE r.id=@id",
Parameters: []documentdb.Parameter{
{Name: "@id", Value: m.Collection},
},
})
if err != nil {
if isTooManyRequestsError(err) {
return err
}

return backoff.Permanent(err)
} else if len(colls) == 0 {
return backoff.Permanent(fmt.Errorf("collection %s for CosmosDB binding not found", m.Collection))
}

c.db = &dbs[0]
colls, err := client.QueryCollections(c.db.Self, &documentdb.Query{
Query: "SELECT * FROM ROOT r WHERE r.id=@id",
Parameters: []documentdb.Parameter{
{Name: "@id", Value: m.Collection},
},
c.collection = &colls[0]
c.client = client

return nil
}, bo, func(err error, d time.Duration) {
c.logger.Warnf("CosmosDB binding initialization failed: %v; retrying in %s", err, d)
})
if err != nil {
return err
} else if len(colls) == 0 {
return fmt.Errorf("collection %s for CosmosDB state store not found", m.Collection)
}

c.collection = &colls[0]
c.client = client

return nil
}

Expand All @@ -120,29 +152,35 @@ func (c *CosmosDB) Operations() []bindings.OperationKind {
}

func (c *CosmosDB) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var obj interface{}
err := json.Unmarshal(req.Data, &obj)
if err != nil {
return nil, err
}
//nolint:exhaustive
switch req.Operation {
case bindings.CreateOperation:
var obj interface{}
err := json.Unmarshal(req.Data, &obj)
if err != nil {
return nil, err
}

val, err := c.getPartitionKeyValue(c.partitionKey, obj)
if err != nil {
return nil, err
}
val, err := c.getPartitionKeyValue(c.partitionKey, obj)
if err != nil {
return nil, err
}

_, err = c.client.CreateDocument(c.collection.Self, obj, documentdb.PartitionKey(val))
if err != nil {
return nil, err
}
_, err = c.client.CreateDocument(c.collection.Self, obj, documentdb.PartitionKey(val))
if err != nil {
return nil, err
}

return nil, nil
return nil, nil
default:
return nil, fmt.Errorf("operation kind %s not supported", req.Operation)
}
}

func (c *CosmosDB) getPartitionKeyValue(key string, obj interface{}) (interface{}, error) {
val, err := c.lookup(obj.(map[string]interface{}), strings.Split(key, "."))
if err != nil {
return nil, fmt.Errorf("missing partitionKey field %s from request body - %s", c.partitionKey, err)
return nil, fmt.Errorf("missing partitionKey field %s from request body - %w", c.partitionKey, err)
}

if val == "" {
Expand Down Expand Up @@ -177,3 +215,17 @@ func (c *CosmosDB) lookup(m map[string]interface{}, ks []string) (val interface{

return c.lookup(m, ks[1:])
}

func isTooManyRequestsError(err error) bool {
if err == nil {
return false
}

if requestError, ok := err.(*documentdb.RequestError); ok {
if requestError.Code == statusTooManyRequests {
return true
}
}

return false
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/Shopify/sarama v1.23.1
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 // indirect
github.com/a8m/documentdb v1.3.0
github.com/a8m/documentdb v1.3.1-0.20211026005403-13c3593b3c3a
github.com/aerospike/aerospike-client-go v4.5.0+incompatible
github.com/agrea/ptr v0.0.0-20180711073057-77a518d99b7b
github.com/ajg/form v1.5.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrU
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 h1:5sXbqlSomvdjlRbWyNqkPsJ3Fg+tQZCbgeX1VGljbQY=
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/a8m/documentdb v1.3.0 h1:xzZQ6Ts02QesHeQdRr6doF7xfXYSsq9SUIlCqfJjbv4=
github.com/a8m/documentdb v1.3.0/go.mod h1:4Z0mpi7fkyqjxUdGiNMO3vagyiUoiwLncaIX6AsW5z0=
github.com/a8m/documentdb v1.3.1-0.20211026005403-13c3593b3c3a h1:CRGa9OOZNd184xTDcaVJd6N0KumviIGO6purxihBZ14=
github.com/a8m/documentdb v1.3.1-0.20211026005403-13c3593b3c3a/go.mod h1:4Z0mpi7fkyqjxUdGiNMO3vagyiUoiwLncaIX6AsW5z0=
github.com/aerospike/aerospike-client-go v4.5.0+incompatible h1:6ALev/Ge4jW5avSLoqgvPYTh+FLeeDD9xDhzoMCNgOo=
github.com/aerospike/aerospike-client-go v4.5.0+incompatible/go.mod h1:zj8LBEnWBDOVEIJt8LvaRvDG5ARAoa5dBeHaB472NRc=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
Expand Down
145 changes: 97 additions & 48 deletions state/azure/cosmosdb/cosmosdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/a8m/documentdb"
"github.com/agrea/ptr"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
jsoniter "github.com/json-iterator/go"

Expand Down Expand Up @@ -62,10 +64,11 @@ type storedProcedureDefinition struct {
}

const (
storedProcedureName = "__dapr__"
metadataPartitionKey = "partitionKey"
unknownPartitionKey = "__UNKNOWN__"
metadataTTLKey = "ttlInSeconds"
storedProcedureName = "__dapr__"
metadataPartitionKey = "partitionKey"
unknownPartitionKey = "__UNKNOWN__"
metadataTTLKey = "ttlInSeconds"
statusTooManyRequests = "429" // RFC 6585, 4
)

// NewCosmosDBStateStore returns a new CosmosDB state store
Expand Down Expand Up @@ -129,62 +132,94 @@ func (c *StateStore) Init(meta state.Metadata) error {
}
config = documentdb.NewConfigWithServicePrincipal(spt)
}
client := documentdb.New(m.URL, config)
config.WithAppIdentifier("dapr-" + logger.DaprVersion)

dbs, err := client.QueryDatabases(&documentdb.Query{
Query: "SELECT * FROM ROOT r WHERE r.id=@id",
Parameters: []documentdb.Parameter{
{Name: "@id", Value: m.Database},
},
})
if err != nil {
return err
} else if len(dbs) == 0 {
return fmt.Errorf("database %s for CosmosDB state store not found", m.Database)
}
// Retries initializing the client if a TooManyRequests error is encountered
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = 2 * time.Second
bo.MaxElapsedTime = 5 * time.Minute
err = backoff.RetryNotify(func() (err error) {
client := documentdb.New(m.URL, config)

c.db = &dbs[0]
colls, err := client.QueryCollections(c.db.Self, &documentdb.Query{
Query: "SELECT * FROM ROOT r WHERE r.id=@id",
Parameters: []documentdb.Parameter{
{Name: "@id", Value: m.Collection},
},
})
if err != nil {
return err
} else if len(colls) == 0 {
return fmt.Errorf("collection %s for CosmosDB state store not found. This must be created before Dapr uses it", m.Collection)
}
dbs, err := client.QueryDatabases(&documentdb.Query{
Query: "SELECT * FROM ROOT r WHERE r.id=@id",
Parameters: []documentdb.Parameter{
{Name: "@id", Value: m.Database},
},
})

c.metadata = m
c.collection = &colls[0]
c.client = client
c.contentType = m.ContentType
if err != nil {
if isTooManyRequestsError(err) {
return err
}

sps, err := c.client.ReadStoredProcedures(c.collection.Self)
if err != nil {
return err
}
return backoff.Permanent(err)
} else if len(dbs) == 0 {
return backoff.Permanent(fmt.Errorf("database %s for CosmosDB state store not found", m.Database))
}

// get a link to the sp
for i := range sps {
if sps[i].Id == storedProcedureName {
c.sp = &sps[i]
c.db = &dbs[0]
colls, err := client.QueryCollections(c.db.Self, &documentdb.Query{
Query: "SELECT * FROM ROOT r WHERE r.id=@id",
Parameters: []documentdb.Parameter{
{Name: "@id", Value: m.Collection},
},
})
if err != nil {
if isTooManyRequestsError(err) {
return err
}

break
return backoff.Permanent(err)
} else if len(colls) == 0 {
return backoff.Permanent(fmt.Errorf("collection %s for CosmosDB state store not found. This must be created before Dapr uses it", m.Collection))
}
}

if c.sp == nil {
// register the stored procedure
createspBody := storedProcedureDefinition{ID: storedProcedureName, Body: spDefinition}
c.sp, err = c.client.CreateStoredProcedure(c.collection.Self, createspBody)
c.metadata = m
c.collection = &colls[0]
c.client = client
c.contentType = m.ContentType

sps, err := c.client.ReadStoredProcedures(c.collection.Self)
if err != nil {
// if it already exists that is success
if !strings.HasPrefix(err.Error(), "Conflict") {
if isTooManyRequestsError(err) {
return err
}

return backoff.Permanent(err)
}

// get a link to the sp
for i := range sps {
if sps[i].Id == storedProcedureName {
c.sp = &sps[i]

break
}
}

// nolint:nestif
if c.sp == nil {
// register the stored procedure
createspBody := storedProcedureDefinition{ID: storedProcedureName, Body: spDefinition}
c.sp, err = c.client.CreateStoredProcedure(c.collection.Self, createspBody)
if err != nil {
if isTooManyRequestsError(err) {
return err
}
// if it already exists that is success
if !strings.HasPrefix(err.Error(), "Conflict") {
return backoff.Permanent(err)
}
}
}

return nil
}, bo, func(err error, d time.Duration) {
c.logger.Warnf("CosmosDB state store initialization failed: %v; retrying in %s", err, d)
})
if err != nil {
return err
}

c.logger.Debug("cosmos Init done")
Expand Down Expand Up @@ -463,3 +498,17 @@ func parseTTL(requestMetadata map[string]string) (*int, error) {

return nil, nil
}

func isTooManyRequestsError(err error) bool {
if err == nil {
return false
}

if requestError, ok := err.(*documentdb.RequestError); ok {
if requestError.Code == statusTooManyRequests {
return true
}
}

return false
}
Loading

0 comments on commit 2b9cd80

Please sign in to comment.