Skip to content

Commit

Permalink
Refactor matching and split the files into subpackages
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed May 9, 2024
1 parent 2fef3c3 commit 4dda545
Show file tree
Hide file tree
Showing 37 changed files with 1,763 additions and 1,304 deletions.
1 change: 1 addition & 0 deletions cmd/server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ require (
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/emirpasic/gods v0.0.0-20190624094223-e689965507ab // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/fatih/structtag v1.2.0 // indirect
github.com/go-zookeeper/zk v1.0.3 // indirect
Expand Down
1 change: 1 addition & 0 deletions cmd/server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/emirpasic/gods v0.0.0-20190624094223-e689965507ab h1:eTc1vwMHNg4WtS95PtYi3FFCKwlPjtN/Lw9IALTRtd8=
github.com/emirpasic/gods v0.0.0-20190624094223-e689965507ab/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down
1 change: 1 addition & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ ignore:
- "service/history/workflow/errors.go"
- "service/history/service.go"
- "service/matching/service.go"
- "service/matching/tasklist/testing.go"
- "service/worker/service.go"
- "testflags/**"
- "tools/common/schema/test/**"
Expand Down
26 changes: 13 additions & 13 deletions host/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/matching"
"github.com/uber/cadence/service/matching/tasklist"
)

func (s *IntegrationSuite) TestActivityHeartBeatWorkflow_Success() {
Expand Down Expand Up @@ -142,10 +142,10 @@ func (s *IntegrationSuite) TestActivityHeartBeatWorkflow_Success() {
}

_, err := poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil || err == matching.ErrNoTasks)
s.True(err == nil || err == tasklist.ErrNoTasks)

err = poller.PollAndProcessActivityTask(false)
s.True(err == nil || err == matching.ErrNoTasks)
s.True(err == nil || err == tasklist.ErrNoTasks)

s.Logger.Info("Waiting for workflow to complete", tag.WorkflowRunID(we.RunID))

Expand Down Expand Up @@ -516,7 +516,7 @@ func (s *IntegrationSuite) TestActivityRetry() {
s.True(err == nil, err)

err = poller.PollAndProcessActivityTask(false)
s.True(err == nil || err == matching.ErrNoTasks, err)
s.True(err == nil || err == tasklist.ErrNoTasks, err)

descResp, err := describeWorkflowExecution()
s.Nil(err)
Expand All @@ -530,7 +530,7 @@ func (s *IntegrationSuite) TestActivityRetry() {
}

err = poller2.PollAndProcessActivityTask(false)
s.True(err == nil || err == matching.ErrNoTasks, err)
s.True(err == nil || err == tasklist.ErrNoTasks, err)

descResp, err = describeWorkflowExecution()
s.Nil(err)
Expand Down Expand Up @@ -659,7 +659,7 @@ func (s *IntegrationSuite) TestActivityHeartBeatWorkflow_Timeout() {
}

_, err := poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil || err == matching.ErrNoTasks)
s.True(err == nil || err == tasklist.ErrNoTasks)

err = poller.PollAndProcessActivityTask(false)
s.Error(err)
Expand Down Expand Up @@ -894,7 +894,7 @@ func (s *IntegrationSuite) TestActivityTimeouts() {
}

_, err := poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil || err == matching.ErrNoTasks)
s.True(err == nil || err == tasklist.ErrNoTasks)

for i := 0; i < 3; i++ {
go func() {
Expand Down Expand Up @@ -1083,7 +1083,7 @@ func (s *IntegrationSuite) TestActivityHeartbeatTimeouts() {
}

_, err := poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil || err == matching.ErrNoTasks)
s.True(err == nil || err == tasklist.ErrNoTasks)

for i := 0; i < activityCount; i++ {
go func() {
Expand Down Expand Up @@ -1224,7 +1224,7 @@ func (s *IntegrationSuite) TestActivityCancellation() {
}

_, err := poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil || err == matching.ErrNoTasks, err)
s.True(err == nil || err == tasklist.ErrNoTasks, err)

cancelCh := make(chan struct{})

Expand All @@ -1233,12 +1233,12 @@ func (s *IntegrationSuite) TestActivityCancellation() {
scheduleActivity = false
requestCancellation = true
_, err := poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil || err == matching.ErrNoTasks, err)
s.True(err == nil || err == tasklist.ErrNoTasks, err)
cancelCh <- struct{}{}
}()

err = poller.PollAndProcessActivityTask(false)
s.True(err == nil || err == matching.ErrNoTasks, err)
s.True(err == nil || err == tasklist.ErrNoTasks, err)

<-cancelCh
s.Logger.Info("Waiting for workflow to complete", tag.WorkflowRunID(we.RunID))
Expand Down Expand Up @@ -1340,7 +1340,7 @@ func (s *IntegrationSuite) TestActivityCancellationNotStarted() {
}

_, err := poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil || err == matching.ErrNoTasks)
s.True(err == nil || err == tasklist.ErrNoTasks)

