Skip to content

Commit

Permalink
Refactor rollmode handling to support timebased montly.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ismail731404 committed Nov 12, 2024
1 parent 70dba8a commit 3398c69
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 12 deletions.
4 changes: 2 additions & 2 deletions internals/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestCoordinator(t *testing.T) {
ID: 1,
Name: "myindex",
ElasticsearchOptions: modeler.ElasticsearchOptions{
Rollmode: "timebased",
Rollmode: modeler.RollmodeSettings{Type: modeler.RollmodeTimeBased},
Rollcron: "0 * * * *",
EnablePurge: true,
PurgeMaxConcurrentIndices: 30,
Expand All @@ -150,7 +150,7 @@ func TestPurge(t *testing.T) {
elasticsearchsdk.ReplaceGlobals(elasticsearch.Config{Addresses: viper.GetStringSlice("ELASTICSEARCH_URLS")})

logicalIndex, err := NewLogicalIndexTimeBased("myrtea", modeler.Model{Name: "myindex", ElasticsearchOptions: modeler.ElasticsearchOptions{
Rollmode: "timebased",
Rollmode: modeler.RollmodeSettings{Type: modeler.RollmodeTimeBased},
Rollcron: "0 * * * *",
EnablePurge: true,
PurgeMaxConcurrentIndices: 30,
Expand Down
6 changes: 3 additions & 3 deletions internals/coordinator/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ func InitInstance(instanceName string, models map[int64]modeler.Model) error {
for _, model := range models {
var err error
var logicalIndex LogicalIndex
switch model.ElasticsearchOptions.Rollmode {
case "cron":
switch model.ElasticsearchOptions.Rollmode.Type {

Check failure on line 43 in internals/coordinator/instance.go

View workflow job for this annotation

GitHub Actions / build

model.ElasticsearchOptions.Rollmode.Type undefined (type string has no field or method Type)
case modeler.RollmodeCron:

Check failure on line 44 in internals/coordinator/instance.go

View workflow job for this annotation

GitHub Actions / build

undefined: modeler.RollmodeCron
logicalIndex, err = NewLogicalIndexCron(instance.Name, model)
case "timebased":
case modeler.RollmodeTimeBased:

Check failure on line 46 in internals/coordinator/instance.go

View workflow job for this annotation

GitHub Actions / build

undefined: modeler.RollmodeTimeBased
logicalIndex, err = NewLogicalIndexTimeBased(instance.Name, model)
}
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internals/coordinator/logicalindex_cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewLogicalIndexCron(instanceName string, model modeler.Model) (*LogicalInde

zap.L().Info("Initialize logicalIndex (LogicalIndexCron)", zap.String("name", logicalIndexName), zap.String("model", model.Name), zap.Any("options", model.ElasticsearchOptions))

if model.ElasticsearchOptions.Rollmode != "cron" {
if model.ElasticsearchOptions.Rollmode.Type != modeler.RollmodeCron {

Check failure on line 39 in internals/coordinator/logicalindex_cron.go

View workflow job for this annotation

GitHub Actions / build

model.ElasticsearchOptions.Rollmode.Type undefined (type string has no field or method Type)

Check failure on line 39 in internals/coordinator/logicalindex_cron.go

View workflow job for this annotation

GitHub Actions / build

undefined: modeler.RollmodeCron
return nil, errors.New("invalid rollmode for this logicalIndex type")
}

Expand Down
32 changes: 26 additions & 6 deletions internals/coordinator/logicalindex_timebased.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewLogicalIndexTimeBased(instanceName string, model modeler.Model) (*Logica

zap.L().Info("Initialize logicalIndex (LogicalIndexTimeBased)", zap.String("name", logicalIndexName), zap.String("model", model.Name), zap.Any("options", model.ElasticsearchOptions))

if model.ElasticsearchOptions.Rollmode != "timebased" {
if model.ElasticsearchOptions.Rollmode.Type != modeler.RollmodeTimeBased {

Check failure on line 34 in internals/coordinator/logicalindex_timebased.go

View workflow job for this annotation

GitHub Actions / build

model.ElasticsearchOptions.Rollmode.Type undefined (type string has no field or method Type)

Check failure on line 34 in internals/coordinator/logicalindex_timebased.go

View workflow job for this annotation

GitHub Actions / build

undefined: modeler.RollmodeTimeBased
return nil, errors.New("invalid rollmode for this logicalIndex type")
}

Expand Down Expand Up @@ -59,6 +59,7 @@ func NewLogicalIndexTimeBased(instanceName string, model modeler.Model) (*Logica
}

logicalIndex.FetchIndices()
zap.L().Info("LogicalIndex initialized", zap.String("logicalIndex", logicalIndex.Name))

c := cron.New()
_, err = c.AddFunc("*/30 * * * *", logicalIndex.FetchIndices)
Expand Down Expand Up @@ -90,8 +91,18 @@ func (logicalIndex *LogicalIndexTimeBased) purge() {
return
}

tsStart := time.Now().Add(time.Duration(logicalIndex.Model.ElasticsearchOptions.PurgeMaxConcurrentIndices) * -1 * 24 * time.Hour)
indexStart := fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01-02"))
var indexStart string
var tsStart time.Time

if logicalIndex.Model.ElasticsearchOptions.Rollmode.Timebased.Interval == modeler.Daily {

Check failure on line 97 in internals/coordinator/logicalindex_timebased.go

View workflow job for this annotation

GitHub Actions / build

logicalIndex.Model.ElasticsearchOptions.Rollmode.Timebased undefined (type string has no field or method Timebased)

Check failure on line 97 in internals/coordinator/logicalindex_timebased.go

View workflow job for this annotation

GitHub Actions / build

undefined: modeler.Daily
// Daily mode: calculation based on a number of days
tsStart = time.Now().Add(time.Duration(logicalIndex.Model.ElasticsearchOptions.PurgeMaxConcurrentIndices) * -1 * 24 * time.Hour)
indexStart = fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01-02"))
} else if logicalIndex.Model.ElasticsearchOptions.Rollmode.Timebased.Interval == modeler.Monthly {

Check failure on line 101 in internals/coordinator/logicalindex_timebased.go

View workflow job for this annotation

GitHub Actions / build

logicalIndex.Model.ElasticsearchOptions.Rollmode.Timebased undefined (type string has no field or method Timebased)
// Monthly mode: calculation based on a number of months
tsStart = time.Now().AddDate(0, -logicalIndex.Model.ElasticsearchOptions.PurgeMaxConcurrentIndices, 0)
indexStart = fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01"))
}

allIndices := logicalIndex.GetAllIndices()
indices := make([]string, 0)
Expand Down Expand Up @@ -168,9 +179,18 @@ func (logicalIndex *LogicalIndexTimeBased) GetAllIndices() []string {
}

func (logicalIndex *LogicalIndexTimeBased) FindIndices(t time.Time, depthDays int64) ([]string, error) {
tsStart := t.Add(time.Duration(depthDays) * -1 * 24 * time.Hour)
indexEnd := fmt.Sprintf("%s-%s", logicalIndex.Name, t.Format("2006-01-02"))
indexStart := fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01-02"))
var indexStart, indexEnd string

if logicalIndex.Model.ElasticsearchOptions.Rollmode.Timebased.Interval == modeler.Daily {
tsStart := t.Add(time.Duration(-depthDays) * 24 * time.Hour)
indexEnd = fmt.Sprintf("%s-%s", logicalIndex.Name, t.Format("2006-01-02"))
indexStart = fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01-02"))
} else if logicalIndex.Model.ElasticsearchOptions.Rollmode.Timebased.Interval == modeler.Monthly {
// For a monthly interval, we calculate the starting point in months
tsStart := t.AddDate(0, int(-depthDays/30), 0)
indexEnd = fmt.Sprintf("%s-%s", logicalIndex.Name, t.Format("2006-01"))
indexStart = fmt.Sprintf("%s-%s", logicalIndex.Name, tsStart.Format("2006-01"))
}

indices := make([]string, 0)
logicalIndex.mu.RLock()
Expand Down

0 comments on commit 3398c69

Please sign in to comment.