Skip to content

Commit

Permalink
Extend inet attributes and improve ip address type validation
Browse files Browse the repository at this point in the history
* Allow to enforce IPv4 or IPv6 for inet attributes
* Allow duplicate ip addresses in distinct inet attributes
  • Loading branch information
TuxPowered42 authored Feb 23, 2024
1 parent d169d5c commit 2c95e5c
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 10 deletions.
18 changes: 18 additions & 0 deletions serveradmin/serverdb/migrations/0017_inet_family_choice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.23 on 2024-02-16 14:28

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('serverdb', '0016_optional_servertype_for_relation'),
]

operations = [
migrations.AddField(
model_name='attribute',
name='inet_address_family',
field=models.CharField(blank=True, choices=[(None, 'none or any'), ('IPV4', 'IPv4'), ('IPV6', 'IPv6')], max_length=5),
),
]
51 changes: 46 additions & 5 deletions serveradmin/serverdb/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@

IP_ADDR_TYPES = [
('null', 'null: intern_ip must be empty, no inet attributes'),
('host', 'host: intern_ip and inet must be an ip address and unique across all objects'),
('host', 'host: intern_ip and inet must be an ip address and unique across all objects per attribute'),
('loadbalancer', 'loadbalancer: intern_ip and inet must be an ip address'),
('network', 'network: intern_ip and inet must be an ip network, not overlapping with same servertype'),
]
Expand Down Expand Up @@ -96,26 +96,47 @@ def is_ip_address(ip_interface: Union[IPv4Interface, IPv6Interface]) -> None:


def is_unique_ip(ip_interface: Union[IPv4Interface, IPv6Interface],
object_id: int) -> None:
object_id: int,
attribute_id: str = None) -> None:
"""Validate if IPv4/IPv6 address is unique
Raises a ValidationError if intern_ip or any other attribute of type inet
with this ip_address already exists.
:param ip_interface:
:param object_id:
:param attribute_id:
:return:
"""

# We avoid querying the duplicate hosts here and giving the user
# detailed information because checking with exists is cheaper than
# querying the server and this is a validation and should be fast.

# Always exclude the current object_id from the query because we allow
# duplication of data between the legacy (intern_ip, primary_ip6) and
# the modern (ipv4, ipv6) attributes.

# When operating on real attributes (not on intern_ip) find duplicates
# only withing the same attribute id. That means different hosts can have
# the same IP address as long as it is in different attributes.

# TODO: Make "aid" mandatory when intern_ip is gone.
if attribute_id:
object_attribute_condition = Q(server_id=object_id) | ~Q(attribute_id=attribute_id)
else:
object_attribute_condition = Q(server_id=object_id)

has_duplicates = (
# TODO: Remove intern_ip.
Server.objects.filter(intern_ip=ip_interface).exclude(
Q(servertype__ip_addr_type='network') |
Q(server_id=object_id)
).exists() or
ServerInetAttribute.objects.filter(value=ip_interface).exclude(
server__servertype__ip_addr_type='network').exists())
Q(server__servertype__ip_addr_type='network') | object_attribute_condition
).exists()
)
if has_duplicates:
raise ValidationError(
'An object with {0} already exists'.format(str(ip_interface)))
Expand Down Expand Up @@ -172,6 +193,8 @@ def network_overlaps(ip_interface: Union[IPv4Interface, IPv6Interface],
).exclude(
server_id=object_id
).exists() or
# TODO: We should filter for attribute id here as well to have
# consistent bebaviour with ip_addr_type: host and is_unique.
ServerInetAttribute.objects.filter(
server__servertype=servertype_id,
value__net_overlaps=ip_interface
Expand Down Expand Up @@ -208,6 +231,11 @@ def __str__(self):


class Attribute(models.Model):
class InetAddressFamilyChoice(models.TextChoices):
IPV4 = 'IPV4', _('IPv4')
IPV6 = 'IPV6', _('IPv6')
__empty__ = _("none or any")

special = None

def __init__(self, *args, **kwargs):
Expand All @@ -232,6 +260,7 @@ def __init__(self, *args, **kwargs):
max_length=32, null=False, blank=False, default='other'
)
help_link = models.CharField(max_length=255, blank=True, null=True)
inet_address_family = models.CharField(choices=InetAddressFamilyChoice.choices, max_length=5, blank=True)
readonly = models.BooleanField(null=False, default=False)
target_servertype = models.ForeignKey(
Servertype, on_delete=models.CASCADE,
Expand Down Expand Up @@ -672,8 +701,20 @@ class Meta:
def clean(self):
super(ServerAttribute, self).clean()

if type(self.value) not in [IPv4Interface, IPv6Interface]:
if self.attribute.inet_address_family == Attribute.InetAddressFamilyChoice.IPV4:
allowed_types = (IPv4Interface,)
elif self.attribute.inet_address_family == Attribute.InetAddressFamilyChoice.IPV6:
allowed_types = (IPv6Interface,)
else:
allowed_types = (IPv4Interface, IPv6Interface)

if type(self.value) not in allowed_types:
self.value = inet_to_python(self.value)
if type(self.value) not in allowed_types:
raise ValidationError(
f'IP address {self.value} is not '
f'of type {self.attribute.get_inet_address_family_display()}!'
)

# Get the ip_addr_type of the servertype
ip_addr_type = self.server.servertype.ip_addr_type
Expand All @@ -687,7 +728,7 @@ def clean(self):
params={'attribute_id': self.attribute_id})
elif ip_addr_type == 'host':
is_ip_address(self.value)
is_unique_ip(self.value, self.server.server_id)
is_unique_ip(self.value, self.server.server_id, self.attribute_id)
elif ip_addr_type == 'loadbalancer':
is_ip_address(self.value)
elif ip_addr_type == 'network':
Expand Down
9 changes: 4 additions & 5 deletions serveradmin/serverdb/tests/test_ip_addr_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,10 @@ def test_server_with_duplicate_inet_different_attrs(self):
server['ip_config'] = '10.0.0.2/32'
server.commit(user=User.objects.first())

duplicate = self._get_server('host')
duplicate['intern_ip'] = '10.0.0.3/32'
duplicate['ip_config_new'] = '10.0.0.2/32'
with self.assertRaises(ValidationError):
duplicate.commit(user=User.objects.first())
other_attribute = self._get_server('host')
other_attribute['intern_ip'] = '10.0.0.3/32'
other_attribute['ip_config_new'] = '10.0.0.2/32'
self.assertIsNone(other_attribute.commit(user=User.objects.first()))

def test_server_with_duplicate_inet_for_loadbalancer(self):
server = self._get_server('loadbalancer')
Expand Down

0 comments on commit 2c95e5c

Please sign in to comment.