Skip to content

Commit

Permalink
spec refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Bohdan Siryk authored and Bohdan Siryk committed Jan 24, 2024
1 parent ff9b923 commit f608712
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 37 deletions.
123 changes: 101 additions & 22 deletions apis/clusters/v1beta1/opensearch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ type OpenSearchSpec struct {
Cluster `json:",inline"`
DataCentres []*OpenSearchDataCentre `json:"dataCentres,omitempty"`
DataNodes []*OpenSearchDataNodes `json:"dataNodes,omitempty"`
Dashboards []*OpenSearchDashboards `json:"opensearchDashboards,omitempty"`
ClusterManagerNodes []*ClusterManagerNodes `json:"clusterManagerNodes,omitempty"`
ICUPlugin bool `json:"icuPlugin,omitempty"`
AsynchronousSearchPlugin bool `json:"asynchronousSearchPlugin,omitempty"`
KNNPlugin bool `json:"knnPlugin,omitempty"`
Dashboards []*OpenSearchDashboards `json:"opensearchDashboards,omitempty"`
ReportingPlugin bool `json:"reportingPlugin,omitempty"`
SQLPlugin bool `json:"sqlPlugin,omitempty"`
NotificationsPlugin bool `json:"notificationsPlugin,omitempty"`
AnomalyDetectionPlugin bool `json:"anomalyDetectionPlugin,omitempty"`
LoadBalancer bool `json:"loadBalancer,omitempty"`
ClusterManagerNodes []*ClusterManagerNodes `json:"clusterManagerNodes,omitempty"`
IndexManagementPlugin bool `json:"indexManagementPlugin,omitempty"`
AlertingPlugin bool `json:"alertingPlugin,omitempty"`
BundledUseOnly bool `json:"bundledUseOnly,omitempty"`
Expand All @@ -60,6 +60,90 @@ type OpenSearchSpec struct {
IngestNodes []*OpenSearchIngestNodes `json:"ingestNodes,omitempty"`
}

func (s *OpenSearchSpec) FromModel(model *models.OpenSearchCluster) {
s.Cluster.FromModel(&model.GenericClusterFields)

s.Version = model.OpenSearchVersion
s.ICUPlugin = model.ICUPlugin
s.AsynchronousSearchPlugin = model.AsynchronousSearchPlugin
s.KNNPlugin = model.KNNPlugin
s.ReportingPlugin = model.ReportingPlugin
s.SQLPlugin = model.SQLPlugin
s.NotificationsPlugin = model.NotificationsPlugin
s.AnomalyDetectionPlugin = model.AnomalyDetectionPlugin
s.LoadBalancer = model.LoadBalancer
s.IndexManagementPlugin = model.IndexManagementPlugin
s.AlertingPlugin = model.AlertingPlugin
s.BundledUseOnly = model.BundledUseOnly

s.dcsFromModel(model.DataCentres)
s.dataNodesFromModel(model.DataNodes)
s.dashboardsFromModel(model.OpenSearchDashboards)
s.ingestNodesFromModel(model.IngestNodes)
s.clusterManagerNodesFromModel(model.ClusterManagerNodes)
}

func (s *OpenSearchSpec) dcsFromModel(dcModels []*models.OpenSearchDataCentre) {
s.DataCentres = make([]*OpenSearchDataCentre, 0, len(dcModels))
for _, model := range dcModels {
dc := &OpenSearchDataCentre{}
dc.FromModel(model)
s.DataCentres = append(s.DataCentres, dc)
}
}

func (dc *OpenSearchDataCentre) FromModel(model *models.OpenSearchDataCentre) {
dc.PrivateLink = model.PrivateLink
dc.Name = model.Name
dc.Region = model.Region
dc.CloudProvider = model.CloudProvider
dc.ProviderAccountName = model.ProviderAccountName
dc.Network = model.Network
dc.Tags = tagsFromInstAPI(model.Tags)
dc.CloudProviderSettings = cloudProviderSettingsFromInstAPI(&model.GenericDataCentreFields)
}

func (s *OpenSearchSpec) dataNodesFromModel(dataNodeModels []*models.OpenSearchDataNodes) {
s.DataNodes = make([]*OpenSearchDataNodes, 0, len(dataNodeModels))
for _, model := range dataNodeModels {
s.DataNodes = append(s.DataNodes, &OpenSearchDataNodes{
NodeSize: model.NodeSize,
NodesNumber: model.NodeCount,
})
}
}

func (s *OpenSearchSpec) dashboardsFromModel(dashboardsModels []*models.OpenSearchDashboards) {
s.Dashboards = make([]*OpenSearchDashboards, 0, len(dashboardsModels))
for _, model := range dashboardsModels {
s.Dashboards = append(s.Dashboards, &OpenSearchDashboards{
NodeSize: model.NodeSize,
OIDCProvider: model.OIDCProvider,
Version: model.Version,
})
}
}

func (s *OpenSearchSpec) ingestNodesFromModel(ingestNodeModels []*models.OpenSearchIngestNodes) {
s.IngestNodes = make([]*OpenSearchIngestNodes, 0, len(ingestNodeModels))
for _, model := range ingestNodeModels {
s.IngestNodes = append(s.IngestNodes, &OpenSearchIngestNodes{
NodeSize: model.NodeSize,
NodeCount: model.NodeCount,
})
}
}

func (s *OpenSearchSpec) clusterManagerNodesFromModel(clusterManagerNodeModels []*models.ClusterManagerNodes) {
s.ClusterManagerNodes = make([]*ClusterManagerNodes, 0, len(clusterManagerNodeModels))
for _, model := range clusterManagerNodeModels {
s.ClusterManagerNodes = append(s.ClusterManagerNodes, &ClusterManagerNodes{
NodeSize: model.NodeSize,
DedicatedManager: model.DedicatedManager,
})
}
}

type OpenSearchDataCentre struct {
PrivateLink bool `json:"privateLink,omitempty"`
Name string `json:"name,omitempty"`
Expand All @@ -69,9 +153,7 @@ type OpenSearchDataCentre struct {
CloudProviderSettings []*CloudProviderSettings `json:"cloudProviderSettings,omitempty"`
Network string `json:"network"`
Tags map[string]string `json:"tags,omitempty"`

// ReplicationFactor is a number of racks to use when allocating data nodes.
ReplicationFactor int `json:"replicationFactor"`
NumberOfRacks int `json:"numberOfRacks,omitempty"`
}

type OpenSearchDataNodes struct {
Expand Down Expand Up @@ -144,7 +226,7 @@ func (oss *OpenSearchSpec) dcsToInstAPI() (iDCs []*models.OpenSearchDataCentre)
ProviderAccountName: dc.ProviderAccountName,
},
PrivateLink: dc.PrivateLink,
NumberOfRacks: dc.ReplicationFactor,
NumberOfRacks: dc.NumberOfRacks,
})
}

