Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dual indexer and separate consumers for OpenSearch migration #6559

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config/dynamicconfig/development_es.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ frontend.enableClientVersionCheck:
- value: true
system.advancedVisibilityWritingMode:
- value: "on"
system.advancedVisibilityMigrationWritingMode:
- value: "source"
system.enableReadVisibilityFromES:
- value: true
frontend.validSearchAttributes:
Expand Down Expand Up @@ -43,4 +45,3 @@ system.minRetentionDays:
history.EnableConsistentQueryByDomain:
- value: true
constraints: {}

95 changes: 5 additions & 90 deletions service/worker/indexer/esProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const (
type (
// ESProcessorImpl implements ESProcessor, it's an agent of GenericBulkProcessor
ESProcessorImpl struct {
bulkProcessor []bulk.GenericBulkProcessor
neil-xie marked this conversation as resolved.
Show resolved Hide resolved
bulkProcessor bulk.GenericBulkProcessor
mapToKafkaMsg collection.ConcurrentTxMap // used to map ES request to kafka message
config *Config
logger log.Logger
Expand Down Expand Up @@ -92,67 +92,18 @@ func newESProcessor(
return nil, err
}

p.bulkProcessor = []bulk.GenericBulkProcessor{processor}
p.mapToKafkaMsg = collection.NewShardedConcurrentTxMap(1024, p.hashFn)
return p, nil
}

// newESDualProcessor creates new ESDualProcessor which handles double writes to dual visibility clients
func newESDualProcessor(
name string,
config *Config,
esclient es.GenericClient,
osclient es.GenericClient,
logger log.Logger,
metricsClient metrics.Client,
) (*ESProcessorImpl, error) {
p := &ESProcessorImpl{
config: config,
logger: logger.WithTags(tag.ComponentIndexerESProcessor),
scope: metricsClient.Scope(metrics.ESProcessorScope),
msgEncoder: defaultEncoder,
}

params := &bulk.BulkProcessorParameters{
Name: name,
NumOfWorkers: config.ESProcessorNumOfWorkers(),
BulkActions: config.ESProcessorBulkActions(),
BulkSize: config.ESProcessorBulkSize(),
FlushInterval: config.ESProcessorFlushInterval(),
Backoff: bulk.NewExponentialBackoff(esProcessorInitialRetryInterval, esProcessorMaxRetryInterval),
BeforeFunc: p.bulkBeforeAction,
AfterFunc: p.bulkAfterAction,
}
esprocessor, err := esclient.RunBulkProcessor(context.Background(), params)
if err != nil {
return nil, err
}

// for the sceondary processor, we use shadow bulk after func which only logs errors and not ack/nack messages
// the primary processor will be the source of truth
shadowParams := *params
shadowParams.AfterFunc = p.shadowBulkAfterAction
osprocessor, err := osclient.RunBulkProcessor(context.Background(), &shadowParams)
if err != nil {
return nil, err
}

p.bulkProcessor = []bulk.GenericBulkProcessor{esprocessor, osprocessor}
p.bulkProcessor = processor
p.mapToKafkaMsg = collection.NewShardedConcurrentTxMap(1024, p.hashFn)
return p, nil
}

func (p *ESProcessorImpl) Start() {
// current implementation (v6 and v7) allows to invoke Start() multiple times
for _, processor := range p.bulkProcessor {
processor.Start(context.Background())
}
p.bulkProcessor.Start(context.Background())

}
func (p *ESProcessorImpl) Stop() {
for _, processor := range p.bulkProcessor {
processor.Stop() //nolint:errcheck
}
p.bulkProcessor.Stop() //nolint:errcheck
p.mapToKafkaMsg = nil
}

Expand All @@ -167,9 +118,7 @@ func (p *ESProcessorImpl) Add(request *bulk.GenericBulkableAddRequest, key strin
if isDup {
return
}
for _, processor := range p.bulkProcessor {
processor.Add(request)
}
p.bulkProcessor.Add(request)
}

// bulkBeforeAction is triggered before bulk bulkProcessor commit
Expand Down Expand Up @@ -254,40 +203,6 @@ func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulka
}
}

// shadowBulkAfterAction is triggered after bulk bulkProcessor commit
func (p *ESProcessorImpl) shadowBulkAfterAction(id int64, requests []bulk.GenericBulkableRequest, response *bulk.GenericBulkResponse, err *bulk.GenericError) {
if err != nil {
// This happens after configured retry, which means something bad happens on cluster or index
// When cluster back to live, bulkProcessor will re-commit those failure requests
p.logger.Error("Error commit bulk request in secondary processor.", tag.Error(err.Details))

for _, request := range requests {
p.logger.Error("ES request failed in secondary processor",
tag.ESResponseStatus(err.Status),
tag.ESRequest(request.String()))
}
return
}
responseItems := response.Items
for i := 0; i < len(requests) && i < len(responseItems); i++ {
key := p.retrieveKafkaKey(requests[i])
if key == "" {
continue
}
responseItem := responseItems[i]
// It is possible for err to be nil while the responses in response.Items might still contain errors or unsuccessful statuses for individual requests.
// This is because the err variable refers to the overall bulk request operation, but each individual request in the bulk operation has its own status code.
for _, resp := range responseItem {
if !isResponseSuccess(resp.Status) {
wid, rid, domainID := p.getMsgWithInfo(key)
p.logger.Error("ES request failed in secondary processor",
tag.ESResponseStatus(resp.Status), tag.ESResponseError(getErrorMsgFromESResp(resp)), tag.WorkflowID(wid), tag.WorkflowRunID(rid),
tag.WorkflowDomainID(domainID))
}
}
}
}

