Skip to content

Commit

Permalink
address/assigner: return assigned address
Browse files Browse the repository at this point in the history
  • Loading branch information
huwcbjones committed Oct 24, 2024
1 parent ab70f62 commit cb26e29
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 43 deletions.
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func assignAddress(c context.Context, log *logrus.Entry, client kubernetes.Inter
lock.Unlock(ctx) //nolint:errcheck
log.Debug("lock released")
}()
if err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy); err != nil {
if _, err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy); err != nil {
return err //nolint:wrapcheck
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion internal/address/assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var (
)

type Assigner interface {
Assign(ctx context.Context, instanceID, zone string, filter []string, orderBy string) error
Assign(ctx context.Context, instanceID, zone string, filter []string, orderBy string) (string, error)
Unassign(ctx context.Context, instanceID, zone string) error
}

Expand Down
16 changes: 9 additions & 7 deletions internal/address/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,32 +165,33 @@ func (a *awsAssigner) forceCheckAddressAssigned(ctx context.Context, allocationI
return false, nil
}

func (a *awsAssigner) Assign(ctx context.Context, instanceID, _ string, filter []string, orderBy string) error {
func (a *awsAssigner) Assign(ctx context.Context, instanceID, _ string, filter []string, orderBy string) (string, error) {
// get elastic IP attached to the instance
err := a.checkElasticIPAssigned(ctx, instanceID)
if err != nil {
return errors.Wrapf(err, "check if elastic IP is already assigned to instance %s", instanceID)
return "", errors.Wrapf(err, "check if elastic IP is already assigned to instance %s", instanceID)
}

// get available elastic IPs based on filter and orderBy
addresses, err := a.getAvailableElasticIPs(ctx, filter, orderBy)
if err != nil {
return errors.Wrap(err, "failed to get available elastic IPs")
return "", errors.Wrap(err, "failed to get available elastic IPs")
}

// get EC2 instance
instance, err := a.instanceGetter.Get(ctx, instanceID, a.region)
if err != nil {
return errors.Wrapf(err, "failed to get instance %s", instanceID)
return "", errors.Wrapf(err, "failed to get instance %s", instanceID)
}
// get primary network interface ID with public IP address (DeviceIndex == 0)
networkInterfaceID, err := a.getNetworkInterfaceID(instance)
if err != nil {
return errors.Wrapf(err, "failed to get network interface ID for instance %s", instanceID)
return "", errors.Wrapf(err, "failed to get network interface ID for instance %s", instanceID)
}

// try to assign available addresses until succeeds
// due to concurrency, it is possible that another kubeip instance will assign the same address
var assignedAddress string
for i := range addresses {
a.logger.WithFields(logrus.Fields{
"instance": instanceID,
Expand All @@ -208,13 +209,14 @@ func (a *awsAssigner) Assign(ctx context.Context, instanceID, _ string, filter [
"address": *addresses[i].PublicIp,
"allocation_id": *addresses[i].AllocationId,
}).Info("elastic IP assigned to the instance")
assignedAddress = *addresses[i].PublicIp
break // break if address assigned successfully
}
}
if err != nil {
return errors.Wrap(err, "failed to assign elastic IP address")
return "", errors.Wrap(err, "failed to assign elastic IP address")
}
return nil
return assignedAddress, nil
}

func (a *awsAssigner) tryAssignAddress(ctx context.Context, address *types.Address, networkInterfaceID, instanceID string) error {
Expand Down
11 changes: 8 additions & 3 deletions internal/address/aws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ func Test_awsAssigner_Assign(t *testing.T) {
type fields struct {
region string
logger *logrus.Entry
address string
instanceGetterFn func(t *testing.T, args *args) cloud.Ec2InstanceGetter
eipListerFn func(t *testing.T, args *args) cloud.EipLister
eipAssignerFn func(t *testing.T, args *args) cloud.EipAssigner
Expand All @@ -444,8 +445,9 @@ func Test_awsAssigner_Assign(t *testing.T) {
{
name: "assign EIP to instance",
fields: fields{
region: "us-east-1",
logger: logrus.NewEntry(logrus.New()),
region: "us-east-1",
logger: logrus.NewEntry(logrus.New()),
address: "100.0.0.1",
instanceGetterFn: func(t *testing.T, args *args) cloud.Ec2InstanceGetter {
mock := mocks.NewEc2InstanceGetter(t)
mock.EXPECT().Get(args.ctx, args.instanceID, "us-east-1").Return(&types.Instance{
Expand Down Expand Up @@ -584,8 +586,11 @@ func Test_awsAssigner_Assign(t *testing.T) {
eipLister: tt.fields.eipListerFn(t, &tt.args),
eipAssigner: tt.fields.eipAssignerFn(t, &tt.args),
}
if err := a.Assign(tt.args.ctx, tt.args.instanceID, "", tt.args.filter, tt.args.orderBy); (err != nil) != tt.wantErr {
address, err := a.Assign(tt.args.ctx, tt.args.instanceID, "", tt.args.filter, tt.args.orderBy)
if err != nil != tt.wantErr {
t.Errorf("Assign() error = %v, wantErr %v", err, tt.wantErr)
} else if address != tt.fields.address {
t.Fatalf("Assign() = %v, want %v", address, tt.fields.address)
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions internal/address/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import "context"
type azureAssigner struct {
}

func (a *azureAssigner) Assign(_ context.Context, _, _ string, _ []string, _ string) error {
return nil
func (a *azureAssigner) Assign(_ context.Context, _, _ string, _ []string, _ string) (string, error) {
return "", nil
}

func (a *azureAssigner) Unassign(_ context.Context, _, _ string) error {
Expand Down
43 changes: 24 additions & 19 deletions internal/address/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,20 +214,23 @@ func (a *gcpAssigner) CheckAddressAssigned(region, addressName string) (bool, er
return address.Status == inUseStatus, nil
}

func (a *gcpAssigner) Assign(ctx context.Context, instanceID, zone string, filter []string, orderBy string) error {
func (a *gcpAssigner) Assign(ctx context.Context, instanceID, zone string, filter []string, orderBy string) (string, error) {
// check if instance already has a public static IP address assigned
instance, err := a.checkStaticIPAssigned(zone, instanceID)
instance, address, err := a.checkStaticIPAssigned(zone, instanceID)
if err != nil {
return errors.Wrapf(err, "check if static public IP is already assigned to instance %s", instanceID)
if errors.Is(err, ErrStaticIPAlreadyAssigned) {
return address, nil
}
return "", errors.Wrapf(err, "check if static public IP is already assigned to instance %s", instanceID)
}

// get available reserved public IP addresses
addresses, err := a.listAddresses(filter, orderBy, reservedStatus)
if err != nil {
return errors.Wrap(err, "failed to list available addresses")
return "", errors.Wrap(err, "failed to list available addresses")
}
if len(addresses) == 0 {
return errors.Errorf("no available addresses")
return "", errors.Errorf("no available addresses")
}
// log available addresses IPs
ips := make([]string, 0, len(addresses))
Expand All @@ -238,51 +241,53 @@ func (a *gcpAssigner) Assign(ctx context.Context, instanceID, zone string, filte

// delete current ephemeral public IP address
if err = a.DeleteInstanceAddress(ctx, instance, zone); err != nil && !errors.Is(err, ErrNoPublicIPAssigned) {
return errors.Wrap(err, "failed to delete current public IP address")
return "", errors.Wrap(err, "failed to delete current public IP address")
}

// get instance details again to refresh the network interface fingerprint (required for adding a new ipv6 address)
instance, err = a.instanceGetter.Get(a.project, zone, instanceID)
if err != nil {
return errors.Wrapf(err, "failed refresh network interface fingerprint for instance %s", instanceID)
return "", errors.Wrapf(err, "failed refresh network interface fingerprint for instance %s", instanceID)
}

// try to assign all available addresses until one succeeds
// due to concurrency, it is possible that another kubeip instance will assign the same address
var assignedAddress string
for _, address := range addresses {
// check if context is done before trying to assign an address
if ctx.Err() != nil {
return errors.Wrap(ctx.Err(), "context cancelled while assigning addresses")
return "", errors.Wrap(ctx.Err(), "context cancelled while assigning addresses")
}
if err = tryAssignAddress(ctx, a, instance, a.region, zone, address); err != nil {
a.logger.WithError(err).WithField("address", address.Address).Error("failed to assign static public IP address")
continue
}
assignedAddress = address.Address
// break the loop after successfully assigning an address
break
}
if err != nil {
return errors.Wrap(err, "failed to assign static public IP address")
return "", errors.Wrap(err, "failed to assign static public IP address")
}
return nil
return assignedAddress, nil
}

func (a *gcpAssigner) checkStaticIPAssigned(zone, instanceID string) (*compute.Instance, error) {
func (a *gcpAssigner) checkStaticIPAssigned(zone, instanceID string) (*compute.Instance, string, error) {
instance, err := a.instanceGetter.Get(a.project, zone, instanceID)
if err != nil {
return nil, errors.Wrapf(err, "failed to get instance %s", instanceID)
return nil, "", errors.Wrapf(err, "failed to get instance %s", instanceID)
}
assigned, err := a.listAddresses(nil, "", inUseStatus)
if err != nil {
return nil, errors.Wrap(err, "failed to list assigned addresses")
return nil, "", errors.Wrap(err, "failed to list assigned addresses")
}
// create a map of users for quick lookup
users := a.createUserMap(assigned)
// check if the instance's self link is in the list of users
if _, ok := users[instance.SelfLink]; ok {
return nil, ErrStaticIPAlreadyAssigned
if address, ok := users[instance.SelfLink]; ok {
return nil, address, ErrStaticIPAlreadyAssigned
}
return instance, nil
return instance, "", nil
}

func (a *gcpAssigner) listAddresses(filter []string, orderBy, status string) ([]*compute.Address, error) {
Expand Down Expand Up @@ -396,11 +401,11 @@ func tryAssignAddress(ctx context.Context, as internalAssigner, instance *comput
return nil
}

func (a *gcpAssigner) createUserMap(assigned []*compute.Address) map[string]struct{} {
users := make(map[string]struct{})
func (a *gcpAssigner) createUserMap(assigned []*compute.Address) map[string]string {
users := make(map[string]string)
for _, address := range assigned {
for _, user := range address.Users {
users[user] = struct{}{}
users[user] = address.Address
}
}
return users
Expand Down
56 changes: 55 additions & 1 deletion internal/address/gcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func Test_gcpAssigner_Assign(t *testing.T) {
instanceGetterFn func(t *testing.T) cloud.InstanceGetter
project string
region string
address string
}
type args struct {
ctx context.Context
Expand All @@ -348,6 +349,7 @@ func Test_gcpAssigner_Assign(t *testing.T) {
fields: fields{
project: "test-project",
region: "test-region",
address: "100.0.0.3",
listerFn: func(t *testing.T) cloud.Lister {
mock := mocks.NewLister(t)
mockCall := mocks.NewListCall(t)
Expand Down Expand Up @@ -407,6 +409,55 @@ func Test_gcpAssigner_Assign(t *testing.T) {
orderBy: "test-order-by",
},
},
{
name: "assign when static IP address already allocted",
fields: fields{
project: "test-project",
region: "test-region",
address: "100.0.0.2",
listerFn: func(t *testing.T) cloud.Lister {
mock := mocks.NewLister(t)
mockCall := mocks.NewListCall(t)
mock.EXPECT().List("test-project", "test-region").Return(mockCall)
mockCall.EXPECT().Filter("(status=IN_USE) (addressType=EXTERNAL) (ipVersion!=IPV6)").Return(mockCall).Once()
mockCall.EXPECT().Do().Return(&compute.AddressList{
Items: []*compute.Address{
{Name: "test-address-1", Status: inUseStatus, Address: "100.0.0.1", NetworkTier: defaultNetworkTier, AddressType: "EXTERNAL", Users: []string{"self-link-test-instance-1"}},
{Name: "test-address-2", Status: inUseStatus, Address: "100.0.0.2", NetworkTier: defaultNetworkTier, AddressType: "EXTERNAL", Users: []string{"self-link-test-instance-2"}},
},
}, nil).Once()
return mock
},
instanceGetterFn: func(t *testing.T) cloud.InstanceGetter {
mock := mocks.NewInstanceGetter(t)
mock.EXPECT().Get("test-project", "test-zone", "test-instance-0").Return(&compute.Instance{
Name: "test-instance-0",
Zone: "test-zone",
SelfLink: "self-link-test-instance-2",
NetworkInterfaces: []*compute.NetworkInterface{
{
Name: "test-network-interface",
AccessConfigs: []*compute.AccessConfig{
{Name: "test-access-config", NatIP: "200.0.0.1", Type: defaultAccessConfigType, Kind: accessConfigKind},
},
Fingerprint: "test-fingerprint",
},
},
}, nil)
return mock
},
addressManagerFn: func(t *testing.T) cloud.AddressManager {
return mocks.NewAddressManager(t)
},
},
args: args{
ctx: context.TODO(),
instanceID: "test-instance-0",
zone: "test-zone",
filter: []string{"test-filter-1", "test-filter-2"},
orderBy: "test-order-by",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -419,8 +470,11 @@ func Test_gcpAssigner_Assign(t *testing.T) {
region: tt.fields.region,
logger: logger,
}
if err := a.Assign(tt.args.ctx, tt.args.instanceID, tt.args.zone, tt.args.filter, tt.args.orderBy); (err != nil) != tt.wantErr {
address, err := a.Assign(tt.args.ctx, tt.args.instanceID, tt.args.zone, tt.args.filter, tt.args.orderBy)
if err != nil != tt.wantErr {
t.Errorf("Assign() error = %v, wantErr %v", err, tt.wantErr)
} else if address != tt.fields.address {
t.Fatalf("Assign() = %v, want %v", address, tt.fields.address)
}
})
}
Expand Down
28 changes: 19 additions & 9 deletions mocks/address/Assigner.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit cb26e29

Please sign in to comment.