Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: support tunnel preview table records #12

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions examples/sdk/tunnel/read_table/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"fmt"
"log"
"os"

"github.com/aliyun/aliyun-odps-go-sdk/arrow/array"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
tunnel2 "github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
)

func main() {
conf, err := odps.NewConfigFromIni(os.Args[1])
if err != nil {
log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)

tableTunnel := tunnel2.NewTunnel(odpsIns, conf.TunnelEndpoint)
ts := odpsIns.Table("all_types_demo")
err = ts.Load()
if err != nil {
log.Fatalf("%+v", err)
}

reader, err := tableTunnel.ReadTable(&ts, "", 10)
if err != nil {
log.Fatalf("%+v", err)
}

n := 0
reader.Iterator(func(rec array.Record, err error) {
for c, col := range rec.Columns() {
fmt.Printf("rec[%d][%d]: %v\n", n, c, col)
}
rec.Release()
n++
})

err = reader.Close()
if err != nil {
log.Fatalf("%+v", err)
}
}
2 changes: 2 additions & 0 deletions odps/common/http_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ const (
HttpHeaderOdpsSlotNum = "odps-tunnel-slot-num"
HttpHeaderRoutedServer = "odps-tunnel-routed-server"
HttpHeaderTransferEncoding = "Transfer-Encoding"
HttpHeaderAcceptEncoding = "Accept-Encoding"
HttpHeaderContentEncoding = "Content-Encoding"

XMLContentType = "application/xml"
)
12 changes: 9 additions & 3 deletions odps/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
"encoding/json"
"encoding/xml"
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps/common"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"github.com/pkg/errors"
"net/url"
"strconv"
"strings"
"time"

"github.com/aliyun/aliyun-odps-go-sdk/odps/common"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"github.com/pkg/errors"
)

type TableType int
Expand Down Expand Up @@ -65,6 +66,7 @@ type tableModel struct {
TableLabel string
CryptoAlgo string
Type TableType
SchemaName string `xml:"SchemaName"`
}

