diff --git a/dstore/host.go b/dstore/host.go index 7648842..58a3397 100644 --- a/dstore/host.go +++ b/dstore/host.go @@ -108,82 +108,53 @@ func (host *Host) releaseConn(conn net.Conn) { } } -func (host *Host) execute(req *mc.Request) (resp *mc.Response, delta time.Duration, err error) { - now := time.Now() +func (host *Host) executeWithTimeout(req *mc.Request, timeout time.Duration) (resp *mc.Response, err error) { conn, err := host.getConn() if err != nil { return } + conn.SetDeadline(time.Now().Add(timeout)) + + var reason string + + defer func() { + if err != nil { + logger.Errorf("error occurred on %s, reason: %s, err: %s", host.Addr, reason, err.Error()) + if resp != nil { + resp.CleanBuffer() + } + conn.Close() + } else { + host.releaseConn(conn) + } + }() err = req.Write(conn) if err != nil { - logger.Infof("%s write request failed: %v", host.Addr, err) - conn.Close() + reason = "write request failed" return } resp = new(mc.Response) if req.NoReply { - host.releaseConn(conn) resp.Status = "STORED" - delta = time.Since(now) return } reader := bufio.NewReader(conn) if err = resp.Read(reader); err != nil { - logger.Infof("%s read response failed: %v", host.Addr, err) - conn.Close() - return nil, 0, err + reason = "read response failed" + return nil, err } if err = req.Check(resp); err != nil { - logger.Infof("%s unexpected response %s %v %v", - host.Addr, req, resp, err) - conn.Close() - return nil, 0, err + reason = fmt.Sprintf("unexpected response %v %v", + req, resp) + return nil, err } - - host.releaseConn(conn) - delta = time.Since(now) return } -func (host *Host) executeWithTimeout(req *mc.Request, timeout time.Duration) (resp *mc.Response, err error) { - var tmpErr error - var tmpResp *mc.Response - done := make(chan bool) - isTimeout := make(chan bool, 1) - - go func() { - var delta time.Duration - tmpResp, delta, tmpErr = host.execute(req) - select { - case done <- true: - case <-isTimeout: - logger.Infof("request %v to host %s return after timeout, use %d ms", - req, host.Addr, delta/1e6) - if tmpResp != nil { - tmpResp.CleanBuffer() - } - } - }() - - timer := time.NewTimer(timeout) - defer timer.Stop() - - select { - case <-done: - resp = tmpResp - err = tmpErr - case <-timer.C: - isTimeout <- true - err = fmt.Errorf("request %v timeout", req) - logger.Infof("request %v to host %s timeout", req, host.Addr) - } - return resp, err -} - func (host *Host) Len() int { return 0 } @@ -210,7 +181,7 @@ func (host *Host) Get(key string) (*mc.Item, error) { func (host *Host) GetMulti(keys []string) (map[string]*mc.Item, error) { req := &mc.Request{Cmd: "get", Keys: keys} - resp, _, err := host.execute(req) + resp, err := host.executeWithTimeout(req, time.Duration(proxyConf.ReadTimeoutMs)*time.Millisecond) if err != nil { return nil, err } @@ -221,7 +192,7 @@ func (host *Host) Append(key string, value []byte) (bool, error) { flag := 0 item := newItem(flag, value) req := &mc.Request{Cmd: "append", Keys: []string{key}, Item: item} - resp, _, err := host.execute(req) + resp, err := host.executeWithTimeout(req, time.Duration(proxyConf.ReadTimeoutMs)*time.Millisecond) item.Free() if err == nil { return resp.Status == "STORED", nil @@ -234,7 +205,7 @@ func (host *Host) Incr(key string, value int) (int, error) { flag := 0 item := newItem(flag, []byte(strconv.Itoa(value))) req := &mc.Request{Cmd: "incr", Keys: []string{key}, Item: item} - resp, _, err := host.execute(req) + resp, err := host.executeWithTimeout(req, time.Duration(proxyConf.ReadTimeoutMs)*time.Millisecond) item.Free() if err != nil { return 0, err @@ -244,7 +215,7 @@ func (host *Host) Incr(key string, value int) (int, error) { func (host *Host) Delete(key string) (bool, error) { req := &mc.Request{Cmd: "delete", Keys: []string{key}} - resp, _, err := host.execute(req) + resp, err := host.executeWithTimeout(req, time.Duration(proxyConf.ReadTimeoutMs)*time.Millisecond) if err == nil { return resp.Status == "DELETED", nil } else {