Skip to content

Commit

Permalink
Support STARTS_WITH in visibility queries
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou committed Nov 9, 2023
1 parent b6dbe2f commit 52d4557
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 64 deletions.
22 changes: 12 additions & 10 deletions common/persistence/visibility/store/elasticsearch/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@ import (
)

var allowedComparisonOperators = map[string]struct{}{
sqlparser.EqualStr: {},
sqlparser.NotEqualStr: {},
sqlparser.GreaterThanStr: {},
sqlparser.GreaterEqualStr: {},
sqlparser.LessThanStr: {},
sqlparser.LessEqualStr: {},
sqlparser.LikeStr: {},
sqlparser.NotLikeStr: {},
sqlparser.InStr: {},
sqlparser.NotInStr: {},
sqlparser.EqualStr: {},
sqlparser.NotEqualStr: {},
sqlparser.GreaterThanStr: {},
sqlparser.GreaterEqualStr: {},
sqlparser.LessThanStr: {},
sqlparser.LessEqualStr: {},
sqlparser.LikeStr: {},
sqlparser.NotLikeStr: {},
sqlparser.InStr: {},
sqlparser.NotInStr: {},
sqlparser.StartsWithStr: {},
sqlparser.NotStartsWithStr: {},
}

func newQueryConverter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ var supportedWhereCases = map[string]string{
"create_time BETWEEN '2015-01-01 00:00:00' and '2016-02-02 00:00:00'": `{"bool":{"filter":{"range":{"create_time":{"from":"2015-01-01 00:00:00","include_lower":true,"include_upper":true,"to":"2016-02-02 00:00:00"}}}}}`,
"create_time nOt between '2015-01-01 00:00:00' and '2016-02-02 00:00:00'": `{"bool":{"must_not":{"range":{"create_time":{"from":"2015-01-01 00:00:00","include_lower":true,"include_upper":true,"to":"2016-02-02 00:00:00"}}}}}`,
"create_time between '2015-01-01T00:00:00+0800' and '2017-01-01T00:00:00+0800' and process_id = 0 and status >= 1 and content = '三个男人' and phone = '15810324322'": `{"bool":{"filter":[{"range":{"create_time":{"from":"2015-01-01T00:00:00+0800","include_lower":true,"include_upper":true,"to":"2017-01-01T00:00:00+0800"}}},{"match":{"process_id":{"query":0}}},{"range":{"status":{"from":1,"include_lower":true,"include_upper":true,"to":null}}},{"match":{"content":{"query":"三个男人"}}},{"match":{"phone":{"query":"15810324322"}}}]}}`,
"value starts_with 'prefix'": `{"bool":{"filter":{"prefix":{"value":"prefix"}}}}`,
"value not starts_with 'prefix'": `{"bool":{"must_not":{"prefix":{"value":"prefix"}}}}`,
}

