Skip to content

Commit

Permalink
Merge pull request #9 from AccelByte/SOCP-520-add-groupid-in-consumer
Browse files Browse the repository at this point in the history
SOCP-520 implement subscribe with groupid, add validation and test
  • Loading branch information
anggorodewanto authored Sep 25, 2019
2 parents 9740941 + 13d2095 commit eb875e3
Show file tree
Hide file tree
Showing 15 changed files with 929 additions and 296 deletions.
118 changes: 72 additions & 46 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ go get -u github.com/AccelByte/eventstream-go-sdk
eventstream "github.com/AccelByte/eventstream-go-sdk"
```

To create a new event stream client, use this function:

```go
client, err := eventstream.NewClient(prefix, stream, brokers, config)
```
``NewClient`` requires 4 parameters :
* prefix : Topic prefix from client (string)
* stream : Stream name. e.g. kafka, stdout, none (string)
* brokers : List of kafka broker (array of string)
* config : Custom broker configuration from client.
This is optional and only uses the first arguments. (variadic *BrokerConfig)


## Supported Stream
Currently event stream are supported by these stream:
Expand All @@ -26,85 +38,99 @@ Publish and subscribe an event to / from Kafka stream.

currently compatible with golang version from 1.12+ and Kafka versions from 0.10.1.0 to 2.1.0.

To create a new Kafka stream client, use this function:
```go
client, err := NewKafkaClient(brokers []string, prefix string, config ...*KafkaConfig)
```
``NewKafkaClient`` requires 3 parameters :
* brokers : List of kafka broker (array of string)
* prefix : Topic prefix from client (string)
* config : Custom kafka configuration from client.
This is optional and only uses the first arguments. (variadic KafkaConfig)
To create a kafka stream client, just pass the stream parameter with `kafka`.

#### Custom Configuration
SDK support with custom configuration for kafka stream, that is :

* DialTimeout : Timeout duration during connecting to kafka broker (time.Duration)
* ReadTimeout : Timeout duration during consume topic from kafka broker (time.Duration)
* WriteTimeout : Timeout duration during publish event to kafka broker (time.Duration)


#### Publish
Publish or sent an event into kafka stream. Client able to publish an event into single or multiple topic.
Publish support with exponential backoff retry. (max 3x)
### Stdout Stream
This stream is for testing purpose. This will print the event in stdout. It should not be used in production since this
will print unnecessary log.

To create a stdout stream client, just pass the stream parameter with `stdout`.

### Blackhole
This is used when client don't want the service to send event data to anywhere.

To create a blackhole client, just pass the stream parameter with `none`.

## Publish Subscribe Event

### Publish
Publish or sent an event into stream. Client able to publish an event into single or multiple topic.
Publish to `kafka` stream support with exponential backoff retry. (max 3x)

To publish an event, use this function:
```go
client.Publish(
err := client.Publish(
NewPublish().
Topic(TopicName).
EventName(EventName).
Namespace(Namespace).
ClientID(ClientID).
UserID(UserID).
SessionID(SessionID).
TraceID(TraceID).
Context(Context).
Version(Version).
Payload(Payload))
```

#### Subscribe
To subscribe an event from specific topic, client should be register a callback function that executed once event received.
A callback function has specific topic and event name.
#### Parameter
* Topic : List of topic / channel. (variadic string - alphaNumeric(256) - Required)
* EventName : Event name. (string - alphaNumeric(256) - Required)
* Namespace : Event namespace. (string - alphaNumeric(256) - Required)
* ClientID : Publisher client ID. (string - UUID v4 without Hyphens)
* UserID : Publisher user ID. (string - UUID v4 without Hyphens)
* SessionID : Publisher session ID. (string - UUID v4 without Hyphens)
* TraceID : Trace ID. (string - UUID v4 without Hyphens)
* Context : Golang context. (context - default: context.background)
* Version : Version of schema. (string - default: `0.1.0`)
* Payload : Additional attribute. (map[string]interface{})

### Subscribe
To subscribe an event from specific topic in stream, client should be register a callback function that executed once event received.
A callback aimed towards specific topic and event name.

To subscribe an event, use this function:
```go
client.Register(
err := client.Register(
NewSubscribe().
Topic(topicName).
EventName(mockEvent.EventName).
GroupID(groupID).
Context(Context).
Callback(func(event *Event, err error) {}))
```

Note: Callback function should be ``func(event *Event, err error){}``. ``event`` is object that store event message
and ``err`` is an error that happen when consume the message.

#### Custom Configuration
SDK support with custom configuration for kafka stream, that is :

* DialTimeout : Timeout duration during connecting to kafka broker (time.Duration)
* ReadTimeout : Timeout duration during consume topic from kafka broker (time.Duration)
* WriteTimeout : Timeout duration during publish event to kafka broker (time.Duration)
#### Parameter
* Topic : Subscribed topic. (string - alphaNumeric(256) - Required)
* EventName : Event name. (string - alphaNumeric(256) - Required)
* Namespace : Event namespace. (string - alphaNumeric(256) - Required)
* GroupID : Message broker group / queue ID. (string - alphaNumeric(256) - default: `*`)
* Context : Golang context. (context - default: context.background)
* Callback : Callback function when receive event. (func(event *Event, err error){} - required)

### Stdout Stream
This stream is for testing purpose. This will print the event in stdout. It should not be used in production since this
will print unnecessary log.

To create a client for stdout, use this function:
```go
client, err := NewStdoutClient(prefix string)
```

### Blackhole
This is used when client don't want the service to send event data to anywhere.

To create a client for stdout, use this function:
```go
client, err := NewBlackholeClient()
```
Callback function passing 2 parameters:
* ``event`` is object that store event message.
* ``err`` is an error that happen when consume the message.

## Event Message
Event message is a set of event information that would be publish or consume by client.

Event message format :
* id : Event ID with UUID format (string)
* id : Event ID (string - UUID v4 without Hyphens)
* name : Event name (string)
* namespace : Event namespace (string)
* traceId : Trace ID (string)
* clientId : Publisher client ID (string)
* userId : Publisher user ID (string)
* traceId : Trace ID (string - UUID v4 without Hyphens)
* clientId : Publisher client ID (string - UUID v4 without Hyphens)
* userId : Publisher user ID (string - UUID v4 without Hyphens)
* sessionId : Publisher session ID (string - UUID v4 without Hyphens)
* timestamp : Event time (time.Time)
* version : Event schema version (string)
* payload : Set of data / object that given by producer. Each data have own key for specific purpose. (map[string]interface{})
12 changes: 7 additions & 5 deletions blackhole.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ package eventstream
// BlackholeClient satisfies the publisher for mocking
type BlackholeClient struct{}

// NewBlackholeClient creates new telemetry client
func NewBlackholeClient() (*BlackholeClient, error) {
return &BlackholeClient{}, nil
// newBlackholeClient creates new telemetry client
func newBlackholeClient() *BlackholeClient {
return &BlackholeClient{}
}

func (client *BlackholeClient) Publish(publishBuilder *PublishBuilder) {
func (client *BlackholeClient) Publish(publishBuilder *PublishBuilder) error {
// do nothing
return nil
}

func (client *BlackholeClient) Register(subscribeBuilder *SubscribeBuilder) {
func (client *BlackholeClient) Register(subscribeBuilder *SubscribeBuilder) error {
// do nothing
return nil
}
7 changes: 3 additions & 4 deletions blackhole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@ import (
)

func TestNewBlackholeClient(t *testing.T) {
client, err := NewBlackholeClient()
client := newBlackholeClient()
expectedClient := &BlackholeClient{}
assert.Equal(t, expectedClient, client, "client should be equal")
assert.Nil(t, err, "error should be nil")
}

func TestBlackholePublish(t *testing.T) {
client := &BlackholeClient{}
client.Publish(nil)
_ = client.Publish(nil)
}

func TestBlackholeSubscribe(t *testing.T) {
client := &BlackholeClient{}
client.Register(nil)
_ = client.Register(nil)
}
24 changes: 12 additions & 12 deletions docker-compose-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,24 @@
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.2.1
image: wurstmeister/zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
expose:
- "2181"

kafka:
image: confluentinc/cp-kafka:5.2.1
image: wurstmeister/kafka
ports:
- "9092:9092"
expose:
- "9092"
depends_on:
- zookeeper
links:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_ADVERTISED_HOST_NAME: PLAINTEXT://localhost:9092
- KAFKA_BROKER_ID=1
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ADVERTISED_HOST_NAME=localhost
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
48 changes: 36 additions & 12 deletions eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package eventstream
import (
"context"
"errors"
"time"
)

const (
Expand All @@ -30,6 +31,7 @@ const (
const (
separator = "."
defaultVersion = "0.1.0"
defaultGroupID = "*"
)

// Event defines the structure of event
Expand All @@ -40,11 +42,19 @@ type Event struct {
ClientID string `json:"clientId"`
TraceID string `json:"traceId"`
UserID string `json:"userId"`
SessionID string `json:"sessionId"`
Timestamp string `json:"timestamp"`
Version string `json:"version"`
Payload map[string]interface{} `json:"payload"`
}

// BrokerConfig is custom configuration for message broker
type BrokerConfig struct {
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
}

// PublishBuilder defines the structure of message which is sent through message broker
type PublishBuilder struct {
topic []string
Expand All @@ -53,6 +63,7 @@ type PublishBuilder struct {
clientID string
traceID string
userID string
sessionID string
version string
payload map[string]interface{}
ctx context.Context
Expand Down Expand Up @@ -84,12 +95,6 @@ func (p *PublishBuilder) Namespace(namespace string) *PublishBuilder {
return p
}

// UserID set userID of publisher event
func (p *PublishBuilder) UserID(userID string) *PublishBuilder {
p.userID = userID
return p
}

// ClientID set clientID of publisher event
func (p *PublishBuilder) ClientID(clientID string) *PublishBuilder {
p.clientID = clientID
Expand All @@ -102,6 +107,18 @@ func (p *PublishBuilder) TraceID(traceID string) *PublishBuilder {
return p
}

// SessionID set sessionID of publisher event
func (p *PublishBuilder) SessionID(sessionID string) *PublishBuilder {
p.sessionID = sessionID
return p
}

// UserID set userID of publisher event
func (p *PublishBuilder) UserID(userID string) *PublishBuilder {
p.userID = userID
return p
}

// Version set event schema version
func (p *PublishBuilder) Version(version string) *PublishBuilder {
p.version = version
Expand All @@ -124,6 +141,7 @@ func (p *PublishBuilder) Context(ctx context.Context) *PublishBuilder {
// SubscribeBuilder defines the structure of message which is sent through message broker
type SubscribeBuilder struct {
topic string
groupID string
callback func(event *Event, err error)
eventName string
ctx context.Context
Expand All @@ -142,6 +160,12 @@ func (s *SubscribeBuilder) Topic(topic string) *SubscribeBuilder {
return s
}

// GroupID set subscriber groupID or queue group name
func (s *SubscribeBuilder) GroupID(groupID string) *SubscribeBuilder {
s.groupID = groupID
return s
}

// EventName set event name that will be subscribe
func (s *SubscribeBuilder) EventName(eventName string) *SubscribeBuilder {
s.eventName = eventName
Expand All @@ -161,21 +185,21 @@ func (s *SubscribeBuilder) Context(ctx context.Context) *SubscribeBuilder {
return s
}

func NewClient(prefix, stream string, brokers []string) (Client, error) {
func NewClient(prefix, stream string, brokers []string, config ...*BrokerConfig) (Client, error) {
switch stream {
case eventStreamNull:
return NewBlackholeClient()
return newBlackholeClient(), nil
case eventStreamStdout:
return NewStdoutClient(prefix)
return newStdoutClient(prefix), nil
case eventStreamKafka:
return NewKafkaClient(brokers, prefix)
return newKafkaClient(brokers, prefix, config...), nil
default:
return nil, errors.New("unsupported stream")
}
}

// Client is an interface for event stream functionality
type Client interface {
Publish(publishBuilder *PublishBuilder)
Register(subscribeBuilder *SubscribeBuilder)
Publish(publishBuilder *PublishBuilder) error
Register(subscribeBuilder *SubscribeBuilder) error
}
Loading

0 comments on commit eb875e3

Please sign in to comment.