From 4f3b261191ef6cf77ea5d081e618b4d181063090 Mon Sep 17 00:00:00 2001 From: ALI OMAR Ismail <73675505+Ismail731404@users.noreply.github.com> Date: Fri, 15 Nov 2024 14:29:26 +0100 Subject: [PATCH] Support monthly index on timebased rollmode (#206) * Refactor rollmode handling to support timebased montly. * improve precision in findIndices calculation * Update myrtea-sdk to v5.1.2 * Update Rollmode type in model_handlers_test.go * Refactor rollmode field to support nested structure --- go.mod | 2 +- go.sum | 4 +- internals/coordinator/coordinator_test.go | 71 ++++++++++++++++++- internals/coordinator/instance.go | 6 +- internals/coordinator/logicalindex_cron.go | 2 +- .../coordinator/logicalindex_timebased.go | 37 ++++++++-- internals/handlers/model_handlers_test.go | 6 +- 7 files changed, 110 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index 0edd5e6..ca4236f 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/jmoiron/sqlx v1.4.0 github.com/json-iterator/go v1.1.12 github.com/lib/pq v1.10.9 - github.com/myrteametrics/myrtea-sdk/v5 v5.1.1 + github.com/myrteametrics/myrtea-sdk/v5 v5.1.2 github.com/prataprc/goparsec v0.0.0-20211219142520-daac0e635e7e github.com/prometheus/client_golang v1.20.2 github.com/robfig/cron/v3 v3.0.1 diff --git a/go.sum b/go.sum index b956652..e6cd5fb 100644 --- a/go.sum +++ b/go.sum @@ -210,8 +210,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/myrteametrics/myrtea-sdk/v5 v5.1.1 h1:U7PsPEAfIkVneWLybtaB3RhYYATHvtWaXlXnpSxwOb4= -github.com/myrteametrics/myrtea-sdk/v5 v5.1.1/go.mod h1:wJJ0R7p8VPtuqvJyYVl+eoxodkYL4IQ06e3zJnjiEIw= +github.com/myrteametrics/myrtea-sdk/v5 v5.1.2 h1:UWKDIF18n56eVbJJ1LXd5RaZ2E8WC1YPgO+VEj2b1cc= +github.com/myrteametrics/myrtea-sdk/v5 v5.1.2/go.mod h1:wJJ0R7p8VPtuqvJyYVl+eoxodkYL4IQ06e3zJnjiEIw= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= diff --git a/internals/coordinator/coordinator_test.go b/internals/coordinator/coordinator_test.go index 49f0c85..fe62afa 100644 --- a/internals/coordinator/coordinator_test.go +++ b/internals/coordinator/coordinator_test.go @@ -124,7 +124,8 @@ func TestCoordinator(t *testing.T) { ID: 1, Name: "myindex", ElasticsearchOptions: modeler.ElasticsearchOptions{ - Rollmode: "timebased", + Rollmode: modeler.RollmodeSettings{Type: modeler.RollmodeTimeBased, + Timebased: &modeler.TimebasedSettings{Interval: modeler.Daily}}, Rollcron: "0 * * * *", EnablePurge: true, PurgeMaxConcurrentIndices: 30, @@ -150,7 +151,8 @@ func TestPurge(t *testing.T) { elasticsearchsdk.ReplaceGlobals(elasticsearch.Config{Addresses: viper.GetStringSlice("ELASTICSEARCH_URLS")}) logicalIndex, err := NewLogicalIndexTimeBased("myrtea", modeler.Model{Name: "myindex", ElasticsearchOptions: modeler.ElasticsearchOptions{ - Rollmode: "timebased", + Rollmode: modeler.RollmodeSettings{Type: modeler.RollmodeTimeBased, + Timebased: &modeler.TimebasedSettings{Interval: modeler.Daily}}, Rollcron: "0 * * * *", EnablePurge: true, PurgeMaxConcurrentIndices: 30, @@ -162,3 +164,68 @@ func TestPurge(t *testing.T) { logicalIndex.purge() t.Fail() } + +func TestFindIndices(t *testing.T) { + refDate := time.Date(2024, 11, 1, 0, 0, 0, 0, time.UTC) + + tests := []struct { + name string + depthDays int64 + rollMode modeler.RollmodeType + interval modeler.IndexIntervalType + expectedIndices []string + liveIndices []string + }{ + { + name: "Daily interval", + depthDays: 10, + rollMode: modeler.RollmodeTimeBased, + interval: modeler.Daily, + liveIndices: []string{"my_index-2024-10-22", "my_index-2024-10-23", "my_index-2024-10-24", "my_index-2024-10-25", "my_index-2024-10-26", "my_index-2024-10-27", "my_index-2024-10-28", "my_index-2024-10-29", "my_index-2024-10-30", "my_index-2024-10-31", "my_index-2024-11-01"}, + expectedIndices: []string{"my_index-2024-10-22", "my_index-2024-10-23", "my_index-2024-10-24", "my_index-2024-10-25", "my_index-2024-10-26", "my_index-2024-10-27", "my_index-2024-10-28", "my_index-2024-10-29", "my_index-2024-10-30", "my_index-2024-10-31", "my_index-2024-11-01"}, + }, + { + name: "Monthly interval", + depthDays: 60, + rollMode: modeler.RollmodeTimeBased, + interval: modeler.Monthly, + liveIndices: []string{"my_index-2024-09", "my_index-2024-10", "my_index-2024-11"}, + expectedIndices: []string{"my_index-2024-09", "my_index-2024-10", "my_index-2024-11"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logicalIndex := LogicalIndexTimeBased{ + Name: "my_index", + Model: modeler.Model{ + ElasticsearchOptions: modeler.ElasticsearchOptions{ + Rollmode: modeler.RollmodeSettings{ + Type: tt.rollMode, + Timebased: &modeler.TimebasedSettings{ + Interval: tt.interval, + }, + }, + }, + }, + LiveIndices: tt.liveIndices, + } + + indices, err := logicalIndex.FindIndices(refDate, tt.depthDays) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if len(indices) != len(tt.expectedIndices) { + t.Fatalf("Expected %d indices, but got %d", len(tt.expectedIndices), len(indices)) + } + + for i := range indices { + if indices[i] != tt.expectedIndices[i] { + t.Errorf("Mismatch at index %d: expected %s, got %s", i, tt.expectedIndices[i], indices[i]) + } + } + + }) + } +} diff --git a/internals/coordinator/instance.go b/internals/coordinator/instance.go index 9c3174d..7d46702 100644 --- a/internals/coordinator/instance.go +++ b/internals/coordinator/instance.go @@ -40,10 +40,10 @@ func InitInstance(instanceName string, models map[int64]modeler.Model) error { for _, model := range models { var err error var logicalIndex LogicalIndex - switch model.ElasticsearchOptions.Rollmode { - case "cron": + switch model.ElasticsearchOptions.Rollmode.Type { + case modeler.RollmodeCron: logicalIndex, err = NewLogicalIndexCron(instance.Name, model) - case "timebased": + case modeler.RollmodeTimeBased: logicalIndex, err = NewLogicalIndexTimeBased(instance.Name, model) } if err != nil { diff --git a/internals/coordinator/logicalindex_cron.go b/internals/coordinator/logicalindex_cron.go index dbc7531..b7695bd 100644 --- a/internals/coordinator/logicalindex_cron.go +++ b/internals/coordinator/logicalindex_cron.go @@ -36,7 +36,7 @@ func NewLogicalIndexCron(instanceName string, model modeler.Model) (*LogicalInde zap.L().Info("Initialize logicalIndex (LogicalIndexCron)", zap.String("name", logicalIndexName), zap.String("model", model.Name), zap.Any("options", model.ElasticsearchOptions)) - if model.ElasticsearchOptions.Rollmode != "cron" { + if model.ElasticsearchOptions.Rollmode.Type != modeler.RollmodeCron { return nil, errors.New("invalid rollmode for this logicalIndex type") } diff --git a/internals/coordinator/logicalindex_timebased.go b/internals/coordinator/logicalindex_timebased.go index aea1231..f4e6dbf 100644 --- a/internals/coordinator/logicalindex_timebased.go +++ b/internals/coordinator/logicalindex_timebased.go @@ -31,7 +31,7 @@ func NewLogicalIndexTimeBased(instanceName string, model modeler.Model) (*Logica zap.L().Info("Initialize logicalIndex (LogicalIndexTimeBased)", zap.String("name", logicalIndexName), zap.String("model", model.Name), zap.Any("options", model.ElasticsearchOptions)) - if model.ElasticsearchOptions.Rollmode != "timebased" { + if model.ElasticsearchOptions.Rollmode.Type != modeler.RollmodeTimeBased { return nil, errors.New("invalid rollmode for this logicalIndex type") } @@ -59,6 +59,7 @@ func NewLogicalIndexTimeBased(instanceName string, model modeler.Model) (*Logica } logicalIndex.FetchIndices() + zap.L().Info("LogicalIndex initialized", zap.String("logicalIndex", logicalIndex.Name)) c := cron.New() _, err = c.AddFunc("*/30 * * * *", logicalIndex.FetchIndices) @@ -90,8 +91,18 @@ func (logicalIndex *LogicalIndexTimeBased) purge() { return } - tsStart := time.Now().Add(time.Duration(logicalIndex.Model.ElasticsearchOptions.PurgeMaxConcurrentIndices) * -1 * 24 * time.Hour) - indexStart := fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01-02")) + var indexStart string + var tsStart time.Time + + if logicalIndex.Model.ElasticsearchOptions.Rollmode.Timebased.Interval == modeler.Daily { + // Daily mode: calculation based on a number of days + tsStart = time.Now().Add(time.Duration(logicalIndex.Model.ElasticsearchOptions.PurgeMaxConcurrentIndices) * -1 * 24 * time.Hour) + indexStart = fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01-02")) + } else if logicalIndex.Model.ElasticsearchOptions.Rollmode.Timebased.Interval == modeler.Monthly { + // Monthly mode: calculation based on a number of months + tsStart = time.Now().AddDate(0, -logicalIndex.Model.ElasticsearchOptions.PurgeMaxConcurrentIndices, 0) + indexStart = fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01")) + } allIndices := logicalIndex.GetAllIndices() indices := make([]string, 0) @@ -168,9 +179,23 @@ func (logicalIndex *LogicalIndexTimeBased) GetAllIndices() []string { } func (logicalIndex *LogicalIndexTimeBased) FindIndices(t time.Time, depthDays int64) ([]string, error) { - tsStart := t.Add(time.Duration(depthDays) * -1 * 24 * time.Hour) - indexEnd := fmt.Sprintf("%s-%s", logicalIndex.Name, t.Format("2006-01-02")) - indexStart := fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01-02")) + var indexStart, indexEnd string + + if logicalIndex.Model.ElasticsearchOptions.Rollmode.Timebased.Interval == modeler.Daily { + tsStart := t.Add(time.Duration(-depthDays) * 24 * time.Hour) + indexEnd = fmt.Sprintf("%s-%s", logicalIndex.Name, t.Format("2006-01-02")) + indexStart = fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01-02")) + } else if logicalIndex.Model.ElasticsearchOptions.Rollmode.Timebased.Interval == modeler.Monthly { + // For a monthly interval, we calculate the starting point in months + tempDate := t.AddDate(0, 0, int(-depthDays)) + yearsDiff := t.Year() - tempDate.Year() + monthsDiff := int(t.Month()) - int(tempDate.Month()) + monthsToSubtract := yearsDiff*12 + monthsDiff + + tsStart := t.AddDate(0, -monthsToSubtract, 0) + indexEnd = fmt.Sprintf("%s-%s", logicalIndex.Name, t.Format("2006-01")) + indexStart = fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01")) + } indices := make([]string, 0) logicalIndex.mu.RLock() diff --git a/internals/handlers/model_handlers_test.go b/internals/handlers/model_handlers_test.go index 549036d..668fa94 100644 --- a/internals/handlers/model_handlers_test.go +++ b/internals/handlers/model_handlers_test.go @@ -43,7 +43,7 @@ func initModelRepository(t *testing.T) []modeler.Model { Synonyms: []string{"model1"}, Fields: fieldarray, ElasticsearchOptions: modeler.ElasticsearchOptions{ - Rollmode: "cron", + Rollmode: modeler.RollmodeSettings{Type: modeler.RollmodeCron}, Rollcron: "0 0 * * *", EnablePurge: true, PurgeMaxConcurrentIndices: 30, @@ -57,7 +57,7 @@ func initModelRepository(t *testing.T) []modeler.Model { Synonyms: []string{"model2"}, Fields: fieldarray, ElasticsearchOptions: modeler.ElasticsearchOptions{ - Rollmode: "cron", + Rollmode: modeler.RollmodeSettings{Type: modeler.RollmodeCron}, Rollcron: "0 0 * * *", EnablePurge: true, PurgeMaxConcurrentIndices: 30, @@ -123,7 +123,7 @@ func TestGetModel(t *testing.T) { t.Errorf("handler returned wrong status code: got %v want %v", status, http.StatusOK) } - expected := `{"id":1,"name":"test","synonyms":["model1"],"fields":[{"name":"test","type":"string","semantic":false,"synonyms":["other"]},{"name":"test_object","type":"object","keepObjectSeparation":false,"fields":[{"name":"test_subfield","type":"string","semantic":false,"synonyms":["other2"]}]}],"source":"{}","elasticsearchOptions":{"rollmode":"cron","rollcron":"0 0 * * *","enablePurge":true,"purgeMaxConcurrentIndices":30,"patchAliasMaxIndices":2,"advancedSettings":{"number_of_replicas":"2","number_of_shards":"6"}}}` + "\n" + expected := `{"id":1,"name":"test","synonyms":["model1"],"fields":[{"name":"test","type":"string","semantic":false,"synonyms":["other"]},{"name":"test_object","type":"object","keepObjectSeparation":false,"fields":[{"name":"test_subfield","type":"string","semantic":false,"synonyms":["other2"]}]}],"source":"{}","elasticsearchOptions":{"rollmode":{"type":"cron"},"rollcron":"0 0 * * *","enablePurge":true,"purgeMaxConcurrentIndices":30,"patchAliasMaxIndices":2,"advancedSettings":{"number_of_replicas":"2","number_of_shards":"6"}}}` + "\n" if rr.Body.String() != expected { t.Errorf("handler returned unexpected body: got %v want %v", rr.Body.String(), expected) }