Skip to content

Commit

Permalink
Add connection query API
Browse files Browse the repository at this point in the history
  • Loading branch information
MakeHoney authored and junbeomlee committed Sep 17, 2018
1 parent a77961c commit c32eca0
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 24 deletions.
124 changes: 124 additions & 0 deletions api_gateway/connection_query_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package api_gateway

import (
"errors"
"sync"

"github.com/it-chain/engine/common/event"
)

var ErrConnectionExists = errors.New("Connection already exists")

type ConnectionQueryApi struct {
mux *sync.Mutex
connectionRepository ConnectionRepository
}

func NewConnectionQueryApi(connRepo ConnectionRepository) *ConnectionQueryApi {
return &ConnectionQueryApi{
mux: &sync.Mutex{},
connectionRepository: connRepo,
}
}

func (q ConnectionQueryApi) GetAllConnectionList() ([]Connection, error) {
return q.connectionRepository.FindAll()
}

func (q ConnectionQueryApi) GetConnectionByID(connID string) (Connection, error) {
return q.connectionRepository.FindByID(connID)
}

type ConnectionRepository struct {
mux *sync.RWMutex
ConnectionTable map[string]Connection
}

func NewConnectionRepository() *ConnectionRepository {
return &ConnectionRepository{
mux: &sync.RWMutex{},
ConnectionTable: make(map[string]Connection),
}
}

func (cr *ConnectionRepository) Save(conn Connection) error {
cr.mux.Lock()
defer cr.mux.Unlock()

_, exist := cr.ConnectionTable[conn.ConnectionID]
if exist {
return ErrConnectionExists
}

cr.ConnectionTable[conn.ConnectionID] = conn

return nil
}

func (cr *ConnectionRepository) Remove(connID string) error {
cr.mux.Lock()
defer cr.mux.Unlock()

delete(cr.ConnectionTable, connID)

return nil
}

func (cr *ConnectionRepository) FindAll() ([]Connection, error) {
cr.mux.Lock()
defer cr.mux.Unlock()

connectionList := []Connection{}

for _, conn := range cr.ConnectionTable {
connectionList = append(connectionList, conn)
}

return connectionList, nil
}

func (cr *ConnectionRepository) FindByID(connID string) (Connection, error) {
cr.mux.Lock()
defer cr.mux.Unlock()

for _, conn := range cr.ConnectionTable {
if connID == conn.ConnectionID {
return conn, nil
}
}

return Connection{}, nil
}

type ConnectionEventListener struct {
connectionRepository ConnectionRepository
}

func NewConnectionEventListener(connRepo ConnectionRepository) ConnectionEventListener {
return ConnectionEventListener{
connectionRepository: connRepo,
}
}

func (cel *ConnectionEventListener) HandleConnectionCreatedEvent(event event.ConnectionCreated) error {
connection := Connection{
ConnectionID: event.ConnectionID,
Address: event.Address,
}

err := cel.connectionRepository.Save(connection)
if err != nil {
return err
}
return nil
}

func (cel *ConnectionEventListener) HandleConnectionClosedEvent(event event.ConnectionClosed) error {
cel.connectionRepository.Remove(event.ConnectionID)
return nil
}

type Connection struct {
ConnectionID string
Address string
}
2 changes: 1 addition & 1 deletion cmd/connection/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func connect(ip string) error {
return
}

logger.Infof(nil, "[Cmd] Connection created - Address: [%s], Id:[%s]", connection.Address, connection.ConnectionId)
logger.Infof(nil, "[Cmd] Connection created - Address: [%s], Id:[%s]", connection.Address, connection.ConnectionID)
})

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/connection/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func list() error {
fmt.Println("Index\t ID\t\t\t\t\t\t Address")
for index, connection := range getConnectionListCommand.ConnectionList {
fmt.Printf("[%d]\t [%s]\t [%s]\n",
index, connection.ConnectionId, connection.Address)
index, connection.ConnectionID, connection.Address)
}
})

