Skip to content

Commit

Permalink
Merge pull request #7 from it-chain/feature/avengers
Browse files Browse the repository at this point in the history
implement mock event service
  • Loading branch information
frontalnh authored Sep 16, 2018
2 parents 35496cf + 5ed1ef4 commit 7190631
Show file tree
Hide file tree
Showing 10 changed files with 509 additions and 233 deletions.
364 changes: 238 additions & 126 deletions .idea/workspace.xml

Large diffs are not rendered by default.

53 changes: 53 additions & 0 deletions mock/event_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2018 It-chain
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package mock

import (
"errors"
"reflect"
)

var ErrEventType = errors.New("Error type of event is not struct")

type EventService struct {
ProcessId string
PublishFunc func(processId string, topic string, event interface{}) error
}

func NewEventService(processId string, publishFunc func(processId string, topic string, event interface{}) error) *EventService {
return &EventService{
ProcessId: processId,
PublishFunc: publishFunc,
}
}

func (s *EventService) Publish(topic string, event interface{}) error {

if !eventIsStruct(event) {
return ErrEventType
}

if reflect.ValueOf(event).Elem().Type().Name() == "DeliverGrpc" {
return s.PublishFunc(s.ProcessId, topic, event)
}

return nil
}

func eventIsStruct(event interface{}) bool {
return reflect.TypeOf(event).Kind() == reflect.Struct
}
1 change: 0 additions & 1 deletion mock/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,3 @@
*/

package mock

62 changes: 54 additions & 8 deletions mock/network_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,24 @@ import (

"github.com/it-chain/engine/common/command"
"github.com/it-chain/engine/common/logger"
"reflect"
)

//network manager builds environment for communication between multiple nodes in network

type NetworkManager struct {
mutex sync.Mutex
ChannelMap map[string]map[string]chan command.ReceiveGrpc // channel for receive deliverGrpc command

ProcessMap map[string]*Process
}

