From c32eca0e80a3564518eee9dafe06ba88cdbe9180 Mon Sep 17 00:00:00 2001 From: makehoney Date: Mon, 17 Sep 2018 20:23:39 +0900 Subject: [PATCH] Add connection query API --- api_gateway/connection_query_api.go | 124 ++++++++++++++++++++++++ cmd/connection/dial.go | 2 +- cmd/connection/list.go | 2 +- cmd/on/grpc_gatewayfx/grpc_gatewayfx.go | 2 +- common/event/event.go | 2 +- grpc_gateway/api/connection_api.go | 16 +-- grpc_gateway/connection.go | 2 +- grpc_gateway/infra/grpc_service.go | 2 +- grpc_gateway/infra/grpc_service_test.go | 14 +-- p2p/infra/adapter/event_handler.go | 4 +- p2p/infra/adapter/event_handler_test.go | 2 +- 11 files changed, 148 insertions(+), 24 deletions(-) create mode 100644 api_gateway/connection_query_api.go diff --git a/api_gateway/connection_query_api.go b/api_gateway/connection_query_api.go new file mode 100644 index 000000000..2d0121596 --- /dev/null +++ b/api_gateway/connection_query_api.go @@ -0,0 +1,124 @@ +package api_gateway + +import ( + "errors" + "sync" + + "github.com/it-chain/engine/common/event" +) + +var ErrConnectionExists = errors.New("Connection already exists") + +type ConnectionQueryApi struct { + mux *sync.Mutex + connectionRepository ConnectionRepository +} + +func NewConnectionQueryApi(connRepo ConnectionRepository) *ConnectionQueryApi { + return &ConnectionQueryApi{ + mux: &sync.Mutex{}, + connectionRepository: connRepo, + } +} + +func (q ConnectionQueryApi) GetAllConnectionList() ([]Connection, error) { + return q.connectionRepository.FindAll() +} + +func (q ConnectionQueryApi) GetConnectionByID(connID string) (Connection, error) { + return q.connectionRepository.FindByID(connID) +} + +type ConnectionRepository struct { + mux *sync.RWMutex + ConnectionTable map[string]Connection +} + +func NewConnectionRepository() *ConnectionRepository { + return &ConnectionRepository{ + mux: &sync.RWMutex{}, + ConnectionTable: make(map[string]Connection), + } +} + +func (cr *ConnectionRepository) Save(conn Connection) error { + cr.mux.Lock() + defer cr.mux.Unlock() + + _, exist := cr.ConnectionTable[conn.ConnectionID] + if exist { + return ErrConnectionExists + } + + cr.ConnectionTable[conn.ConnectionID] = conn + + return nil +} + +func (cr *ConnectionRepository) Remove(connID string) error { + cr.mux.Lock() + defer cr.mux.Unlock() + + delete(cr.ConnectionTable, connID) + + return nil +} + +func (cr *ConnectionRepository) FindAll() ([]Connection, error) { + cr.mux.Lock() + defer cr.mux.Unlock() + + connectionList := []Connection{} + + for _, conn := range cr.ConnectionTable { + connectionList = append(connectionList, conn) + } + + return connectionList, nil +} + +func (cr *ConnectionRepository) FindByID(connID string) (Connection, error) { + cr.mux.Lock() + defer cr.mux.Unlock() + + for _, conn := range cr.ConnectionTable { + if connID == conn.ConnectionID { + return conn, nil + } + } + + return Connection{}, nil +} + +type ConnectionEventListener struct { + connectionRepository ConnectionRepository +} + +func NewConnectionEventListener(connRepo ConnectionRepository) ConnectionEventListener { + return ConnectionEventListener{ + connectionRepository: connRepo, + } +} + +func (cel *ConnectionEventListener) HandleConnectionCreatedEvent(event event.ConnectionCreated) error { + connection := Connection{ + ConnectionID: event.ConnectionID, + Address: event.Address, + } + + err := cel.connectionRepository.Save(connection) + if err != nil { + return err + } + return nil +} + +func (cel *ConnectionEventListener) HandleConnectionClosedEvent(event event.ConnectionClosed) error { + cel.connectionRepository.Remove(event.ConnectionID) + return nil +} + +type Connection struct { + ConnectionID string + Address string +} diff --git a/cmd/connection/dial.go b/cmd/connection/dial.go index cb0b271ae..d3ea326f5 100644 --- a/cmd/connection/dial.go +++ b/cmd/connection/dial.go @@ -54,7 +54,7 @@ func connect(ip string) error { return } - logger.Infof(nil, "[Cmd] Connection created - Address: [%s], Id:[%s]", connection.Address, connection.ConnectionId) + logger.Infof(nil, "[Cmd] Connection created - Address: [%s], Id:[%s]", connection.Address, connection.ConnectionID) }) if err != nil { diff --git a/cmd/connection/list.go b/cmd/connection/list.go index 0ec4be01f..c1fe9ca3e 100644 --- a/cmd/connection/list.go +++ b/cmd/connection/list.go @@ -53,7 +53,7 @@ func list() error { fmt.Println("Index\t ID\t\t\t\t\t\t Address") for index, connection := range getConnectionListCommand.ConnectionList { fmt.Printf("[%d]\t [%s]\t [%s]\n", - index, connection.ConnectionId, connection.Address) + index, connection.ConnectionID, connection.Address) } }) diff --git a/cmd/on/grpc_gatewayfx/grpc_gatewayfx.go b/cmd/on/grpc_gatewayfx/grpc_gatewayfx.go index 17dfc653a..33cdfb603 100644 --- a/cmd/on/grpc_gatewayfx/grpc_gatewayfx.go +++ b/cmd/on/grpc_gatewayfx/grpc_gatewayfx.go @@ -78,7 +78,7 @@ func InitgRPCServer(lifecycle fx.Lifecycle, config *conf.Configuration, hostServ OnStop: func(context context.Context) error { connections, _ := hostService.GetAllConnections() for _, connection := range connections { - hostService.CloseConnection(connection.ConnectionId) + hostService.CloseConnection(connection.ConnectionID) } hostService.Stop() return nil diff --git a/common/event/event.go b/common/event/event.go index fcdc7efd3..848214bc4 100644 --- a/common/event/event.go +++ b/common/event/event.go @@ -168,5 +168,5 @@ type ConnectionCreated struct { // connection close type ConnectionClosed struct { - ConnectionId string + ConnectionID string } diff --git a/grpc_gateway/api/connection_api.go b/grpc_gateway/api/connection_api.go index a7e320db0..7ce597a2e 100644 --- a/grpc_gateway/api/connection_api.go +++ b/grpc_gateway/api/connection_api.go @@ -50,7 +50,7 @@ func (c ConnectionApi) CreateConnection(address string) (grpc_gateway.Connection return connection, err } - logger.Infof(nil, "[gRPC-Gateway] Connection created - Address [%s], ConnectionId [%s]", connection.Address, connection.ConnectionId) + logger.Infof(nil, "[gRPC-Gateway] Connection created - Address [%s], ConnectionID [%s]", connection.Address, connection.ConnectionID) return connection, nil } @@ -58,12 +58,12 @@ func (c ConnectionApi) CreateConnection(address string) (grpc_gateway.Connection func createConnectionCreatedEvent(connection grpc_gateway.Connection) event.ConnectionCreated { return event.ConnectionCreated{ Address: connection.Address, - ConnectionID: connection.ConnectionId, + ConnectionID: connection.ConnectionID, } } func (c ConnectionApi) CloseConnection(connectionID string) error { - logger.Infof(nil, "[gRPC-Gateway] Close connection - ConnectionId [%s]", connectionID) + logger.Infof(nil, "[gRPC-Gateway] Close connection - ConnectionID [%s]", connectionID) c.grpcService.CloseConnection(connectionID) @@ -72,23 +72,23 @@ func (c ConnectionApi) CloseConnection(connectionID string) error { func createConnectionClosedEvent(connectionID string) event.ConnectionClosed { return event.ConnectionClosed{ - ConnectionId: connectionID, + ConnectionID: connectionID, } } func (c ConnectionApi) OnConnection(connection grpc_gateway.Connection) { - logger.Infof(nil, "[gRPC-Gateway] Connection created - Address [%s], ConnectionId [%s]", connection.Address, connection.ConnectionId) + logger.Infof(nil, "[gRPC-Gateway] Connection created - Address [%s], ConnectionID [%s]", connection.Address, connection.ConnectionID) if err := c.eventService.Publish("connection.created", createConnectionCreatedEvent(connection)); err != nil { - logger.Infof(nil, "[gRPC-Gateway] Fail to publish connection createdEvent - ConnectionId: [%s]", connection.ConnectionId) + logger.Infof(nil, "[gRPC-Gateway] Fail to publish connection createdEvent - ConnectionID: [%s]", connection.ConnectionID) } } func (c ConnectionApi) OnDisconnection(connection grpc_gateway.Connection) { - logger.Infof(nil, "[gRPC-Gateway] Connection closed - ConnectionId [%s]", connection.ConnectionId) + logger.Infof(nil, "[gRPC-Gateway] Connection closed - ConnectionID [%s]", connection.ConnectionID) if err := c.eventService.Publish("connection.closed", connection); err != nil { - logger.Infof(nil, "[gRPC-Gateway] Fail to publish connection createdEvent - ConnectionId: [%s]", connection.ConnectionId) + logger.Infof(nil, "[gRPC-Gateway] Fail to publish connection createdEvent - ConnectionID: [%s]", connection.ConnectionID) } } diff --git a/grpc_gateway/connection.go b/grpc_gateway/connection.go index bf027800e..f90e126f6 100644 --- a/grpc_gateway/connection.go +++ b/grpc_gateway/connection.go @@ -17,6 +17,6 @@ package grpc_gateway type Connection struct { - ConnectionId string + ConnectionID string Address string } diff --git a/grpc_gateway/infra/grpc_service.go b/grpc_gateway/infra/grpc_service.go index e6215a5dc..a26e449ac 100644 --- a/grpc_gateway/infra/grpc_service.go +++ b/grpc_gateway/infra/grpc_service.go @@ -91,7 +91,7 @@ func (g *GrpcHostService) Dial(address string) (grpc_gateway.Connection, error) func toGatewayConnectionModel(connection bifrost.Connection) grpc_gateway.Connection { return grpc_gateway.Connection{ - ConnectionId: connection.GetID(), + ConnectionID: connection.GetID(), Address: connection.GetIP(), } } diff --git a/grpc_gateway/infra/grpc_service_test.go b/grpc_gateway/infra/grpc_service_test.go index 72ca758bb..44d41b646 100644 --- a/grpc_gateway/infra/grpc_service_test.go +++ b/grpc_gateway/infra/grpc_service_test.go @@ -391,7 +391,7 @@ func TestGrpcHostService_SendMessages(t *testing.T) { handler := &MockHandler{} handler.OnConnectionFunc = func(connection grpc_gateway.Connection) { - connID = connection.ConnectionId + connID = connection.ConnectionID } handler.OnDisconnectionFunc = func(connection grpc_gateway.Connection) { @@ -409,7 +409,7 @@ func TestGrpcHostService_SendMessages(t *testing.T) { publishedData = test.input.Message - clientHostService.SendMessages(test.input.Message, test.input.Protocol, conn.ConnectionId) + clientHostService.SendMessages(test.input.Message, test.input.Protocol, conn.ConnectionID) } } @@ -452,12 +452,12 @@ func TestGrpcHostService_Close(t *testing.T) { handler := &MockHandler{} handler.OnConnectionFunc = func(connection grpc_gateway.Connection) { fmt.Println(connection) - connID = connection.ConnectionId + connID = connection.ConnectionID } handler.OnDisconnectionFunc = func(connection grpc_gateway.Connection) { fmt.Println("connection is closing", connection) - assert.Equal(t, connID, connection.ConnectionId) + assert.Equal(t, connID, connection.ConnectionID) } serverHostService.SetHandler(handler) @@ -468,7 +468,7 @@ func TestGrpcHostService_Close(t *testing.T) { conn, err := clientHostService.Dial(test.input) assert.NoError(t, err) - clientHostService.CloseConnection(conn.ConnectionId) + clientHostService.CloseConnection(conn.ConnectionID) } } @@ -503,7 +503,7 @@ func TestGrpcHostService_GetAllConnections(t *testing.T) { var connID string handler := &MockHandler{} handler.OnConnectionFunc = func(connection grpc_gateway.Connection) { - connID = connection.ConnectionId + connID = connection.ConnectionID } handler.OnDisconnectionFunc = func(connection grpc_gateway.Connection) { @@ -518,5 +518,5 @@ func TestGrpcHostService_GetAllConnections(t *testing.T) { connections, err := serverHostService.GetAllConnections() assert.NoError(t, err) - assert.Equal(t, connections[0].ConnectionId, connID) + assert.Equal(t, connections[0].ConnectionID, connID) } diff --git a/p2p/infra/adapter/event_handler.go b/p2p/infra/adapter/event_handler.go index d93e7afb6..e5afb24e8 100644 --- a/p2p/infra/adapter/event_handler.go +++ b/p2p/infra/adapter/event_handler.go @@ -71,11 +71,11 @@ func (eh *EventHandler) HandleConnCreatedEvent(event event.ConnectionCreated) er //todo deleted peer if disconnected peer is leader func (eh *EventHandler) HandleConnDisconnectedEvent(event event.ConnectionClosed) error { - if event.ConnectionId == "" { + if event.ConnectionID == "" { return ErrEmptyPeerId } - err := eh.peerApi.Remove(p2p.PeerId{Id: event.ConnectionId}) + err := eh.peerApi.Remove(p2p.PeerId{Id: event.ConnectionID}) if err != nil { log.Println(err) diff --git a/p2p/infra/adapter/event_handler_test.go b/p2p/infra/adapter/event_handler_test.go index caae2b083..1ec305239 100644 --- a/p2p/infra/adapter/event_handler_test.go +++ b/p2p/infra/adapter/event_handler_test.go @@ -119,7 +119,7 @@ func TestEventHandler_HandleConnDisconnectedEvent(t *testing.T) { t.Logf("running test case %s", testName) e := event.ConnectionClosed{ - ConnectionId: test.input.id, + ConnectionID: test.input.id, } err := eventHandler.HandleConnDisconnectedEvent(e)