Expand Down
2 changes: 1 addition & 1 deletion cmd/on/grpc_gatewayfx/grpc_gatewayfx.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func InitgRPCServer(lifecycle fx.Lifecycle, config *conf.Configuration, hostServ
OnStop: func(context context.Context) error {
connections, _ := hostService.GetAllConnections()
for _, connection := range connections {
hostService.CloseConnection(connection.ConnectionId)
hostService.CloseConnection(connection.ConnectionID)
}
hostService.Stop()
return nil
Expand Down
2 changes: 1 addition & 1 deletion common/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,5 @@ type ConnectionCreated struct {

// connection close
type ConnectionClosed struct {
ConnectionId string
ConnectionID string
}
16 changes: 8 additions & 8 deletions grpc_gateway/api/connection_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,20 @@ func (c ConnectionApi) CreateConnection(address string) (grpc_gateway.Connection
return connection, err
}

logger.Infof(nil, "[gRPC-Gateway] Connection created - Address [%s], ConnectionId [%s]", connection.Address, connection.ConnectionId)
logger.Infof(nil, "[gRPC-Gateway] Connection created - Address [%s], ConnectionID [%s]", connection.Address, connection.ConnectionID)

return connection, nil
}

func createConnectionCreatedEvent(connection grpc_gateway.Connection) event.ConnectionCreated {
return event.ConnectionCreated{
Address: connection.Address,
ConnectionID: connection.ConnectionId,
ConnectionID: connection.ConnectionID,
}
}

func (c ConnectionApi) CloseConnection(connectionID string) error {
logger.Infof(nil, "[gRPC-Gateway] Close connection - ConnectionId [%s]", connectionID)
logger.Infof(nil, "[gRPC-Gateway] Close connection - ConnectionID [%s]", connectionID)

c.grpcService.CloseConnection(connectionID)

Expand All @@ -72,23 +72,23 @@ func (c ConnectionApi) CloseConnection(connectionID string) error {

func createConnectionClosedEvent(connectionID string) event.ConnectionClosed {
return event.ConnectionClosed{
ConnectionId: connectionID,
ConnectionID: connectionID,
}
}

func (c ConnectionApi) OnConnection(connection grpc_gateway.Connection) {
logger.Infof(nil, "[gRPC-Gateway] Connection created - Address [%s], ConnectionId [%s]", connection.Address, connection.ConnectionId)
logger.Infof(nil, "[gRPC-Gateway] Connection created - Address [%s], ConnectionID [%s]", connection.Address, connection.ConnectionID)

if err := c.eventService.Publish("connection.created", createConnectionCreatedEvent(connection)); err != nil {
logger.Infof(nil, "[gRPC-Gateway] Fail to publish connection createdEvent - ConnectionId: [%s]", connection.ConnectionId)
logger.Infof(nil, "[gRPC-Gateway] Fail to publish connection createdEvent - ConnectionID: [%s]", connection.ConnectionID)
}
}

func (c ConnectionApi) OnDisconnection(connection grpc_gateway.Connection) {
logger.Infof(nil, "[gRPC-Gateway] Connection closed - ConnectionId [%s]", connection.ConnectionId)
logger.Infof(nil, "[gRPC-Gateway] Connection closed - ConnectionID [%s]", connection.ConnectionID)

if err := c.eventService.Publish("connection.closed", connection); err != nil {
logger.Infof(nil, "[gRPC-Gateway] Fail to publish connection createdEvent - ConnectionId: [%s]", connection.ConnectionId)
logger.Infof(nil, "[gRPC-Gateway] Fail to publish connection createdEvent - ConnectionID: [%s]", connection.ConnectionID)
}
}

Expand Down
2 changes: 1 addition & 1 deletion grpc_gateway/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@
package grpc_gateway

type Connection struct {
ConnectionId string
ConnectionID string
Address string
}
2 changes: 1 addition & 1 deletion grpc_gateway/infra/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (g *GrpcHostService) Dial(address string) (grpc_gateway.Connection, error)

func toGatewayConnectionModel(connection bifrost.Connection) grpc_gateway.Connection {
return grpc_gateway.Connection{
ConnectionId: connection.GetID(),
ConnectionID: connection.GetID(),
Address: connection.GetIP(),
}
}
Expand Down
14 changes: 7 additions & 7 deletions grpc_gateway/infra/grpc_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func TestGrpcHostService_SendMessages(t *testing.T) {

handler := &MockHandler{}
handler.OnConnectionFunc = func(connection grpc_gateway.Connection) {
connID = connection.ConnectionId
connID = connection.ConnectionID
}

handler.OnDisconnectionFunc = func(connection grpc_gateway.Connection) {
Expand All @@ -409,7 +409,7 @@ func TestGrpcHostService_SendMessages(t *testing.T) {

publishedData = test.input.Message

clientHostService.SendMessages(test.input.Message, test.input.Protocol, conn.ConnectionId)
clientHostService.SendMessages(test.input.Message, test.input.Protocol, conn.ConnectionID)
}
}

Expand Down Expand Up @@ -452,12 +452,12 @@ func TestGrpcHostService_Close(t *testing.T) {
handler := &MockHandler{}
handler.OnConnectionFunc = func(connection grpc_gateway.Connection) {
fmt.Println(connection)
connID = connection.ConnectionId
connID = connection.ConnectionID
}

handler.OnDisconnectionFunc = func(connection grpc_gateway.Connection) {
fmt.Println("connection is closing", connection)
assert.Equal(t, connID, connection.ConnectionId)
assert.Equal(t, connID, connection.ConnectionID)
}

serverHostService.SetHandler(handler)
Expand All @@ -468,7 +468,7 @@ func TestGrpcHostService_Close(t *testing.T) {

conn, err := clientHostService.Dial(test.input)
assert.NoError(t, err)
clientHostService.CloseConnection(conn.ConnectionId)
clientHostService.CloseConnection(conn.ConnectionID)
}
}

Expand Down Expand Up @@ -503,7 +503,7 @@ func TestGrpcHostService_GetAllConnections(t *testing.T) {
var connID string
handler := &MockHandler{}
handler.OnConnectionFunc = func(connection grpc_gateway.Connection) {
connID = connection.ConnectionId
connID = connection.ConnectionID
}

handler.OnDisconnectionFunc = func(connection grpc_gateway.Connection) {
Expand All @@ -518,5 +518,5 @@ func TestGrpcHostService_GetAllConnections(t *testing.T) {

connections, err := serverHostService.GetAllConnections()
assert.NoError(t, err)
assert.Equal(t, connections[0].ConnectionId, connID)
assert.Equal(t, connections[0].ConnectionID, connID)
}
4 changes: 2 additions & 2 deletions p2p/infra/adapter/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ func (eh *EventHandler) HandleConnCreatedEvent(event event.ConnectionCreated) er
//todo deleted peer if disconnected peer is leader
func (eh *EventHandler) HandleConnDisconnectedEvent(event event.ConnectionClosed) error {

if event.ConnectionId == "" {
if event.ConnectionID == "" {
return ErrEmptyPeerId
}

err := eh.peerApi.Remove(p2p.PeerId{Id: event.ConnectionId})
err := eh.peerApi.Remove(p2p.PeerId{Id: event.ConnectionID})

if err != nil {
log.Println(err)
Expand Down
2 changes: 1 addition & 1 deletion p2p/infra/adapter/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestEventHandler_HandleConnDisconnectedEvent(t *testing.T) {
t.Logf("running test case %s", testName)

e := event.ConnectionClosed{
ConnectionId: test.input.id,
ConnectionID: test.input.id,
}

err := eventHandler.HandleConnDisconnectedEvent(e)
Expand Down

0 comments on commit c32eca0

Please sign in to comment.