Skip to content

Commit

Permalink
[dynamodb] Initial stab at a few APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
dzbarsky committed Jul 10, 2023
1 parent 121d6b0 commit cc2850e
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 5 deletions.
4 changes: 4 additions & 0 deletions awserrors/awserrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ func ResourceNotFoundException(message string) *Error {
func ResourceInUseException(message string) *Error {
return generate400Exception("ResourceInUseException", message)
}

func XXX_TODO(message string) *Error {
return generate400Exception("XXX_TODO", message)
}
14 changes: 9 additions & 5 deletions http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (
)

const (
jsonContentType = "application/x-amz-json-1.1"
cborContentType = "application/x-amz-cbor-1.1"
jsonContentType10 = "application/x-amz-json-1.0"
jsonContentType11 = "application/x-amz-json-1.1"
cborContentType = "application/x-amz-cbor-1.1"
)

func strictUnmarshal(r io.Reader, contentType string, target any) error {
Expand All @@ -28,7 +29,7 @@ func strictUnmarshal(r io.Reader, contentType string, target any) error {
}

switch contentType {
case jsonContentType:
case jsonContentType10, jsonContentType11:
decoder := json.NewDecoder(bytes.NewBuffer(data))
decoder.DisallowUnknownFields()
err := decoder.Decode(target)
Expand Down Expand Up @@ -69,9 +70,12 @@ func writeResponse(w http.ResponseWriter, output any, awserr *awserrors.Error, c
return
}

marshalFunc := cbor.Marshal
if contentType == jsonContentType {
var marshalFunc func(v any) ([]byte, error)
switch contentType {
case jsonContentType10, jsonContentType11:
marshalFunc = json.Marshal
case cborContentType:
marshalFunc = cbor.Marshal
}

data, err := marshalFunc(output)
Expand Down
9 changes: 9 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"aws-in-a-box/arn"
"aws-in-a-box/server"
"aws-in-a-box/services/dynamodb"
"aws-in-a-box/services/kinesis"
"aws-in-a-box/services/kms"
)
Expand All @@ -26,6 +27,8 @@ func main() {

enableKMS := flag.Bool("enableKMS", true, "Enable Kinesis service")

enableDynamoDB := flag.Bool("experimental_enableDynamoDB", true, "Enable DynamoDB service")

flag.Parse()

methodRegistry := make(map[string]http.HandlerFunc)
Expand Down Expand Up @@ -54,6 +57,12 @@ func main() {
log.Println("Enabled KMS")
}

if *enableDynamoDB {
d := dynamodb.New(arnGenerator)
d.RegisterHTTPHandlers(methodRegistry)
log.Println("Enabled DynamoDB (EXPERIMENTAL!!!)")
}

srv := server.New(methodRegistry)
srv.Addr = *addr

Expand Down
221 changes: 221 additions & 0 deletions services/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
package dynamodb

import (
"fmt"
"reflect"
"sync"

"aws-in-a-box/arn"
"aws-in-a-box/awserrors"
)

type Table struct {
Name string
ARN string
BillingMode string
AttributeDefinitions []APIAttributeDefinition
KeySchema []APIKeySchemaElement

PrimaryKeyAttributeName string
ItemsByPrimaryKey map[string][]APIItem
ItemCount int
}

func (t *Table) toAPI() APITableDescription {
return APITableDescription{
AttributeDefinitions: t.AttributeDefinitions,
ItemCount: t.ItemCount,
KeySchema: t.KeySchema,
// TODO: delayed creation
TableARN: t.ARN,
TableStatus: "ACTIVE",
}
}

type DynamoDB struct {
arnGenerator arn.Generator

mu sync.Mutex
tablesByName map[string]*Table
}

func New(generator arn.Generator) *DynamoDB {
d := &DynamoDB{
arnGenerator: generator,
tablesByName: make(map[string]*Table),
}
return d
}

// https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_CreateTable.html
func (d *DynamoDB) CreateTable(input CreateTableInput) (*CreateTableOutput, *awserrors.Error) {
d.mu.Lock()
defer d.mu.Unlock()

if _, ok := d.tablesByName[input.TableName]; ok {
return nil, awserrors.ResourceInUseException("Table already exists")
}

primaryKeyAttributeName := ""
for _, keySchemaElement := range input.KeySchema {
if keySchemaElement.KeyType == "HASH" {
primaryKeyAttributeName = keySchemaElement.AttributeName
break
}
}
if primaryKeyAttributeName == "" {
return nil, awserrors.InvalidArgumentException("KeySchema must have a HASH key")
}

t := &Table{
Name: input.TableName,
ARN: d.arnGenerator.Generate("dynamodb", "table", input.TableName),
BillingMode: input.BillingMode,
AttributeDefinitions: input.AttributeDefinitions,
KeySchema: input.KeySchema,
PrimaryKeyAttributeName: primaryKeyAttributeName,
ItemsByPrimaryKey: make(map[string][]APIItem),
}
d.tablesByName[input.TableName] = t

fmt.Println("CreateTable", input)
return &CreateTableOutput{
TableDescription: t.toAPI(),
}, nil
}

// https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_CreateTable.html
func (d *DynamoDB) DescribeTable(input DescribeTableInput) (*DescribeTableOutput, *awserrors.Error) {
d.mu.Lock()
defer d.mu.Unlock()

t, ok := d.tablesByName[input.TableName]
if !ok {
return nil, awserrors.InvalidArgumentException("Table does not exist")
}

return &DescribeTableOutput{
Table: t.toAPI(),
}, nil
}

// https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Scan.html
func (d *DynamoDB) Scan(input ScanInput) (*ScanOutput, *awserrors.Error) {
d.mu.Lock()
defer d.mu.Unlock()

/*data, _ := json.MarshalIndent(input, "", " ")
fmt.Println("Scan", string(data))*/

t, ok := d.tablesByName[input.TableName]
if !ok {
return nil, awserrors.InvalidArgumentException("Table does not exist")
}

var allItems []APIItem
for _, items := range t.ItemsByPrimaryKey {
allItems = append(allItems, items...)
}

return &ScanOutput{
Count: len(allItems),
Items: allItems,
}, nil
}

// https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html
func (d *DynamoDB) PutItem(input PutItemInput) (*PutItemOutput, *awserrors.Error) {
d.mu.Lock()
defer d.mu.Unlock()

/*data, _ := json.MarshalIndent(input, "", " ")
fmt.Println("PutItem", string(data))*/

t, ok := d.tablesByName[input.TableName]
if !ok {
return nil, awserrors.InvalidArgumentException("Table does not exist")
}
key := input.Item[t.PrimaryKeyAttributeName].S
if key == "" {
return nil, awserrors.InvalidArgumentException("PrimaryKey must be provided (and string)")
}
t.ItemsByPrimaryKey[key] = append(t.ItemsByPrimaryKey[key], input.Item)
t.ItemCount += 1

return &PutItemOutput{}, nil
}

// https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html
func (d *DynamoDB) UpdateItem(input UpdateItemInput) (*UpdateItemOutput, *awserrors.Error) {
d.mu.Lock()
defer d.mu.Unlock()

/*data, _ := json.MarshalIndent(input, "", " ")
fmt.Println("UpdateItem", string(data))*/

t, ok := d.tablesByName[input.TableName]
if !ok {
return nil, awserrors.InvalidArgumentException("Table does not exist")
}

// TODO: composite keys
key := input.Key[t.PrimaryKeyAttributeName].S
if key == "" {
return nil, awserrors.InvalidArgumentException("PrimaryKey must be provided (and string)")
}
items := t.ItemsByPrimaryKey[key]

itemCountIncrease := 0
var existingItem map[string]APIAttributeValue
if len(items) == 0 {
existingItem = make(map[string]APIAttributeValue)
itemCountIncrease = 1
} else if len(items) == 1 {
existingItem = items[0]
t.ItemsByPrimaryKey[key] = items[:0]
} else {
return nil, awserrors.XXX_TODO("Multiple items with same primary key")
}

// Check preconditions
for attribute, expectation := range input.Expected {
attr, exists := existingItem[attribute]
if expectation.Exists != nil {
if *expectation.Exists != exists {
return nil, awserrors.XXX_TODO("Attribute exists mismatch")
}
}
switch expectation.ComparisonOperator {
case "":
case "EQ":
if !reflect.DeepEqual(attr, expectation.Value) {
return nil, awserrors.XXX_TODO("Attribute EQ mismatch")
}
case "NEQ":
if reflect.DeepEqual(attr, expectation.Value) {
return nil, awserrors.XXX_TODO("Attribute NEQ mismatch")
}
default:
return nil, awserrors.InvalidArgumentException("Invalid expectation comparison operator: " + expectation.ComparisonOperator)
}
}

// Perform update
for attribute, update := range input.AttributeUpdates {
switch update.Action {
case "PUT":
existingItem[attribute] = update.Value
case "DELETE":
delete(existingItem, attribute)
case "ADD":
// TODO
// fallthrough
default:
return nil, awserrors.InvalidArgumentException("Invalid update action: " + update.Action)
}
}

t.ItemCount += itemCountIncrease
t.ItemsByPrimaryKey[key] = []APIItem{existingItem}
return &UpdateItemOutput{}, nil
}
13 changes: 13 additions & 0 deletions services/dynamodb/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package dynamodb

import "aws-in-a-box/http"

const service = "DynamoDB_20120810"

func (d *DynamoDB) RegisterHTTPHandlers(methodRegistry http.Registry) {
http.Register(methodRegistry, service, "CreateTable", d.CreateTable)
http.Register(methodRegistry, service, "DescribeTable", d.DescribeTable)
http.Register(methodRegistry, service, "PutItem", d.PutItem)
http.Register(methodRegistry, service, "Scan", d.Scan)
http.Register(methodRegistry, service, "UpdateItem", d.UpdateItem)
}
Loading

0 comments on commit cc2850e

Please sign in to comment.