func NewTable(odpsIns *Odps, projectName string, tableName string) Table {
Expand Down Expand Up @@ -141,6 +143,10 @@ func (t *Table) ProjectName() string {
return t.model.ProjectName
}

func (t *Table) SchemaName() string {
return t.model.SchemaName
}

func (t *Table) Type() TableType {
return t.model.Type
}
Expand Down
129 changes: 90 additions & 39 deletions odps/tableschema/arrow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,60 @@ import (
"github.com/pkg/errors"
)

type typeConvertConfig struct {
IsExtensionTimeStamp bool
}

type TypeConvertOption func(cfg *typeConvertConfig)

func newTypeConvertConfig(opts ...TypeConvertOption) *typeConvertConfig {
cfg := &typeConvertConfig{}

for _, opt := range opts {
opt(cfg)
}

return cfg
}

func withExtensionTimeStamp() TypeConvertOption {
return func(cfg *typeConvertConfig) {
cfg.IsExtensionTimeStamp = true
}
}

var TypeConvertConfig = struct {
WithExtendedTimeStamp func() TypeConvertOption
}{
WithExtendedTimeStamp: withExtensionTimeStamp,
}

// TypeToArrowType convert odps field type to arrow field type
//* Storage Type | Arrow Type
//* ----------------------+---------------------
//* boolean | boolean
//* tinyint | int8
//* smallint | int16
//* int | int32
//* bigint | int64
//* float | float32
//* double | float64
//* char | utf8
//* varchar | utf8
//* string | utf8
//* binary | binary
//* date | date32
//* datetime | timestamp(nano)
//* timestamp | timestamp(nano) 【注:精度选择功能开发中】
//* interval_day_time | day_time_interval
//* interval_year_month | month_interval
//* decimal | decimal
//* struct | struct
//* array | list
//* map | map
func TypeToArrowType(odpsType datatype.DataType) (arrow.DataType, error) {
// * Storage Type | Arrow Type
// * ----------------------+---------------------
// * boolean | boolean
// * tinyint | int8
// * smallint | int16
// * int | int32
// * bigint | int64
// * float | float32
// * double | float64
// * char | utf8
// * varchar | utf8
// * string | utf8
// * binary | binary
// * date | date32
// * datetime | timestamp(milli)
// * timestamp | timestamp(nano) 【注:精度选择功能开发中】
// * interval_day_time | day_time_interval
// * interval_year_month | month_interval
// * decimal | decimal
// * struct | struct
// * array | list
// * map | map
func TypeToArrowType(odpsType datatype.DataType, opt ...TypeConvertOption) (arrow.DataType, error) {
cfg := newTypeConvertConfig(opt...)

switch odpsType.ID() {
case datatype.BOOLEAN:
return arrow.FixedWidthTypes.Boolean, nil
Expand All @@ -68,10 +98,30 @@ func TypeToArrowType(odpsType datatype.DataType) (arrow.DataType, error) {
case datatype.DATE:
return arrow.FixedWidthTypes.Date32, nil
case datatype.DATETIME:
return arrow.FixedWidthTypes.Timestamp_ns, nil
//return &arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "UTC"}, nil
return &arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "UTC"}, nil
case datatype.TIMESTAMP:
return arrow.FixedWidthTypes.Timestamp_ns, nil
if cfg.IsExtensionTimeStamp {
arrowFields := make([]arrow.Field, 2)
//
secField := arrow.Field{
Name: "sec",
Type: arrow.PrimitiveTypes.Int64,
Nullable: true,
}
arrowFields[0] = secField
//
nanoField := arrow.Field{
Name: "nano",
Type: arrow.PrimitiveTypes.Int32,
Nullable: true,
}
arrowFields[1] = nanoField
//
arrowStruct := arrow.StructOf(arrowFields...)
return arrowStruct, nil
} else {
return arrow.FixedWidthTypes.Timestamp_ns, nil
}
//return &arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "UTC"}, nil
case datatype.IntervalDayTime:
return arrow.FixedWidthTypes.DayTimeInterval, nil
Expand All @@ -93,8 +143,9 @@ func TypeToArrowType(odpsType datatype.DataType) (arrow.DataType, error) {
}

arrowFields[i] = arrow.Field{
Name: field.Name,
Type: arrowType,
Name: field.Name,
Type: arrowType,
Nullable: true,
}
}
return arrow.StructOf(arrowFields...), nil
Expand All @@ -106,17 +157,17 @@ func TypeToArrowType(odpsType datatype.DataType) (arrow.DataType, error) {
}

return arrow.ListOf(itemType), nil
//case datatype.MAP:
// mapType, _ := odpsType.(datatype.MapType)
// keyType, err := TypeToArrowType(mapType.KeyType)
// if err != nil {
// return arrow.Null, err
// }
// valueType, err := TypeToArrowType(mapType.ValueType)
// if err != nil {
// return arrow.Null, err
// }
// return arrow.MapOf(keyType, valueType), nil
case datatype.MAP:
mapType, _ := odpsType.(datatype.MapType)
keyType, err := TypeToArrowType(mapType.KeyType)
if err != nil {
return arrow.Null, err
}
valueType, err := TypeToArrowType(mapType.ValueType)
if err != nil {
return arrow.Null, err
}
return arrow.MapOf(keyType, valueType), nil
}

return arrow.Null, errors.Errorf("unknown odps data type: %s", odpsType.Name())
Expand Down
54 changes: 51 additions & 3 deletions odps/tableschema/table_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package tableschema
import (
"bytes"
"fmt"
"strings"
"text/template"

"github.com/aliyun/aliyun-odps-go-sdk/arrow"
"github.com/aliyun/aliyun-odps-go-sdk/odps/common"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/pkg/errors"
"strings"
"text/template"
)

type TableSchema struct {
Expand Down Expand Up @@ -85,6 +87,11 @@ type SchemaBuilder struct {
lifecycle int
}

type ToArrowSchemaOption struct {
WithPartitionColumns bool
WithExtensionTimeStamp bool
}

func NewSchemaBuilder() SchemaBuilder {
return SchemaBuilder{}
}
Expand Down Expand Up @@ -286,14 +293,55 @@ func (schema *TableSchema) ToExternalSQLString(
return builder.String(), nil
}

func (schema *TableSchema) ToArrowSchema() *arrow.Schema {
func (schema *TableSchema) ToArrowSchema(opt ...ToArrowSchemaOption) *arrow.Schema {
fields := make([]arrow.Field, len(schema.Columns))
//
usingPartitionColumns := false
usingExtensionTimeStamp := false
if len(opt) == 1 {
if opt[0].WithPartitionColumns {
usingPartitionColumns = true
}
if opt[0].WithExtensionTimeStamp {
usingExtensionTimeStamp = true
}
}
//
for i, column := range schema.Columns {
arrowType, _ := TypeToArrowType(column.Type)
metadata := arrow.NewMetadata(nil, nil)
if column.Type.ID() == datatype.TIMESTAMP && usingExtensionTimeStamp {
arrowType, _ = TypeToArrowType(column.Type, withExtensionTimeStamp())
metadata = arrow.NewMetadata(
[]string{"ARROW:extension:metadata", "ARROW:extension:name"},
[]string{"odps_timestamp", "odps_timestamp"},
)
}
fields[i] = arrow.Field{
Name: column.Name,
Type: arrowType,
Nullable: column.IsNullable,
Metadata: metadata,
}
}

if schema.PartitionColumns != nil && usingPartitionColumns {
for _, column := range schema.PartitionColumns {
arrowType, _ := TypeToArrowType(column.Type)
metadata := arrow.NewMetadata(nil, nil)
if column.Type.ID() == datatype.TIMESTAMP && usingExtensionTimeStamp {
arrowType, _ = TypeToArrowType(column.Type, withExtensionTimeStamp())
metadata = arrow.NewMetadata(
[]string{"ARROW:extension:metadata", "ARROW:extension:name"},
[]string{"odps_timestamp", "odps_timestamp"},
)
}
fields = append(fields, arrow.Field{
Name: column.Name,
Type: arrowType,
Nullable: true, // todo: should be false, need tunnel backend fix this bug
Metadata: metadata,
})
}
}

Expand Down
Loading