Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: add support of a batch insert request #411

Merged
merged 4 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:

jobs:
luacheck:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request' &&
Expand All @@ -32,7 +32,7 @@ jobs:
run: ./.rocks/bin/luacheck .

golangci-lint:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request' &&
Expand All @@ -57,7 +57,7 @@ jobs:
args: --out-${NO_FUTURE}format colored-line-number --config=.golangci.yaml

codespell:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request' &&
Expand Down
2 changes: 1 addition & 1 deletion .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ issues:
exclude-rules:
- linters:
- lll
source: "\t?// *(see )?https://"
source: "^\\s*//\\s*(\\S+\\s){0,3}https?://\\S+$"
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
## [Unreleased]

### Added
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
connection and ctx is not canceled;
also added logs for error case of `ConnectionPool.tryConnect()` calls in
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
connection and ctx is not canceled;
also added logs for error case of `ConnectionPool.tryConnect()` calls in
`ConnectionPool.controller()` and `ConnectionPool.reconnect()`
- Methods that are implemented but not included in the pooler interface (#395).
- Implemented stringer methods for pool.Role (#405).
- Support the IPROTO_INSERT_ARROW request (#399).

### Changed

Expand Down
56 changes: 56 additions & 0 deletions arrow/arrow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package arrow

import (
"fmt"
"reflect"

"github.com/vmihailenco/msgpack/v5"
)

// Arrow MessagePack extension type.
const arrowExtId = 8

// Arrow struct wraps a raw arrow data buffer.
type Arrow struct {
data []byte
}

// MakeArrow returns a new arrow.Arrow object that contains
// wrapped a raw arrow data buffer.
func MakeArrow(arrow []byte) (Arrow, error) {
return Arrow{arrow}, nil
}

// Raw returns a []byte that contains Arrow raw data.
func (a Arrow) Raw() []byte {
return a.data
}

func arrowDecoder(d *msgpack.Decoder, v reflect.Value, extLen int) error {
arrow := Arrow{
data: make([]byte, extLen),
}
n, err := d.Buffered().Read(arrow.data)
if err != nil {
return fmt.Errorf("arrowDecoder: can't read bytes on Arrow decode: %w", err)
}
if n < extLen || n != len(arrow.data) {
return fmt.Errorf("arrowDecoder: unexpected end of stream after %d Arrow bytes", n)
}

v.Set(reflect.ValueOf(arrow))
return nil
}

func arrowEncoder(e *msgpack.Encoder, v reflect.Value) ([]byte, error) {
arr, ok := v.Interface().(Arrow)
if !ok {
return []byte{}, fmt.Errorf("arrowEncoder: not an Arrow type")
}
return arr.data, nil
}

func init() {
msgpack.RegisterExtDecoder(arrowExtId, Arrow{}, arrowDecoder)
msgpack.RegisterExtEncoder(arrowExtId, Arrow{}, arrowEncoder)
}
oleg-jukovec marked this conversation as resolved.
Show resolved Hide resolved
100 changes: 100 additions & 0 deletions arrow/arrow_test.go
oleg-jukovec marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package arrow_test

import (
"bytes"
"encoding/hex"
"testing"

"github.com/stretchr/testify/require"
"github.com/tarantool/go-tarantool/v2/arrow"
"github.com/vmihailenco/msgpack/v5"
)

var longArrow, _ = hex.DecodeString("ffffffff70000000040000009effffff0400010004000000" +
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
"01000000000000003400000008000000000000000200000000000000000000000000" +
"00000000000000000000000000000800000000000000000000000100000001000000" +
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
"00000000000000000000")

var tests = []struct {
name string
arr []byte
enc []byte
}{
{
"abc",
[]byte{'a', 'b', 'c'},
[]byte{0xc7, 0x3, 0x8, 'a', 'b', 'c'},
},
{
"empty",
[]byte{},
[]byte{0xc7, 0x0, 0x8},
},
{
"one",
[]byte{1},
[]byte{0xd4, 0x8, 0x1},
},
{
"long",
longArrow,
[]byte{
0xc8, 0x1, 0x10, 0x8, 0xff, 0xff, 0xff, 0xff, 0x70, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0,
0x0, 0x9e, 0xff, 0xff, 0xff, 0x4, 0x0, 0x1, 0x0, 0x4, 0x0, 0x0, 0x0, 0xb6, 0xff, 0xff,
0xff, 0xc, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0,
0x4, 0x0, 0x0, 0x0, 0xda, 0xff, 0xff, 0xff, 0x14, 0x0, 0x0, 0x0, 0x2, 0x2, 0x0, 0x0,
0x4, 0x0, 0x0, 0x0, 0xf0, 0xff, 0xff, 0xff, 0x40, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0,
0x61, 0x0, 0x0, 0x0, 0x6, 0x0, 0x8, 0x0, 0x4, 0x0, 0xc, 0x0, 0x10, 0x0, 0x4, 0x0, 0x8,
0x0, 0x9, 0x0, 0xc, 0x0, 0xc, 0x0, 0xc, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x8, 0x0,
0xa, 0x0, 0xc, 0x0, 0x4, 0x0, 0x6, 0x0, 0x8, 0x0, 0xff, 0xff, 0xff, 0xff, 0x88, 0x0,
0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x8a, 0xff, 0xff, 0xff, 0x4, 0x0, 0x3, 0x0, 0x10, 0x0,
0x0, 0x0, 0x8, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xac, 0xff, 0xff,
0xff, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x34, 0x0, 0x0, 0x0, 0x8, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x8, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa, 0x0, 0x14, 0x0,
0x4, 0x0, 0xc, 0x0, 0x10, 0x0, 0xc, 0x0, 0x14, 0x0, 0x4, 0x0, 0x6, 0x0, 0x8, 0x0, 0xc,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
},
},
}

func TestEncodeArrow(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
buf := bytes.NewBuffer([]byte{})
enc := msgpack.NewEncoder(buf)

arr, err := arrow.MakeArrow(tt.arr)
require.NoError(t, err)

err = enc.Encode(arr)
require.NoError(t, err)

require.Equal(t, tt.enc, buf.Bytes())
})

}
}