var supportedWhereOrderCases = map[string]struct {
Expand Down
28 changes: 20 additions & 8 deletions common/persistence/visibility/store/query/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,24 +479,36 @@ func (c *comparisonExprConverter) Convert(expr sqlparser.Expr) (elastic.Query, e

var query elastic.Query
switch comparisonExpr.Operator {
case ">=":
case sqlparser.GreaterEqualStr:
query = elastic.NewRangeQuery(colName).Gte(colValues[0])
case "<=":
case sqlparser.LessEqualStr:
query = elastic.NewRangeQuery(colName).Lte(colValues[0])
case ">":
case sqlparser.GreaterThanStr:
query = elastic.NewRangeQuery(colName).Gt(colValues[0])
case "<":
case sqlparser.LessThanStr:
query = elastic.NewRangeQuery(colName).Lt(colValues[0])
case "=", "like": // The only difference is that "%" is removed for LIKE queries.
case sqlparser.EqualStr, sqlparser.LikeStr: // The only difference is that "%" is removed for LIKE queries.
// Not elastic.NewTermQuery to support partial word match for String custom search attributes.
query = elastic.NewMatchQuery(colName, colValues[0])
case "!=", "not like":
case sqlparser.NotEqualStr, sqlparser.NotLikeStr:
// Not elastic.NewTermQuery to support partial word match for String custom search attributes.
query = elastic.NewBoolQuery().MustNot(elastic.NewMatchQuery(colName, colValues[0]))
case "in":
case sqlparser.InStr:
query = elastic.NewTermsQuery(colName, colValues...)
case "not in":
case sqlparser.NotInStr:
query = elastic.NewBoolQuery().MustNot(elastic.NewTermsQuery(colName, colValues...))
case sqlparser.StartsWithStr:
v, ok := colValues[0].(string)
if !ok {
return nil, NewConverterError("right-hand side of '%v' must be a string", comparisonExpr.Operator)
}
query = elastic.NewPrefixQuery(colName, v)
case sqlparser.NotStartsWithStr:
v, ok := colValues[0].(string)
if !ok {
return nil, NewConverterError("right-hand side of '%v' must be a string", comparisonExpr.Operator)
}
query = elastic.NewBoolQuery().MustNot(elastic.NewPrefixQuery(colName, v))
}

return query, nil
Expand Down
36 changes: 36 additions & 0 deletions common/persistence/visibility/store/sql/query_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ var (
sqlparser.GreaterEqualStr,
sqlparser.InStr,
sqlparser.NotInStr,
sqlparser.StartsWithStr,
sqlparser.NotStartsWithStr,
}

supportedKeyworkListOperators = []string{
Expand Down Expand Up @@ -372,6 +374,7 @@ func (c *QueryConverter) convertComparisonExpr(exprRef *sqlparser.Expr) error {
if err != nil {
return err
}

switch saColNameExpr.valueType {
case enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST:
newExpr, err := c.convertKeywordListComparisonExpr(expr)
Expand All @@ -386,6 +389,27 @@ func (c *QueryConverter) convertComparisonExpr(exprRef *sqlparser.Expr) error {
}
*exprRef = newExpr
}

switch expr.Operator {
case sqlparser.StartsWithStr, sqlparser.NotStartsWithStr:
valueExpr, ok := expr.Right.(*unsafeSQLString)
if !ok {
return query.NewConverterError(
"%s: right-hand side of '%s' must be a literal string (got: %v)",
query.InvalidExpressionErrMessage,
expr.Operator,
sqlparser.String(expr.Right),
)
}
if expr.Operator == sqlparser.StartsWithStr {
expr.Operator = sqlparser.LikeStr
} else {
expr.Operator = sqlparser.NotLikeStr
}
expr.Escape = newUnsafeSQLString(`\`)
valueExpr.Val = escapeLikeValueForPrefixSearch(valueExpr.Val)
}

return nil
}

Expand Down Expand Up @@ -650,6 +674,18 @@ func (c *QueryConverter) convertIsExpr(exprRef *sqlparser.Expr) error {
return nil
}

func escapeLikeValueForPrefixSearch(in string) string {
sb := strings.Builder{}
for _, c := range in {
if c == '%' || c == '_' {
sb.WriteByte('\\')
}
sb.WriteRune(c)
}
sb.WriteByte('%')
return sb.String()
}

func isSupportedOperator(supportedOperators []string, operator string) bool {
for _, op := range supportedOperators {
if operator == op {
Expand Down
32 changes: 32 additions & 0 deletions common/persistence/visibility/store/sql/query_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,38 @@ func (s *queryConverterSuite) TestConvertComparisonExpr() {
output: "Keyword01 not in ('foo', 'bar')",
err: nil,
},
{
name: "starts_with expression",
input: "AliasForKeyword01 starts_with 'foo_bar%'",
output: `Keyword01 like 'foo\_bar\%%' escape '\'`,
err: nil,
},
{
name: "not starts_with expression",
input: "AliasForKeyword01 not starts_with 'foo_bar%'",
output: `Keyword01 not like 'foo\_bar\%%' escape '\'`,
err: nil,
},
{
name: "starts_with expression error",
input: "AliasForKeyword01 starts_with 123",
output: "",
err: query.NewConverterError(
"%s: right-hand side of '%s' must be a literal string (got: 123)",
query.InvalidExpressionErrMessage,
sqlparser.StartsWithStr,
),
},
{
name: "not starts_with expression error",
input: "AliasForKeyword01 not starts_with 123",
output: "",
err: query.NewConverterError(
"%s: right-hand side of '%s' must be a literal string (got: 123)",
query.InvalidExpressionErrMessage,
sqlparser.NotStartsWithStr,
),
},
{
name: "like expression",
input: "AliasForKeyword01 like 'foo%'",
Expand Down
120 changes: 74 additions & 46 deletions tests/advanced_visibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,59 +540,87 @@ func (s *advancedVisibilitySuite) TestListWorkflow_KeywordQuery() {
s.NoError(err)
s.Len(resp.GetExecutions(), 0)

if s.isElasticsearchEnabled {
// LIKE is supported on Elasticsearch only.
// Prefix search
listRequest = &workflowservice.ListWorkflowExecutionsRequest{
Namespace: s.namespace,
PageSize: defaultPageSize,
Query: `CustomKeywordField STARTS_WITH "justice"`,
}
resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)
s.NoError(err)
s.Len(resp.GetExecutions(), 1)
}

// LIKE exact match on Keyword (supported)
listRequest = &workflowservice.ListWorkflowExecutionsRequest{
Namespace: s.namespace,
PageSize: defaultPageSize,
Query: `CustomKeywordField LIKE "%justice for all%"`,
}
resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)
s.NoError(err)
s.Len(resp.GetExecutions(), 1)
func (s *advancedVisibilitySuite) TestListWorkflow_LikeQuery() {
if !s.isElasticsearchEnabled {
s.T().Skip("Like is only supported with Elasticsearch")
}

// LIKE %word% on Keyword (not supported)
listRequest = &workflowservice.ListWorkflowExecutionsRequest{
Namespace: s.namespace,
PageSize: defaultPageSize,
Query: `CustomKeywordField LIKE "%justice%"`,
}
resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)
s.NoError(err)
s.Len(resp.GetExecutions(), 0)
id := "es-functional-list-workflow-keyword-query-like-test"
wt := "es-functional-list-workflow-keyword-query-like-test-type"
tl := "es-functional-list-workflow-keyword-query-like-test-taskqueue"
request := s.createStartWorkflowExecutionRequest(id, wt, tl)

// LIKE %chars% on Keyword (not supported)
listRequest = &workflowservice.ListWorkflowExecutionsRequest{
Namespace: s.namespace,
PageSize: defaultPageSize,
Query: `CustomKeywordField LIKE "%ice%"`,
}
resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)
s.NoError(err)
s.Len(resp.GetExecutions(), 0)
searchAttr := &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
"CustomKeywordField": payload.EncodeString("justice for all"),
},
}
request.SearchAttributes = searchAttr
_, err := s.engine.StartWorkflowExecution(NewContext(), request)
s.NoError(err)

// LIKE NOT %chars% on Keyword (not supported)
listRequest = &workflowservice.ListWorkflowExecutionsRequest{
Namespace: s.namespace,
PageSize: defaultPageSize,
Query: `CustomKeywordField NOT LIKE "%ice%"`,
}
resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)
time.Sleep(waitForESToSettle)

// LIKE exact match on Keyword (supported)
listRequest := &workflowservice.ListWorkflowExecutionsRequest{
Namespace: s.namespace,
PageSize: defaultPageSize,
Query: `CustomKeywordField LIKE "%justice for all%"`,
}
resp, err := s.engine.ListWorkflowExecutions(NewContext(), listRequest)
s.NoError(err)
s.Len(resp.GetExecutions(), 1)

// LIKE %word% on Keyword (not supported)
listRequest = &workflowservice.ListWorkflowExecutionsRequest{
Namespace: s.namespace,
PageSize: defaultPageSize,
Query: `CustomKeywordField LIKE "%justice%"`,
}
resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)
s.NoError(err)
s.Len(resp.GetExecutions(), 0)