func (p *ESProcessorImpl) ackKafkaMsg(key string) {
p.ackKafkaMsgHelper(key, false)
}
Expand Down
14 changes: 1 addition & 13 deletions service/worker/indexer/esProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *esProcessorSuite) SetupTest() {
msgEncoder: defaultEncoder,
}
p.mapToKafkaMsg = collection.NewShardedConcurrentTxMap(1024, p.hashFn)
p.bulkProcessor = []bulk.GenericBulkProcessor{s.mockBulkProcessor}
p.bulkProcessor = s.mockBulkProcessor

s.esProcessor = p

Expand Down Expand Up @@ -526,18 +526,12 @@ func (s *esProcessorSuite) TestBulkAfterAction_Nack_Shadow_WithError() {
mapVal := newKafkaMessageWithMetrics(mockKafkaMsg, &testStopWatch)
s.esProcessor.mapToKafkaMsg.Put(testKey, mapVal)

// Add mocked secondary processor
secondaryProcessor := &mocks2.GenericBulkProcessor{}
s.esProcessor.bulkProcessor = append(s.esProcessor.bulkProcessor, secondaryProcessor)

// Mock Kafka message Nack and Value
mockKafkaMsg.On("Nack").Return(nil).Once()
mockKafkaMsg.On("Value").Return(payload).Once()
s.mockScope.On("IncCounter", mock.AnythingOfType("int")).Return()
// Execute bulkAfterAction for primary processor with error
s.esProcessor.bulkAfterAction(0, requests, response, mockErr)
// Mocking secondary processor to test shadowBulkAfterAction with error
s.esProcessor.shadowBulkAfterAction(0, requests, response, mockErr)
}

