Skip to content

Commit

Permalink
Fixed nil query for server for unknown request + improved raw marshal…
Browse files Browse the repository at this point in the history
…ing of result
  • Loading branch information
Speshal71 committed Dec 26, 2024
1 parent bd0ec14 commit 6a8a6b4
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 27 deletions.
1 change: 1 addition & 0 deletions binpacket.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (pp *BinaryPacket) Reset() {
pp.packet.SchemaID = 0
pp.packet.requestID = 0
pp.packet.Result = nil
pp.packet.opts = PacketOpts{}
pp.body = pp.body[:0]
}

Expand Down
17 changes: 15 additions & 2 deletions packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"github.com/tinylib/msgp/msgp"
)

type PacketOpts struct {
asQuery bool
}

type Packet struct {
Cmd uint
LSN uint64
Expand All @@ -16,6 +20,15 @@ type Packet struct {
Timestamp time.Time
Request Query
Result *Result

opts PacketOpts
}

// AsQuery forces packet to be unmarshaled as query even if it's not supported.
func (pack *Packet) AsQuery() *Packet {
pack.opts.asQuery = true

return pack
}

func (pack *Packet) String() string {
Expand Down Expand Up @@ -114,7 +127,7 @@ func (pack *Packet) UnmarshalBinaryBody(data []byte) (buf []byte, err error) {
return unpackr(pack.Cmd^ErrorFlag, data)
}

if q := NewQuery(pack.Cmd); q != nil {
if q := NewQuery(pack.Cmd); IsKnownQuery(q) || pack.opts.asQuery {
return unpackq(q, data)
}
return unpackr(OKCommand, data)
Expand All @@ -128,7 +141,7 @@ func (pack *Packet) UnmarshalBinary(data []byte) error {

// UnmarshalMsg implements msgp.Unmarshaler
func (pack *Packet) UnmarshalMsg(data []byte) (buf []byte, err error) {
*pack = Packet{}
*pack = Packet{opts: pack.opts}

buf = data

Expand Down
2 changes: 1 addition & 1 deletion query.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ func NewQuery(cmd uint) Query {
case EvalCommand:
return &Eval{}
default:
return nil
return NewUnknownQuery(cmd)
}
}
47 changes: 23 additions & 24 deletions result.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package tarantool

import (
"fmt"

"github.com/tinylib/msgp/msgp"
)

type Result struct {
RawBytes []byte

ErrorCode uint
Error error
Data [][]interface{}
DataBytes []byte

marshaller msgp.Marshaler
}
Expand All @@ -29,6 +31,16 @@ func (r *Result) MarshalMsg(b []byte) (o []byte, err error) {
return r.marshaller.MarshalMsg(b)
}

// WithBytesMarshaller changes the marshaller for result serialization.
//
// Current implementation of unmarshaller may change structure of the result (e.g. in call17)
// if it's not array of tuples in which case it's forcefully wrapped. It also skips
// unknown keys. Therefore serialized sequence of bytes produced by the default marshaller
// is different from the incoming.
//
// Bytes marshaller on the other hand returns exactly the same array
// the result was successfully unmarshalled from (preserving all the keys of the body
// including unknown ones). But it won't reflect any manual changes of unmarshalled data.
func (r *Result) WithBytesMarshaller() *Result {
r.marshaller = bytesResultMarshaller{Result: r}
return r
Expand Down Expand Up @@ -64,22 +76,7 @@ type bytesResultMarshaller struct {
}

func (r bytesResultMarshaller) MarshalMsg(b []byte) (o []byte, err error) {
o = b
if r.Error != nil {
o = msgp.AppendMapHeader(o, 1)
o = msgp.AppendUint(o, KeyError)
o = msgp.AppendString(o, r.Error.Error())
} else {
o = msgp.AppendMapHeader(o, 1)
o = msgp.AppendUint(o, KeyData)
if len(r.DataBytes) != 0 {
o = append(o, r.DataBytes...)
} else {
o = msgp.AppendArrayHeader(o, 0)
}
}

return o, nil
return append(b, r.RawBytes...), nil
}

// UnmarshalMsg implements msgp.Unmarshaler
Expand All @@ -95,6 +92,15 @@ func (r *Result) UnmarshalMsg(data []byte) (buf []byte, err error) {
if len(buf) == 0 && r.ErrorCode == OKCommand {
return buf, nil
}

defer func() {
if err == nil {
rawPacketLength := len(data) - len(buf)
r.RawBytes = make([]byte, rawPacketLength)
copy(r.RawBytes, data[:rawPacketLength])
}
}()

l, buf, err = msgp.ReadMapHeaderBytes(buf)

if err != nil {
Expand All @@ -112,8 +118,6 @@ func (r *Result) UnmarshalMsg(data []byte) (buf []byte, err error) {
case KeyData:
var i, j uint32

bufData := buf

if dl, buf, err = msgp.ReadArrayHeaderBytes(buf); err != nil {
return
}
Expand Down Expand Up @@ -141,11 +145,6 @@ func (r *Result) UnmarshalMsg(data []byte) (buf []byte, err error) {
}
}

bufRead := len(bufData) - len(buf)
bufData = bufData[:bufRead]
r.DataBytes = make([]byte, len(bufData))
copy(r.DataBytes, bufData)

case KeyError:
errorMessage, buf, err = msgp.ReadStringBytes(buf)
if err != nil {
Expand Down
47 changes: 47 additions & 0 deletions result_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package tarantool

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestResultMarshaling(t *testing.T) {
var result Result

// The result of a call17 to:
// function a()
// return "a"
// end
tntBodyBytes := []byte{
0x81, // MP_MAP
0x30, // key IPROTO_DATA
0xdd, 0x0, 0x0, 0x0, 0x1, // MP_ARRAY
0xa1, 0x61, // string value "a"
}

expectedDefaultMarshalerBytes := []byte{
0x81, // MP_MAP
0x30, // key IPROTO_DATA
0x91, // MP_ARRAY
0x91, // MP_ARRAY
0xa1, 0x61, // string value "a"
}

buf, err := result.UnmarshalMsg(tntBodyBytes)
assert.NoError(t, err, "error unmarshaling result")
assert.Empty(t, buf, "unmarshaling residual buffer is not empty")

defaultMarshalerRes, err := result.MarshalMsg(nil)
assert.NoError(t, err, "error marshaling by default marshaller")
assert.Equal(
t,
expectedDefaultMarshalerBytes,
defaultMarshalerRes,
)

result.WithBytesMarshaller()
bytesMarshalerRes, err := result.MarshalMsg(nil)
assert.NoError(t, err, "error marshaling by bytes marshaller")
assert.Equal(t, tntBodyBytes, bytesMarshalerRes)
}
8 changes: 8 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ type IprotoServer struct {
schemaID uint64
wg sync.WaitGroup
getPingStatus func(*IprotoServer) uint

// asQueryServer forces incoming requests to be parsed as queries
asQueryServer bool
}

type IprotoServerOptions struct {
Perf PerfCount
GetPingStatus func(*IprotoServer) uint
AsQueryServer bool
}

func NewIprotoServer(uuid string, handler QueryHandler, onShutdown OnShutdownCallback) *IprotoServer {
Expand All @@ -58,6 +62,7 @@ func (s *IprotoServer) WithOptions(opts *IprotoServerOptions) *IprotoServer {
opts = &IprotoServerOptions{}
}
s.perf = opts.Perf
s.asQueryServer = opts.AsQueryServer
s.getPingStatus = opts.GetPingStatus
if s.getPingStatus == nil {
s.getPingStatus = func(*IprotoServer) uint { return 0 }
Expand Down Expand Up @@ -216,6 +221,9 @@ READER_LOOP:
wg.Add(1)
go func(pp *BinaryPacket) {
packet := &pp.packet
if s.asQueryServer {
packet.AsQuery()
}
defer wg.Done()

err := packet.UnmarshalBinary(pp.body)
Expand Down
36 changes: 36 additions & 0 deletions unknown_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package tarantool

type UnknownQuery struct {
cmd uint
data []byte
}

var _ Query = (*UnknownQuery)(nil)

func NewUnknownQuery(cmd uint) *UnknownQuery {
return &UnknownQuery{cmd: cmd}
}

func (q *UnknownQuery) GetCommandID() uint {
return q.cmd
}

func (q *UnknownQuery) MarshalMsg(b []byte) ([]byte, error) {
return append(b, q.data...), nil
}

// UnmarshalMsg saves all of the data into the query.
// So make sure it doesn't contain part of another packet.
func (q *UnknownQuery) UnmarshalMsg(data []byte) (buf []byte, err error) {
q.data = make([]byte, len(data))
copy(q.data, data)

return nil, nil
}

// IsKnownQuery returns true if passed query is known and supported.
func IsKnownQuery(q Query) bool {
_, unknown := q.(*UnknownQuery)

return q != nil && !unknown
}

0 comments on commit 6a8a6b4

Please sign in to comment.