Expand Down Expand Up @@ -251,6 +333,11 @@ func (oss *OpenSearch) FromInstAPI(iData []byte) (*OpenSearch, error) {
return o, nil
}

func (oss *OpenSearch) FromModel(model *models.OpenSearchCluster) {
oss.Spec.FromModel(model)
oss.Status.FromModel(model)
}

func (oss *OpenSearchSpec) FromInstAPI(iOpenSearch *models.OpenSearchCluster) OpenSearchSpec {
return OpenSearchSpec{
Cluster: Cluster{
Expand Down Expand Up @@ -307,11 +394,11 @@ func (oss *OpenSearchSpec) DCsFromInstAPI(iDCs []*models.OpenSearchDataCentre) (
Region: iDC.Region,
CloudProvider: iDC.CloudProvider,
ProviderAccountName: iDC.ProviderAccountName,
CloudProviderSettings: cloudProviderSettingsFromInstAPI(iDC),
CloudProviderSettings: cloudProviderSettingsFromInstAPI(&iDC.GenericDataCentreFields),
Network: iDC.Network,
Tags: tagsFromInstAPI(iDC.Tags),
PrivateLink: iDC.PrivateLink,
ReplicationFactor: iDC.NumberOfRacks,
NumberOfRacks: iDC.NumberOfRacks,
})
}

Expand All @@ -326,11 +413,7 @@ func tagsFromInstAPI(iTags []*models.Tag) map[string]string {
return newTags
}

func cloudProviderSettingsFromInstAPI(iDC *models.OpenSearchDataCentre) (settings []*CloudProviderSettings) {
if isCloudProviderSettingsProvided(iDC.GenericDataCentreFields) {
return nil
}

func cloudProviderSettingsFromInstAPI(iDC *models.GenericDataCentreFields) (settings []*CloudProviderSettings) {
switch iDC.CloudProvider {
case models.AWSVPC:
for _, awsSetting := range iDC.AWSSettings {
Expand Down Expand Up @@ -435,7 +518,7 @@ func (oss *OpenSearchSpec) areDCsEqual(b []*OpenSearchDataCentre) bool {
a[i].Network != b[i].Network &&
areTagsEqual(a[i].Tags, b[i].Tags) &&
a[i].PrivateLink != b[i].PrivateLink ||
a[i].ReplicationFactor != b[i].ReplicationFactor {
a[i].NumberOfRacks != b[i].NumberOfRacks {
return false
}
}
Expand Down Expand Up @@ -538,14 +621,12 @@ func (oss *OpenSearchStatus) FromModel(model *models.OpenSearchCluster) {
oss.IngestNodesLoadBalancerConnectionURL = model.IngestNodesLoadBalancerConnectionURL
oss.LoadBalancerConnectionURL = model.LoadBalancerConnectionURL

dcs := make([]OpenSearchDataCentreStatus, 0, len(model.DataCentres))
oss.DataCentres = make([]OpenSearchDataCentreStatus, 0, len(model.DataCentres))
for _, dc := range model.DataCentres {
d := OpenSearchDataCentreStatus{}
d.fromModel(dc)
dcs = append(dcs, d)
oss.DataCentres = append(oss.DataCentres, d)
}

oss.DataCentres = dcs
}

type OpenSearchDataCentreStatus struct {
Expand All @@ -563,14 +644,12 @@ func (s *OpenSearchDataCentreStatus) fromModel(model *models.OpenSearchDataCentr
s.PrivateLinkEndpointServiceName = model.PrivateLinkEndpointServiceName
s.Status = model.Status

nodes := make([]OpenSearchNodeStatus, 0, len(model.Nodes))
s.Nodes = make([]OpenSearchNodeStatus, 0, len(model.Nodes))
for _, node := range model.Nodes {
n := OpenSearchNodeStatus{}
n.FromModel(&node)
nodes = append(nodes, n)
s.Nodes = append(s.Nodes, n)
}

s.Nodes = nodes
}

type OpenSearchNodeStatus struct {
Expand Down
8 changes: 4 additions & 4 deletions apis/clusters/v1beta1/opensearch_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (osv *openSearchValidator) ValidateCreate(ctx context.Context, obj runtime.
return err
}

err = validateReplicationFactor(models.OpenSearchReplicationFactors, dc.ReplicationFactor)
err = validateReplicationFactor(models.OpenSearchReplicationFactors, dc.NumberOfRacks)
if err != nil {
return err
}
Expand Down Expand Up @@ -318,7 +318,7 @@ func (oss *OpenSearchDataCentre) newImmutableFields() *immutableOpenSearchDCFiel
},
specificOpenSearchDC{
PrivateLink: oss.PrivateLink,
ReplicationFactor: oss.ReplicationFactor,
ReplicationFactor: oss.NumberOfRacks,
},
}
}
Expand Down Expand Up @@ -396,8 +396,8 @@ func (oss *OpenSearchSpec) validateImmutableDataCentresUpdate(oldDCs []*OpenSear

func (dc *OpenSearchDataCentre) validateDataNode(nodes []*OpenSearchDataNodes) error {
for _, node := range nodes {
if node.NodesNumber%dc.ReplicationFactor != 0 {
return fmt.Errorf("number of data nodes must be a multiple of replication factor: %v", dc.ReplicationFactor)
if node.NodesNumber%dc.NumberOfRacks != 0 {
return fmt.Errorf("number of data nodes must be a multiple of replication factor: %v", dc.NumberOfRacks)
}
}

Expand Down
8 changes: 8 additions & 0 deletions apis/clusters/v1beta1/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ type Cluster struct {
Description string `json:"description,omitempty"`
}

func (c *Cluster) FromModel(model *models.GenericClusterFields) {
c.Name = model.Name
c.PCICompliance = model.PCIComplianceMode
c.PrivateNetworkCluster = model.PrivateNetworkCluster
c.SLATier = model.SLATier
c.Description = model.Description
}

type ClusterStatus struct {
ID string `json:"id,omitempty"`
State string `json:"state,omitempty"`
Expand Down
7 changes: 2 additions & 5 deletions config/crd/bases/clusters.instaclustr.com_opensearches.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,12 @@ spec:
type: string
network:
type: string
numberOfRacks:
type: integer
privateLink:
type: boolean
region:
type: string
replicationFactor:
description: ReplicationFactor is a number of racks to use when
allocating data nodes.
type: integer
tags:
additionalProperties:
type: string
Expand All @@ -106,7 +104,6 @@ spec:
- cloudProvider
- network
- region
- replicationFactor
type: object
type: array
dataNodes:
Expand Down
2 changes: 1 addition & 1 deletion controllers/clusters/cadence_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ func (r *CadenceReconciler) newOpenSearchSpec(c *v1beta1.Cadence, oldestOpenSear
CloudProvider: cloudProvider,
ProviderAccountName: providerAccountName,
Network: osNetwork,
ReplicationFactor: osReplicationFactor,
NumberOfRacks: osReplicationFactor,
},
}
spec := v1beta1.OpenSearchSpec{
Expand Down
2 changes: 1 addition & 1 deletion controllers/clusters/datatest/opensearch_v1beta1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ spec:
- cloudProvider: AWS_VPC
name: AWS_VPC_US_EAST_1
network: 10.0.0.0/16
replicationFactor: 3
numberOfRacks: 3
privateLink: false
region: US_EAST_1
# dataNodes:
Expand Down
9 changes: 5 additions & 4 deletions controllers/clusters/opensearch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,14 +644,15 @@ func (r *OpenSearchReconciler) newWatchStatusJob(o *v1beta1.OpenSearch) schedule
return err
}

iO, err := o.FromInstAPI(iData)
opensearchModel := &models.OpenSearchCluster{}
err = json.Unmarshal(iData, opensearchModel)
if err != nil {
l.Error(err, "Cannot convert OpenSearch cluster from the Instaclustr API",
"cluster ID", o.Status.ID)

return err
}

iO := v1beta1.OpenSearch{}
iO.FromModel(opensearchModel)

if !iO.Status.Equals(&o.Status) {
l.Info("Updating OpenSearch cluster status", "old", o.Status, "new", iO.Status)

Expand Down

0 comments on commit f608712

Please sign in to comment.