-
Notifications
You must be signed in to change notification settings - Fork 49
/
dynamodb.go
139 lines (111 loc) · 3.92 KB
/
dynamodb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package limiters
import (
"context"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/pkg/errors"
)
// DynamoDBTableProperties are supplied to DynamoDB limiter backends.
// This struct informs the backend what the name of the table is and what the names of the key fields are.
type DynamoDBTableProperties struct {
//TableName is the name of the table.
TableName string
//PartitionKeyName is the name of the PartitionKey attribute.
PartitionKeyName string
//SortKeyName is the name of the SortKey attribute.
SortKeyName string
//SortKeyUsed indicates if a SortKey is present on the table.
SortKeyUsed bool
//TTLFieldName is the name of the attribute configured for TTL.
TTLFieldName string
}
// LoadDynamoDBTableProperties fetches a table description with the supplied client and returns a DynamoDBTableProperties struct
func LoadDynamoDBTableProperties(ctx context.Context, client *dynamodb.Client, tableName string) (DynamoDBTableProperties, error) {
resp, err := client.DescribeTable(ctx, &dynamodb.DescribeTableInput{
TableName: &tableName,
})
if err != nil {
return DynamoDBTableProperties{}, errors.Wrap(err, "describe dynamodb table failed")
}
ttlResp, err := client.DescribeTimeToLive(ctx, &dynamodb.DescribeTimeToLiveInput{
TableName: &tableName,
})
if err != nil {
return DynamoDBTableProperties{}, errors.Wrap(err, "describe dynamobd table ttl failed")
}
data, err := loadTableData(resp.Table, ttlResp.TimeToLiveDescription)
if err != nil {
return data, err
}
return data, nil
}
func loadTableData(table *types.TableDescription, ttl *types.TimeToLiveDescription) (DynamoDBTableProperties, error) {
data := DynamoDBTableProperties{
TableName: *table.TableName,
}
data, err := loadTableKeys(data, table)
if err != nil {
return data, errors.Wrap(err, "invalid dynamodb table")
}
return populateTableTTL(data, ttl), nil
}
func loadTableKeys(data DynamoDBTableProperties, table *types.TableDescription) (DynamoDBTableProperties, error) {
for _, key := range table.KeySchema {
if key.KeyType == types.KeyTypeHash {
data.PartitionKeyName = *key.AttributeName
continue
}
data.SortKeyName = *key.AttributeName
data.SortKeyUsed = true
}
for _, attribute := range table.AttributeDefinitions {
name := *attribute.AttributeName
if !(name == data.PartitionKeyName || name == data.SortKeyName) {
continue
}
if name == data.PartitionKeyName && attribute.AttributeType != types.ScalarAttributeTypeS {
return data, errors.New("dynamodb partition key must be of type S")
} else if data.SortKeyUsed && name == data.SortKeyName && attribute.AttributeType != types.ScalarAttributeTypeS {
return data, errors.New("dynamodb sort key must be of type S")
}
}
return data, nil
}
func populateTableTTL(data DynamoDBTableProperties, ttl *types.TimeToLiveDescription) DynamoDBTableProperties {
if ttl.TimeToLiveStatus != types.TimeToLiveStatusEnabled {
return data
}
data.TTLFieldName = *ttl.AttributeName
return data
}
func dynamoDBputItem(ctx context.Context, client *dynamodb.Client, input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) {
resp, err := client.PutItem(ctx, input)
if err != nil {
var cErr *types.ConditionalCheckFailedException
if errors.As(err, &cErr) {
return nil, ErrRaceCondition
}
return nil, errors.Wrap(err, "unable to set dynamodb item")
}
return resp, nil
}
func dynamoDBGetItem(ctx context.Context, client *dynamodb.Client, input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) {
input.ConsistentRead = aws.Bool(true)
var resp *dynamodb.GetItemOutput
var err error
done := make(chan struct{})
go func() {
defer close(done)
resp, err = client.GetItem(ctx, input)
}()
select {
case <-done:
case <-ctx.Done():
return nil, ctx.Err()
}
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve dynamodb item")
}
return resp, nil
}