From f757364fc73f80ed1285c547ef54105a12d08658 Mon Sep 17 00:00:00 2001 From: "Moellering, Sascha" Date: Sun, 27 May 2018 19:11:44 +0200 Subject: [PATCH] - Added deployment logic in bootstrap verticle - Ported AWS Lambda functions to Golang - Added Service Auto Scaling for Fargate --- services/kinesis-consumer/README.md | 9 - services/kinesis-consumer/cmd/main.go | 92 ++ services/kinesis-consumer/model/structs.go | 30 + services/kinesis-consumer/pom.xml | 90 -- .../services/consumer/tracking.pb.go | 139 ++ .../consumer}/tracking.proto | 0 .../services/persistence/store_items.go | 65 + .../services/persistence/store_items_test.go | 107 ++ .../main/java/com/amazon/KinesisConsumer.java | 57 - .../com/amazon/KinesisConsumerHandler.java | 94 -- .../com/amazon/proto/TrackingEventProtos.java | 1386 ----------------- .../java/com/amazon/vo/TrackingMessage.java | 135 -- .../java/com/amazon/KinesisConsumerTest.java | 88 -- .../src/test/resources/log4j2.xml | 13 - services/redis-updater/README.md | 9 - services/redis-updater/cmd/main.go | 78 + .../model/structs.go} | 20 +- services/redis-updater/pom.xml | 92 -- .../services/persistence/store_data.go | 61 + .../services/persistence/store_data_test.go | 78 + .../main/java/com/amazon/RedisUpdater.java | 96 -- .../java/com/amazon/RedisUpdaterHandler.java | 84 - .../main/java/com/amazon/util/Constants.java | 27 - .../java/com/amazon/vo/TrackingMessage.java | 112 -- .../java/com/amazon/RedisUpdaterTest.java | 146 -- .../src/test/resources/log4j2.xml | 13 - .../reactive-vertx/.gitignore | 31 + 27 files changed, 692 insertions(+), 2460 deletions(-) delete mode 100644 services/kinesis-consumer/README.md create mode 100644 services/kinesis-consumer/cmd/main.go create mode 100644 services/kinesis-consumer/model/structs.go delete mode 100644 services/kinesis-consumer/pom.xml create mode 100644 services/kinesis-consumer/services/consumer/tracking.pb.go rename services/kinesis-consumer/{src/main/proto => services/consumer}/tracking.proto (100%) create mode 100644 services/kinesis-consumer/services/persistence/store_items.go create mode 100644 services/kinesis-consumer/services/persistence/store_items_test.go delete mode 100644 services/kinesis-consumer/src/main/java/com/amazon/KinesisConsumer.java delete mode 100644 services/kinesis-consumer/src/main/java/com/amazon/KinesisConsumerHandler.java delete mode 100644 services/kinesis-consumer/src/main/java/com/amazon/proto/TrackingEventProtos.java delete mode 100644 services/kinesis-consumer/src/main/java/com/amazon/vo/TrackingMessage.java delete mode 100644 services/kinesis-consumer/src/test/java/com/amazon/KinesisConsumerTest.java delete mode 100644 services/kinesis-consumer/src/test/resources/log4j2.xml delete mode 100644 services/redis-updater/README.md create mode 100644 services/redis-updater/cmd/main.go rename services/{kinesis-consumer/src/main/java/com/amazon/Constants.java => redis-updater/model/structs.go} (55%) delete mode 100644 services/redis-updater/pom.xml create mode 100644 services/redis-updater/services/persistence/store_data.go create mode 100644 services/redis-updater/services/persistence/store_data_test.go delete mode 100644 services/redis-updater/src/main/java/com/amazon/RedisUpdater.java delete mode 100644 services/redis-updater/src/main/java/com/amazon/RedisUpdaterHandler.java delete mode 100644 services/redis-updater/src/main/java/com/amazon/util/Constants.java delete mode 100644 services/redis-updater/src/main/java/com/amazon/vo/TrackingMessage.java delete mode 100644 services/redis-updater/src/test/java/com/amazon/RedisUpdaterTest.java delete mode 100644 services/redis-updater/src/test/resources/log4j2.xml create mode 100644 services/tracking-service/reactive-vertx/.gitignore diff --git a/services/kinesis-consumer/README.md b/services/kinesis-consumer/README.md deleted file mode 100644 index db758e9..0000000 --- a/services/kinesis-consumer/README.md +++ /dev/null @@ -1,9 +0,0 @@ -# Kinesis Consumer -This AWS Lambda function consumes data from an Amazon Kinesis Date Stream and persists the data in an Amazon DynamoDB table. - -## General concept -In this Lambda function, the invocation code is separated from the business logic for better testing. The invocation code is triggered with a maximum number of 100 Kinesis events. Protobuf is used to reduce message size and saturate the Kinesis Stream. Unwrapping the data from Protobuf to Java objects is implemented in the invocation layer (`KinesisConsumerHandler`) of the Lambda function. The objects are passed to the business logic (`KinesisConsumer`) and used to persist data in a DynamoDB table. In order to reduce cold startup time, only few additional libraries are used as dependecies. In contract to the `redis-updater`-function, it is not necessary to access resources in private subnets, so it is sufficient to use the default networking environment of Lambda. - -## Configuration - -The configuration of this Lambda-function is really simple: the only dynamic parameter that needs to be passed as an ENV-variale is the name of the DynamoDB-table. This parameter is called `TABLE_NAME`. During the creation of the architecture using Amazon CloudFormation, certain names and tags are created dynamically, so it's not possible to hardcode the name of the table. \ No newline at end of file diff --git a/services/kinesis-consumer/cmd/main.go b/services/kinesis-consumer/cmd/main.go new file mode 100644 index 0000000..35f5b35 --- /dev/null +++ b/services/kinesis-consumer/cmd/main.go @@ -0,0 +1,92 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ +package main + +import ( + "context" + "fmt" + "os" + + "github.com/aws-samples/reactive-refarch-cloudformation/services/kinesis-consumer/services/persistence" + + model "github.com/aws-samples/reactive-refarch-cloudformation/services/kinesis-consumer/model" + consumer "github.com/aws-samples/reactive-refarch-cloudformation/services/kinesis-consumer/services/consumer" + + "github.com/golang/protobuf/proto" + + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambda" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/dynamodb" +) + +var region string +var svc *dynamodb.DynamoDB +var tableName string + +func init() { + region = os.Getenv("AWS_REGION") + tableName = os.Getenv("TABLE_NAME") + + fmt.Printf("Using region %s\n", region) + fmt.Printf("Using DynamoDB table %s\n", tableName) + + // Initialize a session in us-west-2 that the SDK will use to load + // credentials from the shared credentials file ~/.a ws/credentials. + sess, err := session.NewSession(&aws.Config{ + Region: aws.String(region)}, + ) + + if err != nil { + fmt.Println("Error creating session:") + fmt.Println(err.Error()) + } + + // Create DynamoDB client + svc = dynamodb.New(sess) + fmt.Printf("DynamoDB client created") +} + +func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error { + for _, record := range kinesisEvent.Records { + kinesisRecord := record.Kinesis + dataBytes := kinesisRecord.Data + + msg := &consumer.TrackingEvent{} + if err := proto.Unmarshal(dataBytes, msg); err != nil { + fmt.Println("Got error unmarshalling event:") + fmt.Println(err.Error()) + } + + event := &model.Message{ + UserAgent: msg.UserAgent, + ProgramID: msg.Programid, + Checksum: msg.Checksum, + CustomerID: msg.CustomerId, + CustomerName: msg.CustomerName, + MessageID: msg.MessageId, + ProgramName: msg.ProgramName} + + persistence.PersistData(*svc, tableName, *event) + } + + return nil +} + +func main() { + lambda.Start(handler) +} diff --git a/services/kinesis-consumer/model/structs.go b/services/kinesis-consumer/model/structs.go new file mode 100644 index 0000000..2d84007 --- /dev/null +++ b/services/kinesis-consumer/model/structs.go @@ -0,0 +1,30 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ +package model + +// Message is an exported type that +// contains all values for a tracking message +type Message struct { + ID string `json:"id"` + UpdatedAt string `json:"updated_at"` + UserAgent string `json:"user_agent"` + ProgramID string `json:"program_id"` + Checksum string `json:"checksum"` + CustomerID int32 `json:"customer_id"` + CustomerName string `json:"customer_name"` + MessageID string `json:"message_id"` + ProgramName string `json:"program_name"` +} diff --git a/services/kinesis-consumer/pom.xml b/services/kinesis-consumer/pom.xml deleted file mode 100644 index 686454c..0000000 --- a/services/kinesis-consumer/pom.xml +++ /dev/null @@ -1,90 +0,0 @@ - - 4.0.0 - com.amazon - kinesis-consumer - jar - 1.1 - kinesis-consumer - http://maven.apache.org - - - - com.amazonaws - aws-java-sdk-dynamodb - 1.11.311 - - - - com.amazonaws - aws-lambda-java-core - 1.2.0 - - - - com.amazonaws - aws-lambda-java-events - 2.1.0 - - - - com.amazonaws - amazon-kinesis-client - 1.9.0 - - - - com.google.protobuf - protobuf-java - 3.5.1 - - - - org.testng - testng - 6.11 - test - - - - org.apache.logging.log4j - log4j-core - 2.8.2 - test - - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.7.0 - - 1.8 - 1.8 - true - true - - - - org.apache.maven.plugins - maven-shade-plugin - 2.3 - - false - - - - package - - shade - - - - - - - - diff --git a/services/kinesis-consumer/services/consumer/tracking.pb.go b/services/kinesis-consumer/services/consumer/tracking.pb.go new file mode 100644 index 0000000..7ca641c --- /dev/null +++ b/services/kinesis-consumer/services/consumer/tracking.pb.go @@ -0,0 +1,139 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: tracking.proto + +package consumer + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type TrackingEvent struct { + Programid string `protobuf:"bytes,1,opt,name=programid" json:"programid,omitempty"` + Checksum string `protobuf:"bytes,2,opt,name=checksum" json:"checksum,omitempty"` + CustomerId int32 `protobuf:"varint,3,opt,name=customer_id,json=customerId" json:"customer_id,omitempty"` + UserAgent string `protobuf:"bytes,4,opt,name=user_agent,json=userAgent" json:"user_agent,omitempty"` + ProgramName string `protobuf:"bytes,5,opt,name=program_name,json=programName" json:"program_name,omitempty"` + CustomerName string `protobuf:"bytes,6,opt,name=customer_name,json=customerName" json:"customer_name,omitempty"` + IsValid bool `protobuf:"varint,7,opt,name=is_valid,json=isValid" json:"is_valid,omitempty"` + MessageId string `protobuf:"bytes,8,opt,name=message_id,json=messageId" json:"message_id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *TrackingEvent) Reset() { *m = TrackingEvent{} } +func (m *TrackingEvent) String() string { return proto.CompactTextString(m) } +func (*TrackingEvent) ProtoMessage() {} +func (*TrackingEvent) Descriptor() ([]byte, []int) { + return fileDescriptor_tracking_e51492b0a54f9ea5, []int{0} +} +func (m *TrackingEvent) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_TrackingEvent.Unmarshal(m, b) +} +func (m *TrackingEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_TrackingEvent.Marshal(b, m, deterministic) +} +func (dst *TrackingEvent) XXX_Merge(src proto.Message) { + xxx_messageInfo_TrackingEvent.Merge(dst, src) +} +func (m *TrackingEvent) XXX_Size() int { + return xxx_messageInfo_TrackingEvent.Size(m) +} +func (m *TrackingEvent) XXX_DiscardUnknown() { + xxx_messageInfo_TrackingEvent.DiscardUnknown(m) +} + +var xxx_messageInfo_TrackingEvent proto.InternalMessageInfo + +func (m *TrackingEvent) GetProgramid() string { + if m != nil { + return m.Programid + } + return "" +} + +func (m *TrackingEvent) GetChecksum() string { + if m != nil { + return m.Checksum + } + return "" +} + +func (m *TrackingEvent) GetCustomerId() int32 { + if m != nil { + return m.CustomerId + } + return 0 +} + +func (m *TrackingEvent) GetUserAgent() string { + if m != nil { + return m.UserAgent + } + return "" +} + +func (m *TrackingEvent) GetProgramName() string { + if m != nil { + return m.ProgramName + } + return "" +} + +func (m *TrackingEvent) GetCustomerName() string { + if m != nil { + return m.CustomerName + } + return "" +} + +func (m *TrackingEvent) GetIsValid() bool { + if m != nil { + return m.IsValid + } + return false +} + +func (m *TrackingEvent) GetMessageId() string { + if m != nil { + return m.MessageId + } + return "" +} + +func init() { + proto.RegisterType((*TrackingEvent)(nil), "TrackingEvent") +} + +func init() { proto.RegisterFile("tracking.proto", fileDescriptor_tracking_e51492b0a54f9ea5) } + +var fileDescriptor_tracking_e51492b0a54f9ea5 = []byte{ + // 246 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0xd0, 0xb1, 0x4e, 0xc3, 0x30, + 0x10, 0x06, 0x60, 0xa5, 0xd0, 0x36, 0xbd, 0xb6, 0x08, 0x99, 0xc5, 0x20, 0x10, 0x01, 0x06, 0x32, + 0x75, 0xe1, 0x09, 0xa8, 0xc4, 0xd0, 0x05, 0xa1, 0x08, 0xb1, 0x46, 0x87, 0x7d, 0x0a, 0x56, 0xb1, + 0x5d, 0xd9, 0x4e, 0x07, 0x5e, 0x81, 0x97, 0x46, 0x76, 0xd2, 0xa0, 0x8e, 0xfe, 0xee, 0x97, 0xef, + 0xd7, 0xc1, 0x59, 0x70, 0x28, 0xb6, 0xca, 0x34, 0xab, 0x9d, 0xb3, 0xc1, 0xde, 0xff, 0x8e, 0x60, + 0xf9, 0xde, 0xd3, 0xcb, 0x9e, 0x4c, 0x60, 0xd7, 0x30, 0xdb, 0x39, 0xdb, 0x38, 0xd4, 0x4a, 0xf2, + 0xac, 0xc8, 0xca, 0x59, 0xf5, 0x0f, 0xec, 0x0a, 0x72, 0xf1, 0x45, 0x62, 0xeb, 0x5b, 0xcd, 0x47, + 0x69, 0x38, 0xbc, 0xd9, 0x2d, 0xcc, 0x45, 0xeb, 0x83, 0xd5, 0xe4, 0x6a, 0x25, 0xf9, 0x49, 0x91, + 0x95, 0xe3, 0x0a, 0x0e, 0xb4, 0x91, 0xec, 0x06, 0xa0, 0xf5, 0xe4, 0x6a, 0x6c, 0xc8, 0x04, 0x7e, + 0xda, 0xfd, 0x1d, 0xe5, 0x39, 0x02, 0xbb, 0x83, 0x45, 0xbf, 0xa8, 0x36, 0xa8, 0x89, 0x8f, 0x53, + 0x60, 0xde, 0xdb, 0x2b, 0x6a, 0x62, 0x0f, 0xb0, 0x1c, 0x56, 0xa4, 0xcc, 0x24, 0x65, 0x16, 0x07, + 0x4c, 0xa1, 0x4b, 0xc8, 0x95, 0xaf, 0xf7, 0xf8, 0xad, 0x24, 0x9f, 0x16, 0x59, 0x99, 0x57, 0x53, + 0xe5, 0x3f, 0xe2, 0x33, 0x36, 0xd0, 0xe4, 0x3d, 0x36, 0x14, 0x1b, 0xe6, 0x5d, 0x83, 0x5e, 0x36, + 0x72, 0xfd, 0x08, 0xe7, 0xc2, 0xea, 0x15, 0x6a, 0xfc, 0xb1, 0xa6, 0xbb, 0xd0, 0xfa, 0xe2, 0xe8, + 0x3c, 0x6f, 0xd1, 0xfc, 0xe7, 0x24, 0xcd, 0x9e, 0xfe, 0x02, 0x00, 0x00, 0xff, 0xff, 0x0e, 0xb5, + 0x9c, 0xfa, 0x4f, 0x01, 0x00, 0x00, +} diff --git a/services/kinesis-consumer/src/main/proto/tracking.proto b/services/kinesis-consumer/services/consumer/tracking.proto similarity index 100% rename from services/kinesis-consumer/src/main/proto/tracking.proto rename to services/kinesis-consumer/services/consumer/tracking.proto diff --git a/services/kinesis-consumer/services/persistence/store_items.go b/services/kinesis-consumer/services/persistence/store_items.go new file mode 100644 index 0000000..3cfd932 --- /dev/null +++ b/services/kinesis-consumer/services/persistence/store_items.go @@ -0,0 +1,65 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ +package persistence + +import ( + "fmt" + "time" + + "github.com/aws-samples/reactive-refarch-cloudformation/services/kinesis-consumer/model" + "github.com/aws/aws-sdk-go/aws" + + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" + "github.com/nu7hatch/gouuid" +) + +// PersistData is a function to persist data in DynamoDB +func PersistData(ddb dynamodb.DynamoDB, tablename string, msg model.Message) string { + uuid, err := uuid.NewV4() + + if err != nil { + fmt.Printf("Failed creating UUID %s", err) + } else { + fmt.Printf("Created UUID %s", uuid) + } + + timestamp := time.Now().Format(time.StampMicro) + + msg.ID = uuid.String() + msg.UpdatedAt = timestamp + + av, err := dynamodbattribute.MarshalMap(msg) + + if err != nil { + fmt.Println("Got error marshalling map:") + fmt.Println(err.Error()) + } + + input := &dynamodb.PutItemInput{ + Item: av, + TableName: aws.String(tablename), + } + + _, err = ddb.PutItem(input) + + if err != nil { + fmt.Println("Got error calling PutItem:") + fmt.Println(err.Error()) + } + + return msg.ID +} diff --git a/services/kinesis-consumer/services/persistence/store_items_test.go b/services/kinesis-consumer/services/persistence/store_items_test.go new file mode 100644 index 0000000..72fa3d6 --- /dev/null +++ b/services/kinesis-consumer/services/persistence/store_items_test.go @@ -0,0 +1,107 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ +package persistence + +import ( + "fmt" + "testing" + + "github.com/aws-samples/reactive-refarch-cloudformation/services/kinesis-consumer/model" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/stretchr/testify/assert" + + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" +) + +func TestPersistData(t *testing.T) { + event := prepareData() + + region := "us-east-1" + tableName := "reactive-ProgramTable-us-east-1" + + fmt.Printf("Using region %s\n", region) + fmt.Printf("Using DynamoDB table %s\n", tableName) + + // Initialize a session in us-west-2 that the SDK will use to load + // credentials from the shared credentials file ~/.a ws/credentials. + sess, err := session.NewSession(&aws.Config{ + Region: aws.String(region)}, + ) + + if err != nil { + fmt.Println("Error creating session:") + fmt.Println(err.Error()) + t.Fail() + } + + // Create DynamoDB client + svc := dynamodb.New(sess) + fmt.Printf("DynamoDB client created") + + uuid := PersistData(*svc, tableName, event) + + input := &dynamodb.GetItemInput{ + TableName: aws.String(tableName), + Key: map[string]*dynamodb.AttributeValue{ + "id": { + S: aws.String(uuid), + }, + "customer_id": { + N: aws.String("1234"), + }, + }, + } + + itemOutput, err := svc.GetItem(input) + + if err != nil { + fmt.Println("Error getting data") + fmt.Println(err.Error()) + t.Fail() + } + + item := model.Message{} + err = dynamodbattribute.UnmarshalMap(itemOutput.Item, &item) + + if err != nil { + fmt.Println("Error getting data") + fmt.Println(err.Error()) + t.Fail() + } + + assert.Equal(t, item.ProgramID, event.ProgramID, "ProgramId not the same value") + assert.Equal(t, item.Checksum, event.Checksum, "Checksum not the same value") + assert.Equal(t, item.CustomerID, event.CustomerID, "CustomerId not the same value") + assert.Equal(t, item.MessageID, event.MessageID, "MessageId not the same value") + assert.Equal(t, item.ProgramName, event.ProgramName, "ProgramName not the same value") + assert.Equal(t, item.UserAgent, event.UserAgent, "UserAgent not the same value") +} + +func prepareData() model.Message { + event := model.Message{ + MessageID: "messageId", + UserAgent: "myUserAgent", + ProgramID: "12345", + Checksum: "check123", + CustomerID: 1234, + CustomerName: "myCustomer", + ProgramName: "myProgram", + } + + return event +} diff --git a/services/kinesis-consumer/src/main/java/com/amazon/KinesisConsumer.java b/services/kinesis-consumer/src/main/java/com/amazon/KinesisConsumer.java deleted file mode 100644 index 233cf3b..0000000 --- a/services/kinesis-consumer/src/main/java/com/amazon/KinesisConsumer.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ - -package com.amazon; - -import com.amazon.vo.TrackingMessage; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.document.Item; -import com.amazonaws.services.dynamodbv2.document.PutItemOutcome; -import com.amazonaws.services.dynamodbv2.document.Table; -import com.amazonaws.services.dynamodbv2.document.spec.PutItemSpec; -import com.amazonaws.services.dynamodbv2.model.ReturnValue; - -import java.sql.Timestamp; -import java.util.UUID; - -public class KinesisConsumer { - - public String updateDynamoDb(final DynamoDB dynamoDB, final String tableName, final TrackingMessage trackingMessage) { - // Write data in DynamoDB - String id = UUID.randomUUID().toString(); - - String updatedAt = new Timestamp(System.currentTimeMillis()).toString(); - - Item item = new Item().withPrimaryKey("id", id) - .withString("updated_at", updatedAt) - .withString("checksum", trackingMessage.getChecksum()) - .withString("customer_name", trackingMessage.getCustomerName()) - .withString("message_id", trackingMessage.getMessageId()) - .withString("program_id", trackingMessage.getProgramId()) - .withString("program_name", trackingMessage.getProgramName()) - .withString("user_agent", trackingMessage.getUserAgent()) - .withInt("customer_id", trackingMessage.getCustomerId()); - - Table table = dynamoDB.getTable(tableName); - - PutItemSpec spec = new PutItemSpec(); - spec.withItem(item).withReturnValues(ReturnValue.ALL_OLD); - - table.putItem(spec); - return id; - } - -} diff --git a/services/kinesis-consumer/src/main/java/com/amazon/KinesisConsumerHandler.java b/services/kinesis-consumer/src/main/java/com/amazon/KinesisConsumerHandler.java deleted file mode 100644 index 22d4278..0000000 --- a/services/kinesis-consumer/src/main/java/com/amazon/KinesisConsumerHandler.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ - -package com.amazon; - -import com.amazon.proto.TrackingEventProtos; -import com.amazon.vo.TrackingMessage; -import com.amazonaws.regions.Region; -import com.amazonaws.regions.Regions; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.LambdaLogger; -import com.amazonaws.services.lambda.runtime.RequestHandler; -import com.amazonaws.services.lambda.runtime.events.KinesisEvent; -import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord; -import com.google.protobuf.InvalidProtocolBufferException; - -import java.nio.ByteBuffer; - -public class KinesisConsumerHandler implements RequestHandler { - - private DynamoDB dynamoDB; - - @Override - public Void handleRequest(KinesisEvent kinesisEvent, Context context) { - - LambdaLogger logger = context.getLogger(); - - if (null == dynamoDB) { - logger.log("Creating DynamodbClient ... "); - Region region = Region.getRegion(Regions.fromName(System.getenv("AWS_REGION"))); - - logger.log("Current Region: " + region.getName()); - AmazonDynamoDB client = createDynamodbClient(region); - dynamoDB = new DynamoDB(client); - } else { - logger.log("Reusing DynamodbClient"); - } - - KinesisConsumer consumer = new KinesisConsumer(); - String tableName = System.getenv(Constants.TABLE_NAME); - logger.log("Received " + kinesisEvent.getRecords().size() + " raw Event Records."); - - for (KinesisEventRecord eventRecord : kinesisEvent.getRecords()) { - // Unwrap protobuf - try { - ByteBuffer buffer = eventRecord.getKinesis().getData(); - TrackingEventProtos.TrackingEvent.Builder trackingEventBuilder = TrackingEventProtos.TrackingEvent.newBuilder(); - trackingEventBuilder.mergeFrom(buffer.array()); - TrackingEventProtos.TrackingEvent trackingEvent = trackingEventBuilder.build(); - - TrackingMessage trackingMessage = new TrackingMessage(); - - trackingMessage.setCustomerId(trackingEvent.getCustomerId()); - trackingMessage.setMessageId(trackingEvent.getMessageId()); - trackingMessage.setChecksum(trackingEvent.getChecksum()); - trackingMessage.setValid(trackingEvent.getIsValid()); - trackingMessage.setCustomerName(trackingEvent.getCustomerName()); - trackingMessage.setProgramId(trackingEvent.getProgramid()); - trackingMessage.setProgramName(trackingEvent.getProgramName()); - trackingMessage.setUserAgent(trackingEvent.getUserAgent()); - - consumer.updateDynamoDb(dynamoDB, tableName, trackingMessage); - } - - catch (InvalidProtocolBufferException exc) { - logger.log(exc.getMessage()); - } - } - - return null; - } - - private AmazonDynamoDB createDynamodbClient(final Region region) { - return AmazonDynamoDBClientBuilder.standard() - .withRegion(region.getName()) - .build(); - } -} diff --git a/services/kinesis-consumer/src/main/java/com/amazon/proto/TrackingEventProtos.java b/services/kinesis-consumer/src/main/java/com/amazon/proto/TrackingEventProtos.java deleted file mode 100644 index 9aedc16..0000000 --- a/services/kinesis-consumer/src/main/java/com/amazon/proto/TrackingEventProtos.java +++ /dev/null @@ -1,1386 +0,0 @@ -// Generated by the protocol buffer compiler. DO NOT EDIT! -// source: tracking.proto - -package com.amazon.proto; - -public final class TrackingEventProtos { - private TrackingEventProtos() {} - public static void registerAllExtensions( - com.google.protobuf.ExtensionRegistryLite registry) { - } - - public static void registerAllExtensions( - com.google.protobuf.ExtensionRegistry registry) { - registerAllExtensions( - (com.google.protobuf.ExtensionRegistryLite) registry); - } - public interface TrackingEventOrBuilder extends - // @@protoc_insertion_point(interface_extends:TrackingEvent) - com.google.protobuf.MessageOrBuilder { - - /** - * string programid = 1; - */ - java.lang.String getProgramid(); - /** - * string programid = 1; - */ - com.google.protobuf.ByteString - getProgramidBytes(); - - /** - * string checksum = 2; - */ - java.lang.String getChecksum(); - /** - * string checksum = 2; - */ - com.google.protobuf.ByteString - getChecksumBytes(); - - /** - * int32 customer_id = 3; - */ - int getCustomerId(); - - /** - * string user_agent = 4; - */ - java.lang.String getUserAgent(); - /** - * string user_agent = 4; - */ - com.google.protobuf.ByteString - getUserAgentBytes(); - - /** - * string program_name = 5; - */ - java.lang.String getProgramName(); - /** - * string program_name = 5; - */ - com.google.protobuf.ByteString - getProgramNameBytes(); - - /** - * string customer_name = 6; - */ - java.lang.String getCustomerName(); - /** - * string customer_name = 6; - */ - com.google.protobuf.ByteString - getCustomerNameBytes(); - - /** - * bool is_valid = 7; - */ - boolean getIsValid(); - - /** - * string message_id = 8; - */ - java.lang.String getMessageId(); - /** - * string message_id = 8; - */ - com.google.protobuf.ByteString - getMessageIdBytes(); - } - /** - * Protobuf type {@code TrackingEvent} - */ - public static final class TrackingEvent extends - com.google.protobuf.GeneratedMessageV3 implements - // @@protoc_insertion_point(message_implements:TrackingEvent) - TrackingEventOrBuilder { - // Use TrackingEvent.newBuilder() to construct. - private TrackingEvent(com.google.protobuf.GeneratedMessageV3.Builder builder) { - super(builder); - } - private TrackingEvent() { - programid_ = ""; - checksum_ = ""; - customerId_ = 0; - userAgent_ = ""; - programName_ = ""; - customerName_ = ""; - isValid_ = false; - messageId_ = ""; - } - - @java.lang.Override - public final com.google.protobuf.UnknownFieldSet - getUnknownFields() { - return com.google.protobuf.UnknownFieldSet.getDefaultInstance(); - } - private TrackingEvent( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - this(); - int mutable_bitField0_ = 0; - try { - boolean done = false; - while (!done) { - int tag = input.readTag(); - switch (tag) { - case 0: - done = true; - break; - default: { - if (!input.skipField(tag)) { - done = true; - } - break; - } - case 10: { - java.lang.String s = input.readStringRequireUtf8(); - - programid_ = s; - break; - } - case 18: { - java.lang.String s = input.readStringRequireUtf8(); - - checksum_ = s; - break; - } - case 24: { - - customerId_ = input.readInt32(); - break; - } - case 34: { - java.lang.String s = input.readStringRequireUtf8(); - - userAgent_ = s; - break; - } - case 42: { - java.lang.String s = input.readStringRequireUtf8(); - - programName_ = s; - break; - } - case 50: { - java.lang.String s = input.readStringRequireUtf8(); - - customerName_ = s; - break; - } - case 56: { - - isValid_ = input.readBool(); - break; - } - case 66: { - java.lang.String s = input.readStringRequireUtf8(); - - messageId_ = s; - break; - } - } - } - } catch (com.google.protobuf.InvalidProtocolBufferException e) { - throw e.setUnfinishedMessage(this); - } catch (java.io.IOException e) { - throw new com.google.protobuf.InvalidProtocolBufferException( - e).setUnfinishedMessage(this); - } finally { - makeExtensionsImmutable(); - } - } - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return com.amazon.proto.TrackingEventProtos.internal_static_TrackingEvent_descriptor; - } - - protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable - internalGetFieldAccessorTable() { - return com.amazon.proto.TrackingEventProtos.internal_static_TrackingEvent_fieldAccessorTable - .ensureFieldAccessorsInitialized( - com.amazon.proto.TrackingEventProtos.TrackingEvent.class, com.amazon.proto.TrackingEventProtos.TrackingEvent.Builder.class); - } - - public static final int PROGRAMID_FIELD_NUMBER = 1; - private volatile java.lang.Object programid_; - /** - * string programid = 1; - */ - public java.lang.String getProgramid() { - java.lang.Object ref = programid_; - if (ref instanceof java.lang.String) { - return (java.lang.String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - programid_ = s; - return s; - } - } - /** - * string programid = 1; - */ - public com.google.protobuf.ByteString - getProgramidBytes() { - java.lang.Object ref = programid_; - if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - programid_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - - public static final int CHECKSUM_FIELD_NUMBER = 2; - private volatile java.lang.Object checksum_; - /** - * string checksum = 2; - */ - public java.lang.String getChecksum() { - java.lang.Object ref = checksum_; - if (ref instanceof java.lang.String) { - return (java.lang.String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - checksum_ = s; - return s; - } - } - /** - * string checksum = 2; - */ - public com.google.protobuf.ByteString - getChecksumBytes() { - java.lang.Object ref = checksum_; - if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - checksum_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - - public static final int CUSTOMER_ID_FIELD_NUMBER = 3; - private int customerId_; - /** - * int32 customer_id = 3; - */ - public int getCustomerId() { - return customerId_; - } - - public static final int USER_AGENT_FIELD_NUMBER = 4; - private volatile java.lang.Object userAgent_; - /** - * string user_agent = 4; - */ - public java.lang.String getUserAgent() { - java.lang.Object ref = userAgent_; - if (ref instanceof java.lang.String) { - return (java.lang.String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - userAgent_ = s; - return s; - } - } - /** - * string user_agent = 4; - */ - public com.google.protobuf.ByteString - getUserAgentBytes() { - java.lang.Object ref = userAgent_; - if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - userAgent_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - - public static final int PROGRAM_NAME_FIELD_NUMBER = 5; - private volatile java.lang.Object programName_; - /** - * string program_name = 5; - */ - public java.lang.String getProgramName() { - java.lang.Object ref = programName_; - if (ref instanceof java.lang.String) { - return (java.lang.String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - programName_ = s; - return s; - } - } - /** - * string program_name = 5; - */ - public com.google.protobuf.ByteString - getProgramNameBytes() { - java.lang.Object ref = programName_; - if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - programName_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - - public static final int CUSTOMER_NAME_FIELD_NUMBER = 6; - private volatile java.lang.Object customerName_; - /** - * string customer_name = 6; - */ - public java.lang.String getCustomerName() { - java.lang.Object ref = customerName_; - if (ref instanceof java.lang.String) { - return (java.lang.String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - customerName_ = s; - return s; - } - } - /** - * string customer_name = 6; - */ - public com.google.protobuf.ByteString - getCustomerNameBytes() { - java.lang.Object ref = customerName_; - if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - customerName_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - - public static final int IS_VALID_FIELD_NUMBER = 7; - private boolean isValid_; - /** - * bool is_valid = 7; - */ - public boolean getIsValid() { - return isValid_; - } - - public static final int MESSAGE_ID_FIELD_NUMBER = 8; - private volatile java.lang.Object messageId_; - /** - * string message_id = 8; - */ - public java.lang.String getMessageId() { - java.lang.Object ref = messageId_; - if (ref instanceof java.lang.String) { - return (java.lang.String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - messageId_ = s; - return s; - } - } - /** - * string message_id = 8; - */ - public com.google.protobuf.ByteString - getMessageIdBytes() { - java.lang.Object ref = messageId_; - if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - messageId_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - - private byte memoizedIsInitialized = -1; - public final boolean isInitialized() { - byte isInitialized = memoizedIsInitialized; - if (isInitialized == 1) return true; - if (isInitialized == 0) return false; - - memoizedIsInitialized = 1; - return true; - } - - public void writeTo(com.google.protobuf.CodedOutputStream output) - throws java.io.IOException { - if (!getProgramidBytes().isEmpty()) { - com.google.protobuf.GeneratedMessageV3.writeString(output, 1, programid_); - } - if (!getChecksumBytes().isEmpty()) { - com.google.protobuf.GeneratedMessageV3.writeString(output, 2, checksum_); - } - if (customerId_ != 0) { - output.writeInt32(3, customerId_); - } - if (!getUserAgentBytes().isEmpty()) { - com.google.protobuf.GeneratedMessageV3.writeString(output, 4, userAgent_); - } - if (!getProgramNameBytes().isEmpty()) { - com.google.protobuf.GeneratedMessageV3.writeString(output, 5, programName_); - } - if (!getCustomerNameBytes().isEmpty()) { - com.google.protobuf.GeneratedMessageV3.writeString(output, 6, customerName_); - } - if (isValid_ != false) { - output.writeBool(7, isValid_); - } - if (!getMessageIdBytes().isEmpty()) { - com.google.protobuf.GeneratedMessageV3.writeString(output, 8, messageId_); - } - } - - public int getSerializedSize() { - int size = memoizedSize; - if (size != -1) return size; - - size = 0; - if (!getProgramidBytes().isEmpty()) { - size += com.google.protobuf.GeneratedMessageV3.computeStringSize(1, programid_); - } - if (!getChecksumBytes().isEmpty()) { - size += com.google.protobuf.GeneratedMessageV3.computeStringSize(2, checksum_); - } - if (customerId_ != 0) { - size += com.google.protobuf.CodedOutputStream - .computeInt32Size(3, customerId_); - } - if (!getUserAgentBytes().isEmpty()) { - size += com.google.protobuf.GeneratedMessageV3.computeStringSize(4, userAgent_); - } - if (!getProgramNameBytes().isEmpty()) { - size += com.google.protobuf.GeneratedMessageV3.computeStringSize(5, programName_); - } - if (!getCustomerNameBytes().isEmpty()) { - size += com.google.protobuf.GeneratedMessageV3.computeStringSize(6, customerName_); - } - if (isValid_ != false) { - size += com.google.protobuf.CodedOutputStream - .computeBoolSize(7, isValid_); - } - if (!getMessageIdBytes().isEmpty()) { - size += com.google.protobuf.GeneratedMessageV3.computeStringSize(8, messageId_); - } - memoizedSize = size; - return size; - } - - private static final long serialVersionUID = 0L; - @java.lang.Override - public boolean equals(final java.lang.Object obj) { - if (obj == this) { - return true; - } - if (!(obj instanceof com.amazon.proto.TrackingEventProtos.TrackingEvent)) { - return super.equals(obj); - } - com.amazon.proto.TrackingEventProtos.TrackingEvent other = (com.amazon.proto.TrackingEventProtos.TrackingEvent) obj; - - boolean result = true; - result = result && getProgramid() - .equals(other.getProgramid()); - result = result && getChecksum() - .equals(other.getChecksum()); - result = result && (getCustomerId() - == other.getCustomerId()); - result = result && getUserAgent() - .equals(other.getUserAgent()); - result = result && getProgramName() - .equals(other.getProgramName()); - result = result && getCustomerName() - .equals(other.getCustomerName()); - result = result && (getIsValid() - == other.getIsValid()); - result = result && getMessageId() - .equals(other.getMessageId()); - return result; - } - - @java.lang.Override - public int hashCode() { - if (memoizedHashCode != 0) { - return memoizedHashCode; - } - int hash = 41; - hash = (19 * hash) + getDescriptor().hashCode(); - hash = (37 * hash) + PROGRAMID_FIELD_NUMBER; - hash = (53 * hash) + getProgramid().hashCode(); - hash = (37 * hash) + CHECKSUM_FIELD_NUMBER; - hash = (53 * hash) + getChecksum().hashCode(); - hash = (37 * hash) + CUSTOMER_ID_FIELD_NUMBER; - hash = (53 * hash) + getCustomerId(); - hash = (37 * hash) + USER_AGENT_FIELD_NUMBER; - hash = (53 * hash) + getUserAgent().hashCode(); - hash = (37 * hash) + PROGRAM_NAME_FIELD_NUMBER; - hash = (53 * hash) + getProgramName().hashCode(); - hash = (37 * hash) + CUSTOMER_NAME_FIELD_NUMBER; - hash = (53 * hash) + getCustomerName().hashCode(); - hash = (37 * hash) + IS_VALID_FIELD_NUMBER; - hash = (53 * hash) + com.google.protobuf.Internal.hashBoolean( - getIsValid()); - hash = (37 * hash) + MESSAGE_ID_FIELD_NUMBER; - hash = (53 * hash) + getMessageId().hashCode(); - hash = (29 * hash) + unknownFields.hashCode(); - memoizedHashCode = hash; - return hash; - } - - public static com.amazon.proto.TrackingEventProtos.TrackingEvent parseFrom( - java.nio.ByteBuffer data) - throws com.google.protobuf.InvalidProtocolBufferException { - return PARSER.parseFrom(data); - } - public static com.amazon.proto.TrackingEventProtos.TrackingEvent parseFrom( - java.nio.ByteBuffer data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return PARSER.parseFrom(data, extensionRegistry); - } - public static com.amazon.proto.TrackingEventProtos.TrackingEvent parseFrom( - com.google.protobuf.ByteString data) - throws com.google.protobuf.InvalidProtocolBufferException { - return PARSER.parseFrom(data); - } - public static com.amazon.proto.TrackingEventProtos.TrackingEvent parseFrom( - com.google.protobuf.ByteString data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return PARSER.parseFrom(data, extensionRegistry); - } - public static com.amazon.proto.TrackingEventProtos.TrackingEvent parseFrom(byte[] data) - throws com.google.protobuf.InvalidProtocolBufferException { - return PARSER.parseFrom(data); - } - public static com.amazon.proto.TrackingEventProtos.TrackingEvent parseFrom( - byte[] data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return PARSER.parseFrom(data, extensionRegistry); - } - public static com.amazon.proto.TrackingEventProtos.TrackingEvent parseFrom(java.io.InputStream input) - throws java.io.IOException { - return com.google.protobuf.GeneratedMessageV3 - .parseWithIOException(PARSER, input); - } - public static com.amazon.proto.TrackingEventProtos.TrackingEvent parseFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return com.google.protobuf.GeneratedMessageV3 - .parseWithIOException(PARSER, input, extensionRegistry); - } - public static com.amazon.proto.TrackingEventProtos.TrackingEvent parseDelimitedFrom(java.io.InputStream input) - throws java.io.IOException { - return com.google.protobuf.GeneratedMessageV3 - .parseDelimitedWithIOException(PARSER, input); - } - public static com.amazon.proto.TrackingEventProtos.TrackingEvent parseDelimitedFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return com.google.protobuf.GeneratedMessageV3 - .parseDelimitedWithIOException(PARSER, input, extensionRegistry); - } - public static com.amazon.proto.TrackingEventProtos.TrackingEvent parseFrom( - com.google.protobuf.CodedInputStream input) - throws java.io.IOException { - return com.google.protobuf.GeneratedMessageV3 - .parseWithIOException(PARSER, input); - } - public static com.amazon.proto.TrackingEventProtos.TrackingEvent parseFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return com.google.protobuf.GeneratedMessageV3 - .parseWithIOException(PARSER, input, extensionRegistry); - } - - public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder() { - return DEFAULT_INSTANCE.toBuilder(); - } - public static Builder newBuilder(com.amazon.proto.TrackingEventProtos.TrackingEvent prototype) { - return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); - } - public Builder toBuilder() { - return this == DEFAULT_INSTANCE - ? new Builder() : new Builder().mergeFrom(this); - } - - @java.lang.Override - protected Builder newBuilderForType( - com.google.protobuf.GeneratedMessageV3.BuilderParent parent) { - Builder builder = new Builder(parent); - return builder; - } - /** - * Protobuf type {@code TrackingEvent} - */ - public static final class Builder extends - com.google.protobuf.GeneratedMessageV3.Builder implements - // @@protoc_insertion_point(builder_implements:TrackingEvent) - com.amazon.proto.TrackingEventProtos.TrackingEventOrBuilder { - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return com.amazon.proto.TrackingEventProtos.internal_static_TrackingEvent_descriptor; - } - - protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable - internalGetFieldAccessorTable() { - return com.amazon.proto.TrackingEventProtos.internal_static_TrackingEvent_fieldAccessorTable - .ensureFieldAccessorsInitialized( - com.amazon.proto.TrackingEventProtos.TrackingEvent.class, com.amazon.proto.TrackingEventProtos.TrackingEvent.Builder.class); - } - - // Construct using com.amazon.proto.TrackingEventProtos.TrackingEvent.newBuilder() - private Builder() { - maybeForceBuilderInitialization(); - } - - private Builder( - com.google.protobuf.GeneratedMessageV3.BuilderParent parent) { - super(parent); - maybeForceBuilderInitialization(); - } - private void maybeForceBuilderInitialization() { - if (com.google.protobuf.GeneratedMessageV3 - .alwaysUseFieldBuilders) { - } - } - public Builder clear() { - super.clear(); - programid_ = ""; - - checksum_ = ""; - - customerId_ = 0; - - userAgent_ = ""; - - programName_ = ""; - - customerName_ = ""; - - isValid_ = false; - - messageId_ = ""; - - return this; - } - - public com.google.protobuf.Descriptors.Descriptor - getDescriptorForType() { - return com.amazon.proto.TrackingEventProtos.internal_static_TrackingEvent_descriptor; - } - - public com.amazon.proto.TrackingEventProtos.TrackingEvent getDefaultInstanceForType() { - return com.amazon.proto.TrackingEventProtos.TrackingEvent.getDefaultInstance(); - } - - public com.amazon.proto.TrackingEventProtos.TrackingEvent build() { - com.amazon.proto.TrackingEventProtos.TrackingEvent result = buildPartial(); - if (!result.isInitialized()) { - throw newUninitializedMessageException(result); - } - return result; - } - - public com.amazon.proto.TrackingEventProtos.TrackingEvent buildPartial() { - com.amazon.proto.TrackingEventProtos.TrackingEvent result = new com.amazon.proto.TrackingEventProtos.TrackingEvent(this); - result.programid_ = programid_; - result.checksum_ = checksum_; - result.customerId_ = customerId_; - result.userAgent_ = userAgent_; - result.programName_ = programName_; - result.customerName_ = customerName_; - result.isValid_ = isValid_; - result.messageId_ = messageId_; - onBuilt(); - return result; - } - - public Builder clone() { - return (Builder) super.clone(); - } - public Builder setField( - com.google.protobuf.Descriptors.FieldDescriptor field, - Object value) { - return (Builder) super.setField(field, value); - } - public Builder clearField( - com.google.protobuf.Descriptors.FieldDescriptor field) { - return (Builder) super.clearField(field); - } - public Builder clearOneof( - com.google.protobuf.Descriptors.OneofDescriptor oneof) { - return (Builder) super.clearOneof(oneof); - } - public Builder setRepeatedField( - com.google.protobuf.Descriptors.FieldDescriptor field, - int index, Object value) { - return (Builder) super.setRepeatedField(field, index, value); - } - public Builder addRepeatedField( - com.google.protobuf.Descriptors.FieldDescriptor field, - Object value) { - return (Builder) super.addRepeatedField(field, value); - } - public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof com.amazon.proto.TrackingEventProtos.TrackingEvent) { - return mergeFrom((com.amazon.proto.TrackingEventProtos.TrackingEvent)other); - } else { - super.mergeFrom(other); - return this; - } - } - - public Builder mergeFrom(com.amazon.proto.TrackingEventProtos.TrackingEvent other) { - if (other == com.amazon.proto.TrackingEventProtos.TrackingEvent.getDefaultInstance()) return this; - if (!other.getProgramid().isEmpty()) { - programid_ = other.programid_; - onChanged(); - } - if (!other.getChecksum().isEmpty()) { - checksum_ = other.checksum_; - onChanged(); - } - if (other.getCustomerId() != 0) { - setCustomerId(other.getCustomerId()); - } - if (!other.getUserAgent().isEmpty()) { - userAgent_ = other.userAgent_; - onChanged(); - } - if (!other.getProgramName().isEmpty()) { - programName_ = other.programName_; - onChanged(); - } - if (!other.getCustomerName().isEmpty()) { - customerName_ = other.customerName_; - onChanged(); - } - if (other.getIsValid() != false) { - setIsValid(other.getIsValid()); - } - if (!other.getMessageId().isEmpty()) { - messageId_ = other.messageId_; - onChanged(); - } - onChanged(); - return this; - } - - public final boolean isInitialized() { - return true; - } - - public Builder mergeFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - com.amazon.proto.TrackingEventProtos.TrackingEvent parsedMessage = null; - try { - parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); - } catch (com.google.protobuf.InvalidProtocolBufferException e) { - parsedMessage = (com.amazon.proto.TrackingEventProtos.TrackingEvent) e.getUnfinishedMessage(); - throw e.unwrapIOException(); - } finally { - if (parsedMessage != null) { - mergeFrom(parsedMessage); - } - } - return this; - } - - private java.lang.Object programid_ = ""; - /** - * string programid = 1; - */ - public java.lang.String getProgramid() { - java.lang.Object ref = programid_; - if (!(ref instanceof java.lang.String)) { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - programid_ = s; - return s; - } else { - return (java.lang.String) ref; - } - } - /** - * string programid = 1; - */ - public com.google.protobuf.ByteString - getProgramidBytes() { - java.lang.Object ref = programid_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - programid_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - /** - * string programid = 1; - */ - public Builder setProgramid( - java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - - programid_ = value; - onChanged(); - return this; - } - /** - * string programid = 1; - */ - public Builder clearProgramid() { - - programid_ = getDefaultInstance().getProgramid(); - onChanged(); - return this; - } - /** - * string programid = 1; - */ - public Builder setProgramidBytes( - com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - checkByteStringIsUtf8(value); - - programid_ = value; - onChanged(); - return this; - } - - private java.lang.Object checksum_ = ""; - /** - * string checksum = 2; - */ - public java.lang.String getChecksum() { - java.lang.Object ref = checksum_; - if (!(ref instanceof java.lang.String)) { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - checksum_ = s; - return s; - } else { - return (java.lang.String) ref; - } - } - /** - * string checksum = 2; - */ - public com.google.protobuf.ByteString - getChecksumBytes() { - java.lang.Object ref = checksum_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - checksum_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - /** - * string checksum = 2; - */ - public Builder setChecksum( - java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - - checksum_ = value; - onChanged(); - return this; - } - /** - * string checksum = 2; - */ - public Builder clearChecksum() { - - checksum_ = getDefaultInstance().getChecksum(); - onChanged(); - return this; - } - /** - * string checksum = 2; - */ - public Builder setChecksumBytes( - com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - checkByteStringIsUtf8(value); - - checksum_ = value; - onChanged(); - return this; - } - - private int customerId_ ; - /** - * int32 customer_id = 3; - */ - public int getCustomerId() { - return customerId_; - } - /** - * int32 customer_id = 3; - */ - public Builder setCustomerId(int value) { - - customerId_ = value; - onChanged(); - return this; - } - /** - * int32 customer_id = 3; - */ - public Builder clearCustomerId() { - - customerId_ = 0; - onChanged(); - return this; - } - - private java.lang.Object userAgent_ = ""; - /** - * string user_agent = 4; - */ - public java.lang.String getUserAgent() { - java.lang.Object ref = userAgent_; - if (!(ref instanceof java.lang.String)) { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - userAgent_ = s; - return s; - } else { - return (java.lang.String) ref; - } - } - /** - * string user_agent = 4; - */ - public com.google.protobuf.ByteString - getUserAgentBytes() { - java.lang.Object ref = userAgent_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - userAgent_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - /** - * string user_agent = 4; - */ - public Builder setUserAgent( - java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - - userAgent_ = value; - onChanged(); - return this; - } - /** - * string user_agent = 4; - */ - public Builder clearUserAgent() { - - userAgent_ = getDefaultInstance().getUserAgent(); - onChanged(); - return this; - } - /** - * string user_agent = 4; - */ - public Builder setUserAgentBytes( - com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - checkByteStringIsUtf8(value); - - userAgent_ = value; - onChanged(); - return this; - } - - private java.lang.Object programName_ = ""; - /** - * string program_name = 5; - */ - public java.lang.String getProgramName() { - java.lang.Object ref = programName_; - if (!(ref instanceof java.lang.String)) { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - programName_ = s; - return s; - } else { - return (java.lang.String) ref; - } - } - /** - * string program_name = 5; - */ - public com.google.protobuf.ByteString - getProgramNameBytes() { - java.lang.Object ref = programName_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - programName_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - /** - * string program_name = 5; - */ - public Builder setProgramName( - java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - - programName_ = value; - onChanged(); - return this; - } - /** - * string program_name = 5; - */ - public Builder clearProgramName() { - - programName_ = getDefaultInstance().getProgramName(); - onChanged(); - return this; - } - /** - * string program_name = 5; - */ - public Builder setProgramNameBytes( - com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - checkByteStringIsUtf8(value); - - programName_ = value; - onChanged(); - return this; - } - - private java.lang.Object customerName_ = ""; - /** - * string customer_name = 6; - */ - public java.lang.String getCustomerName() { - java.lang.Object ref = customerName_; - if (!(ref instanceof java.lang.String)) { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - customerName_ = s; - return s; - } else { - return (java.lang.String) ref; - } - } - /** - * string customer_name = 6; - */ - public com.google.protobuf.ByteString - getCustomerNameBytes() { - java.lang.Object ref = customerName_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - customerName_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - /** - * string customer_name = 6; - */ - public Builder setCustomerName( - java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - - customerName_ = value; - onChanged(); - return this; - } - /** - * string customer_name = 6; - */ - public Builder clearCustomerName() { - - customerName_ = getDefaultInstance().getCustomerName(); - onChanged(); - return this; - } - /** - * string customer_name = 6; - */ - public Builder setCustomerNameBytes( - com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - checkByteStringIsUtf8(value); - - customerName_ = value; - onChanged(); - return this; - } - - private boolean isValid_ ; - /** - * bool is_valid = 7; - */ - public boolean getIsValid() { - return isValid_; - } - /** - * bool is_valid = 7; - */ - public Builder setIsValid(boolean value) { - - isValid_ = value; - onChanged(); - return this; - } - /** - * bool is_valid = 7; - */ - public Builder clearIsValid() { - - isValid_ = false; - onChanged(); - return this; - } - - private java.lang.Object messageId_ = ""; - /** - * string message_id = 8; - */ - public java.lang.String getMessageId() { - java.lang.Object ref = messageId_; - if (!(ref instanceof java.lang.String)) { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - messageId_ = s; - return s; - } else { - return (java.lang.String) ref; - } - } - /** - * string message_id = 8; - */ - public com.google.protobuf.ByteString - getMessageIdBytes() { - java.lang.Object ref = messageId_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - messageId_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - /** - * string message_id = 8; - */ - public Builder setMessageId( - java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - - messageId_ = value; - onChanged(); - return this; - } - /** - * string message_id = 8; - */ - public Builder clearMessageId() { - - messageId_ = getDefaultInstance().getMessageId(); - onChanged(); - return this; - } - /** - * string message_id = 8; - */ - public Builder setMessageIdBytes( - com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - checkByteStringIsUtf8(value); - - messageId_ = value; - onChanged(); - return this; - } - public final Builder setUnknownFields( - final com.google.protobuf.UnknownFieldSet unknownFields) { - return this; - } - - public final Builder mergeUnknownFields( - final com.google.protobuf.UnknownFieldSet unknownFields) { - return this; - } - - - // @@protoc_insertion_point(builder_scope:TrackingEvent) - } - - // @@protoc_insertion_point(class_scope:TrackingEvent) - private static final com.amazon.proto.TrackingEventProtos.TrackingEvent DEFAULT_INSTANCE; - static { - DEFAULT_INSTANCE = new com.amazon.proto.TrackingEventProtos.TrackingEvent(); - } - - public static com.amazon.proto.TrackingEventProtos.TrackingEvent getDefaultInstance() { - return DEFAULT_INSTANCE; - } - - private static final com.google.protobuf.Parser - PARSER = new com.google.protobuf.AbstractParser() { - public TrackingEvent parsePartialFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return new TrackingEvent(input, extensionRegistry); - } - }; - - public static com.google.protobuf.Parser parser() { - return PARSER; - } - - @java.lang.Override - public com.google.protobuf.Parser getParserForType() { - return PARSER; - } - - public com.amazon.proto.TrackingEventProtos.TrackingEvent getDefaultInstanceForType() { - return DEFAULT_INSTANCE; - } - - } - - private static final com.google.protobuf.Descriptors.Descriptor - internal_static_TrackingEvent_descriptor; - private static final - com.google.protobuf.GeneratedMessageV3.FieldAccessorTable - internal_static_TrackingEvent_fieldAccessorTable; - - public static com.google.protobuf.Descriptors.FileDescriptor - getDescriptor() { - return descriptor; - } - private static com.google.protobuf.Descriptors.FileDescriptor - descriptor; - static { - java.lang.String[] descriptorData = { - "\n\016tracking.proto\"\260\001\n\rTrackingEvent\022\021\n\tpr" + - "ogramid\030\001 \001(\t\022\020\n\010checksum\030\002 \001(\t\022\023\n\013custo" + - "mer_id\030\003 \001(\005\022\022\n\nuser_agent\030\004 \001(\t\022\024\n\014prog" + - "ram_name\030\005 \001(\t\022\025\n\rcustomer_name\030\006 \001(\t\022\020\n" + - "\010is_valid\030\007 \001(\010\022\022\n\nmessage_id\030\010 \001(\tB\'\n\020c" + - "om.amazon.protoB\023TrackingEventProtosb\006pr" + - "oto3" - }; - com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = - new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { - public com.google.protobuf.ExtensionRegistry assignDescriptors( - com.google.protobuf.Descriptors.FileDescriptor root) { - descriptor = root; - return null; - } - }; - com.google.protobuf.Descriptors.FileDescriptor - .internalBuildGeneratedFileFrom(descriptorData, - new com.google.protobuf.Descriptors.FileDescriptor[] { - }, assigner); - internal_static_TrackingEvent_descriptor = - getDescriptor().getMessageTypes().get(0); - internal_static_TrackingEvent_fieldAccessorTable = new - com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( - internal_static_TrackingEvent_descriptor, - new java.lang.String[] { "Programid", "Checksum", "CustomerId", "UserAgent", "ProgramName", "CustomerName", "IsValid", "MessageId", }); - } - - // @@protoc_insertion_point(outer_class_scope) -} diff --git a/services/kinesis-consumer/src/main/java/com/amazon/vo/TrackingMessage.java b/services/kinesis-consumer/src/main/java/com/amazon/vo/TrackingMessage.java deleted file mode 100644 index 9f7fa09..0000000 --- a/services/kinesis-consumer/src/main/java/com/amazon/vo/TrackingMessage.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ - -package com.amazon.vo; - -public class TrackingMessage { - private String userAgent; - private String programId; - private String programName; - private String checksum; - private Integer customerId; - private String customerName; - private String messageId; - private boolean isValid; - - public TrackingMessage() {} - - public TrackingMessage(String userAgent, String programId, String checksum, Integer customerId, - String customerName, boolean isValid, String messageId, String programName) { - this.userAgent = userAgent; - this.programId = programId; - this.checksum = checksum; - this.customerId = customerId; - this.customerName = customerName; - this.isValid = isValid; - this.messageId = messageId; - this.programName = programName; - } - - public String getUserAgent() { - return userAgent; - } - - public void setUserAgent(String userAgent) { - this.userAgent = userAgent; - } - - public String getProgramId() { - return programId; - } - - public void setProgramId(String programId) { - this.programId = programId; - } - - public String getChecksum() { - return checksum; - } - - public void setChecksum(String checksum) { - this.checksum = checksum; - } - - public Integer getCustomerId() { - return customerId; - } - - public void setCustomerId(Integer customerId) { - this.customerId = customerId; - } - - public String getCustomerName() { - return customerName; - } - - public void setCustomerName(String customerName) { - this.customerName = customerName; - } - - public boolean isValid() { - return isValid; - } - - public void setValid(boolean valid) { - isValid = valid; - } - - public String getMessageId() { - return messageId; - } - - public void setMessageId(String messageId) { - this.messageId = messageId; - } - - public String getProgramName() { - return programName; - } - - public void setProgramName(String programName) { - this.programName = programName; - } - - @Override - public String toString() { - return "TrackingMessage{" + - "userAgent='" + userAgent + '\'' + - ", programId='" + programId + '\'' + - ", programName='" + programName + '\'' + - ", checksum='" + checksum + '\'' + - ", customerId=" + customerId + - ", customerName='" + customerName + '\'' + - ", messageId='" + messageId + '\'' + - ", isValid=" + isValid + - '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TrackingMessage that = (TrackingMessage) o; - - return getProgramId() != null ? getProgramId().equals(that.getProgramId()) : that.getProgramId() == null; - } - - @Override - public int hashCode() { - return getProgramId() != null ? getProgramId().hashCode() : 0; - } -} diff --git a/services/kinesis-consumer/src/test/java/com/amazon/KinesisConsumerTest.java b/services/kinesis-consumer/src/test/java/com/amazon/KinesisConsumerTest.java deleted file mode 100644 index 3a24f05..0000000 --- a/services/kinesis-consumer/src/test/java/com/amazon/KinesisConsumerTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ - -package com.amazon; - -import com.amazon.vo.TrackingMessage; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.regions.Regions; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; -import com.amazonaws.services.dynamodbv2.document.*; -import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -public class KinesisConsumerTest { - - private static final Logger LOGGER = LogManager.getLogger(KinesisConsumerTest.class); - private AmazonDynamoDB client; - - @BeforeClass - public static void before() throws Exception { - } - - @AfterClass - public static void after() throws Exception { - - } - - private static TrackingMessage prepareData() { - TrackingMessage msg = new TrackingMessage(); - msg.setMessageId("messageId"); - msg.setUserAgent("myUserAgent"); - msg.setProgramId("12345"); - msg.setProgramName("myProgram"); - msg.setCustomerName("myCustomer"); - msg.setCustomerId(1234); - msg.setChecksum("check123"); - msg.setValid(true); - - return msg; - } - - @Test - public void writeToDynamoDb() { - - client = AmazonDynamoDBClientBuilder.standard().withRegion(Regions.EU_WEST_1) - .withCredentials(new DefaultAWSCredentialsProviderChain()) - .build(); - DynamoDB dynamoDB = new DynamoDB(client); - - // Write data to local DynamoDB - TrackingMessage trackingMessage = prepareData(); - KinesisConsumer kinesisConsumer = new KinesisConsumer(); - String id = kinesisConsumer.updateDynamoDb(dynamoDB, "test_table", trackingMessage); - - LOGGER.info("id " + id + " stored"); - Table table = dynamoDB.getTable("test_table"); - - Item item = table.getItem("id", id); - - LOGGER.info(item); - Assert.assertNotNull(item); - Assert.assertEquals(item.getString("program_id"), trackingMessage.getProgramId()); - - DeleteItemSpec deleteItemSpec = new DeleteItemSpec() - .withPrimaryKey(new PrimaryKey("id", id)); - DeleteItemOutcome deleteItemOutcome = table.deleteItem(deleteItemSpec); - LOGGER.info("DeleteItemOutcome: " + deleteItemOutcome); - } -} diff --git a/services/kinesis-consumer/src/test/resources/log4j2.xml b/services/kinesis-consumer/src/test/resources/log4j2.xml deleted file mode 100644 index 32c8f6b..0000000 --- a/services/kinesis-consumer/src/test/resources/log4j2.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/services/redis-updater/README.md b/services/redis-updater/README.md deleted file mode 100644 index 479a013..0000000 --- a/services/redis-updater/README.md +++ /dev/null @@ -1,9 +0,0 @@ -# Redis Updater -This AWS Lambda function consumes data from an Amazon Kinesis Data Stream, persists the data in Redis, and notifies subscribers of data changes. - -## General concept -In this Lambda function, the invocation code is separated from the business logic for better testing. The invocation code is triggered with a maximum of 100 Kinesis events. JSON is used as data format to send data using Kinesis. Unwrapping and conversion of the data from JSON to a POJO is implemented in the invocation layer (`RedisUpdaterHandler`) of the Lambda function. The objects are passed to the business logic (`RedisUpdater`) and used to persist data in Redis and to notify subscribers of data changes. In order to reduce cold startup time, only a few additional libraries are used a dependencies. For this Lambda-function, it is necessary to access the Amazon ElastiCache cluster in private subnets inside a VPC. In this case, it is necessary to provide additional VPC-specific configuration information that includes VPC subnet IDs and security group IDs. AWS Lambda uses this information to set up elastic network interfaces (ENIs) that enables the function to connect securely to other resources within the private VPC. - -## Configuration - -The configuration of this Lambda-function is really simple: the only dynamic parameter that needs to be passed as an ENV-variale is the endpoint of the ElastiCache-cluster. This parameter is called `REDIS_HOST`. During the creation of the architecture using Amazon CloudFormation, certain names and tags are created dynamically, so it's not possible to hardcode the endpoint. \ No newline at end of file diff --git a/services/redis-updater/cmd/main.go b/services/redis-updater/cmd/main.go new file mode 100644 index 0000000..6bef4a7 --- /dev/null +++ b/services/redis-updater/cmd/main.go @@ -0,0 +1,78 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ +package main + +import ( + "context" + "fmt" + "os" + + "encoding/json" + + model "github.com/aws-samples/reactive-refarch-cloudformation/services/redis-updater/model" + "github.com/aws-samples/reactive-refarch-cloudformation/services/redis-updater/services/persistence" + + "github.com/go-redis/redis" + + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambda" +) + +var region string +var redisHost string +var redisPort string +var redisChannel string +var redisClient *redis.Client + +func init() { + region = os.Getenv("AWS_REGION") + redisHost = os.Getenv("REDIS_HOST") + redisPort = os.Getenv("REDIS_PORT") + redisChannel = os.Getenv("REDIS_CHANNEL") + + fmt.Printf("Using region %s\n", region) + fmt.Printf("Using Redis host %s\n", redisHost) + fmt.Printf("Using Redis port %s\n", redisPort) + + redisClient = redis.NewClient(&redis.Options{ + Addr: redisHost + ":" + redisPort, + DB: 0, // use default DB + }) +} + +func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error { + for _, record := range kinesisEvent.Records { + kinesisRecord := record.Kinesis + dataBytes := kinesisRecord.Data + dataText := string(dataBytes) + + event := model.Message{} + + if err := json.Unmarshal([]byte(dataText), &event); err != nil { + fmt.Println("Error unmarshalling data:") + fmt.Println(err.Error()) + } + + persistence.PersistData(event, *redisClient) + persistence.NotifySubscribers(event, *redisClient, redisChannel) + } + + return nil +} + +func main() { + lambda.Start(handler) +} diff --git a/services/kinesis-consumer/src/main/java/com/amazon/Constants.java b/services/redis-updater/model/structs.go similarity index 55% rename from services/kinesis-consumer/src/main/java/com/amazon/Constants.java rename to services/redis-updater/model/structs.go index 9b8e088..8aa7b55 100644 --- a/services/kinesis-consumer/src/main/java/com/amazon/Constants.java +++ b/services/redis-updater/model/structs.go @@ -1,5 +1,5 @@ /* - * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -13,13 +13,15 @@ * permissions and limitations under the License. * */ +package model -package com.amazon; - -public final class Constants { - - private Constants() { - } - - public static final String TABLE_NAME = "TABLE_NAME"; +// Message is an exported type that +// contains all values for a tracking message +type Message struct { + ProgramID string `json:"programId"` + Checksum string `json:"checksum"` + CustomerID int32 `json:"customerId"` + CustomerName string `json:"customerName"` + ProgramName string `json:"programName"` + IsValid bool `json:"valid"` } diff --git a/services/redis-updater/pom.xml b/services/redis-updater/pom.xml deleted file mode 100644 index 6eb83dd..0000000 --- a/services/redis-updater/pom.xml +++ /dev/null @@ -1,92 +0,0 @@ - - 4.0.0 - com.amazon - redis-updater - jar - 1.1 - redis-updater - http://maven.apache.org - - - 1.8 - 1.8 - - - - - - com.amazonaws - aws-lambda-java-core - 1.2.0 - - - - com.amazonaws - aws-lambda-java-events - 2.1.0 - - - - com.amazonaws - amazon-kinesis-client - 1.9.0 - - - - redis.clients - jedis - 2.9.0 - jar - compile - - - - org.testng - testng - 6.11 - test - - - - org.apache.logging.log4j - log4j-core - 2.8.2 - test - - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.7.0 - - 1.8 - 1.8 - true - true - - - - org.apache.maven.plugins - maven-shade-plugin - 2.3 - - false - - - - package - - shade - - - - - - - - diff --git a/services/redis-updater/services/persistence/store_data.go b/services/redis-updater/services/persistence/store_data.go new file mode 100644 index 0000000..89ef3b9 --- /dev/null +++ b/services/redis-updater/services/persistence/store_data.go @@ -0,0 +1,61 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ +package persistence + +import ( + "encoding/json" + "fmt" + + "github.com/aws-samples/reactive-refarch-cloudformation/services/redis-updater/model" + "github.com/fatih/structs" + "github.com/go-redis/redis" +) + +// PersistData stores data in Redis +func PersistData(msg model.Message, client redis.Client) { + + values := structs.Map(msg) + status := client.HMSet(msg.ProgramID, values) + if status.Err() != nil { + fmt.Println("Error writing data:") + fmt.Println(status.Err().Error()) + } + + msgBytes, err := json.Marshal(msg) + + if err != nil { + fmt.Println("Error marshalling data:") + fmt.Println(err.Error()) + } + + msgString := string(msgBytes) + fmt.Println("Persisting data: " + msgString) +} + +// NotifySubscribers notifies all subscribers of data changes +func NotifySubscribers(msg model.Message, client redis.Client, channel string) { + + msgBytes, err := json.Marshal(msg) + + if err != nil { + fmt.Println("Error marshalling data:") + fmt.Println(err.Error()) + } + + msgString := string(msgBytes) + client.Publish(channel, msgString) + fmt.Println("Publishing to subscribers: " + msgString) +} diff --git a/services/redis-updater/services/persistence/store_data_test.go b/services/redis-updater/services/persistence/store_data_test.go new file mode 100644 index 0000000..abc2423 --- /dev/null +++ b/services/redis-updater/services/persistence/store_data_test.go @@ -0,0 +1,78 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ +package persistence + +import ( + "fmt" + "strconv" + "testing" + + "github.com/aws-samples/reactive-refarch-cloudformation/services/redis-updater/model" + "github.com/go-redis/redis" + "github.com/stretchr/testify/assert" +) + +func TestPersistData(t *testing.T) { + + redisHost := "localhost" + redisPort := "6379" + + redisClient := redis.NewClient(&redis.Options{ + Addr: redisHost + ":" + redisPort, + DB: 0, // use default DB + }) + + event := prepareData() + + PersistData(event, *redisClient) + + fmt.Println("Using key " + event.ProgramID) + retVal := redisClient.HGetAll(event.ProgramID).Val() + + custID, err := strconv.ParseInt(retVal["CustomerID"], 10, 32) + cID := int32(custID) + + assert.Equal(t, retVal["ProgramID"], event.ProgramID, "ProgramId not the same value") + assert.Equal(t, retVal["Checksum"], event.Checksum, "Checksum not the same value") + assert.Equal(t, cID, event.CustomerID, "CustomerID not the same value") + assert.Equal(t, retVal["CustomerName"], event.CustomerName, "CustomerName not the same value") + assert.Equal(t, retVal["ProgramName"], event.ProgramName, "ProgramName not the same value") + + pubSub := redisClient.Subscribe("channelTest") + defer pubSub.Close() + + NotifySubscribers(event, *redisClient, "channelTest") + + msg, err := pubSub.Receive() + + if err != nil { + panic(err) + } + + fmt.Println(msg) +} + +func prepareData() model.Message { + event := model.Message{ + ProgramID: "programId", + Checksum: "checksum", + CustomerID: 1234, + CustomerName: "customerName", + ProgramName: "programName", + IsValid: true} + + return event +} diff --git a/services/redis-updater/src/main/java/com/amazon/RedisUpdater.java b/services/redis-updater/src/main/java/com/amazon/RedisUpdater.java deleted file mode 100644 index f8c8c3f..0000000 --- a/services/redis-updater/src/main/java/com/amazon/RedisUpdater.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ - -package com.amazon; - -import com.amazon.util.Constants; -import com.amazon.vo.TrackingMessage; -import com.amazonaws.services.lambda.runtime.LambdaLogger; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import redis.clients.jedis.Jedis; - -import java.io.IOException; -import java.util.Map; - -public class RedisUpdater { - - private static final ObjectMapper MAPPER = new ObjectMapper(); - - public void updateRedisData(final TrackingMessage trackingMessage, final Jedis jedis, final LambdaLogger logger) { - try { - - ObjectMapper mapper = new ObjectMapper(); - String jsonString = mapper.writeValueAsString(trackingMessage); - - this.log("Updating Redis with object " + jsonString, logger); - - Map map = marshal(jsonString); - String statusCode = jedis.hmset(trackingMessage.getProgramId(), map); - - this.log("Status: " + statusCode, logger); - } - catch (Exception exc) { - if (null == logger) - exc.printStackTrace(); - else - logger.log(exc.getMessage()); - } - } - - private void log(String message, LambdaLogger logger) { - if (null == logger) - System.out.println(message); - else - logger.log(message); - } - - public void updateRedisData(final TrackingMessage trackingMessage, final Jedis jedis) { - this.updateRedisData(trackingMessage, jedis, null); - } - - public void notifySubscribers(final TrackingMessage trackingMessage, final Jedis jedis) { - this.notifySubscribers(trackingMessage, jedis, null); - } - - public void notifySubscribers(final TrackingMessage trackingMessage, final Jedis jedis, final LambdaLogger logger) { - try { - - ObjectMapper mapper = new ObjectMapper(); - String jsonString = mapper.writeValueAsString(trackingMessage); - - this.log("Sending data " + jsonString + " to " + Constants.REDIS_PUBSUB_CHANNEL, logger); - jedis.publish(Constants.REDIS_PUBSUB_CHANNEL, jsonString); - } - - catch (final IOException e) { - log(e.getMessage(), logger); - } - } - - private Map marshal(final String jsonString) { - - try { - Map map = MAPPER.readValue(jsonString, new TypeReference>() { - }); - return map; - } catch (final IOException e) { - e.printStackTrace(); - } - - return null; - } -} diff --git a/services/redis-updater/src/main/java/com/amazon/RedisUpdaterHandler.java b/services/redis-updater/src/main/java/com/amazon/RedisUpdaterHandler.java deleted file mode 100644 index 0d9af7c..0000000 --- a/services/redis-updater/src/main/java/com/amazon/RedisUpdaterHandler.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ - -package com.amazon; - -import com.amazon.vo.TrackingMessage; -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.LambdaLogger; -import com.amazonaws.services.lambda.runtime.RequestHandler; -import com.amazonaws.services.lambda.runtime.events.KinesisEvent; -import com.fasterxml.jackson.databind.ObjectMapper; -import redis.clients.jedis.Jedis; - -import java.nio.ByteBuffer; -import java.nio.charset.Charset; - -import static com.amazon.util.Constants.REDIS_HOST; -import static com.amazon.util.Constants.REDIS_PORT; - -public class RedisUpdaterHandler implements RequestHandler { - - private Jedis jedis; - private Charset charset = Charset.forName("UTF-8"); - private ObjectMapper mapper = new ObjectMapper(); - - @Override - public Void handleRequest(KinesisEvent kinesisEvent, Context context) { - - RedisUpdater redisUpdater = new RedisUpdater(); - LambdaLogger logger = context.getLogger(); - logger.log("Received " + kinesisEvent.getRecords().size() + " raw Event Records."); - - for (KinesisEvent.KinesisEventRecord eventRecord : kinesisEvent.getRecords()) { - // Update Redis - if (null == jedis || !jedis.isConnected()) { - logger.log("Connection to Redis is closed, trying to reconnect ... "); - - if (System.getenv(REDIS_HOST) == null) { - logger.log("Not Redis host specified"); - return null; - } - String redisHost = System.getenv(REDIS_HOST); - int redisPort = System.getenv(REDIS_PORT) == null ? 6379 : Integer.parseInt(System.getenv(REDIS_PORT)); - - logger.log("Connection to " + redisHost); - jedis = new Jedis(redisHost, redisPort); - } - - ByteBuffer kinesisData = eventRecord.getKinesis().getData(); - String textData = charset.decode(kinesisData).toString(); - - try { - TrackingMessage trackingMessage = mapper.readValue(textData, TrackingMessage.class); - - redisUpdater.updateRedisData(trackingMessage, jedis, logger); - redisUpdater.notifySubscribers(trackingMessage, jedis, logger); - } - - catch (Exception exc) { - if (null == logger) - exc.printStackTrace(); - else - logger.log(exc.getMessage()); - } - } - - return null; - } - - -} diff --git a/services/redis-updater/src/main/java/com/amazon/util/Constants.java b/services/redis-updater/src/main/java/com/amazon/util/Constants.java deleted file mode 100644 index c3d97c2..0000000 --- a/services/redis-updater/src/main/java/com/amazon/util/Constants.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ - -package com.amazon.util; - -public final class Constants { - - private Constants() { - } - - public final static String REDIS_HOST = "REDIS_HOST"; - public final static String REDIS_PORT = "REDIS_PORT"; - public final static String REDIS_PUBSUB_CHANNEL = "channel1"; -} diff --git a/services/redis-updater/src/main/java/com/amazon/vo/TrackingMessage.java b/services/redis-updater/src/main/java/com/amazon/vo/TrackingMessage.java deleted file mode 100644 index 0fb56e0..0000000 --- a/services/redis-updater/src/main/java/com/amazon/vo/TrackingMessage.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ - -package com.amazon.vo; - -public class TrackingMessage { - private String programId; - private String programName; - private String checksum; - private Integer customerId; - private String customerName; - private boolean isValid; - - public TrackingMessage() {} - - public TrackingMessage(String programId, String checksum, Integer customerId, - String customerName, boolean isValid, String programName) { - this.programId = programId; - this.checksum = checksum; - this.customerId = customerId; - this.customerName = customerName; - this.isValid = isValid; - this.programName = programName; - } - - public String getProgramId() { - return programId; - } - - public void setProgramId(String programId) { - this.programId = programId; - } - - public String getChecksum() { - return checksum; - } - - public void setChecksum(String checksum) { - this.checksum = checksum; - } - - public Integer getCustomerId() { - return customerId; - } - - public void setCustomerId(Integer customerId) { - this.customerId = customerId; - } - - public String getCustomerName() { - return customerName; - } - - public void setCustomerName(String customerName) { - this.customerName = customerName; - } - - public boolean isValid() { - return isValid; - } - - public void setValid(boolean valid) { - isValid = valid; - } - - public String getProgramName() { - return programName; - } - - public void setProgramName(String programName) { - this.programName = programName; - } - - @Override - public String toString() { - return "TrackingMessage{" + - ", programId='" + programId + '\'' + - ", programName='" + programName + '\'' + - ", checksum='" + checksum + '\'' + - ", customerId=" + customerId + - ", customerName='" + customerName + '\'' + - '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TrackingMessage that = (TrackingMessage) o; - - return getProgramId() != null ? getProgramId().equals(that.getProgramId()) : that.getProgramId() == null; - } - - @Override - public int hashCode() { - return getProgramId() != null ? getProgramId().hashCode() : 0; - } -} diff --git a/services/redis-updater/src/test/java/com/amazon/RedisUpdaterTest.java b/services/redis-updater/src/test/java/com/amazon/RedisUpdaterTest.java deleted file mode 100644 index 44538d4..0000000 --- a/services/redis-updater/src/test/java/com/amazon/RedisUpdaterTest.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ - -package com.amazon; - -import com.amazon.vo.TrackingMessage; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPubSub; - -import java.util.Map; - -import static com.amazon.util.Constants.REDIS_HOST; -import static com.amazon.util.Constants.REDIS_PORT; - -public class RedisUpdaterTest { - - private static final Logger LOGGER = LogManager.getLogger(RedisUpdaterTest.class); - private static Jedis jedis, subscriberJedis; - - @BeforeClass - public static void before() throws Exception { - String redisHost = System.getenv(REDIS_HOST) == null ? "localhost" : System.getenv(REDIS_HOST); - int redisPort = System.getenv(REDIS_PORT) == null ? 6379 : Integer.getInteger(System.getenv(REDIS_PORT)); - - jedis = new Jedis(redisHost, redisPort); - jedis.connect(); - - subscriberJedis = new Jedis(redisHost, redisPort); - - new Thread(() -> { - try { - subscriberJedis.subscribe(setupSubscriber(), "com.amazon.reactive"); - } catch (Exception e) { - e.printStackTrace(); - } - }).start(); - - - LOGGER.info("Using Redis Host " + redisHost); - LOGGER.info("Using Redis Port " + redisPort); - } - - private static JedisPubSub setupSubscriber() { - final JedisPubSub jedisPubSub = new JedisPubSub() { - @Override - public void onUnsubscribe(String channel, int subscribedChannels) { - System.out.println("onUnsubscribe"); - } - - @Override - public void onSubscribe(String channel, int subscribedChannels) { - System.out.println("onSubscribe"); - } - - @Override - public void onPUnsubscribe(String pattern, int subscribedChannels) { - } - - @Override - public void onPSubscribe(String pattern, int subscribedChannels) { - } - - @Override - public void onPMessage(String pattern, String channel, String message) { - } - - @Override - public void onMessage(String channel, String message) { - System.out.println("Message received"); - System.out.println("Message: " + message); - } - }; - - return jedisPubSub; - } - - @AfterClass - public static void after() throws Exception { - // Delete data from Redis - - Thread.sleep(1000); - deleteDataFromRedis(prepareData()); - - if (null != jedis) - jedis.close(); - } - - private static TrackingMessage prepareData() { - TrackingMessage msg = new TrackingMessage(); - msg.setProgramId("12345"); - msg.setProgramName("program1"); - msg.setCustomerName("myCustomer"); - msg.setCustomerId(1234); - msg.setChecksum("check123"); - msg.setValid(true); - - return msg; - } - - private static void deleteDataFromRedis(final TrackingMessage trackingMessage) { - jedis.del(trackingMessage.getProgramId()); - } - - @Test - public void writeToRedisTest() { - try { - TrackingMessage testMessage = prepareData(); - - RedisUpdater redisUpdater = new RedisUpdater(); - redisUpdater.updateRedisData(testMessage, jedis); - redisUpdater.notifySubscribers(testMessage, jedis); - - Map resultMap = jedis.hgetAll(testMessage.getProgramId()); - - ObjectMapper mapper = new ObjectMapper(); - String json = mapper.writeValueAsString(resultMap); - TrackingMessage resultMessage = mapper.readValue(json, TrackingMessage.class); - Assert.assertNotNull(resultMessage); - Assert.assertEquals(resultMessage, testMessage); - } - - catch (Exception exc) { - exc.printStackTrace(); - } - } -} diff --git a/services/redis-updater/src/test/resources/log4j2.xml b/services/redis-updater/src/test/resources/log4j2.xml deleted file mode 100644 index 35a6a3c..0000000 --- a/services/redis-updater/src/test/resources/log4j2.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/services/tracking-service/reactive-vertx/.gitignore b/services/tracking-service/reactive-vertx/.gitignore new file mode 100644 index 0000000..ccec225 --- /dev/null +++ b/services/tracking-service/reactive-vertx/.gitignore @@ -0,0 +1,31 @@ +# Created by .ignore support plugin (hsz.mobi) +### Java template +# Compiled class file +*.class +target/* +*.iml +.idea/* + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +*.war +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +.idea/.name +.idea/encodings.xml +.idea/misc.xml +.idea/workspace.xml +target/