Skip to content

Commit

Permalink
Check CRC when forwarding RPC packets
Browse files Browse the repository at this point in the history
Dump last 16 packets on error
  • Loading branch information
alpinskiy committed Jan 13, 2025
1 parent 3a413d4 commit 535ebd9
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 90 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci-go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ jobs:
with:
go-version: 1.22.x
cache: true
- run: apt-get install libpcap-dev
- uses: golangci/golangci-lint-action@v6
with:
version: v1.60
Expand Down
17 changes: 15 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
],
},
{
"name": "proxy_rpc_test",
"name": "proxy rpc test",
"type": "go",
"request": "launch",
"mode": "auto",
Expand All @@ -162,14 +162,27 @@
},
},
{
"name": "TestForwardPacket",
"name": "test forward packet",
"type": "go",
"request": "launch",
"mode": "test",
"program": "${workspaceFolder}/internal/vkgo/rpc/statshouse_test.go",
"args": [
"-test.run=TestForwardPacket",
],
},
{
"name": "test play PCAP",
"type": "go",
"request": "launch",
"mode": "test",
"program": "${workspaceFolder}/internal/vkgo/rpc/statshouse_test.go",
"args": [
"-test.run=TestPlayPcap",
],
"env": {
"STATSHOUSE_TEST_PLAY_PCAP_FILE_PATH":"",
},
}
]
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/gopacket v1.1.19
github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 // indirect
github.com/hashicorp/consul/api v1.12.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
Expand Down
66 changes: 34 additions & 32 deletions internal/aggregator/ingress_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,57 +395,56 @@ func (p *proxyConn) run() {
defer p.group.Done()
defer p.clientConn.Close()
// handshake client
_, _, err := p.clientConn.HandshakeServer(p.serverKeys, p.serverOpts.TrustedSubnetGroups, false, p.startTime, rpc.DefaultPacketTimeout)
if err != nil {
p.logClientError("handshake", err)
return
}
// initialize "__rpc_request_size" tags
_, _, err := p.clientConn.HandshakeServer(p.serverKeys, p.serverOpts.TrustedSubnetGroups, true, p.startTime, rpc.DefaultPacketTimeout)
cryptoKeyID := p.clientConn.KeyID()
p.clientCryptoKeyID = int32(binary.BigEndian.Uint32(cryptoKeyID[:4]))
p.clientProtocolVersion = int32(p.clientConn.ProtocolVersion())
if err != nil {
p.logClientError("handshake", err, rpc.PacketHeaderCircularBuffer{})
return
}
// read first request to get shardReplica
var req proxyRequest
var firstReq proxyRequest
for {
req, err = p.readRequest()
firstReq, err = p.readRequest()
if err != nil {
return
}
if p.ctx.Err() != nil {
return // server shutdown
}
if req.tip == rpcInvokeReqHeaderTLTag {
if firstReq.tip == rpcInvokeReqHeaderTLTag {
break
}
p.rareLog("Client skip #%d looking for invoke request, addr %v\n", req.tip, p.clientConn.RemoteAddr())
log.Printf("Client skip #%d looking for invoke request, addr %v\n", firstReq.tip, p.clientConn.RemoteAddr())
}
shardReplica := req.shardReplica(p)
shardReplica := firstReq.shardReplica(p)
upstreamAddr := p.agent.GetConfigResult.Addresses[shardReplica]
p.rareLog("Connect shard replica %d, addr %v < %v\n", shardReplica, p.clientConn.LocalAddr(), p.clientConn.RemoteAddr())
defer p.rareLog("Disconnect shard replica %d, addr %v < %v\n", shardReplica, p.clientConn.LocalAddr(), p.clientConn.RemoteAddr())
// connect upstream
upstreamConn, err := net.DialTimeout("tcp", upstreamAddr, rpc.DefaultPacketTimeout)
if err != nil {
log.Printf("error connect, upstream addr %s: %v\n", upstreamAddr, err)
_ = req.WriteReponseAndFlush(p.clientConn, err)
_ = firstReq.WriteReponseAndFlush(p.clientConn, err)
return
}
defer upstreamConn.Close()
p.upstreamConn = rpc.NewPacketConn(upstreamConn, rpc.DefaultClientConnReadBufSize, rpc.DefaultClientConnWriteBufSize)
err = p.upstreamConn.HandshakeClient(p.clientOpts.CryptoKey, p.clientOpts.TrustedSubnetGroups, false, p.uniqueStartTime.Dec(), 0, rpc.DefaultPacketTimeout, rpc.LatestProtocolVersion)
if err != nil {
p.logUpstreamError("handshake", err)
_ = req.WriteReponseAndFlush(p.clientConn, err)
p.logUpstreamError("handshake", err, rpc.PacketHeaderCircularBuffer{})
_ = firstReq.WriteReponseAndFlush(p.clientConn, err)
return
}
// process first request
res := req.process(p)
if res.Error() != nil {
firstReqRes := firstReq.process(p)
if firstReqRes.Error() != nil {
return
}
// serve
var ctx = p.ctx
var gracefulShutdown bool
gracefulShutdown := firstReqRes.ClientWantsFin
for { // two iterations at most, the latter is graceful shutdown
var respLoop sync.WaitGroup
var respLoopRes rpc.ForwardPacketsResult
Expand All @@ -456,12 +455,15 @@ func (p *proxyConn) run() {
}()
reqLoopRes := p.requestLoop(ctx)
respLoop.Wait()
if gracefulShutdown || res.ClientWantsFin || reqLoopRes.ClientWantsFin || respLoopRes.ServerWantsFin || reqLoopRes.Error() != nil || respLoopRes.Error() != nil {
if !gracefulShutdown {
gracefulShutdown = reqLoopRes.ClientWantsFin || respLoopRes.ServerWantsFin
}
if gracefulShutdown || reqLoopRes.Error() != nil || respLoopRes.Error() != nil {
return // either graceful shutdown already attempted or error occurred
}
gracefulShutdown = true
if err = p.clientConn.WritePacket(rpcServerWantsFinTLTag, nil, rpc.DefaultPacketTimeout); err != nil {
p.logClientError("write fin", err)
p.logClientError("write fin", err, rpc.PacketHeaderCircularBuffer{})
return
}
// no timeout for connection graceful shutdown (has server level shutdown timeout)
Expand Down Expand Up @@ -494,10 +496,10 @@ func (p *proxyConn) responseLoop(ctx context.Context) rpc.ForwardPacketsResult {
res := rpc.ForwardPackets(ctx, p.clientConn, p.upstreamConn)
if err := res.Error(); err != nil {
if res.ReadErr != nil {
p.logUpstreamError("read", res.ReadErr)
p.logUpstreamError("read", res.ReadErr, res.PacketHeaderCircularBuffer)
}
if res.WriteErr != nil {
p.logClientError("write", res.WriteErr)
p.logClientError("write", res.WriteErr, res.PacketHeaderCircularBuffer)
}
p.clientConn.ShutdownWrite()
}
Expand All @@ -507,7 +509,7 @@ func (p *proxyConn) responseLoop(ctx context.Context) rpc.ForwardPacketsResult {

func (p *proxyConn) readRequest() (req proxyRequest, err error) {
if req.tip, req.Request, err = p.clientConn.ReadPacket(p.reqBuf[:0], rpc.DefaultPacketTimeout); err != nil {
p.logClientError("read", err)
p.logClientError("read", err, rpc.PacketHeaderCircularBuffer{})
return proxyRequest{}, err
}
req.size = len(req.Request)
Expand All @@ -519,7 +521,7 @@ func (p *proxyConn) readRequest() (req proxyRequest, err error) {
p.reqBuf = req.Request // buffer reuse
}
if err = req.ParseInvokeReq(&p.serverOpts); err != nil {
p.logClientError("parse", err)
p.logClientError("parse", err, rpc.PacketHeaderCircularBuffer{})
return proxyRequest{}, err
}
requestTag := req.RequestTag()
Expand All @@ -537,11 +539,11 @@ func (p *proxyConn) readRequest() (req proxyRequest, err error) {
// pass
return req, nil
default:
p.logClientError("not supported request", err)
p.logClientError("not supported request", err, rpc.PacketHeaderCircularBuffer{})
return proxyRequest{}, rpc.ErrNoHandler
}
default:
p.logClientError("not supported packet", err)
p.logClientError("not supported packet", err, rpc.PacketHeaderCircularBuffer{})
return proxyRequest{}, rpc.ErrNoHandler
}
}
Expand All @@ -563,26 +565,26 @@ func (p *proxyConn) reportRequestSize(req *proxyRequest) {
p.agent.AddValueCounter(&key, float64(req.size), 1, format.BuiltinMetricMetaRPCRequests)
}

func (p *proxyConn) logClientError(tag string, err error) {
func (p *proxyConn) logClientError(tag string, err error, lastPackets rpc.PacketHeaderCircularBuffer) {
if err == nil || err == io.EOF {
return
}
var addr string
if p.clientConn != nil {
addr = p.clientConn.RemoteAddr()
}
log.Printf("error %s, client addr %s, key 0x%X: %v\n", tag, addr, p.clientCryptoKeyID, err)
log.Printf("error %s, client addr %s, version %d, key 0x%X: %v, %s\n", tag, addr, p.clientProtocolVersion, p.clientCryptoKeyID, err, lastPackets.String())
}

func (p *proxyConn) logUpstreamError(tag string, err error) {
func (p *proxyConn) logUpstreamError(tag string, err error, lastPackets rpc.PacketHeaderCircularBuffer) {
if err == nil || err == io.EOF {
return
}
var addr string
if p.upstreamConn != nil {
addr = p.upstreamConn.RemoteAddr()
}
log.Printf("error %s, upstream addr %s: %v\n", tag, addr, err)
log.Printf("error %s, upstream addr %s: %v, %s\n", tag, addr, err, lastPackets.String())
}

func (req *proxyRequest) process(p *proxyConn) (res rpc.ForwardPacketsResult) {
Expand All @@ -595,13 +597,13 @@ func (req *proxyRequest) process(p *proxyConn) (res rpc.ForwardPacketsResult) {
if _, err = args.ReadBoxed(req.Request); err == nil {
if args.Cluster != p.cluster {
err = fmt.Errorf("statshouse misconfiguration! cluster requested %q does not match actual cluster connected %q", args.Cluster, p.cluster)
p.logClientError("GetConfig2", err)
p.logClientError("GetConfig2", err, rpc.PacketHeaderCircularBuffer{})
} else {
req.Response, _ = args.WriteResult(req.Response[:0], p.config)
}
}
if err = req.WriteReponseAndFlush(p.clientConn, err); err != nil {
p.logClientError("write", err)
p.logClientError("write", err, rpc.PacketHeaderCircularBuffer{})
// not an error ("requestLoop" exits on request read-write errors only)
}
default:
Expand All @@ -621,7 +623,7 @@ func (req *proxyRequest) process(p *proxyConn) (res rpc.ForwardPacketsResult) {

func (req *proxyRequest) forwardAndFlush(p *proxyConn) error {
if err := req.ForwardAndFlush(p.upstreamConn, req.tip, rpc.DefaultPacketTimeout); err != nil {
p.logUpstreamError("write", err)
p.logUpstreamError("write", err, rpc.PacketHeaderCircularBuffer{})
return err
}
if cap(p.reqBuf) < cap(req.Request) {
Expand Down
12 changes: 6 additions & 6 deletions internal/vkgo/rpc/pingpong.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ func (pc *PacketConn) onPong(body []byte) error {
}
pc.pingMu.Lock()
defer pc.pingMu.Unlock()
if !pc.pingSent {
return fmt.Errorf("received unexpected pong %d without sending ping", pingID)
}
if pingID != pc.currentPingID {
return fmt.Errorf("received unexpected pong %d for ping %d", pingID, pc.currentPingID)
}
// if !pc.pingSent {
// return fmt.Errorf("received unexpected pong %d without sending ping", pingID)
// }
// if pingID != pc.currentPingID {
// return fmt.Errorf("received unexpected pong %d for ping %d", pingID, pc.currentPingID)
// }
pc.pingSent = false
return nil
}
Loading

0 comments on commit 535ebd9

Please sign in to comment.