Skip to content

Commit

Permalink
feat: support sql lookup template sql (#3398)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer authored Nov 26, 2024
1 parent 6674454 commit e795d19
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 5 deletions.
15 changes: 14 additions & 1 deletion extensions/impl/sql/lookupSource.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,20 @@ func (s *SqlLookupSource) Lookup(ctx api.StreamContext, fields []string, keys []
return nil, err
}
}
query := s.gen.buildQuery(fields, keys, values)
var query string
if s.conf.TemplateSqlQueryCfg == nil {
query = s.gen.buildQuery(fields, keys, values)
} else {
mapValue := make(map[string]any)
for index, key := range keys {
mapValue[key] = values[index]
}
sqlQuery, err := ctx.ParseTemplate(s.conf.TemplateSqlQueryCfg.TemplateSql, mapValue)
if err != nil {
return nil, err
}
query = sqlQuery
}
ctx.GetLogger().Debugf("Query is %s", query)
rows, err := s.conn.GetDB().Query(query)
failpoint.Inject("dbErr", func() {
Expand Down
17 changes: 17 additions & 0 deletions extensions/impl/sql/lookupSource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,23 @@ func TestSQLLookupSource(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []map[string]any{{"a": int64(1), "b": int64(1)}}, got)
ls.Close(ctx)

props = map[string]interface{}{
"dburl": fmt.Sprintf("mysql://root:@%v:%v/test", address, port),
"datasource": "t",
"templateSqlQueryCfg": map[string]interface{}{
"templateSql": "select * from t where b = {{.bid}}",
},
}
ls = &SqlLookupSource{}
require.NoError(t, ls.Provision(ctx, props))
require.NoError(t, ls.Connect(ctx, func(status string, message string) {
// do nothing
}))
got, err = ls.Lookup(ctx, []string{"a", "b"}, []string{"bid"}, []any{1})
require.NoError(t, err)
require.Equal(t, []map[string]any{{"a": int64(1), "b": int64(1)}}, got)
ls.Close(ctx)
}

func TestSQLLookupSourceProvisionErr(t *testing.T) {
Expand Down
9 changes: 5 additions & 4 deletions extensions/impl/sql/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ func (s *SQLSourceConnector) Ping(ctx api.StreamContext, m map[string]any) error
}

type SQLConf struct {
Interval cast.DurationConf `json:"interval"`
DBUrl string `json:"dburl"`
URL string `json:"url,omitempty"`
Datasource string `json:"datasource"`
Interval cast.DurationConf `json:"interval"`
DBUrl string `json:"dburl"`
URL string `json:"url,omitempty"`
Datasource string `json:"datasource"`
TemplateSqlQueryCfg *sqlgen.TemplateSqlQueryCfg `json:"templateSqlQueryCfg"`
}

func init() {
Expand Down

0 comments on commit e795d19

Please sign in to comment.