func NewNetworkManager() NetworkManager {
func NewNetworkManager() *NetworkManager {

return NetworkManager{
return &NetworkManager{
ChannelMap: make(map[string]map[string]chan command.ReceiveGrpc),
ProcessMap: make(map[string]*Process),
mutex: sync.Mutex{},
}
}

Expand All @@ -46,7 +51,7 @@ func (n *NetworkManager) Push(processId string, queue string, c command.ReceiveG
defer n.mutex.Unlock()

logger.Infof(nil, "push to channel: %s", processId)
n.ChannelMap[processId][queue]<-c
n.ChannelMap[processId][queue] <- c

return nil
}
Expand Down Expand Up @@ -75,6 +80,7 @@ func (n *NetworkManager) GrpcCall(processId string, queue string, params interfa
return nil
}

// Will Be DEPRECATED!
//GrpcConsume would be injected to rpc server
//processId => receiver
func (n *NetworkManager) GrpcConsume(processId string, queue string, handler func(command command.ReceiveGrpc) error) error {
Expand All @@ -87,11 +93,11 @@ func (n *NetworkManager) GrpcConsume(processId string, queue string, handler fun
for end {
select {
case message := <-n.ChannelMap[processId][queue]:
logger.Infof(nil,"receive message from : %s message: %v",processId, message)
logger.Infof(nil, "receive message from : %s message: %v", processId, message)
handler(message)

case <-time.After(4 * time.Second):
logger.Info(nil,"failed to consume, timed out!")
logger.Info(nil, "failed to consume, timed out!")
end = false
}
}
Expand All @@ -100,11 +106,51 @@ func (n *NetworkManager) GrpcConsume(processId string, queue string, handler fun
return nil
}

func (n *NetworkManager) AddProcess(process Process) {
// test success
func (n *NetworkManager) Publish(from string, topic string, event interface{}) error {
n.mutex.Lock()
defer n.mutex.Unlock()

n.ChannelMap[process.Id] = make(map[string]chan command.ReceiveGrpc)
if reflect.ValueOf(event).Type().Name() == "DeliverGrpc"{
for _, id := range event.(command.DeliverGrpc).RecipientList {
logger.Infof(nil, "publish to: %s", id)

go func(id string) {
n.ProcessMap[id].GrpcCommandReceiver <- command.ReceiveGrpc{
MessageId: event.(command.DeliverGrpc).MessageId,
Body: event.(command.DeliverGrpc).Body,
ConnectionID: from,
Protocol: event.(command.DeliverGrpc).Protocol,
}
}(id)

}
}

return nil
}

func (n *NetworkManager) Start() {
for id, process := range n.ProcessMap {
go func(id string, process *Process) {
logger.Infof(nil, "process %s is running", process.Id)
logger.Infof(nil, "channel %s is %o", process.Id, process.GrpcCommandReceiver)
select {
case message := <-process.GrpcCommandReceiver:
for _, handler := range process.GrpcCommandHandlers {
handler(message)
}

case <-time.After(4 * time.Second):
logger.Info(nil, "failed to consume, timed out!")
}
}(id, process)
}
}

func (n *NetworkManager) AddProcess(process *Process) {
n.mutex.Lock()
defer n.mutex.Unlock()

n.ChannelMap[process.Id]["message.receive"] = process.GrpcCommandReceiver
n.ProcessMap[process.Id] = process
}
100 changes: 94 additions & 6 deletions mock/network_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package mock_test
import (
"testing"

"github.com/it-chain/engine/common/command"
"github.com/magiconair/properties/assert"
"time"

"sync"

"github.com/it-chain/avengers/mock"
"github.com/it-chain/engine/common/command"
"github.com/magiconair/properties/assert"
)

func TestNewNetworkManager(t *testing.T) {
Expand Down Expand Up @@ -57,7 +60,7 @@ func TestNetworkManager_GrpcCall(t *testing.T) {
RecipientList: test.input.RecipientList,
Protocol: test.input.Protocol,
}
networkManager.GrpcCall("1","message.deliver", *deliverGrpc, func() {})
networkManager.GrpcCall("1", "message.deliver", *deliverGrpc, func() {})

t.Logf("end of test calling")

Expand Down Expand Up @@ -87,8 +90,8 @@ func TestNetworkManager_GrpcConsume(t *testing.T) {
ProcessId string
handler func(c command.ReceiveGrpc) error
}{RecipientList: []string{"1", "2", "3"},
ProcessId: "1",
handler: func(c command.ReceiveGrpc) error {callbackIndex=2; t.Logf("handler!"); return nil }}},
ProcessId: "1",
handler: func(c command.ReceiveGrpc) error { callbackIndex = 2; t.Logf("handler!"); return nil }}},
}

for testName, test := range tests {
Expand All @@ -104,7 +107,7 @@ func TestNetworkManager_GrpcConsume(t *testing.T) {

networkManager.GrpcConsume(test.input.ProcessId, "message.receive", test.input.handler)

networkManager.GrpcCall("1","message.deliver", *deliverGrpc, func() {
networkManager.GrpcCall("1", "message.deliver", *deliverGrpc, func() {
callbackIndex++
})

Expand All @@ -115,3 +118,88 @@ func TestNetworkManager_GrpcConsume(t *testing.T) {
time.Sleep(5 * time.Second)
assert.Equal(t, callbackIndex, 2)
}

func TestNetworkManager_Publish(t *testing.T) {
mem := ClosureMemory()
wg := sync.WaitGroup{}
wg.Add(2)
networkManager := SetNetworkManager(mem)

event := command.DeliverGrpc{
MessageId: "1",
RecipientList: []string{"2", "3"},
}

go func() {
m2 := <-networkManager.ProcessMap["2"].GrpcCommandReceiver
assert.Equal(t, m2.MessageId, "1")
wg.Done()
}()

go func() {
m3 := <-networkManager.ProcessMap["3"].GrpcCommandReceiver
assert.Equal(t, m3.MessageId, "1")
wg.Done()
}()

networkManager.Publish("1", "deliver.message", event)

wg.Wait()
}

func TestNetworkManager_Start(t *testing.T) {
mem := ClosureMemory()
net := SetNetworkManager(mem)

net.Start()

command := command.DeliverGrpc{
MessageId: "123",
RecipientList: []string{"2", "3"},
}

net.Publish("1", "message.deliver", command)

time.Sleep(2 * time.Second)

assert.Equal(t, mem(), 3)
}

func SetNetworkManager(closerMemory func() int) *mock.NetworkManager {
networkManager := mock.NewNetworkManager()

process1 := mock.NewProcess("1")
handler1 := func(command command.ReceiveGrpc) error {
return nil
}
process1.RegisterHandler(handler1)

process2 := mock.NewProcess("2")
handler2 := func(command command.ReceiveGrpc) error {
closerMemory()
return nil
}
process2.RegisterHandler(handler2)

process3 := mock.NewProcess("3")
handler3 := func(command command.ReceiveGrpc) error {
closerMemory()
return nil
}
process3.RegisterHandler(handler3)

networkManager.AddProcess(process1)
networkManager.AddProcess(process2)
networkManager.AddProcess(process3)

return networkManager
}

func ClosureMemory() func() int {
i := 0

return func() int {
i++
return i
}
}
38 changes: 22 additions & 16 deletions mock/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,44 @@ package mock

import (
"reflect"
"github.com/it-chain/engine/common/command"
"sync"

"github.com/it-chain/engine/common/command"
)

type Process struct {
mutex sync.Mutex
Id string
Id string

GrpcCommandHandlers []func(command command.ReceiveGrpc) error
GrpcCommandReceiver chan command.ReceiveGrpc //should be register to network's channel map
Services map[string]interface{} // register service or api for testing which has injected mock client
}

func NewProcess() Process {
return Process{}
}

func (p *Process) Init(id string) {
p.Services = make(map[string]interface{})
p.GrpcCommandReceiver = make(chan command.ReceiveGrpc)
p.Id = id
func NewProcess(processId string) *Process {
return &Process{
Id: processId,
Services: make(map[string]interface{}),
GrpcCommandReceiver: make(chan command.ReceiveGrpc),
GrpcCommandHandlers: make([]func(command command.ReceiveGrpc) error, 0),
}
}

// 테스트 과정에서 사용하기 위한 다양한 서비스 혹은 api 들을 등록한다.
// 이를 통해 특정 서비스 혹은 api를 사용함에 있어 어느 프로세스에 종속된 것인지를 직관적으로 알 수 있도록 한다.
// register all kinds of services or apis to use in test process
// so we can intuitively know processes we are using are belongs to the process
func (p *Process) Register(service interface{}) {
p.mutex.Lock()
defer p.mutex.Unlock()

p.Services[reflect.ValueOf(service).Elem().Type().Name()] = service
}

//func (p *Process) RegisterHandler(handler func(command command.ReceiveGrpc) error) error {
//
// p.GrpcCommandHandlers = append(p.GrpcCommandHandlers, handler)
//
// return nil
//}
// register handlers to be started when network manager start to work
func (p *Process) RegisterHandler(handler func(command command.ReceiveGrpc) error) error {

p.GrpcCommandHandlers = append(p.GrpcCommandHandlers, handler)

return nil
}
Loading

0 comments on commit 7190631

Please sign in to comment.