diff --git a/internals/coordinator/coordinator_test.go b/internals/coordinator/coordinator_test.go index 49f0c85..0a3d36f 100644 --- a/internals/coordinator/coordinator_test.go +++ b/internals/coordinator/coordinator_test.go @@ -124,7 +124,7 @@ func TestCoordinator(t *testing.T) { ID: 1, Name: "myindex", ElasticsearchOptions: modeler.ElasticsearchOptions{ - Rollmode: "timebased", + Rollmode: modeler.RollmodeSettings{Type: modeler.RollmodeTimeBased}, Rollcron: "0 * * * *", EnablePurge: true, PurgeMaxConcurrentIndices: 30, @@ -150,7 +150,7 @@ func TestPurge(t *testing.T) { elasticsearchsdk.ReplaceGlobals(elasticsearch.Config{Addresses: viper.GetStringSlice("ELASTICSEARCH_URLS")}) logicalIndex, err := NewLogicalIndexTimeBased("myrtea", modeler.Model{Name: "myindex", ElasticsearchOptions: modeler.ElasticsearchOptions{ - Rollmode: "timebased", + Rollmode: modeler.RollmodeSettings{Type: modeler.RollmodeTimeBased}, Rollcron: "0 * * * *", EnablePurge: true, PurgeMaxConcurrentIndices: 30, diff --git a/internals/coordinator/instance.go b/internals/coordinator/instance.go index 9c3174d..7d46702 100644 --- a/internals/coordinator/instance.go +++ b/internals/coordinator/instance.go @@ -40,10 +40,10 @@ func InitInstance(instanceName string, models map[int64]modeler.Model) error { for _, model := range models { var err error var logicalIndex LogicalIndex - switch model.ElasticsearchOptions.Rollmode { - case "cron": + switch model.ElasticsearchOptions.Rollmode.Type { + case modeler.RollmodeCron: logicalIndex, err = NewLogicalIndexCron(instance.Name, model) - case "timebased": + case modeler.RollmodeTimeBased: logicalIndex, err = NewLogicalIndexTimeBased(instance.Name, model) } if err != nil { diff --git a/internals/coordinator/logicalindex_cron.go b/internals/coordinator/logicalindex_cron.go index dbc7531..b7695bd 100644 --- a/internals/coordinator/logicalindex_cron.go +++ b/internals/coordinator/logicalindex_cron.go @@ -36,7 +36,7 @@ func NewLogicalIndexCron(instanceName string, model modeler.Model) (*LogicalInde zap.L().Info("Initialize logicalIndex (LogicalIndexCron)", zap.String("name", logicalIndexName), zap.String("model", model.Name), zap.Any("options", model.ElasticsearchOptions)) - if model.ElasticsearchOptions.Rollmode != "cron" { + if model.ElasticsearchOptions.Rollmode.Type != modeler.RollmodeCron { return nil, errors.New("invalid rollmode for this logicalIndex type") } diff --git a/internals/coordinator/logicalindex_timebased.go b/internals/coordinator/logicalindex_timebased.go index aea1231..bee6ea3 100644 --- a/internals/coordinator/logicalindex_timebased.go +++ b/internals/coordinator/logicalindex_timebased.go @@ -31,7 +31,7 @@ func NewLogicalIndexTimeBased(instanceName string, model modeler.Model) (*Logica zap.L().Info("Initialize logicalIndex (LogicalIndexTimeBased)", zap.String("name", logicalIndexName), zap.String("model", model.Name), zap.Any("options", model.ElasticsearchOptions)) - if model.ElasticsearchOptions.Rollmode != "timebased" { + if model.ElasticsearchOptions.Rollmode.Type != modeler.RollmodeTimeBased { return nil, errors.New("invalid rollmode for this logicalIndex type") } @@ -59,6 +59,7 @@ func NewLogicalIndexTimeBased(instanceName string, model modeler.Model) (*Logica } logicalIndex.FetchIndices() + zap.L().Info("LogicalIndex initialized", zap.String("logicalIndex", logicalIndex.Name)) c := cron.New() _, err = c.AddFunc("*/30 * * * *", logicalIndex.FetchIndices) @@ -90,8 +91,18 @@ func (logicalIndex *LogicalIndexTimeBased) purge() { return } - tsStart := time.Now().Add(time.Duration(logicalIndex.Model.ElasticsearchOptions.PurgeMaxConcurrentIndices) * -1 * 24 * time.Hour) - indexStart := fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01-02")) + var indexStart string + var tsStart time.Time + + if logicalIndex.Model.ElasticsearchOptions.Rollmode.Timebased.Interval == modeler.Daily { + // Daily mode: calculation based on a number of days + tsStart = time.Now().Add(time.Duration(logicalIndex.Model.ElasticsearchOptions.PurgeMaxConcurrentIndices) * -1 * 24 * time.Hour) + indexStart = fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01-02")) + } else if logicalIndex.Model.ElasticsearchOptions.Rollmode.Timebased.Interval == modeler.Monthly { + // Monthly mode: calculation based on a number of months + tsStart = time.Now().AddDate(0, -logicalIndex.Model.ElasticsearchOptions.PurgeMaxConcurrentIndices, 0) + indexStart = fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01")) + } allIndices := logicalIndex.GetAllIndices() indices := make([]string, 0) @@ -168,9 +179,18 @@ func (logicalIndex *LogicalIndexTimeBased) GetAllIndices() []string { } func (logicalIndex *LogicalIndexTimeBased) FindIndices(t time.Time, depthDays int64) ([]string, error) { - tsStart := t.Add(time.Duration(depthDays) * -1 * 24 * time.Hour) - indexEnd := fmt.Sprintf("%s-%s", logicalIndex.Name, t.Format("2006-01-02")) - indexStart := fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01-02")) + var indexStart, indexEnd string + + if logicalIndex.Model.ElasticsearchOptions.Rollmode.Timebased.Interval == modeler.Daily { + tsStart := t.Add(time.Duration(-depthDays) * 24 * time.Hour) + indexEnd = fmt.Sprintf("%s-%s", logicalIndex.Name, t.Format("2006-01-02")) + indexStart = fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01-02")) + } else if logicalIndex.Model.ElasticsearchOptions.Rollmode.Timebased.Interval == modeler.Monthly { + // For a monthly interval, we calculate the starting point in months + tsStart := t.AddDate(0, int(-depthDays/30), 0) + indexEnd = fmt.Sprintf("%s-%s", logicalIndex.Name, t.Format("2006-01")) + indexStart = fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01")) + } indices := make([]string, 0) logicalIndex.mu.RLock()