// Send signal so that worker can send an activity cancel
signalName := "my signal"
Expand Down Expand Up @@ -1368,5 +1368,5 @@ func (s *IntegrationSuite) TestActivityCancellationNotStarted() {
scheduleActivity = false
requestCancellation = false
_, err = poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil || err == matching.ErrNoTasks)
s.True(err == nil || err == tasklist.ErrNoTasks)
}
6 changes: 3 additions & 3 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/engine/engineimpl"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/matching"
"github.com/uber/cadence/service/matching/tasklist"
)

func TestIntegrationSuite(t *testing.T) {
Expand Down Expand Up @@ -924,11 +924,11 @@ func (s *IntegrationSuite) TestDecisionAndActivityTimeoutsWorkflow() {
history := historyResponse.History
common.PrettyPrintHistory(history, s.Logger)
}
s.True(err == nil || err == matching.ErrNoTasks, "%v", err)
s.True(err == nil || err == tasklist.ErrNoTasks, "%v", err)
if !dropDecisionTask {
s.Logger.Info("Calling Activity Task: %d", tag.Counter(i))
err = poller.PollAndProcessActivityTask(i%4 == 0)
s.True(err == nil || err == matching.ErrNoTasks)
s.True(err == nil || err == tasklist.ErrNoTasks)
}
}

Expand Down
8 changes: 4 additions & 4 deletions host/taskpoller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/matching"
"github.com/uber/cadence/service/matching/tasklist"
)

type (
Expand Down Expand Up @@ -289,7 +289,7 @@ Loop:
return false, newTask, err
}

return false, nil, matching.ErrNoTasks
return false, nil, tasklist.ErrNoTasks
}

// HandlePartialDecision for decision task
Expand Down Expand Up @@ -411,7 +411,7 @@ func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error {
return taskErr
}

return matching.ErrNoTasks
return tasklist.ErrNoTasks
}

// PollAndProcessActivityTaskWithID is similar to PollAndProcessActivityTask but using RespondActivityTask...ByID
Expand Down Expand Up @@ -490,7 +490,7 @@ func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error {
return taskErr
}

return matching.ErrNoTasks
return tasklist.ErrNoTasks
}

func createContext() (context.Context, context.CancelFunc) {
Expand Down
8 changes: 4 additions & 4 deletions host/workflowsidinternalratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
"github.com/uber/cadence/common/persistence"
pt "github.com/uber/cadence/common/persistence/persistence-tests"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/matching"
"github.com/uber/cadence/service/matching/tasklist"
)

func TestWorkflowIDInternalRateLimitIntegrationSuite(t *testing.T) {
Expand Down Expand Up @@ -197,17 +197,17 @@ func (s *WorkflowIDInternalRateLimitIntegrationSuite) TestWorkflowIDSpecificInte

for i := int32(0); i < activityCount; i++ {
_, err = poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil || err == matching.ErrNoTasks)
s.True(err == nil || err == tasklist.ErrNoTasks)

err = poller.PollAndProcessActivityTask(false)
s.True(err == nil || err == matching.ErrNoTasks)
s.True(err == nil || err == tasklist.ErrNoTasks)
}

s.Logger.Info("Waiting for workflow to complete", tag.WorkflowRunID(we.RunID))

s.False(workflowComplete)
_, err = poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil || err == matching.ErrNoTasks)
s.True(err == nil || err == tasklist.ErrNoTasks)
s.True(workflowComplete)

historyResponse, err := s.engine.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
Expand Down
90 changes: 12 additions & 78 deletions service/matching/config.go → service/matching/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package matching
package config

import (
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/dynamicconfig"
)

Expand Down Expand Up @@ -76,17 +74,20 @@ type (
AllIsolationGroups []string
// hostname info
HostName string
// rate limiter configuration
TaskDispatchRPS float64
TaskDispatchRPSTTL time.Duration
}

