From 760b1e65889aa950cd277cad54e3eed2a51453d0 Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Thu, 24 Oct 2024 11:07:04 -0700 Subject: [PATCH] rhp4: increase max size of transaction set responses --- rhp/v4/encoding.go | 11 +++++++---- rhp/v4/transport.go | 28 +++++++++++++++++++++++++--- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/rhp/v4/encoding.go b/rhp/v4/encoding.go index 71650515..79586659 100644 --- a/rhp/v4/encoding.go +++ b/rhp/v4/encoding.go @@ -104,7 +104,10 @@ func (r *RPCError) maxLen() int { return 1024 } -const reasonableObjectSize = 10 * 1024 +const ( + reasonableObjectSize = 10 * 1024 + reasonableTransactionSetSize = 100 * 1024 +) func sizeof(v types.EncoderTo) int { var buf bytes.Buffer @@ -213,7 +216,7 @@ func (r *RPCFormContractThirdResponse) decodeFrom(d *types.Decoder) { types.DecodeSlice(d, &r.TransactionSet) } func (r *RPCFormContractThirdResponse) maxLen() int { - return reasonableObjectSize + return reasonableTransactionSetSize } func (r *RPCRenewContractParams) encodeTo(e *types.Encoder) { @@ -283,7 +286,7 @@ func (r *RPCRenewContractThirdResponse) decodeFrom(d *types.Decoder) { types.DecodeSlice(d, &r.TransactionSet) } func (r *RPCRenewContractThirdResponse) maxLen() int { - return reasonableObjectSize + return reasonableTransactionSetSize } func (r *RPCRefreshContractParams) encodeTo(e *types.Encoder) { @@ -351,7 +354,7 @@ func (r *RPCRefreshContractThirdResponse) decodeFrom(d *types.Decoder) { types.DecodeSlice(d, &r.TransactionSet) } func (r *RPCRefreshContractThirdResponse) maxLen() int { - return reasonableObjectSize + return reasonableTransactionSetSize } func (r *RPCFreeSectorsRequest) encodeTo(e *types.Encoder) { diff --git a/rhp/v4/transport.go b/rhp/v4/transport.go index d7831788..f24cb285 100644 --- a/rhp/v4/transport.go +++ b/rhp/v4/transport.go @@ -1,7 +1,10 @@ package rhp import ( + "bytes" + "fmt" "io" + "log" "go.sia.tech/core/types" ) @@ -21,6 +24,9 @@ func withDecoder(r io.Reader, maxLen int, fn func(*types.Decoder)) error { // ReadID reads an RPC ID from the stream. func ReadID(r io.Reader) (id types.Specifier, err error) { err = withDecoder(r, 16, id.DecodeFrom) + if err == nil { + log.Println("read request id") + } return } @@ -28,37 +34,53 @@ func ReadID(r io.Reader) (id types.Specifier, err error) { func WriteRequest(w io.Writer, id types.Specifier, o Object) error { return withEncoder(w, func(e *types.Encoder) { id.EncodeTo(e) + fmt.Println("wrote request id") if o == nil { return } + fmt.Printf("writing request %T\n", o) o.encodeTo(e) }) } // ReadRequest reads a request from the stream. func ReadRequest(r io.Reader, o Object) error { - return withDecoder(r, o.maxLen(), o.decodeFrom) + return withDecoder(r, o.maxLen(), func(d *types.Decoder) { + o.decodeFrom(d) + fmt.Printf("read request %T\n", o) + }) } // WriteResponse writes a response to the stream. Note that RPCError implements // Object, and may be used as a response to any RPC. func WriteResponse(w io.Writer, o Object) error { - return withEncoder(w, func(e *types.Encoder) { + buf := bytes.NewBuffer(nil) + w = io.MultiWriter(w, buf) + err := withEncoder(w, func(e *types.Encoder) { _, isErr := o.(*RPCError) e.WriteBool(isErr) o.encodeTo(e) + fmt.Printf("wrote response %T\n", o) }) + fmt.Printf("write response %T: %v\n", o, buf.Bytes()) + return err } // ReadResponse reads a response from the stream into r. func ReadResponse(r io.Reader, o Object) error { - return withDecoder(r, (*RPCError)(nil).maxLen()+o.maxLen(), func(d *types.Decoder) { + buf := bytes.NewBuffer(nil) + r = io.TeeReader(r, buf) + err := withDecoder(r, (*RPCError)(nil).maxLen()+o.maxLen(), func(d *types.Decoder) { if d.ReadBool() { r := new(RPCError) r.decodeFrom(d) d.SetErr(r) + fmt.Printf("read response error\n") return } o.decodeFrom(d) + fmt.Printf("read response %T\n", o) }) + fmt.Printf("read response %T: %v\n", o, buf.Bytes()) + return err }