Skip to content

Commit

Permalink
Implemented IPPool and IPreservations creation flow and static IPs set
Browse files Browse the repository at this point in the history
  • Loading branch information
RostislavPorohnya committed Nov 22, 2023
1 parent 9d81f9f commit 2a0bb5b
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 11 deletions.
26 changes: 26 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -862,3 +862,29 @@ rules:
- patch
- update
- watch
- apiGroups:
- projectcalico.org
resources:
- ippools
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- projectcalico.org
resources:
- ipreservations
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
2 changes: 1 addition & 1 deletion config/samples/onpremises/clusters_v1beta1_cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ kind: Cassandra
metadata:
name: cassandra-on-prem-cluster
spec:
name: "danylo-on-prem-cassandra"
name: "rostyslp-on-prem-cassandra"
version: "4.0.10"
privateNetworkCluster: false
onPremisesSpec:
Expand Down
2 changes: 2 additions & 0 deletions controllers/clusters/cassandra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type CassandraReconciler struct {
//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete;deletecollection
//+kubebuilder:rbac:groups=projectcalico.org,resources=ipreservations,verbs=get;list;watch;create;update;patch;delete;deletecollection
//+kubebuilder:rbac:groups=projectcalico.org,resources=ippools,verbs=get;list;watch;create;update;patch;delete;deletecollection

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand Down
209 changes: 199 additions & 10 deletions controllers/clusters/on_premises.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package clusters
import (
"context"
"fmt"
"net"
"strings"

apicalico "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
k8scorev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -74,26 +76,31 @@ func newOnPremisesBootstrap(
}

func handleCreateOnPremisesClusterResources(ctx context.Context, b *onPremisesBootstrap) error {
if len(b.ClusterStatus.DataCentres) < 1 {
return fmt.Errorf("datacenter ID is empty")
if len(b.ClusterStatus.DataCentres) == 0 {
return fmt.Errorf("datacenter ID in status is empty")
}

reservedCIDR, err := reconcileIPReservations(ctx, b.K8sClient)
if err != nil {
return err
}

if b.PrivateNetworkCluster {
err := reconcileSSHGatewayResources(ctx, b)
err := reconcileSSHGatewayResources(ctx, b, reservedCIDR)
if err != nil {
return err
}
}

err := reconcileNodesResources(ctx, b)
err = reconcileNodesResources(ctx, b, reservedCIDR)
if err != nil {
return err
}

return nil
}

func reconcileSSHGatewayResources(ctx context.Context, b *onPremisesBootstrap) error {
func reconcileSSHGatewayResources(ctx context.Context, b *onPremisesBootstrap, reservedCIDR string) error {
gatewayDVSize, err := resource.ParseQuantity(b.OnPremisesSpec.OSDiskSize)
if err != nil {
return err
Expand Down Expand Up @@ -122,6 +129,11 @@ func reconcileSSHGatewayResources(ctx context.Context, b *onPremisesBootstrap) e

gatewayName := fmt.Sprintf("%s-%s", models.GatewayVMPrefix, strings.ToLower(b.K8sObject.GetName()))

ip, err := getAvailableIP(ctx, reservedCIDR, b.K8sClient)
if err != nil {
return err
}

gatewayVM := &virtcorev1.VirtualMachine{}
err = b.K8sClient.Get(ctx, types.NamespacedName{
Namespace: b.K8sObject.GetNamespace(),
Expand All @@ -138,6 +150,7 @@ func reconcileSSHGatewayResources(ctx context.Context, b *onPremisesBootstrap) e
b.ClusterStatus.DataCentres[0].ID,
models.GatewayRack,
gatewayDV.Name,
ip,
gatewayCPU,
gatewayMemory)
if err != nil {
Expand Down Expand Up @@ -175,7 +188,7 @@ func reconcileSSHGatewayResources(ctx context.Context, b *onPremisesBootstrap) e
return nil
}

func reconcileNodesResources(ctx context.Context, b *onPremisesBootstrap) error {
func reconcileNodesResources(ctx context.Context, b *onPremisesBootstrap, reservedCIDR string) error {
for i, node := range b.ClusterStatus.DataCentres[0].Nodes {
nodeOSDiskSize, err := resource.ParseQuantity(b.OnPremisesSpec.OSDiskSize)
if err != nil {
Expand Down Expand Up @@ -223,6 +236,11 @@ func reconcileNodesResources(ctx context.Context, b *onPremisesBootstrap) error

nodeName := fmt.Sprintf("%s-%d-%s", models.NodeVMPrefix, i, strings.ToLower(b.K8sObject.GetName()))

ip, err := getAvailableIP(ctx, reservedCIDR, b.K8sClient)
if err != nil {
return err
}

nodeVM := &virtcorev1.VirtualMachine{}
err = b.K8sClient.Get(ctx, types.NamespacedName{
Namespace: b.K8sObject.GetNamespace(),
Expand All @@ -239,6 +257,7 @@ func reconcileNodesResources(ctx context.Context, b *onPremisesBootstrap) error
node.ID,
node.Rack,
nodeOSDV.Name,
ip,
nodeCPU,
nodeMemory,
nodeDataDV.Name,
Expand Down Expand Up @@ -301,6 +320,168 @@ func reconcileNodesResources(ctx context.Context, b *onPremisesBootstrap) error
return nil
}

func reconcileIPReservations(ctx context.Context, k8sclient client.Client) (string, error) {
ipReservationsList := &apicalico.IPReservationList{}
err := k8sclient.List(ctx, ipReservationsList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
models.ControlledByLabel: models.OperatorLabel,
}),
})
if err != nil {
return "", err
}

for _, reservation := range ipReservationsList.Items {
return reservation.Spec.ReservedCIDRs[0], nil
}

icIPPoolList := &apicalico.IPPoolList{}
err = k8sclient.List(ctx, icIPPoolList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
models.ControlledByLabel: models.OperatorLabel,
}),
})
if err != nil {
return "", err
}

if len(icIPPoolList.Items) == 0 {
clusterIPPoolList := &apicalico.IPPoolList{}
err = k8sclient.List(ctx, clusterIPPoolList)
if err != nil {
return "", err
}

usedCIDRs := []string{}
for _, pool := range clusterIPPoolList.Items {
usedCIDRs = append(usedCIDRs, pool.Spec.CIDR)
}

newIPPool := apicalico.NewIPPool()
newIPPool.ObjectMeta.Name = models.IPPoolName
newIPPool.ObjectMeta.Labels = map[string]string{
models.ControlledByLabel: models.OperatorLabel,
}
availCIDR, err := getAvailablePrivateCIDR(usedCIDRs)
if err != nil {
return "", err
}

newIPPool.Spec.CIDR = availCIDR

err = k8sclient.Create(ctx, newIPPool)
if err != nil {
return "", err
}

icIPPoolList.Items = append(icIPPoolList.Items, *newIPPool)
}

newReservation := &apicalico.IPReservation{
TypeMeta: metav1.TypeMeta{
Kind: apicalico.KindIPReservation,
APIVersion: apicalico.VersionCurrent,
},
ObjectMeta: metav1.ObjectMeta{
Name: models.IPReservationName,
Labels: map[string]string{
models.ControlledByLabel: models.OperatorLabel,
},
},
Spec: apicalico.IPReservationSpec{
ReservedCIDRs: []string{icIPPoolList.Items[0].Spec.CIDR},
},
}

err = k8sclient.Create(ctx, newReservation)
if err != nil {
return "", err
}

return newReservation.Spec.ReservedCIDRs[0], nil
}

// Calculates available CIDR from 10.*.*.* range with mask /24
func getAvailablePrivateCIDR(unavailCIDRs []string) (string, error) {
for i := 0; i < 255; i++ {
checkNet := net.IPNet{
IP: net.ParseIP(fmt.Sprintf("10.%d.0.0", i)),
Mask: net.CIDRMask(24, 32),
}
cidrAvail := true
for _, unavailCIDR := range unavailCIDRs {
ip, _, err := net.ParseCIDR(unavailCIDR)
if err != nil {
return "", err
}

if checkNet.Contains(ip) {
cidrAvail = false
break
}
}

if cidrAvail {
return checkNet.String(), nil
}
}

return "", models.ErrNoAvailableCIDRsForIPPool
}

// Checks available IPs, fetching pods, VMs and checking their IPs. Assumes /24 ipmask
func getAvailableIP(ctx context.Context, cidr string, k8sclient client.Client) (string, error) {
ip, _, err := net.ParseCIDR(cidr)
if err != nil {
return "", err
}

podList := &k8scorev1.PodList{}
err = k8sclient.List(ctx, podList)
if err != nil {
return "", err
}

vmList := &virtcorev1.VirtualMachineList{}
err = k8sclient.List(ctx, vmList)
if err != nil {
return "", err
}

ipStrings := strings.Split(ip.String(), ".")
for i := 1; i < 255; i++ {
checkIP := fmt.Sprintf("%s.%s.%s.%d",
ipStrings[0],
ipStrings[1],
ipStrings[2],
i)

ipAvail := true
ipAnnot := fmt.Sprintf("[\"%s\"]", checkIP)
for _, pod := range podList.Items {
if pod.Annotations[models.IPAddrsAnnotation] == ipAnnot {
ipAvail = false
}
}

if !ipAvail {
continue
}

for _, vm := range vmList.Items {
if vm.Annotations[models.IPAddrsAnnotation] == ipAnnot {
ipAvail = false
}
}

if ipAvail {
return checkIP, nil
}
}

return "", models.ErrNoAvailableIPsInReservation
}

func createDV(
ctx context.Context,
b *onPremisesBootstrap,
Expand Down Expand Up @@ -385,7 +566,8 @@ func newVM(
vmName,
nodeID,
nodeRack,
OSDiskDVName string,
OSDiskDVName,
ip string,
cpu,
memory resource.Quantity,
storageDVNames ...string,
Expand Down Expand Up @@ -413,22 +595,29 @@ func newVM(
labelSet[models.NodeLabel] = models.WorkerNode
}

ipAnnot := fmt.Sprintf("[\"%s\"]", ip)
vm := &virtcorev1.VirtualMachine{
TypeMeta: metav1.TypeMeta{
Kind: models.VirtualMachineKind,
APIVersion: models.KubevirtV1APIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Name: vmName,
Namespace: b.K8sObject.GetNamespace(),
Labels: labelSet,
Name: vmName,
Namespace: b.K8sObject.GetNamespace(),
Labels: labelSet,
Annotations: map[string]string{
models.IPAddrsAnnotation: ipAnnot,
},
Finalizers: []string{models.DeletionFinalizer},
},
Spec: virtcorev1.VirtualMachineSpec{
RunStrategy: &runStrategy,
Template: &virtcorev1.VirtualMachineInstanceTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labelSet,
Annotations: map[string]string{
models.IPAddrsAnnotation: ipAnnot,
},
},
Spec: virtcorev1.VirtualMachineInstanceSpec{
Hostname: vmName,
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"flag"
apicalico "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
"os"
"time"

Expand Down Expand Up @@ -57,6 +58,7 @@ func init() {
utilruntime.Must(kafkamanagementv1beta1.AddToScheme(scheme))
utilruntime.Must(cdiv1beta1.AddToScheme(scheme))
utilruntime.Must(virtcorev1.AddToScheme(scheme))
utilruntime.Must(apicalico.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/models/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,6 @@ var (
ErrExposeServiceEndpointsNotCreatedYet = errors.New("expose service endpoints is not created yet")
ErrOnlySingleConcurrentResizeAvailable = errors.New("only single concurrent resize is allowed")
ErrBundledUseOnlyResourceUpdateIsNotSupported = errors.New("updating of bundled use resource is not supported")
ErrNoAvailableCIDRsForIPPool = errors.New("no available CIDRs for IPPool")
ErrNoAvailableIPsInReservation = errors.New("no available IPs in IPreservation")
)
3 changes: 3 additions & 0 deletions pkg/models/on_premises.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
NodeIDLabel = "nodeID"
NodeRackLabel = "nodeRack"
NodeLabel = "node"
IPAddrsAnnotation = "cni.projectcalico.org/ipAddrs"
NodeOSDVPrefix = "node-os-data-volume-pvc"
NodeDVPrefix = "node-data-volume-pvc"
NodeVMPrefix = "node-vm"
Expand All @@ -26,6 +27,8 @@ const (
GatewayRack = "ssh-gateway-rack"
IgnitionScriptSecretPrefix = "ignition-script-secret"
DataDisk = "data-disk"
IPReservationName = "instaclustr-operator-ip-reservation"
IPPoolName = "instaclustr-operator-ip-pool"

Boot = "boot"
Storage = "storage"
Expand Down
Loading

0 comments on commit 2a0bb5b

Please sign in to comment.