diff --git a/config/dynamicconfig/development_es.yaml b/config/dynamicconfig/development_es.yaml index 3131b24c915..310d7f47a79 100644 --- a/config/dynamicconfig/development_es.yaml +++ b/config/dynamicconfig/development_es.yaml @@ -2,6 +2,8 @@ frontend.enableClientVersionCheck: - value: true system.advancedVisibilityWritingMode: - value: "on" +system.advancedVisibilityMigrationWritingMode: + - value: "source" system.enableReadVisibilityFromES: - value: true frontend.validSearchAttributes: @@ -43,4 +45,3 @@ system.minRetentionDays: history.EnableConsistentQueryByDomain: - value: true constraints: {} - diff --git a/service/worker/indexer/esProcessor.go b/service/worker/indexer/esProcessor.go index ea59e5b1257..bbd937c6384 100644 --- a/service/worker/indexer/esProcessor.go +++ b/service/worker/indexer/esProcessor.go @@ -48,7 +48,7 @@ const ( type ( // ESProcessorImpl implements ESProcessor, it's an agent of GenericBulkProcessor ESProcessorImpl struct { - bulkProcessor []bulk.GenericBulkProcessor + bulkProcessor bulk.GenericBulkProcessor mapToKafkaMsg collection.ConcurrentTxMap // used to map ES request to kafka message config *Config logger log.Logger @@ -92,67 +92,18 @@ func newESProcessor( return nil, err } - p.bulkProcessor = []bulk.GenericBulkProcessor{processor} - p.mapToKafkaMsg = collection.NewShardedConcurrentTxMap(1024, p.hashFn) - return p, nil -} - -// newESDualProcessor creates new ESDualProcessor which handles double writes to dual visibility clients -func newESDualProcessor( - name string, - config *Config, - esclient es.GenericClient, - osclient es.GenericClient, - logger log.Logger, - metricsClient metrics.Client, -) (*ESProcessorImpl, error) { - p := &ESProcessorImpl{ - config: config, - logger: logger.WithTags(tag.ComponentIndexerESProcessor), - scope: metricsClient.Scope(metrics.ESProcessorScope), - msgEncoder: defaultEncoder, - } - - params := &bulk.BulkProcessorParameters{ - Name: name, - NumOfWorkers: config.ESProcessorNumOfWorkers(), - BulkActions: config.ESProcessorBulkActions(), - BulkSize: config.ESProcessorBulkSize(), - FlushInterval: config.ESProcessorFlushInterval(), - Backoff: bulk.NewExponentialBackoff(esProcessorInitialRetryInterval, esProcessorMaxRetryInterval), - BeforeFunc: p.bulkBeforeAction, - AfterFunc: p.bulkAfterAction, - } - esprocessor, err := esclient.RunBulkProcessor(context.Background(), params) - if err != nil { - return nil, err - } - - // for the sceondary processor, we use shadow bulk after func which only logs errors and not ack/nack messages - // the primary processor will be the source of truth - shadowParams := *params - shadowParams.AfterFunc = p.shadowBulkAfterAction - osprocessor, err := osclient.RunBulkProcessor(context.Background(), &shadowParams) - if err != nil { - return nil, err - } - - p.bulkProcessor = []bulk.GenericBulkProcessor{esprocessor, osprocessor} + p.bulkProcessor = processor p.mapToKafkaMsg = collection.NewShardedConcurrentTxMap(1024, p.hashFn) return p, nil } func (p *ESProcessorImpl) Start() { // current implementation (v6 and v7) allows to invoke Start() multiple times - for _, processor := range p.bulkProcessor { - processor.Start(context.Background()) - } + p.bulkProcessor.Start(context.Background()) } func (p *ESProcessorImpl) Stop() { - for _, processor := range p.bulkProcessor { - processor.Stop() //nolint:errcheck - } + p.bulkProcessor.Stop() //nolint:errcheck p.mapToKafkaMsg = nil } @@ -167,9 +118,7 @@ func (p *ESProcessorImpl) Add(request *bulk.GenericBulkableAddRequest, key strin if isDup { return } - for _, processor := range p.bulkProcessor { - processor.Add(request) - } + p.bulkProcessor.Add(request) } // bulkBeforeAction is triggered before bulk bulkProcessor commit @@ -254,40 +203,6 @@ func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulka } } -// shadowBulkAfterAction is triggered after bulk bulkProcessor commit -func (p *ESProcessorImpl) shadowBulkAfterAction(id int64, requests []bulk.GenericBulkableRequest, response *bulk.GenericBulkResponse, err *bulk.GenericError) { - if err != nil { - // This happens after configured retry, which means something bad happens on cluster or index - // When cluster back to live, bulkProcessor will re-commit those failure requests - p.logger.Error("Error commit bulk request in secondary processor.", tag.Error(err.Details)) - - for _, request := range requests { - p.logger.Error("ES request failed in secondary processor", - tag.ESResponseStatus(err.Status), - tag.ESRequest(request.String())) - } - return - } - responseItems := response.Items - for i := 0; i < len(requests) && i < len(responseItems); i++ { - key := p.retrieveKafkaKey(requests[i]) - if key == "" { - continue - } - responseItem := responseItems[i] - // It is possible for err to be nil while the responses in response.Items might still contain errors or unsuccessful statuses for individual requests. - // This is because the err variable refers to the overall bulk request operation, but each individual request in the bulk operation has its own status code. - for _, resp := range responseItem { - if !isResponseSuccess(resp.Status) { - wid, rid, domainID := p.getMsgWithInfo(key) - p.logger.Error("ES request failed in secondary processor", - tag.ESResponseStatus(resp.Status), tag.ESResponseError(getErrorMsgFromESResp(resp)), tag.WorkflowID(wid), tag.WorkflowRunID(rid), - tag.WorkflowDomainID(domainID)) - } - } - } -} - func (p *ESProcessorImpl) ackKafkaMsg(key string) { p.ackKafkaMsgHelper(key, false) } diff --git a/service/worker/indexer/esProcessor_test.go b/service/worker/indexer/esProcessor_test.go index 6476eb5f6d4..866a42746f0 100644 --- a/service/worker/indexer/esProcessor_test.go +++ b/service/worker/indexer/esProcessor_test.go @@ -86,7 +86,7 @@ func (s *esProcessorSuite) SetupTest() { msgEncoder: defaultEncoder, } p.mapToKafkaMsg = collection.NewShardedConcurrentTxMap(1024, p.hashFn) - p.bulkProcessor = []bulk.GenericBulkProcessor{s.mockBulkProcessor} + p.bulkProcessor = s.mockBulkProcessor s.esProcessor = p @@ -526,18 +526,12 @@ func (s *esProcessorSuite) TestBulkAfterAction_Nack_Shadow_WithError() { mapVal := newKafkaMessageWithMetrics(mockKafkaMsg, &testStopWatch) s.esProcessor.mapToKafkaMsg.Put(testKey, mapVal) - // Add mocked secondary processor - secondaryProcessor := &mocks2.GenericBulkProcessor{} - s.esProcessor.bulkProcessor = append(s.esProcessor.bulkProcessor, secondaryProcessor) - // Mock Kafka message Nack and Value mockKafkaMsg.On("Nack").Return(nil).Once() mockKafkaMsg.On("Value").Return(payload).Once() s.mockScope.On("IncCounter", mock.AnythingOfType("int")).Return() // Execute bulkAfterAction for primary processor with error s.esProcessor.bulkAfterAction(0, requests, response, mockErr) - // Mocking secondary processor to test shadowBulkAfterAction with error - s.esProcessor.shadowBulkAfterAction(0, requests, response, mockErr) } func (s *esProcessorSuite) TestBulkAfterAction_Shadow_Fail_WithoutError() { @@ -572,16 +566,10 @@ func (s *esProcessorSuite) TestBulkAfterAction_Shadow_Fail_WithoutError() { mapVal := newKafkaMessageWithMetrics(mockKafkaMsg, &testStopWatch) s.esProcessor.mapToKafkaMsg.Put(testKey, mapVal) - // Add mocked secondary processor - secondaryProcessor := &mocks2.GenericBulkProcessor{} - s.esProcessor.bulkProcessor = append(s.esProcessor.bulkProcessor, secondaryProcessor) - // Mock Kafka message Nack and Value mockKafkaMsg.On("Nack").Return(nil).Once() mockKafkaMsg.On("Value").Return(payload).Once() s.mockScope.On("IncCounter", mock.AnythingOfType("int")).Return() // Execute bulkAfterAction for primary processor with error s.esProcessor.bulkAfterAction(0, requests, response, nil) - // Mocking secondary processor to test shadowBulkAfterAction with error - s.esProcessor.shadowBulkAfterAction(0, requests, response, nil) } diff --git a/service/worker/indexer/indexer.go b/service/worker/indexer/indexer.go index d645911c8e9..5fb3411ace0 100644 --- a/service/worker/indexer/indexer.go +++ b/service/worker/indexer/indexer.go @@ -74,6 +74,11 @@ type ( shutdownCh chan struct{} } + DualIndexer struct { + SourceIndexer *Indexer + DestIndexer *Indexer + } + // Config contains all configs for indexer Config struct { IndexerConcurrency dynamicconfig.IntPropertyFn diff --git a/service/worker/indexer/indexer_test.go b/service/worker/indexer/indexer_test.go index 5b715040b5b..ee0783d5a0e 100644 --- a/service/worker/indexer/indexer_test.go +++ b/service/worker/indexer/indexer_test.go @@ -61,8 +61,9 @@ func TestNewDualIndexer(t *testing.T) { mockMessagingClient := messaging.NewMockClient(ctrl) mockMessagingClient.EXPECT().NewConsumer("visibility", "test-bulkProcessor-consumer").Return(nil, nil).Times(1) + mockMessagingClient.EXPECT().NewConsumer("visibility", "test-bulkProcessor-os-consumer").Return(nil, nil).Times(1) - indexer := NewMigrationIndexer(config, mockMessagingClient, mockESClient, mockESClient, processorName, testlogger.New(t), metrics.NewNoopMetricsClient()) + indexer := NewMigrationDualIndexer(config, mockMessagingClient, mockESClient, mockESClient, processorName, processorName, testlogger.New(t), metrics.NewNoopMetricsClient()) assert.NotNil(t, indexer) } diff --git a/service/worker/indexer/migration_indexer.go b/service/worker/indexer/migration_indexer.go index f38cfa2ba0a..0ca840e9e35 100644 --- a/service/worker/indexer/migration_indexer.go +++ b/service/worker/indexer/migration_indexer.go @@ -29,31 +29,32 @@ import ( "github.com/uber/cadence/common/metrics" ) -// NewMigrationIndexer create a new Indexer that can index to both ES and OS -func NewMigrationIndexer( - config *Config, +// NewMigrationDualIndexer create a new Indexer that will be used during visibility migration +// When migrate from ES to OS, we will have this indexer to index to both ES and OS +func NewMigrationDualIndexer(config *Config, client messaging.Client, primaryClient es.GenericClient, secondaryClient es.GenericClient, - visibilityName string, + primaryVisibilityName string, + secondaryVisibilityName string, logger log.Logger, - metricsClient metrics.Client, -) *Indexer { + metricsClient metrics.Client) *DualIndexer { + logger = logger.WithTags(tag.ComponentIndexer) - visibilityProcessor, err := newESDualProcessor(processorName, config, primaryClient, secondaryClient, logger, metricsClient) + visibilityProcessor, err := newESProcessor(processorName, config, primaryClient, logger, metricsClient) if err != nil { logger.Fatal("Index ES processor state changed", tag.LifeCycleStartFailed, tag.Error(err)) } - consumer, err := client.NewConsumer(common.VisibilityAppName, getConsumerName(visibilityName)) + consumer, err := client.NewConsumer(common.VisibilityAppName, getConsumerName(primaryVisibilityName)) if err != nil { logger.Fatal("Index consumer state changed", tag.LifeCycleStartFailed, tag.Error(err)) } - return &Indexer{ + sourceIndexer := &Indexer{ config: config, - esIndexName: visibilityName, + esIndexName: primaryVisibilityName, consumer: consumer, logger: logger.WithTags(tag.ComponentIndexerProcessor), scope: metricsClient.Scope(metrics.IndexProcessorScope), @@ -61,4 +62,30 @@ func NewMigrationIndexer( visibilityProcessor: visibilityProcessor, msgEncoder: defaultEncoder, } + + secondaryVisibilityProcessor, err := newESProcessor(processorName, config, secondaryClient, logger, metricsClient) + if err != nil { + logger.Fatal("Secondary Index ES processor state changed", tag.LifeCycleStartFailed, tag.Error(err)) + } + + secondaryConsumer, err := client.NewConsumer(common.VisibilityAppName, getConsumerName(secondaryVisibilityName+"-os")) + if err != nil { + logger.Fatal("Secondary Index consumer state changed", tag.LifeCycleStartFailed, tag.Error(err)) + } + + destIndexer := &Indexer{ + config: config, + esIndexName: secondaryVisibilityName, + consumer: secondaryConsumer, + logger: logger.WithTags(tag.ComponentIndexerProcessor), + scope: metricsClient.Scope(metrics.IndexProcessorScope), + shutdownCh: make(chan struct{}), + visibilityProcessor: secondaryVisibilityProcessor, + msgEncoder: defaultEncoder, + } + + return &DualIndexer{ + SourceIndexer: sourceIndexer, + DestIndexer: destIndexer, + } } diff --git a/service/worker/service.go b/service/worker/service.go index 210c882f282..ae613f12a68 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -221,7 +221,7 @@ func (s *Service) Start() { s.startFixerWorkflowWorker() if s.config.IndexerCfg != nil { if shouldStartMigrationIndexer(s.params) { - s.startMigrationIndexer() + s.startMigrationDualIndexer() } else { s.startIndexer() } @@ -394,19 +394,25 @@ func (s *Service) startIndexer() { } } -func (s *Service) startMigrationIndexer() { - visibilityIndexer := indexer.NewMigrationIndexer( +func (s *Service) startMigrationDualIndexer() { + visibilityDualIndexer := indexer.NewMigrationDualIndexer( s.config.IndexerCfg, s.GetMessagingClient(), s.params.ESClient, s.params.OSClient, s.params.ESConfig.Indices[common.VisibilityAppName], + s.params.OSConfig.Indices[common.VisibilityAppName], s.GetLogger(), s.GetMetricsClient(), ) - if err := visibilityIndexer.Start(); err != nil { - visibilityIndexer.Stop() - s.GetLogger().Fatal("fail to start indexer", tag.Error(err)) + if err := visibilityDualIndexer.SourceIndexer.Start(); err != nil { + visibilityDualIndexer.SourceIndexer.Stop() + s.GetLogger().Fatal("fail to start source indexer", tag.Error(err)) + } + + if err := visibilityDualIndexer.DestIndexer.Start(); err != nil { + visibilityDualIndexer.DestIndexer.Stop() + s.GetLogger().Fatal("fail to start dest indexer", tag.Error(err)) } }