Skip to content

Commit

Permalink
set conn deadline
Browse files Browse the repository at this point in the history
(cherry picked from commit ae00132)
  • Loading branch information
ZHAO committed Dec 23, 2019
1 parent 9c57e62 commit c76743b
Showing 1 changed file with 26 additions and 55 deletions.
81 changes: 26 additions & 55 deletions dstore/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,82 +108,53 @@ func (host *Host) releaseConn(conn net.Conn) {
}
}

func (host *Host) execute(req *mc.Request) (resp *mc.Response, delta time.Duration, err error) {
now := time.Now()
func (host *Host) executeWithTimeout(req *mc.Request, timeout time.Duration) (resp *mc.Response, err error) {
conn, err := host.getConn()
if err != nil {
return
}
conn.SetDeadline(time.Now().Add(timeout))

var reason string

defer func() {
if err != nil {
logger.Errorf("error occurred on %s, reason: %s, err: %s", host.Addr, reason, err.Error())
if resp != nil {
resp.CleanBuffer()
}
conn.Close()
} else {
host.releaseConn(conn)
}
}()

err = req.Write(conn)
if err != nil {
logger.Infof("%s write request failed: %v", host.Addr, err)
conn.Close()
reason = "write request failed"
return
}

resp = new(mc.Response)
if req.NoReply {
host.releaseConn(conn)
resp.Status = "STORED"
delta = time.Since(now)
return
}

reader := bufio.NewReader(conn)
if err = resp.Read(reader); err != nil {
logger.Infof("%s read response failed: %v", host.Addr, err)
conn.Close()
return nil, 0, err
reason = "read response failed"
return nil, err
}

if err = req.Check(resp); err != nil {
logger.Infof("%s unexpected response %s %v %v",
host.Addr, req, resp, err)
conn.Close()
return nil, 0, err
reason = fmt.Sprintf("unexpected response %v %v",
req, resp)
return nil, err
}

host.releaseConn(conn)
delta = time.Since(now)
return
}

func (host *Host) executeWithTimeout(req *mc.Request, timeout time.Duration) (resp *mc.Response, err error) {
var tmpErr error
var tmpResp *mc.Response
done := make(chan bool)
isTimeout := make(chan bool, 1)

go func() {
var delta time.Duration
tmpResp, delta, tmpErr = host.execute(req)
select {
case done <- true:
case <-isTimeout:
logger.Infof("request %v to host %s return after timeout, use %d ms",
req, host.Addr, delta/1e6)
if tmpResp != nil {
tmpResp.CleanBuffer()
}
}
}()

timer := time.NewTimer(timeout)
defer timer.Stop()

select {
case <-done:
resp = tmpResp
err = tmpErr
case <-timer.C:
isTimeout <- true
err = fmt.Errorf("request %v timeout", req)
logger.Infof("request %v to host %s timeout", req, host.Addr)
}
return resp, err
}

func (host *Host) Len() int {
return 0
}
Expand All @@ -210,7 +181,7 @@ func (host *Host) Get(key string) (*mc.Item, error) {

func (host *Host) GetMulti(keys []string) (map[string]*mc.Item, error) {
req := &mc.Request{Cmd: "get", Keys: keys}
resp, _, err := host.execute(req)
resp, err := host.executeWithTimeout(req, time.Duration(proxyConf.ReadTimeoutMs)*time.Millisecond)
if err != nil {
return nil, err
}
Expand All @@ -221,7 +192,7 @@ func (host *Host) Append(key string, value []byte) (bool, error) {
flag := 0
item := newItem(flag, value)
req := &mc.Request{Cmd: "append", Keys: []string{key}, Item: item}
resp, _, err := host.execute(req)
resp, err := host.executeWithTimeout(req, time.Duration(proxyConf.ReadTimeoutMs)*time.Millisecond)
item.Free()
if err == nil {
return resp.Status == "STORED", nil
Expand All @@ -234,7 +205,7 @@ func (host *Host) Incr(key string, value int) (int, error) {
flag := 0
item := newItem(flag, []byte(strconv.Itoa(value)))
req := &mc.Request{Cmd: "incr", Keys: []string{key}, Item: item}
resp, _, err := host.execute(req)
resp, err := host.executeWithTimeout(req, time.Duration(proxyConf.ReadTimeoutMs)*time.Millisecond)
item.Free()
if err != nil {
return 0, err
Expand All @@ -244,7 +215,7 @@ func (host *Host) Incr(key string, value int) (int, error) {

func (host *Host) Delete(key string) (bool, error) {
req := &mc.Request{Cmd: "delete", Keys: []string{key}}
resp, _, err := host.execute(req)
resp, err := host.executeWithTimeout(req, time.Duration(proxyConf.ReadTimeoutMs)*time.Millisecond)
if err == nil {
return resp.Status == "DELETED", nil
} else {
Expand Down

0 comments on commit c76743b

Please sign in to comment.