// LIKE %chars% on Keyword (not supported)
listRequest = &workflowservice.ListWorkflowExecutionsRequest{
Namespace: s.namespace,
PageSize: defaultPageSize,
Query: `CustomKeywordField LIKE "%ice%"`,
}
resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)
s.NoError(err)
s.Len(resp.GetExecutions(), 0)

// LIKE NOT %chars% on Keyword (not supported)
listRequest = &workflowservice.ListWorkflowExecutionsRequest{
Namespace: s.namespace,
PageSize: defaultPageSize,
Query: `CustomKeywordField NOT LIKE "%ice%"`,
}
resp, err = s.engine.ListWorkflowExecutions(NewContext(), listRequest)
s.NoError(err)
executionCount := 0
for _, execution := range resp.GetExecutions() {
saPayload := execution.SearchAttributes.GetIndexedFields()["CustomKeywordField"]
var saValue string
err = payload.Decode(saPayload, &saValue)
s.NoError(err)
executionCount := 0
for _, execution := range resp.GetExecutions() {
saPayload := execution.SearchAttributes.GetIndexedFields()["CustomKeywordField"]
var saValue string
err = payload.Decode(saPayload, &saValue)
s.NoError(err)
if strings.Contains(saValue, "ice") {
executionCount++ // execution will be found because NOT LIKE is not supported.
}
if strings.Contains(saValue, "ice") {
executionCount++ // execution will be found because NOT LIKE is not supported.
}
s.Equal(executionCount, 1)
}
s.Equal(executionCount, 1)
}

func (s *advancedVisibilitySuite) TestListWorkflow_StringQuery() {
Expand Down

0 comments on commit 52d4557

Please sign in to comment.