forwarderConfig struct {
ForwarderConfig struct {
ForwarderMaxOutstandingPolls func() int
ForwarderMaxOutstandingTasks func() int
ForwarderMaxRatePerSecond func() int
ForwarderMaxChildrenPerNode func() int
}

taskListConfig struct {
forwarderConfig
TaskListConfig struct {
ForwarderConfig
EnableSyncMatch func() bool
// Time to hold a poll request before returning an empty response if there are no tasks
LongPollExpirationInterval func() time.Duration
Expand All @@ -109,6 +110,9 @@ type (
AllIsolationGroups []string
// hostname
HostName string
// rate limiter configuration
TaskDispatchRPS float64
TaskDispatchRPSTTL time.Duration
}
)

Expand Down Expand Up @@ -147,6 +151,8 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config {
AllIsolationGroups: mapIGs(dc.GetListProperty(dynamicconfig.AllIsolationGroups)()),
AsyncTaskDispatchTimeout: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.AsyncTaskDispatchTimeout),
HostName: hostName,
TaskDispatchRPS: 100000.0,
TaskDispatchRPSTTL: time.Minute,
}
}

Expand All @@ -160,75 +166,3 @@ func mapIGs(in []interface{}) []string {
}
return allIsolationGroups
}

func newTaskListConfig(id *taskListID, config *Config, domainCache cache.DomainCache) (*taskListConfig, error) {
domainName, err := domainCache.GetDomainName(id.domainID)
if err != nil {
return nil, err
}

taskListName := id.name
taskType := id.taskType
return &taskListConfig{
RangeSize: config.RangeSize,
AllIsolationGroups: config.AllIsolationGroups,
EnableTasklistIsolation: func() bool {
return config.EnableTasklistIsolation(domainName)
},
ActivityTaskSyncMatchWaitTime: config.ActivityTaskSyncMatchWaitTime,
GetTasksBatchSize: func() int {
return config.GetTasksBatchSize(domainName, taskListName, taskType)
},
UpdateAckInterval: func() time.Duration {
return config.UpdateAckInterval(domainName, taskListName, taskType)
},
IdleTasklistCheckInterval: func() time.Duration {
return config.IdleTasklistCheckInterval(domainName, taskListName, taskType)
},
MaxTasklistIdleTime: func() time.Duration {
return config.MaxTasklistIdleTime(domainName, taskListName, taskType)
},
MinTaskThrottlingBurstSize: func() int {
return config.MinTaskThrottlingBurstSize(domainName, taskListName, taskType)
},
EnableSyncMatch: func() bool {
return config.EnableSyncMatch(domainName, taskListName, taskType)
},
LongPollExpirationInterval: func() time.Duration {
return config.LongPollExpirationInterval(domainName, taskListName, taskType)
},
MaxTaskDeleteBatchSize: func() int {
return config.MaxTaskDeleteBatchSize(domainName, taskListName, taskType)
},
OutstandingTaskAppendsThreshold: func() int {
return config.OutstandingTaskAppendsThreshold(domainName, taskListName, taskType)
},
MaxTaskBatchSize: func() int {
return config.MaxTaskBatchSize(domainName, taskListName, taskType)
},
NumWritePartitions: func() int {
return common.MaxInt(1, config.NumTasklistWritePartitions(domainName, taskListName, taskType))
},
NumReadPartitions: func() int {
return common.MaxInt(1, config.NumTasklistReadPartitions(domainName, taskListName, taskType))
},
AsyncTaskDispatchTimeout: func() time.Duration {
return config.AsyncTaskDispatchTimeout(domainName, taskListName, taskType)
},
forwarderConfig: forwarderConfig{
ForwarderMaxOutstandingPolls: func() int {
return config.ForwarderMaxOutstandingPolls(domainName, taskListName, taskType)
},
ForwarderMaxOutstandingTasks: func() int {
return config.ForwarderMaxOutstandingTasks(domainName, taskListName, taskType)
},
ForwarderMaxRatePerSecond: func() int {
return config.ForwarderMaxRatePerSecond(domainName, taskListName, taskType)
},
ForwarderMaxChildrenPerNode: func() int {
return common.MaxInt(1, config.ForwarderMaxChildrenPerNode(domainName, taskListName, taskType))
},
},
HostName: config.HostName,
}, nil
}
Loading

0 comments on commit 4dda545

Please sign in to comment.