func (s *esProcessorSuite) TestBulkAfterAction_Shadow_Fail_WithoutError() {
Expand Down Expand Up @@ -572,16 +566,10 @@ func (s *esProcessorSuite) TestBulkAfterAction_Shadow_Fail_WithoutError() {
mapVal := newKafkaMessageWithMetrics(mockKafkaMsg, &testStopWatch)
s.esProcessor.mapToKafkaMsg.Put(testKey, mapVal)

// Add mocked secondary processor
secondaryProcessor := &mocks2.GenericBulkProcessor{}
s.esProcessor.bulkProcessor = append(s.esProcessor.bulkProcessor, secondaryProcessor)

// Mock Kafka message Nack and Value
mockKafkaMsg.On("Nack").Return(nil).Once()
mockKafkaMsg.On("Value").Return(payload).Once()
s.mockScope.On("IncCounter", mock.AnythingOfType("int")).Return()
// Execute bulkAfterAction for primary processor with error
s.esProcessor.bulkAfterAction(0, requests, response, nil)
// Mocking secondary processor to test shadowBulkAfterAction with error
s.esProcessor.shadowBulkAfterAction(0, requests, response, nil)
}
5 changes: 5 additions & 0 deletions service/worker/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ type (
shutdownCh chan struct{}
}

DualIndexer struct {
SourceIndexer *Indexer
DestIndexer *Indexer
}

// Config contains all configs for indexer
Config struct {
IndexerConcurrency dynamicconfig.IntPropertyFn
Expand Down
3 changes: 2 additions & 1 deletion service/worker/indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ func TestNewDualIndexer(t *testing.T) {

mockMessagingClient := messaging.NewMockClient(ctrl)
mockMessagingClient.EXPECT().NewConsumer("visibility", "test-bulkProcessor-consumer").Return(nil, nil).Times(1)
mockMessagingClient.EXPECT().NewConsumer("visibility", "test-bulkProcessor-os-consumer").Return(nil, nil).Times(1)

indexer := NewMigrationIndexer(config, mockMessagingClient, mockESClient, mockESClient, processorName, testlogger.New(t), metrics.NewNoopMetricsClient())
indexer := NewMigrationDualIndexer(config, mockMessagingClient, mockESClient, mockESClient, processorName, processorName, testlogger.New(t), metrics.NewNoopMetricsClient())
assert.NotNil(t, indexer)
}

Expand Down
47 changes: 37 additions & 10 deletions service/worker/indexer/migration_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,63 @@ import (
"github.com/uber/cadence/common/metrics"
)

// NewMigrationIndexer create a new Indexer that can index to both ES and OS
func NewMigrationIndexer(
config *Config,
// NewMigrationDualIndexer create a new Indexer that will be used during visibility migration
// When migrate from ES to OS, we will have this indexer to index to both ES and OS
func NewMigrationDualIndexer(config *Config,
client messaging.Client,
primaryClient es.GenericClient,
secondaryClient es.GenericClient,
visibilityName string,
primaryVisibilityName string,
secondaryVisibilityName string,
logger log.Logger,
metricsClient metrics.Client,
) *Indexer {
metricsClient metrics.Client) *DualIndexer {

logger = logger.WithTags(tag.ComponentIndexer)

visibilityProcessor, err := newESDualProcessor(processorName, config, primaryClient, secondaryClient, logger, metricsClient)
visibilityProcessor, err := newESProcessor(processorName, config, primaryClient, logger, metricsClient)
if err != nil {
logger.Fatal("Index ES processor state changed", tag.LifeCycleStartFailed, tag.Error(err))
}

consumer, err := client.NewConsumer(common.VisibilityAppName, getConsumerName(visibilityName))
consumer, err := client.NewConsumer(common.VisibilityAppName, getConsumerName(primaryVisibilityName))
if err != nil {
logger.Fatal("Index consumer state changed", tag.LifeCycleStartFailed, tag.Error(err))
}

return &Indexer{
sourceIndexer := &Indexer{
config: config,
esIndexName: visibilityName,
esIndexName: primaryVisibilityName,
consumer: consumer,
logger: logger.WithTags(tag.ComponentIndexerProcessor),
scope: metricsClient.Scope(metrics.IndexProcessorScope),
shutdownCh: make(chan struct{}),
visibilityProcessor: visibilityProcessor,
msgEncoder: defaultEncoder,
}

secondaryVisibilityProcessor, err := newESProcessor(processorName, config, secondaryClient, logger, metricsClient)
if err != nil {
logger.Fatal("Secondary Index ES processor state changed", tag.LifeCycleStartFailed, tag.Error(err))
}

secondaryConsumer, err := client.NewConsumer(common.VisibilityAppName, getConsumerName(secondaryVisibilityName+"-os"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be better to define consumer names in the config. secondaryVisibilityName+"-os" looks a bit hidden behavior

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah,I will update it. I am also not sure if OS should always use the name pattern *-os-consumer, since this can help when the migration is done and this extra consumer for OS will become the primary. Or we need to rename the consumer group to *-consumer to match the existing code.

if err != nil {
logger.Fatal("Secondary Index consumer state changed", tag.LifeCycleStartFailed, tag.Error(err))
}

destIndexer := &Indexer{
config: config,
esIndexName: secondaryVisibilityName,
consumer: secondaryConsumer,
logger: logger.WithTags(tag.ComponentIndexerProcessor),
scope: metricsClient.Scope(metrics.IndexProcessorScope),
shutdownCh: make(chan struct{}),
visibilityProcessor: secondaryVisibilityProcessor,
msgEncoder: defaultEncoder,
}

return &DualIndexer{
SourceIndexer: sourceIndexer,
DestIndexer: destIndexer,
}
}
18 changes: 12 additions & 6 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (s *Service) Start() {
s.startFixerWorkflowWorker()
if s.config.IndexerCfg != nil {
if shouldStartMigrationIndexer(s.params) {
s.startMigrationIndexer()
s.startMigrationDualIndexer()
} else {
s.startIndexer()
}
Expand Down Expand Up @@ -394,19 +394,25 @@ func (s *Service) startIndexer() {
}
}

func (s *Service) startMigrationIndexer() {
visibilityIndexer := indexer.NewMigrationIndexer(
func (s *Service) startMigrationDualIndexer() {
visibilityDualIndexer := indexer.NewMigrationDualIndexer(
s.config.IndexerCfg,
s.GetMessagingClient(),
s.params.ESClient,
s.params.OSClient,
s.params.ESConfig.Indices[common.VisibilityAppName],
s.params.OSConfig.Indices[common.VisibilityAppName],
s.GetLogger(),
s.GetMetricsClient(),
)
if err := visibilityIndexer.Start(); err != nil {
visibilityIndexer.Stop()
s.GetLogger().Fatal("fail to start indexer", tag.Error(err))
if err := visibilityDualIndexer.SourceIndexer.Start(); err != nil {
visibilityDualIndexer.SourceIndexer.Stop()
s.GetLogger().Fatal("fail to start source indexer", tag.Error(err))
}

if err := visibilityDualIndexer.DestIndexer.Start(); err != nil {
visibilityDualIndexer.DestIndexer.Stop()
s.GetLogger().Fatal("fail to start dest indexer", tag.Error(err))
Comment on lines +408 to +415
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

service doesn't need to know these source/dest indexers. the interface could stay the same from service perspective and the migration-dual-indexer can start/stop these

}
}

Expand Down
Loading