func TestDecodeArrow(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

buf := bytes.NewBuffer(tt.enc)
dec := msgpack.NewDecoder(buf)

var arr arrow.Arrow
err := dec.Decode(&arr)
require.NoError(t, err)

require.Equal(t, tt.arr, arr.Raw())
})
}
}
oleg-jukovec marked this conversation as resolved.
Show resolved Hide resolved
61 changes: 61 additions & 0 deletions arrow/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Run Tarantool Enterprise Edition instance before example execution:
//
// Terminal 1:
// $ cd arrow
// $ TEST_TNT_WORK_DIR=$(mktemp -d -t 'tarantool.XXX') tarantool testdata/config-memcs.lua
//
// Terminal 2:
// $ go test -v example_test.go
package arrow_test

import (
"context"
"encoding/hex"
"fmt"
"log"
"time"

"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/arrow"
)

var arrowBinData, _ = hex.DecodeString("ffffffff70000000040000009effffff0400010004000000" +
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
"01000000000000003400000008000000000000000200000000000000000000000000" +
"00000000000000000000000000000800000000000000000000000100000001000000" +
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
"00000000000000000000")

func Example() {
dialer := tarantool.NetDialer{
Address: "127.0.0.1:3013",
User: "test",
Password: "test",
}
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
client, err := tarantool.Connect(ctx, dialer, tarantool.Opts{})
cancel()
if err != nil {
log.Fatalf("Failed to connect: %s", err)
}

arr, err := arrow.MakeArrow(arrowBinData)
if err != nil {
log.Fatalf("Failed prepare Arrow data: %s", err)
}

req := arrow.NewInsertRequest("testArrow", arr)

resp, err := client.Do(req).Get()
if err != nil {
log.Fatalf("Failed insert Arrow: %s", err)
}
if len(resp) > 0 {
log.Fatalf("Unexpected response")
} else {
fmt.Printf("Batch arrow inserted")
}
}
93 changes: 93 additions & 0 deletions arrow/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package arrow

import (
"context"
"io"

"github.com/tarantool/go-iproto"
"github.com/tarantool/go-tarantool/v2"
"github.com/vmihailenco/msgpack/v5"
)

// INSERT Arrow request.
//
// FIXME: replace with iproto.IPROTO_INSERT_ARROW when iproto will released.
// https://github.com/tarantool/go-tarantool/issues/412
const iprotoInsertArrowType = iproto.Type(17)

// The data in Arrow format.
//
// FIXME: replace with iproto.IPROTO_ARROW when iproto will released.
// https://github.com/tarantool/go-tarantool/issues/412
const iprotoArrowKey = iproto.Key(0x36)

// InsertRequest helps you to create an insert request object for execution
// by a Connection.
type InsertRequest struct {
arrow Arrow
space interface{}
ctx context.Context
}

// NewInsertRequest returns a new InsertRequest.
func NewInsertRequest(space interface{}, arrow Arrow) *InsertRequest {
return &InsertRequest{
space: space,
arrow: arrow,
}
}

// Type returns a IPROTO_INSERT_ARROW type for the request.
func (r *InsertRequest) Type() iproto.Type {
return iprotoInsertArrowType
}

// Async returns false to the request return a response.
func (r *InsertRequest) Async() bool {
return false
}

// Ctx returns a context of the request.
func (r *InsertRequest) Ctx() context.Context {
return r.ctx
}

// Context sets a passed context to the request.
//
// Pay attention that when using context with request objects,
// the timeout option for Connection does not affect the lifetime
// of the request. For those purposes use context.WithTimeout() as
// the root context.
func (r *InsertRequest) Context(ctx context.Context) *InsertRequest {
r.ctx = ctx
return r
}

// Arrow sets the arrow for insertion the insert arrow request.
// Note: default value is nil.
func (r *InsertRequest) Arrow(arrow Arrow) *InsertRequest {
r.arrow = arrow
return r
}

// Body fills an msgpack.Encoder with the insert arrow request body.
func (r *InsertRequest) Body(res tarantool.SchemaResolver, enc *msgpack.Encoder) error {
if err := enc.EncodeMapLen(2); err != nil {
return err
}
if err := tarantool.EncodeSpace(res, enc, r.space); err != nil {
return err
}
if err := enc.EncodeUint(uint64(iprotoArrowKey)); err != nil {
return err
}
return enc.Encode(r.arrow)
}

// Response creates a response for the InsertRequest.
func (r *InsertRequest) Response(
header tarantool.Header,
body io.Reader,
) (tarantool.Response, error) {
return tarantool.DecodeBaseResponse(header, body)
}
Loading
Loading