Skip to content

Commit

Permalink
Add support for JSON type
Browse files Browse the repository at this point in the history
Signed-off-by: Stefano Scafiti <[email protected]>
  • Loading branch information
ostafen committed Jun 4, 2024
1 parent a741ec9 commit bbb1608
Show file tree
Hide file tree
Showing 28 changed files with 1,853 additions and 487 deletions.
85 changes: 69 additions & 16 deletions embedded/sql/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -409,8 +410,8 @@ func (catlg *Catalog) newTable(name string, colsSpec map[uint32]*ColSpec, maxCol
continue
}

if cs.colName == revCol {
return nil, fmt.Errorf("%w(%s)", ErrReservedWord, revCol)
if isReservedCol(cs.colName) {
return nil, fmt.Errorf("%w(%s)", ErrReservedWord, cs.colName)
}

_, colExists := table.colsByName[cs.colName]
Expand Down Expand Up @@ -533,8 +534,8 @@ func (t *Table) newIndex(unique bool, colIDs []uint32) (index *Index, err error)
}

func (t *Table) newColumn(spec *ColSpec) (*Column, error) {
if spec.colName == revCol {
return nil, fmt.Errorf("%w(%s)", ErrReservedWord, revCol)
if isReservedCol(spec.colName) {
return nil, fmt.Errorf("%w(%s)", ErrReservedWord, spec.colName)
}

if spec.autoIncrement {
Expand Down Expand Up @@ -597,8 +598,8 @@ func (ctlg *Catalog) renameTable(oldName, newName string) (*Table, error) {
}

func (t *Table) renameColumn(oldName, newName string) (*Column, error) {
if newName == revCol {
return nil, fmt.Errorf("%w(%s)", ErrReservedWord, revCol)
if isReservedCol(newName) {
return nil, fmt.Errorf("%w(%s)", ErrReservedWord, newName)
}

if oldName == newName {
Expand Down Expand Up @@ -1006,16 +1007,17 @@ func unmapColSpec(prefix, mkey []byte) (dbID, tableID, colID uint32, colType SQL
}

func asType(t string) (SQLValueType, error) {
if t == IntegerType ||
t == Float64Type ||
t == BooleanType ||
t == VarcharType ||
t == UUIDType ||
t == BLOBType ||
t == TimestampType {
switch t {
case IntegerType,
Float64Type,
BooleanType,
VarcharType,
UUIDType,
BLOBType,
TimestampType,
JSONType:
return t, nil
}

return t, ErrCorruptedData
}

Expand Down Expand Up @@ -1304,12 +1306,37 @@ func EncodeRawValueAsKey(val interface{}, colType SQLValueType, maxLen int) ([]b
return nil, 0, ErrInvalidValue
}

func getEncodeRawValue(val TypedValue, colType SQLValueType) (interface{}, error) {
if colType != JSONType || val.Type() == JSONType {
return val.RawValue(), nil
}

if val.Type() != VarcharType {
return nil, fmt.Errorf("%w: invalid json value", ErrInvalidValue)
}
s, _ := val.RawValue().(string)

raw := json.RawMessage(s)
if !json.Valid(raw) {
return nil, fmt.Errorf("%w: invalid json value", ErrInvalidValue)
}
return raw, nil
}

func EncodeValue(val TypedValue, colType SQLValueType, maxLen int) ([]byte, error) {
return EncodeRawValue(val.RawValue(), colType, maxLen, false)
v, err := getEncodeRawValue(val, colType)
if err != nil {
return nil, err
}
return EncodeRawValue(v, colType, maxLen, false)
}

func EncodeNullableValue(val TypedValue, colType SQLValueType, maxLen int) ([]byte, error) {
return EncodeRawValue(val.RawValue(), colType, maxLen, true)
v, err := getEncodeRawValue(val, colType)
if err != nil {
return nil, err
}
return EncodeRawValue(v, colType, maxLen, true)
}

// EncodeRawValue encode a value in a byte format. This is the internal binary representation of a value. Can be decoded with DecodeValue.
Expand Down Expand Up @@ -1402,6 +1429,22 @@ func EncodeRawValue(val interface{}, colType SQLValueType, maxLen int, nullable

return encv[:], nil
}
case JSONType:
rawJson, ok := val.(json.RawMessage)
if !ok {
data, err := json.Marshal(val)
if err != nil {
return nil, err
}
rawJson = data
}

// len(v) + v
encv := make([]byte, EncLenLen+len(rawJson))
binary.BigEndian.PutUint32(encv[:], uint32(len(rawJson)))
copy(encv[EncLenLen:], rawJson)

return encv[:], nil
case UUIDType:
{
uuidVal, ok := convVal.(uuid.UUID)
Expand Down Expand Up @@ -1519,6 +1562,16 @@ func decodeValue(b []byte, colType SQLValueType, nullable bool) (TypedValue, int

return &Blob{val: v}, voff, nil
}
case JSONType:
{
v := b[voff : voff+vlen]
voff += vlen

var val interface{}
err = json.Unmarshal(v, &val)

return &JSON{val: val}, voff, err
}
case UUIDType:
{
if vlen != 16 {
Expand Down
7 changes: 5 additions & 2 deletions embedded/sql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var ErrNewColumnMustBeNullable = errors.New("new column must be nullable")
var ErrIndexAlreadyExists = errors.New("index already exists")
var ErrMaxNumberOfColumnsInIndexExceeded = errors.New("number of columns in multi-column index exceeded")
var ErrIndexNotFound = errors.New("index not found")
var ErrCannotIndexJson = errors.New("cannot index column of type json")
var ErrInvalidNumberOfValues = errors.New("invalid number of values provided")
var ErrInvalidValue = errors.New("invalid value provided")
var ErrInferredMultipleTypes = errors.New("inferred multiple types")
Expand Down Expand Up @@ -89,6 +90,7 @@ var ErrAlreadyClosed = store.ErrAlreadyClosed
var ErrAmbiguousSelector = errors.New("ambiguous selector")
var ErrUnsupportedCast = fmt.Errorf("%w: unsupported cast", ErrInvalidValue)
var ErrColumnMismatchInUnionStmt = errors.New("column mismatch in union statement")
var ErrInvalidTxMetadata = errors.New("invalid transaction metadata")

var MaxKeyLen = 512

Expand All @@ -105,8 +107,8 @@ type Engine struct {
sortBufferSize int
autocommit bool
lazyIndexConstraintValidation bool

multidbHandler MultiDBHandler
parseTxMetadata func([]byte) (map[string]interface{}, error)
multidbHandler MultiDBHandler
}

type MultiDBHandler interface {
Expand Down Expand Up @@ -146,6 +148,7 @@ func NewEngine(st *store.ImmuStore, opts *Options) (*Engine, error) {
sortBufferSize: opts.sortBufferSize,
autocommit: opts.autocommit,
lazyIndexConstraintValidation: opts.lazyIndexConstraintValidation,
parseTxMetadata: opts.parseTxMetadata,
multidbHandler: opts.multidbHandler,
}

Expand Down
Loading

0 comments on commit bbb1608

Please sign in to comment.