Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add interactive-mode SQLExecutor with SQLRTTask #13

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions examples/sdk/sqa/with_interactive_sql_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"fmt"
"log"
"os"

"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/sqa"
)

func main() {
conf, err := odps.NewConfigFromIni(os.Args[1])
if err != nil {
log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
sql := `select * from all_types_demo_no_parition;`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

例子中的表名用:all_types_demo,需要其他表的话表名DDL

//
paramInfo := sqa.SQLExecutorQueryParam{
OdpsIns: odpsIns,
TaskName: "test_mcqa",
ServiceName: sqa.DEFAULT_SERVICE,
RunningCluster: "",
}
ie := sqa.NewInteractiveSQLExecutor(&paramInfo)
err = ie.Run(sql, nil)
if err != nil {
log.Fatalf("%+v", err)
}
//
records, err := ie.GetResult(0, 10, 10000, true)
if err != nil {
log.Fatalf("%+v", err)
}
//
for _, record := range records {
for i, d := range record {
if d == nil {
fmt.Printf("null")
} else {
fmt.Printf("%s", d.Sql())
}

if i < record.Len()-1 {
fmt.Printf(", ")
} else {
fmt.Println()
}
}
}
err = ie.Close()
if err != nil {
log.Fatalf("%+v", err)
}
}
77 changes: 75 additions & 2 deletions odps/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package odps
import (
"encoding/json"
"encoding/xml"
"github.com/aliyun/aliyun-odps-go-sdk/odps/common"
"github.com/pkg/errors"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"

"github.com/aliyun/aliyun-odps-go-sdk/odps/common"
"github.com/pkg/errors"
)

type InstanceStatus int
Expand Down Expand Up @@ -270,6 +272,10 @@ func (instance *Instance) Id() string {
return instance.id
}

func (instance *Instance) ResourceUrl() string {
return instance.resourceUrl
}

func (instance *Instance) Owner() string {
return instance.owner
}
Expand Down Expand Up @@ -358,6 +364,73 @@ func (instance *Instance) GetResult() ([]TaskResult, error) {
return resModel.Tasks, nil
}

type UpdateInfoResult struct {
Result string `json:"result"`
Status string `json:"status"`
}

// UpdateInfo set information to running instance
func (instance *Instance) UpdateInfo(taskName, infoKey, infoValue string) (UpdateInfoResult, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

和java sdk保持一致,用SetTaskInfo作为方法名

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UpdateInfoResult等返回值统一放到instance_res.go里

// instance set information
queryArgs := make(url.Values, 2)
queryArgs.Set("info", "")
queryArgs.Set("taskname", taskName)
//
instanceTaskInfoModel := struct {
XMLName xml.Name `xml:"Instance"`
Key string `xml:"Key"`
Value string `xml:"Value"`
}{
Key: infoKey,
Value: infoValue,
}
//
var res UpdateInfoResult
client := instance.odpsIns.RestClient()
err := client.DoXmlWithParseRes(
common.HttpMethod.PutMethod,
instance.resourceUrl,
queryArgs,
instanceTaskInfoModel,
func(httpRes *http.Response) error {
err := json.NewDecoder(httpRes.Body).Decode(&res)
if err != nil {
return errors.Wrapf(err, "Parse http response failed, body: %+v", httpRes.Body)
}
return nil
},
)
//
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

代码中很多这种无用的注释: "//", 都去掉

return res, err
}

// GetTaskInfo get the specific info of a task in the instance
func (instance *Instance) GetTaskInfo(taskName, infoKey string) (string, error) {
queryArgs := make(url.Values, 3)
queryArgs.Set("info", "")
queryArgs.Set("taskname", taskName)
queryArgs.Set("key", infoKey)

client := instance.odpsIns.RestClient()
var bodyStr string
err := client.DoXmlWithParseRes(
common.HttpMethod.GetMethod,
instance.resourceUrl,
queryArgs,
nil,
func(httpRes *http.Response) error {
body, err := io.ReadAll(httpRes.Body)
if err != nil {
return errors.Wrapf(err, "Parse http response failed, body: %+v", httpRes.Body)
}
bodyStr = string(body)
return nil
},
)

return bodyStr, err
}

func InstancesStatusFromStr(s string) InstanceStatus {
switch strings.ToLower(s) {
case "running":
Expand Down
Loading