diff --git a/backend/internal/lobby/client.go b/backend/internal/lobby/client.go index daaa696..aaa2d2b 100644 --- a/backend/internal/lobby/client.go +++ b/backend/internal/lobby/client.go @@ -17,14 +17,16 @@ import ( ) const ( - writeTimeout = 5 * time.Second + writeTimeout = 5 * time.Second + SEND_FAILURE_LIMIT = 25 ) type clientImpl struct { - clientId string - user sql.User - addr net.Addr - conn *websocket.Conn + clientId string + user sql.User + addr net.Addr + conn *websocket.Conn + sendFailure int logger *zap.SugaredLogger @@ -37,12 +39,13 @@ type clientImpl struct { // and allows server to communicate with him func NewClient(user sql.User, conn *websocket.Conn, serv Server, logger *zap.Logger) Client { return &clientImpl{ - clientId: uuid.NewString(), - user: user, - addr: conn.RemoteAddr(), - conn: conn, - logger: logger.Sugar(), - server: serv, + clientId: uuid.NewString(), + user: user, + addr: conn.RemoteAddr(), + conn: conn, + logger: logger.Sugar(), + server: serv, + sendFailure: 0, send: make(chan *lobby_messages.LobbyMessage), } @@ -68,19 +71,36 @@ func (c *clientImpl) GetRemoteAddr() net.Addr { // Close closes the client func (c *clientImpl) Close() { - //close(c.send) + defer func() { + if r := recover(); r != nil { + c.logger.Debugw("recovered after a panic", "err", r) + } + }() + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) c.conn.Close() - // TODO: Server needs to be aware that we disconnected as well + close(c.send) +} + +// SendClientCloseToServer initiates the client close procedure +// by sending close message to server +func (c *clientImpl) SendClientCloseToServer() { + c.logger.Debugw("sending lobby.disconnect message", "client", c) + events.Publish("lobby.disconnect", c) } // Send sends lobby_messages func (c *clientImpl) Send(msg *lobby_messages.LobbyMessage) { - c.logger.Debugw("sending message to c.send", "msg", msg) + c.logger.Debugw("sending message to c.send", "clientId", c.clientId, "userId", c.user.ID) select { case c.send <- msg: - c.logger.Debugw("sent message to c.send", "msg", msg) + c.logger.Debugw("sent message to c.send", "clientId", c.clientId, "userId", c.user.ID) + c.sendFailure = 0 case <-time.After(5 * time.Second): - c.logger.Warnw("timeout sending message to c.send", "msg", msg) + c.logger.Warnw("timeout sending message to c.send", "clientId", c.clientId, "userId", c.user.ID) + c.sendFailure++ + if c.sendFailure >= SEND_FAILURE_LIMIT { + c.SendClientCloseToServer() + } } } @@ -99,7 +119,7 @@ func (c *clientImpl) ReadPump() { authenticated := false - defer c.Close() + defer c.SendClientCloseToServer() for { _, msg, err := c.conn.ReadMessage() if err != nil { @@ -178,10 +198,6 @@ func (c *clientImpl) ReadPump() { break } } - - c.logger.Debugw("sending server.disconnect message", "client", c) - events.Publish("lobby.disconnect", c) - //events.Publish("server.disconnect", c.user.ID) } // SendPump sends lobby_messages to client and checks if there is an error and returns it @@ -189,7 +205,13 @@ func (c *clientImpl) SendPump() { c.logger.Debugw("started send pump for client", "id", c.user.ID, "remoteAddr", c.addr) - for message := range c.send { + for { + message, open := <-c.send + if !open { + c.logger.Infow("gracefully closing SendPump", "id", c.user.ID, "clientId", c.clientId, "remoteAddr", c.addr) + break + } + c.logger.Debugw("sending message", "msg", message, "client", c.clientId) // So we don't wait for too long before we send @@ -227,5 +249,4 @@ func (c *clientImpl) SendPump() { } c.logger.Debugw("exiting client send pump", "id", c.user.ID, "remoteAddr", c.addr) - c.conn.WriteMessage(websocket.CloseMessage, []byte{}) } diff --git a/backend/internal/lobby/types.go b/backend/internal/lobby/types.go index cd56d98..a13ba75 100644 --- a/backend/internal/lobby/types.go +++ b/backend/internal/lobby/types.go @@ -18,6 +18,7 @@ type Server interface { AddNewFriend(userId string, friendEmail string) IncomingFriendRequestAcceptDeny(userId string, friendRequestId string, accept bool) RemoveFriend(userId string, relationshipId string) + Disconnect(client Client) } // Client contains all the methods we need for recognising and working with the Client diff --git a/backend/internal/ws/client.go b/backend/internal/ws/client.go index 06ad16f..b2dc95e 100644 --- a/backend/internal/ws/client.go +++ b/backend/internal/ws/client.go @@ -17,16 +17,18 @@ import ( ) const ( - writeTimeout = 5 * time.Second + writeTimeout = 5 * time.Second + SEND_FAILURE_LIMIT = 25 ) type clientImpl struct { - clientId string - user sql.User - addr net.Addr - position int32 - conn *websocket.Conn - game string + clientId string + user sql.User + addr net.Addr + position int32 + conn *websocket.Conn + game string + sendFailure int logger *zap.SugaredLogger @@ -73,21 +75,39 @@ func (c *clientImpl) GetRemoteAddr() net.Addr { return c.addr } -// Close closes the client +// Close closes the client. +// Should be called only from the server func (c *clientImpl) Close() { - //close(c.send) + defer func() { + if r := recover(); r != nil { + c.logger.Debugw("recovered after a panic", "err", r) + } + }() + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) c.conn.Close() - // TODO: Server needs to be aware that we disconnected as well + close(c.send) +} + +// SendClientCloseToServer initiates the client close procedure +// by sending close message to server +func (c *clientImpl) SendClientCloseToServer() { + c.logger.Debugw("sending server.disconnect message", "client", c) + events.Publish("server.disconnect", c) } // Send sends messages func (c *clientImpl) Send(msg *messages.Message) { - c.logger.Debugw("sending message to c.send", "msg", msg) + c.logger.Debugw("sending message to c.send", "clientId", c.clientId, "userId", c.user.ID) select { case c.send <- msg: - c.logger.Debugw("sent message to c.send", "msg", msg) + c.logger.Debugw("sent message to c.send", "clientId", c.clientId, "userId", c.user.ID) + c.sendFailure = 0 case <-time.After(5 * time.Second): - c.logger.Warnw("timeout sending message to c.send", "msg", msg) + c.logger.Warnw("timeout sending message to c.send", "clientId", c.clientId, "userId", c.user.ID) + c.sendFailure++ + if c.sendFailure >= SEND_FAILURE_LIMIT { + c.SendClientCloseToServer() + } } } @@ -104,7 +124,7 @@ func (c *clientImpl) ReadPump() { c.logger.Debugw("started read pump for client", "id", c.user.ID, "remoteAddr", c.addr) - defer c.Close() + defer c.SendClientCloseToServer() for { _, msg, err := c.conn.ReadMessage() if err != nil { @@ -253,10 +273,6 @@ func (c *clientImpl) ReadPump() { break } } - - c.logger.Debugw("sending server.disconnect message", "client", c) - events.Publish("server.disconnect", c) - //events.Publish("server.disconnect", c.user.ID) } // SendPump sends messages to client and checks if there is an error and returns it @@ -264,32 +280,38 @@ func (c *clientImpl) SendPump() { c.logger.Debugw("started send pump for client", "id", c.user.ID, "remoteAddr", c.addr) - for message := range c.send { + for { + message, open := <-c.send + if !open { + c.logger.Debugw("gracefully closing SendPump", "id", c.user.ID, "clientId", c.clientId, "remoteAddr", c.addr) + break + } + c.logger.Debugw("sending message", "msg", message, "client", c.clientId) // So we don't wait for too long before we send err := c.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) if err != nil { c.logger.Errorw("error while setting write deadline", "err", err) - return + break } writer, err := c.conn.NextWriter(websocket.BinaryMessage) if err != nil { c.logger.Warnw("error while getting NextWriter for client", "id", c.user.ID, "remoteAddr", c.addr, zap.Error(err)) - return + break } rawMessage, err := proto.Marshal(message) if err != nil { c.logger.Errorw("error while marshalling protobuf message", "err", err) - return + break } _, err = writer.Write(rawMessage) if err != nil { c.logger.Errorw("error while writing to the writer", "err", err) - return + break } // We need to close the writer so that our message // gets flushed to the client @@ -300,5 +322,4 @@ func (c *clientImpl) SendPump() { } c.logger.Debugw("exiting client send pump", "id", c.user.ID, "remoteAddr", c.addr) - c.conn.WriteMessage(websocket.CloseMessage, []byte{}) }