Skip to content

Commit

Permalink
[indexer] Allow manually managed elastic index (#1602)
Browse files Browse the repository at this point in the history
Summary: Opensearch doesn't use the same index lifecycle management as
elasticsearch. This means the current indexer will fail to create a
managed index on clusters using opensearch. This PR allows users to
bypass our index management logic, and manually deploy a managed index
to their elastic cluster. This should allow people using opensearch to
still deploy Pixie cloud albeit with more burden on the user.

Type of change: /kind cleanup

Test Plan: Tested by skaffolding public cloud to my cluster with
`PL_MD_MANUAL_INDEX_MANAGEMENT: true`, saw that the indexer reused the
index already created and didn't attempt to create a new index lifecycle
policy.

Signed-off-by: James Bartlett <[email protected]>
  • Loading branch information
JamesMBartlett authored Jul 10, 2023
1 parent 3b2afb4 commit 0e72c87
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 13 deletions.
1 change: 1 addition & 0 deletions k8s/cloud/base/indexer_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ data:
PL_MD_INDEX_REPLICAS: "4"
PL_MD_INDEX_MAX_AGE: "3d"
PL_MD_INDEX_DELETE_AFTER: "3d"
PL_MD_MANUAL_INDEX_MANAGEMENT: "false"
2 changes: 1 addition & 1 deletion src/cloud/autocomplete/suggester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestMain(m *testing.M) {
elasticClient = es

// Set up elastic indexes.
err = md.InitializeMapping(es, indexName, 1, "30d", "30d")
err = md.InitializeMapping(es, indexName, 1, "30d", "30d", false)
if err != nil {
cleanup()
log.Fatal(err)
Expand Down
3 changes: 2 additions & 1 deletion src/cloud/indexer/indexer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func init() {
pflag.String("md_index_max_age", "", "The amount of time before rolling over the elastic index as a string, eg '30d'")
pflag.String("md_index_delete_after", "", "The amount of time after rollover to delete old elastic indices, as a string, eg '30d'")
pflag.Int("md_index_replicas", 4, "The number of replicas to setup for the metadata index.")
pflag.Bool("md_manual_index_management", false, "Skip creation of managed elastic indices. Requires manually deploying an elastic index with md_index_name")
}

func newVZMgrClient() (vzmgrpb.VZMgrServiceClient, error) {
Expand Down Expand Up @@ -132,7 +133,7 @@ func main() {
log.Fatal("Must specify a delete after time for the rolled over elastic indices.")
}

err = md.InitializeMapping(es, indexName, replicas, maxAge, deleteAfter)
err = md.InitializeMapping(es, indexName, replicas, maxAge, deleteAfter, viper.GetBool("md_manual_index_management"))
if err != nil {
log.WithError(err).Fatal("Could not initialize elastic mapping")
}
Expand Down
34 changes: 25 additions & 9 deletions src/cloud/indexer/md/mapping.o.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,33 @@ const IndexMapping = `
`

// InitializeMapping creates the index in elastic.
func InitializeMapping(es *elastic.Client, indexName string, replicas int, maxAge string, deleteAfter string) error {
err := esutils.NewManagedIndex(es, indexName).
IndexFromJSONString(IndexMapping).
MaxIndexAge(maxAge).
TimeBeforeDelete(deleteAfter).
Migrate(context.Background())
if err != nil {
return err
func InitializeMapping(es *elastic.Client, indexName string, replicas int, maxAge string, deleteAfter string, manualIndex bool) error {
ctx := context.Background()
if manualIndex {
exists, err := es.IndexExists(indexName).Do(ctx)
if err != nil {
return err
}
if !exists {
return fmt.Errorf("elastic index %s does not exist, but manual index management specified", indexName)
}
// Update the index mappings if necessary.
err = esutils.NewIndex(es).Name(indexName).FromJSONString(IndexMapping).Migrate(ctx)
if err != nil {
return err
}
} else {
err := esutils.NewManagedIndex(es, indexName).
IndexFromJSONString(IndexMapping).
MaxIndexAge(maxAge).
TimeBeforeDelete(deleteAfter).
Migrate(ctx)
if err != nil {
return err
}
}

replicaSetting := fmt.Sprintf("{\"index\": {\"number_of_replicas\": %d}}", replicas)
_, err = es.IndexPutSettings(indexName).BodyString(replicaSetting).Do(context.Background())
_, err := es.IndexPutSettings(indexName).BodyString(replicaSetting).Do(context.Background())
return err
}
2 changes: 1 addition & 1 deletion src/cloud/indexer/md/md_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestMain(m *testing.M) {
vzID = uuid.Must(uuid.NewV4())
orgID = uuid.Must(uuid.NewV4())

err = md.InitializeMapping(es, indexName, 1, "30d", "30d")
err = md.InitializeMapping(es, indexName, 1, "30d", "30d", false)
if err != nil {
cleanup()
log.WithError(err).Fatal("Could not initialize indexes in elastic")
Expand Down
15 changes: 14 additions & 1 deletion src/cloud/shared/esutils/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ package esutils
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"strings"

"github.com/olivere/elastic/v7"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -246,7 +248,18 @@ func (i *Index) updateSettings(ctx context.Context) (bool, error) {
WithField("cause", err.(*elastic.Error).Details.CausedBy).Error("failed to get index settings")
return false, err
}
currentSettings := settingsResp[i.indexName].Settings
var indexResp *elastic.IndicesGetSettingsResponse
for indexName, resp := range settingsResp {
// If `i.indexName` is an alias, then the response can be the full index name instead of the alias name.
if strings.HasPrefix(indexName, i.indexName) {
indexResp = resp
break
}
}
if indexResp == nil {
return false, errors.New("could not get index settings")
}
currentSettings := indexResp.Settings
diff := updates(i.index.Settings, currentSettings)
if diff == nil {
return false, nil
Expand Down

0 comments on commit 0e72c87

Please sign in to comment.