Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully shutdown VTGate instances #79

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ type Conn struct {
// enableQueryInfo controls whether we parse the INFO field in QUERY_OK packets
// See: ConnParams.EnableQueryInfo
enableQueryInfo bool

// mu protects the fields below
mu sync.Mutex
// this is used to mark the connection to be closed so that the command phase for the connection can be stopped and
// the connection gets closed.
closing bool
}

// splitStatementFunciton is the function that is used to split the statement in case of a multi-statement query.
Expand Down Expand Up @@ -895,6 +901,11 @@ func (c *Conn) handleNextCommand(handler Handler) bool {
return false
}

// before continue to process the packet, check if the connection should be closed or not.
if c.IsMarkedForClose() {
return false
}

switch data[0] {
case ComQuit:
c.recycleReadPacket()
Expand Down Expand Up @@ -1581,3 +1592,21 @@ func (c *Conn) IsUnixSocket() bool {
func (c *Conn) GetRawConn() net.Conn {
return c.conn
}

// MarkForClose marks the connection for close.
func (c *Conn) MarkForClose() {
c.mu.Lock()
defer c.mu.Unlock()
c.closing = true
}

// IsMarkedForClose return true if the connection should be closed.
func (c *Conn) IsMarkedForClose() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.closing
}

func (c *Conn) IsShuttingDown() bool {
return c.listener.isShutdown()
}
3 changes: 2 additions & 1 deletion go/mysql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,8 @@ func (l *Listener) handle(conn net.Conn, connectionID uint32, acceptTime time.Ti

for {
kontinue := c.handleNextCommand(l.handler)
if !kontinue {
// before going for next command check if the connection should be closed or not.
if !kontinue || c.IsMarkedForClose() {
return
}
}
Expand Down
11 changes: 8 additions & 3 deletions go/vt/vtgate/plugin_mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ func startSpan(ctx context.Context, query, label string) (trace.Span, context.Co
}

func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) error {
session := vh.session(c)
if c.IsShuttingDown() && !session.InTransaction {
c.MarkForClose()
return mysql.NewSQLError(mysql.ERServerShutdown, mysql.SSNetError, "Server shutdown in progress")
}

ctx := context.Background()
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
Expand Down Expand Up @@ -209,7 +215,6 @@ func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sq
"VTGate MySQL Connector" /* subcomponent: part of the client */)
ctx = callerid.NewContext(ctx, ef, im)

session := vh.session(c)
if !session.InTransaction {
atomic.AddInt32(&busyConnections, 1)
}
Expand Down Expand Up @@ -536,11 +541,11 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys

func shutdownMysqlProtocolAndDrain() {
if mysqlListener != nil {
mysqlListener.Close()
mysqlListener.Shutdown()
mysqlListener = nil
}
if mysqlUnixListener != nil {
mysqlUnixListener.Close()
mysqlUnixListener.Shutdown()
mysqlUnixListener = nil
}
if sigChan != nil {
Expand Down