-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipeline.go
104 lines (88 loc) · 2.15 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package brpc
import (
"context"
"net"
"time"
"google.golang.org/protobuf/proto"
brpcpb "github.com/tomwei7/brpc-go/genproto/brpc"
policypb "github.com/tomwei7/brpc-go/genproto/brpc/policy"
)
type frame struct {
RPCMeta policypb.RpcMeta
Body []byte
}
func newReqFrame(correlationID int64, reqMeta *policypb.RpcRequestMeta) *frame {
return &frame{
RPCMeta: policypb.RpcMeta{
Request: reqMeta,
CompressType: proto.Int32(int32(brpcpb.CompressType_COMPRESS_TYPE_NONE)),
CorrelationId: proto.Int64(correlationID),
},
}
}
func (f *frame) AttachmentSize() (size int) {
if f.RPCMeta.AttachmentSize != nil {
size = int(*f.RPCMeta.AttachmentSize)
}
return
}
func (f *frame) CompressType() brpcpb.CompressType {
compressType := brpcpb.CompressType_COMPRESS_TYPE_NONE
if f.RPCMeta.CompressType != nil {
compressType = brpcpb.CompressType(*f.RPCMeta.CompressType)
}
return compressType
}
func (f *frame) AppendMessage(message proto.Message) error {
data, err := proto.Marshal(message)
if err != nil {
return err
}
f.Body = append(f.Body, data...)
return nil
}
func (f *frame) AppendAttachment(attachment []byte) {
if len(attachment) == 0 {
return
}
f.RPCMeta.AttachmentSize = proto.Int32(int32(len(attachment)))
f.Body = append(f.Body, attachment...)
}
type pipeline struct {
enc *encodec
dec *decodec
conn net.Conn
}
func newPipeline(conn net.Conn) *pipeline {
enc := newEncodec(conn)
dec := newDecodec(conn)
return &pipeline{enc: enc, dec: dec, conn: conn}
}
func (p *pipeline) SendRequest(ctx context.Context, req *frame) error {
deadline, ok := ctx.Deadline()
if ok {
if time.Now().After(deadline) {
return context.DeadlineExceeded
}
if err := p.conn.SetWriteDeadline(deadline); err != nil {
return err
}
}
return p.enc.Encode(req)
}
func (p *pipeline) ReceiveResponse(ctx context.Context) (*frame, error) {
deadline, ok := ctx.Deadline()
if ok {
if time.Now().After(deadline) {
return nil, context.DeadlineExceeded
}
if err := p.conn.SetReadDeadline(deadline); err != nil {
return nil, err
}
}
f := new(frame)
return f, p.dec.Decode(f)
}
func (p *pipeline) Close() error {
return p.conn.Close()
}