From d4b5d04d45b12e7f77461468e01f7830e9269e94 Mon Sep 17 00:00:00 2001 From: Dmitriy Gertsog Date: Wed, 2 Oct 2024 17:45:02 +0300 Subject: [PATCH] api: add support of a batch insert request Add support the IPROTO_INSERT_ARROW request and message pack type MP_ARROW. Closes #399 --- CHANGELOG.md | 7 +- arrow/arrow.go | 56 ++++++++++++ arrow/arrow_test.go | 100 ++++++++++++++++++++++ arrow/example_test.go | 61 +++++++++++++ arrow/request.go | 93 ++++++++++++++++++++ arrow/request_test.go | 146 ++++++++++++++++++++++++++++++++ arrow/tarantool_test.go | 120 ++++++++++++++++++++++++++ arrow/testdata/config-memcs.lua | 31 +++++++ arrow/testdata/config-memtx.lua | 35 ++++++++ request.go | 23 ++--- request_test.go | 83 +++++++++++++++++- response.go | 6 ++ response_test.go | 55 ++++++++++++ watch.go | 6 +- 14 files changed, 802 insertions(+), 20 deletions(-) create mode 100644 arrow/arrow.go create mode 100644 arrow/arrow_test.go create mode 100644 arrow/example_test.go create mode 100644 arrow/request.go create mode 100644 arrow/request_test.go create mode 100644 arrow/tarantool_test.go create mode 100644 arrow/testdata/config-memcs.lua create mode 100644 arrow/testdata/config-memtx.lua create mode 100644 response_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index a2f9983e..b0eb0f9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,12 +9,13 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ## [Unreleased] ### Added -- Add err log to `ConnectionPool.Add()` in case, when unable to establish - connection and ctx is not canceled; - also added logs for error case of `ConnectionPool.tryConnect()` calls in +- Add err log to `ConnectionPool.Add()` in case, when unable to establish + connection and ctx is not canceled; + also added logs for error case of `ConnectionPool.tryConnect()` calls in `ConnectionPool.controller()` and `ConnectionPool.reconnect()` - Methods that are implemented but not included in the pooler interface (#395). - Implemented stringer methods for pool.Role (#405). +- Support the IPROTO_INSERT_ARROW request (#399). ### Changed diff --git a/arrow/arrow.go b/arrow/arrow.go new file mode 100644 index 00000000..aaeaccca --- /dev/null +++ b/arrow/arrow.go @@ -0,0 +1,56 @@ +package arrow + +import ( + "fmt" + "reflect" + + "github.com/vmihailenco/msgpack/v5" +) + +// Arrow MessagePack extension type. +const arrowExtId = 8 + +// Arrow struct wraps a raw arrow data buffer. +type Arrow struct { + data []byte +} + +// MakeArrow returns a new arrow.Arrow object that contains +// wrapped a raw arrow data buffer. +func MakeArrow(arrow []byte) (Arrow, error) { + return Arrow{arrow}, nil +} + +// Raw returns a []byte that contains Arrow raw data. +func (a Arrow) Raw() []byte { + return a.data +} + +func arrowDecoder(d *msgpack.Decoder, v reflect.Value, extLen int) error { + arrow := Arrow{ + data: make([]byte, extLen), + } + n, err := d.Buffered().Read(arrow.data) + if err != nil { + return fmt.Errorf("arrowDecoder: can't read bytes on Arrow decode: %w", err) + } + if n < extLen || n != len(arrow.data) { + return fmt.Errorf("arrowDecoder: unexpected end of stream after %d Arrow bytes", n) + } + + v.Set(reflect.ValueOf(arrow)) + return nil +} + +func arrowEncoder(e *msgpack.Encoder, v reflect.Value) ([]byte, error) { + arr, ok := v.Interface().(Arrow) + if !ok { + return []byte{}, fmt.Errorf("arrowEncoder: not an Arrow type") + } + return arr.data, nil +} + +func init() { + msgpack.RegisterExtDecoder(arrowExtId, Arrow{}, arrowDecoder) + msgpack.RegisterExtEncoder(arrowExtId, Arrow{}, arrowEncoder) +} diff --git a/arrow/arrow_test.go b/arrow/arrow_test.go new file mode 100644 index 00000000..5e8b440c --- /dev/null +++ b/arrow/arrow_test.go @@ -0,0 +1,100 @@ +package arrow_test + +import ( + "bytes" + "encoding/hex" + "testing" + + "github.com/stretchr/testify/require" + "github.com/tarantool/go-tarantool/v2/arrow" + "github.com/vmihailenco/msgpack/v5" +) + +var longArrow, _ = hex.DecodeString("ffffffff70000000040000009effffff0400010004000000" + + "b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" + + "000004000000f0ffffff4000000001000000610000000600080004000c0010000400" + + "080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" + + "0000040000008affffff0400030010000000080000000000000000000000acffffff" + + "01000000000000003400000008000000000000000200000000000000000000000000" + + "00000000000000000000000000000800000000000000000000000100000001000000" + + "0000000000000000000000000a00140004000c0010000c0014000400060008000c00" + + "00000000000000000000") + +var tests = []struct { + name string + arr []byte + enc []byte +}{ + { + "abc", + []byte{'a', 'b', 'c'}, + []byte{0xc7, 0x3, 0x8, 'a', 'b', 'c'}, + }, + { + "empty", + []byte{}, + []byte{0xc7, 0x0, 0x8}, + }, + { + "one", + []byte{1}, + []byte{0xd4, 0x8, 0x1}, + }, + { + "long", + longArrow, + []byte{ + 0xc8, 0x1, 0x10, 0x8, 0xff, 0xff, 0xff, 0xff, 0x70, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, + 0x0, 0x9e, 0xff, 0xff, 0xff, 0x4, 0x0, 0x1, 0x0, 0x4, 0x0, 0x0, 0x0, 0xb6, 0xff, 0xff, + 0xff, 0xc, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, + 0x4, 0x0, 0x0, 0x0, 0xda, 0xff, 0xff, 0xff, 0x14, 0x0, 0x0, 0x0, 0x2, 0x2, 0x0, 0x0, + 0x4, 0x0, 0x0, 0x0, 0xf0, 0xff, 0xff, 0xff, 0x40, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, + 0x61, 0x0, 0x0, 0x0, 0x6, 0x0, 0x8, 0x0, 0x4, 0x0, 0xc, 0x0, 0x10, 0x0, 0x4, 0x0, 0x8, + 0x0, 0x9, 0x0, 0xc, 0x0, 0xc, 0x0, 0xc, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x8, 0x0, + 0xa, 0x0, 0xc, 0x0, 0x4, 0x0, 0x6, 0x0, 0x8, 0x0, 0xff, 0xff, 0xff, 0xff, 0x88, 0x0, + 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x8a, 0xff, 0xff, 0xff, 0x4, 0x0, 0x3, 0x0, 0x10, 0x0, + 0x0, 0x0, 0x8, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xac, 0xff, 0xff, + 0xff, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x34, 0x0, 0x0, 0x0, 0x8, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x8, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa, 0x0, 0x14, 0x0, + 0x4, 0x0, 0xc, 0x0, 0x10, 0x0, 0xc, 0x0, 0x14, 0x0, 0x4, 0x0, 0x6, 0x0, 0x8, 0x0, 0xc, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + }, + }, +} + +func TestEncodeArrow(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + buf := bytes.NewBuffer([]byte{}) + enc := msgpack.NewEncoder(buf) + + arr, err := arrow.MakeArrow(tt.arr) + require.NoError(t, err) + + err = enc.Encode(arr) + require.NoError(t, err) + + require.Equal(t, tt.enc, buf.Bytes()) + }) + + } +} + +func TestDecodeArrow(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + buf := bytes.NewBuffer(tt.enc) + dec := msgpack.NewDecoder(buf) + + var arr arrow.Arrow + err := dec.Decode(&arr) + require.NoError(t, err) + + require.Equal(t, tt.arr, arr.Raw()) + }) + } +} diff --git a/arrow/example_test.go b/arrow/example_test.go new file mode 100644 index 00000000..e85d195c --- /dev/null +++ b/arrow/example_test.go @@ -0,0 +1,61 @@ +// Run Tarantool Enterprise Edition instance before example execution: +// +// Terminal 1: +// $ cd arrow +// $ TEST_TNT_WORK_DIR=$(mktemp -d -t 'tarantool.XXX') tarantool testdata/config-memcs.lua +// +// Terminal 2: +// $ go test -v example_test.go +package arrow_test + +import ( + "context" + "encoding/hex" + "fmt" + "log" + "time" + + "github.com/tarantool/go-tarantool/v2" + "github.com/tarantool/go-tarantool/v2/arrow" +) + +var arrowBinData, _ = hex.DecodeString("ffffffff70000000040000009effffff0400010004000000" + + "b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" + + "000004000000f0ffffff4000000001000000610000000600080004000c0010000400" + + "080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" + + "0000040000008affffff0400030010000000080000000000000000000000acffffff" + + "01000000000000003400000008000000000000000200000000000000000000000000" + + "00000000000000000000000000000800000000000000000000000100000001000000" + + "0000000000000000000000000a00140004000c0010000c0014000400060008000c00" + + "00000000000000000000") + +func Example() { + dialer := tarantool.NetDialer{ + Address: "127.0.0.1:3013", + User: "test", + Password: "test", + } + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + client, err := tarantool.Connect(ctx, dialer, tarantool.Opts{}) + cancel() + if err != nil { + log.Fatalf("Failed to connect: %s", err) + } + + arr, err := arrow.MakeArrow(arrowBinData) + if err != nil { + log.Fatalf("Failed prepare Arrow data: %s", err) + } + + req := arrow.NewInsertRequest("testArrow", arr) + + resp, err := client.Do(req).Get() + if err != nil { + log.Fatalf("Failed insert Arrow: %s", err) + } + if len(resp) > 0 { + log.Fatalf("Unexpected response") + } else { + fmt.Printf("Batch arrow inserted") + } +} diff --git a/arrow/request.go b/arrow/request.go new file mode 100644 index 00000000..2b6a9e29 --- /dev/null +++ b/arrow/request.go @@ -0,0 +1,93 @@ +package arrow + +import ( + "context" + "io" + + "github.com/tarantool/go-iproto" + "github.com/tarantool/go-tarantool/v2" + "github.com/vmihailenco/msgpack/v5" +) + +// INSERT Arrow request. +// +// FIXME: replace with iproto.IPROTO_INSERT_ARROW when iproto will released. +// https://github.com/tarantool/go-tarantool/issues/412 +const iprotoInsertArrowType = iproto.Type(17) + +// The data in Arrow format. +// +// FIXME: replace with iproto.IPROTO_ARROW when iproto will released. +// https://github.com/tarantool/go-tarantool/issues/412 +const iprotoArrowKey = iproto.Key(0x36) + +// InsertRequest helps you to create an insert request object for execution +// by a Connection. +type InsertRequest struct { + arrow Arrow + space interface{} + ctx context.Context +} + +// NewInsertRequest returns a new InsertRequest. +func NewInsertRequest(space interface{}, arrow Arrow) *InsertRequest { + return &InsertRequest{ + space: space, + arrow: arrow, + } +} + +// Type returns a IPROTO_INSERT_ARROW type for the request. +func (r *InsertRequest) Type() iproto.Type { + return iprotoInsertArrowType +} + +// Async returns false to the request return a response. +func (r *InsertRequest) Async() bool { + return false +} + +// Ctx returns a context of the request. +func (r *InsertRequest) Ctx() context.Context { + return r.ctx +} + +// Context sets a passed context to the request. +// +// Pay attention that when using context with request objects, +// the timeout option for Connection does not affect the lifetime +// of the request. For those purposes use context.WithTimeout() as +// the root context. +func (r *InsertRequest) Context(ctx context.Context) *InsertRequest { + r.ctx = ctx + return r +} + +// Arrow sets the arrow for insertion the insert arrow request. +// Note: default value is nil. +func (r *InsertRequest) Arrow(arrow Arrow) *InsertRequest { + r.arrow = arrow + return r +} + +// Body fills an msgpack.Encoder with the insert arrow request body. +func (r *InsertRequest) Body(res tarantool.SchemaResolver, enc *msgpack.Encoder) error { + if err := enc.EncodeMapLen(2); err != nil { + return err + } + if err := tarantool.EncodeSpace(res, enc, r.space); err != nil { + return err + } + if err := enc.EncodeUint(uint64(iprotoArrowKey)); err != nil { + return err + } + return enc.Encode(r.arrow) +} + +// Response creates a response for the InsertRequest. +func (r *InsertRequest) Response( + header tarantool.Header, + body io.Reader, +) (tarantool.Response, error) { + return tarantool.DecodeBaseResponse(header, body) +} diff --git a/arrow/request_test.go b/arrow/request_test.go new file mode 100644 index 00000000..5cddaefb --- /dev/null +++ b/arrow/request_test.go @@ -0,0 +1,146 @@ +package arrow_test + +import ( + "bytes" + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "github.com/tarantool/go-iproto" + "github.com/tarantool/go-tarantool/v2" + "github.com/tarantool/go-tarantool/v2/arrow" + "github.com/vmihailenco/msgpack/v5" +) + +// INSERT Arrow request. +// +// FIXME: replace with iproto.IPROTO_INSERT_ARROW when iproto will released. +// https://github.com/tarantool/go-tarantool/issues/412 +const iprotoInsertArrowType = iproto.Type(17) + +const validSpace uint32 = 1 // Any valid value != default. + +func TestInsertRequestType(t *testing.T) { + request := arrow.NewInsertRequest(validSpace, arrow.Arrow{}) + require.Equal(t, iprotoInsertArrowType, request.Type()) +} + +func TestInsertRequestAsync(t *testing.T) { + request := arrow.NewInsertRequest(validSpace, arrow.Arrow{}) + require.Equal(t, false, request.Async()) +} + +func TestInsertRequestCtx_default(t *testing.T) { + request := arrow.NewInsertRequest(validSpace, arrow.Arrow{}) + require.Equal(t, nil, request.Ctx()) +} + +func TestInsertRequestCtx_setter(t *testing.T) { + ctx := context.Background() + request := arrow.NewInsertRequest(validSpace, arrow.Arrow{}).Context(ctx) + require.Equal(t, ctx, request.Ctx()) +} + +func TestResponseDecode(t *testing.T) { + header := tarantool.Header{} + buf := bytes.NewBuffer([]byte{}) + enc := msgpack.NewEncoder(buf) + + enc.EncodeMapLen(1) + enc.EncodeUint8(uint8(iproto.IPROTO_DATA)) + enc.Encode([]interface{}{'v', '2'}) + + request := arrow.NewInsertRequest(validSpace, arrow.Arrow{}) + resp, err := request.Response(header, bytes.NewBuffer(buf.Bytes())) + require.NoError(t, err) + require.Equal(t, header, resp.Header()) + + decodedInterface, err := resp.Decode() + require.NoError(t, err) + require.Equal(t, []interface{}{'v', '2'}, decodedInterface) +} + +func TestResponseDecodeTyped(t *testing.T) { + header := tarantool.Header{} + buf := bytes.NewBuffer([]byte{}) + enc := msgpack.NewEncoder(buf) + + enc.EncodeMapLen(1) + enc.EncodeUint8(uint8(iproto.IPROTO_DATA)) + enc.EncodeBytes([]byte{'v', '2'}) + + request := arrow.NewInsertRequest(validSpace, arrow.Arrow{}) + resp, err := request.Response(header, bytes.NewBuffer(buf.Bytes())) + require.NoError(t, err) + require.Equal(t, header, resp.Header()) + + var decoded []byte + err = resp.DecodeTyped(&decoded) + require.NoError(t, err) + require.Equal(t, []byte{'v', '2'}, decoded) +} + +type stubSchemeResolver struct { + space interface{} +} + +func (r stubSchemeResolver) ResolveSpace(s interface{}) (uint32, error) { + if id, ok := r.space.(uint32); ok { + return id, nil + } + if _, ok := r.space.(string); ok { + return 0, nil + } + return 0, fmt.Errorf("stub error message: %v", r.space) +} + +func (stubSchemeResolver) ResolveIndex(i interface{}, spaceNo uint32) (uint32, error) { + return 0, nil +} + +func (r stubSchemeResolver) NamesUseSupported() bool { + _, ok := r.space.(string) + return ok +} + +func TestInsertRequestDefaultValues(t *testing.T) { + buf := bytes.NewBuffer([]byte{}) + enc := msgpack.NewEncoder(buf) + + resolver := stubSchemeResolver{validSpace} + req := arrow.NewInsertRequest(resolver.space, arrow.Arrow{}) + err := req.Body(&resolver, enc) + require.NoError(t, err) + + require.Equal(t, []byte{0x82, 0x10, 0x1, 0x36, 0xc7, 0x0, 0x8}, buf.Bytes()) +} + +func TestInsertRequestSpaceByName(t *testing.T) { + buf := bytes.NewBuffer([]byte{}) + enc := msgpack.NewEncoder(buf) + + resolver := stubSchemeResolver{"valid"} + req := arrow.NewInsertRequest(resolver.space, arrow.Arrow{}) + err := req.Body(&resolver, enc) + require.NoError(t, err) + + require.Equal(t, + []byte{0x82, 0x5e, 0xa5, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x36, 0xc7, 0x0, 0x8}, + buf.Bytes()) +} + +func TestInsertRequestSetters(t *testing.T) { + buf := bytes.NewBuffer([]byte{}) + enc := msgpack.NewEncoder(buf) + + arr, err := arrow.MakeArrow([]byte{'a', 'b', 'c'}) + require.NoError(t, err) + + resolver := stubSchemeResolver{validSpace} + req := arrow.NewInsertRequest(resolver.space, arr) + err = req.Body(&resolver, enc) + require.NoError(t, err) + + require.Equal(t, []byte{0x82, 0x10, 0x1, 0x36, 0xc7, 0x3, 0x8, 'a', 'b', 'c'}, buf.Bytes()) +} diff --git a/arrow/tarantool_test.go b/arrow/tarantool_test.go new file mode 100644 index 00000000..2ff29fc8 --- /dev/null +++ b/arrow/tarantool_test.go @@ -0,0 +1,120 @@ +package arrow_test + +import ( + "encoding/hex" + "log" + "os" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tarantool/go-tarantool/v2" + "github.com/tarantool/go-tarantool/v2/arrow" + "github.com/tarantool/go-tarantool/v2/test_helpers" +) + +var isArrowSupported = false + +var server = "127.0.0.1:3013" +var dialer = tarantool.NetDialer{ + Address: server, + User: "test", + Password: "test", +} +var space = "testArrow" + +var opts = tarantool.Opts{ + Timeout: 5 * time.Second, +} + +// TestInsert uses Arrow sequence from Tarantool's test. +// See: https://github.com/tarantool/tarantool/blob/d628b71bc537a75b69c253f45ec790462cf1a5cd/test/box-luatest/gh_10508_iproto_insert_arrow_test.lua#L56 +func TestInsert_invalid(t *testing.T) { + arrows := []struct { + arrow string + expected string + }{ + { + "", + "Failed to decode Arrow IPC data", + }, + { + "00", + "Failed to decode Arrow IPC data", + }, + { + "ffffffff70000000040000009effffff0400010004000000" + + "b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" + + "000004000000f0ffffff4000000001000000610000000600080004000c0010000400" + + "080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" + + "0000040000008affffff0400030010000000080000000000000000000000acffffff" + + "01000000000000003400000008000000000000000200000000000000000000000000" + + "00000000000000000000000000000800000000000000000000000100000001000000" + + "0000000000000000000000000a00140004000c0010000c0014000400060008000c00" + + "00000000000000000000", + "memtx does not support arrow format", + }, + } + + conn := test_helpers.ConnectWithValidation(t, dialer, opts) + defer conn.Close() + + for i, a := range arrows { + t.Run(strconv.Itoa(i), func(t *testing.T) { + data, err := hex.DecodeString(a.arrow) + require.NoError(t, err) + + arr, err := arrow.MakeArrow(data) + if err != nil { + require.ErrorContains(t, err, a.expected) + return + } + req := arrow.NewInsertRequest(space, arr) + + _, err = conn.Do(req).Get() + require.ErrorContains(t, err, a.expected) + }) + } + +} + +// runTestMain is a body of TestMain function +// (see https://pkg.go.dev/testing#hdr-Main). +// Using defer + os.Exit is not works so TestMain body +// is a separate function, see +// https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls +func runTestMain(m *testing.M) int { + isLess, err := test_helpers.IsTarantoolVersionLess(3, 3, 0) + if err != nil { + log.Fatalf("Failed to extract Tarantool version: %s", err) + } + isArrowSupported = !isLess + + if !isArrowSupported { + log.Println("Skipping insert Arrow tests...") + return 0 + } + + instance, err := test_helpers.StartTarantool(test_helpers.StartOpts{ + Dialer: dialer, + InitScript: "testdata/config-memtx.lua", + Listen: server, + WaitStart: 100 * time.Millisecond, + ConnectRetry: 10, + RetryTimeout: 500 * time.Millisecond, + }) + defer test_helpers.StopTarantoolWithCleanup(instance) + + if err != nil { + log.Printf("Failed to prepare test Tarantool: %s", err) + return 1 + } + + return m.Run() +} + +func TestMain(m *testing.M) { + code := runTestMain(m) + os.Exit(code) +} diff --git a/arrow/testdata/config-memcs.lua b/arrow/testdata/config-memcs.lua new file mode 100644 index 00000000..ae1cc430 --- /dev/null +++ b/arrow/testdata/config-memcs.lua @@ -0,0 +1,31 @@ +-- Do not set listen for now so connector won't be +-- able to send requests until everything is configured. +box.cfg { + work_dir = os.getenv("TEST_TNT_WORK_DIR") +} + +box.schema.user.create('test', { + password = 'test', + if_not_exists = true +}) +box.schema.user.grant('test', 'execute', 'universe', nil, { + if_not_exists = true +}) + +local s = box.schema.space.create('testArrow', { + engine = 'memcs', + field_count = 1, + format = {{'a', 'uint64'}}, + if_not_exists = true +}) +s:create_index('primary') +s:truncate() + +box.schema.user.grant('test', 'read,write', 'space', 'testArrow', { + if_not_exists = true +}) + +-- Set listen only when every other thing is configured. +box.cfg { + listen = 3013 +} diff --git a/arrow/testdata/config-memtx.lua b/arrow/testdata/config-memtx.lua new file mode 100644 index 00000000..92c0af09 --- /dev/null +++ b/arrow/testdata/config-memtx.lua @@ -0,0 +1,35 @@ +-- Do not set listen for now so connector won't be +-- able to send requests until everything is configured. +box.cfg { + work_dir = os.getenv("TEST_TNT_WORK_DIR") +} + +box.schema.user.create('test', { + password = 'test', + if_not_exists = true +}) +box.schema.user.grant('test', 'execute', 'universe', nil, { + if_not_exists = true +}) + +local s = box.schema.space.create('testArrow', { + if_not_exists = true +}) +s:create_index('primary', { + type = 'tree', + parts = {{ + field = 1, + type = 'integer' + }}, + if_not_exists = true +}) +s:truncate() + +box.schema.user.grant('test', 'read,write', 'space', 'testArrow', { + if_not_exists = true +}) + +-- Set listen only when every other thing is configured. +box.cfg { + listen = os.getenv("TEST_TNT_LISTEN") +} diff --git a/request.go b/request.go index 8dbe250b..21ed0eba 100644 --- a/request.go +++ b/request.go @@ -855,11 +855,7 @@ func (req *baseRequest) Ctx() context.Context { // Response creates a response for the baseRequest. func (req *baseRequest) Response(header Header, body io.Reader) (Response, error) { - resp, err := createBaseResponse(header, body) - if err != nil { - return nil, err - } - return &resp, nil + return DecodeBaseResponse(header, body) } type spaceRequest struct { @@ -871,6 +867,17 @@ func (req *spaceRequest) setSpace(space interface{}) { req.space = space } +func EncodeSpace(res SchemaResolver, enc *msgpack.Encoder, space interface{}) error { + spaceEnc, err := newSpaceEncoder(res, space) + if err != nil { + return err + } + if err := spaceEnc.Encode(enc); err != nil { + return err + } + return nil +} + type spaceIndexRequest struct { spaceRequest index interface{} @@ -954,11 +961,7 @@ func (req authRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { // Response creates a response for the authRequest. func (req authRequest) Response(header Header, body io.Reader) (Response, error) { - resp, err := createBaseResponse(header, body) - if err != nil { - return nil, err - } - return &resp, nil + return DecodeBaseResponse(header, body) } // PingRequest helps you to create an execute request object for execution diff --git a/request_test.go b/request_test.go index 84ba23ef..8ced455c 100644 --- a/request_test.go +++ b/request_test.go @@ -9,10 +9,10 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/tarantool/go-iproto" - "github.com/vmihailenco/msgpack/v5" - . "github.com/tarantool/go-tarantool/v2" + "github.com/vmihailenco/msgpack/v5" ) const invalidSpaceMsg = "invalid space" @@ -1063,3 +1063,82 @@ func TestResponseDecodeTyped(t *testing.T) { assert.Equal(t, []byte{'v', '2'}, decoded) } } + +type stubSchemeResolver struct { + space interface{} +} + +func (r stubSchemeResolver) ResolveSpace(s interface{}) (uint32, error) { + if id, ok := r.space.(uint32); ok { + return id, nil + } + if _, ok := r.space.(string); ok { + return 0, nil + } + return 0, fmt.Errorf("stub error message: %v", r.space) +} + +func (stubSchemeResolver) ResolveIndex(i interface{}, spaceNo uint32) (uint32, error) { + return 0, nil +} + +func (r stubSchemeResolver) NamesUseSupported() bool { + _, ok := r.space.(string) + return ok +} + +func TestEncodeSpace(t *testing.T) { + tests := []struct { + name string + res stubSchemeResolver + err string + out []byte + }{ + { + name: "string space", + res: stubSchemeResolver{"test"}, + out: []byte{0x5E, 0xA4, 0x74, 0x65, 0x73, 0x74}, + }, + { + name: "empty string", + res: stubSchemeResolver{""}, + out: []byte{0x5E, 0xA0}, + }, + { + name: "numeric 524", + res: stubSchemeResolver{uint32(524)}, + out: []byte{0x10, 0xCD, 0x02, 0x0C}, + }, + { + name: "numeric zero", + res: stubSchemeResolver{uint32(0)}, + out: []byte{0x10, 0x00}, + }, + { + name: "numeric max value", + res: stubSchemeResolver{^uint32(0)}, + out: []byte{0x10, 0xCE, 0xFF, 0xFF, 0xFF, 0xFF}, + }, + { + name: "resolve error", + res: stubSchemeResolver{false}, + err: "stub error message", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + enc := msgpack.NewEncoder(&buf) + + err := EncodeSpace(tt.res, enc, tt.res.space) + if tt.err != "" { + require.ErrorContains(t, err, tt.err) + return + } else { + require.NoError(t, err) + } + + require.Equal(t, tt.out, buf.Bytes()) + }) + } +} diff --git a/response.go b/response.go index 2c287f8b..90a02a1c 100644 --- a/response.go +++ b/response.go @@ -45,6 +45,12 @@ func createBaseResponse(header Header, body io.Reader) (baseResponse, error) { return baseResponse{header: header, buf: smallBuf{b: data}}, nil } +// DecodeBaseResponse parse response header and body. +func DecodeBaseResponse(header Header, body io.Reader) (Response, error) { + resp, err := createBaseResponse(header, body) + return &resp, err +} + // SelectResponse is used for the select requests. // It might contain a position descriptor of the last selected tuple. // diff --git a/response_test.go b/response_test.go new file mode 100644 index 00000000..1edbf018 --- /dev/null +++ b/response_test.go @@ -0,0 +1,55 @@ +package tarantool_test + +import ( + "bytes" + "io" + "testing" + + "github.com/stretchr/testify/require" + "github.com/tarantool/go-iproto" + "github.com/tarantool/go-tarantool/v2" + "github.com/vmihailenco/msgpack/v5" +) + +func encodeResponseData(t *testing.T, data interface{}) io.Reader { + t.Helper() + + buf := bytes.NewBuffer([]byte{}) + enc := msgpack.NewEncoder(buf) + + enc.EncodeMapLen(1) + enc.EncodeUint8(uint8(iproto.IPROTO_DATA)) + enc.Encode([]interface{}{data}) + return buf + +} + +func TestDecodeBaseResponse(t *testing.T) { + tests := []struct { + name string + header tarantool.Header + body interface{} + }{ + { + "test1", + tarantool.Header{}, + nil, + }, + { + "test2", + tarantool.Header{RequestId: 123}, + []byte{'v', '2'}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, err := tarantool.DecodeBaseResponse(tt.header, encodeResponseData(t, tt.body)) + require.NoError(t, err) + require.Equal(t, tt.header, res.Header()) + + got, err := res.Decode() + require.NoError(t, err) + require.Equal(t, []interface{}{tt.body}, got) + }) + } +} diff --git a/watch.go b/watch.go index c147b039..9f127313 100644 --- a/watch.go +++ b/watch.go @@ -58,11 +58,7 @@ func (req *BroadcastRequest) Async() bool { // Response creates a response for a BroadcastRequest. func (req *BroadcastRequest) Response(header Header, body io.Reader) (Response, error) { - resp, err := createBaseResponse(header, body) - if err != nil { - return nil, err - } - return &resp, nil + return DecodeBaseResponse(header, body) } // watchRequest subscribes to the updates of a specified key defined on the