Skip to content

Commit

Permalink
adds config options vlan_group_relation_by_name and vlan_group_relati…
Browse files Browse the repository at this point in the history
…on_by_id #373
  • Loading branch information
bb-Ricardo committed Nov 22, 2024
1 parent 08b244a commit 13685c5
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 73 deletions.
4 changes: 4 additions & 0 deletions module/config/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def __eq__(self, other):
def __contains__(self, key):
return key in self.__dict__

def __getattr__(self, item):
if item in self:
return getattr(self, item)
return None

class ConfigBase:
"""
Expand Down
1 change: 1 addition & 0 deletions module/netbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
NBVRF,
NBVLAN,
NBVLANList,
NBVLANGroup,
NBPrefix,
NBManufacturer,
NBDeviceType,
Expand Down
22 changes: 19 additions & 3 deletions module/netbox/object_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,12 +1013,13 @@ def compile_vlans(self, vlans):
Parameters
----------
vlans: list of (dict, NBVLAN)
vlans: list of (dict or NBVLAN)
list of VLANs that should be in the returned list
Returns
-------
NBVLANList: of parsed VLANs
NBVLANList
of parsed VLANs
"""

if vlans is None or NBVLANList not in self.data_model.values():
Expand Down Expand Up @@ -1361,7 +1362,8 @@ def __init__(self, *args, **kwargs):
"site": NBSite,
"description": 200,
"tenant": NBTenant,
"tags": NBTagList
"tags": NBTagList,
"group": NBVLANGroup
}
super().__init__(*args, **kwargs)

Expand Down Expand Up @@ -1402,6 +1404,20 @@ def update(self, data=None, read_from_netbox=False, source=None):

super().update(data=data, read_from_netbox=read_from_netbox, source=source)

class NBVLANGroup(NetBoxObject):
name = "VLANGroup"
api_path = "ipam/vlan-groups"
primary_key = "name"
prune = False

def __init__(self, *args, **kwargs):
self.data_model = {
"name": 100,
"slug": 100,
"description": 200,
"tags": NBTagList
}
super().__init__(*args, **kwargs)

class NBVLANList(NBObjectList):
member_type = NBVLAN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,17 @@
log = get_logger()


class ExcludedVLAN:
"""
initializes and verifies if an VLAN should be excluded from being synced to NetBox
"""
class VLANFilter:

def __init__(self, vlan):
def __init__(self, vlan, filter_type):
self._validation_failed = False

self.site = None
self.filter_type = filter_type

if vlan is None:
if vlan is None or len(f"{vlan}") == 0:
self._validation_failed = True
log.error("submitted VLAN string for VLAN exclusion was 'None'")
log.error(f"submitted VLAN {self.filter_type} string for VLAN was '{"'None'" if vlan is None else "empty" }'")
return

vlan_split = [x.replace('\\', "") for x in re.split(r'(?<!\\)/', vlan)]
Expand All @@ -37,7 +35,7 @@ def __init__(self, vlan):
self._value = vlan_split[1]
else:
self._validation_failed = True
log.error("submitted VLAN string for VLAN exclusion contains name or site including '/'. " +
log.error(f"submitted VLAN {self.filter_type} string for VLAN filter contains name or site including '/'. " +
"A '/' which belongs to the name needs to be escaped like '\\/'.")

def site_matches(self, site_name):
Expand All @@ -49,7 +47,7 @@ def site_matches(self, site_name):
# noinspection PyBroadException
try:
if ([self.site, site_name]).count(None) == 0 and re.search(f"^{self.site}$", site_name):
log.debug2(f"VLAN exclude site name '{site_name}' matches '{self.site}'")
log.debug2(f"VLAN {self.filter_type} site name '{site_name}' matches '{self.site}'")
return True
except Exception:
return False
Expand All @@ -61,11 +59,14 @@ def is_valid(self):
return not self._validation_failed


class ExcludedVLANName(ExcludedVLAN):
class FilterVLANByName(VLANFilter):
"""
initializes and verifies if a VLAN matches by name
"""

def __init__(self, vlan):
def __init__(self, vlan, filter_type):

super().__init__(vlan)
super().__init__(vlan, filter_type)

self.name = None

Expand All @@ -82,20 +83,23 @@ def matches(self, name, site=None):
# string or regex matches
try:
if ([self.name, name]).count(None) == 0 and re.search(f"^{self.name}$", name):
log.debug2(f"VLAN exclude name '{name}' matches '{self.name}'")
log.debug2(f"VLAN {self.filter_type} name '{name}' matches '{self.name}'")
return True
except Exception as e:
log.warning(f"Unable to match exclude VLAN name '{name}' to '{self.name}': {e}")
log.warning(f"Unable to match {self.filter_type} VLAN name '{name}' to '{self.name}': {e}")
return False

return False


class ExcludedVLANID(ExcludedVLAN):
class FilterVLANByID(VLANFilter):
"""
initializes and verifies if a VLAN matches by ID
"""

def __init__(self, vlan):
def __init__(self, vlan, filter_type):

super().__init__(vlan)
super().__init__(vlan, filter_type)

self.range = None

Expand All @@ -104,7 +108,7 @@ def __init__(self, vlan):

try:
if "-" in self._value and int(self._value.split("-")[0]) >= int(self._value.split("-")[1]):
log.error(f"range has to start with the lower id: {self._value}")
log.error(f"VLAN {self.filter_type} range has to start with the lower ID: {self._value}")
self._validation_failed = True
return

Expand All @@ -113,22 +117,21 @@ def __init__(self, vlan):
for i in self._value.split(',')), []
)
except Exception as e:
log.error(f"unable to extract ids from value '{self._value}': {e}")
log.error(f"unable to extract VLAN IDs from value '{self._value}': {e}")
self._validation_failed = True

def matches(self, vlan_id, site=None):

if self.site_matches(site) is False:
log.debug2(f"VLAN {self.filter_type} site name '{site_name}' matches '{self.site}'")
return False

try:
if int(vlan_id) in self.range:
log.debug2(f"VLAN exclude id '{vlan_id}' matches '{self._value}'")
log.debug2(f"VLAN {self.filter_type} ID '{vlan_id}' matches '{self._value}'")
return True
except Exception as e:
log.warning(f"Unable to match exclude VLAN id '{vlan_id}' to '{self._value}': {e}")
log.warning(f"Unable to match {self.filter_type} VLAN ID '{vlan_id}' to '{self._value}': {e}")
return False

return False


104 changes: 75 additions & 29 deletions module/sources/common/source_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from module.netbox import *
from module.common.logging import get_logger
from module.common.misc import grab
from module.sources.common.excluded_vlan import ExcludedVLANName, ExcludedVLANID
from module.sources.common.handle_vlan import FilterVLANByName, FilterVLANByID

log = get_logger()

Expand Down Expand Up @@ -247,27 +247,26 @@ def add_update_interface(self, interface_object, device_object, interface_data,
Parameters
----------
interface_object: NBVMInterface, NBInterface, None
interface_object: NBVMInterface | NBInterface | None
object handle of the current interface (if existent, otherwise None)
device_object: NBVM, NBDevice
device_object: NBVM | NBDevice
device object handle this interface belongs to
interface_data: dict
dictionary with interface attributes to add to this interface
interface_ips: list
a list of ip addresses which are assigned to this interface
vmware_object: (vim.HostSystem, vim.VirtualMachine)
vmware_object: vim.HostSystem | vim.VirtualMachine
object to add to list of objects to reevaluate
Returns
-------
objects: tuple((NBVMInterface, NBInterface), list)
objects:
tuple of NBVMInterface | NBInterface and list
tuple with interface object that was added/updated and a list of ip address objects which were
added to this interface
"""

ip_tenant_inheritance_order = None
if "ip_tenant_inheritance_order" in self.settings:
ip_tenant_inheritance_order = self.settings.ip_tenant_inheritance_order
ip_tenant_inheritance_order = self.settings.ip_tenant_inheritance_order

if not isinstance(interface_data, dict):
log.error(f"Attribute 'interface_data' must be a dict() got {type(interface_data)}.")
Expand Down Expand Up @@ -572,7 +571,7 @@ def add_update_interface(self, interface_object, device_object, interface_data,
f"untagged interface VLAN.")

if matching_untagged_vlan is not None:
vlan_interface_data["untagged_vlan"] = matching_untagged_vlan
vlan_interface_data["untagged_vlan"] = self.add_vlan_group(matching_untagged_vlan, site_name)
if grab(interface_object, "data.mode") is None:
vlan_interface_data["mode"] = "access"

Expand All @@ -591,7 +590,7 @@ def add_update_interface(self, interface_object, device_object, interface_data,
matching_tagged_vlan = None

if matching_tagged_vlan is not None:
compiled_tagged_vlans.append(matching_tagged_vlan)
compiled_tagged_vlans.append(self.add_vlan_group(matching_tagged_vlan, site_name))

if len(compiled_tagged_vlans) > 0:
vlan_interface_data["tagged_vlans"] = compiled_tagged_vlans
Expand Down Expand Up @@ -633,12 +632,70 @@ def patch_data(object_to_patch, data, overwrite=False):

return data_to_update

def add_vlan_group(self, vlan_data, vlan_site) -> NBVLAN | dict:
"""
This function will try to find a matching VLAN group according to the settings.
Name matching will take precedence over ID matching. First match wins.
If nothing matches the input data from 'vlan_data' will be returned
Parameters
----------
vlan_data: dict | NBVLAN
A dict or NBVLAN object
vlan_site: str | None
name of site for the VLAN
Returns
-------
NBVLAN | dict: the input vlan_data enriched with VLAN group if a match was found
"""

# get VLAN details
if isinstance(vlan_data, NBVLAN):
vlan_name = grab(vlan_data, "data.name")
vlan_id = grab(vlan_data, "data.vid")
elif isinstance(vlan_data, dict):
vlan_name = vlan_data.get("name")
vlan_id = vlan_data.get("vid")
else:
return vlan_data

# check existing Devices for matches
log.debug2(f"Trying to find a matching VLAN Group based on the VLAN name '{vlan_name}' and VLAN ID '{vlan_id}'")

vlan_group = None
for vlan_filter, vlan_group_name in self.settings.vlan_group_relation_by_name or list():
if vlan_filter.matches(vlan_name, vlan_site):
vlan_group = self.inventory.get_by_data(NBVLANGroup, data={"name": vlan_group_name})
break

if vlan_group is None:
for vlan_filter, vlan_group_name in self.settings.vlan_group_relation_by_id or list():
if vlan_filter.matches(vlan_id, vlan_site):
vlan_group = self.inventory.get_by_data(NBVLANGroup, data={"name": vlan_group_name})
break

if vlan_group is not None:
log.debug2(f"Found matching VLAN group '{vlan_group.get_display_name()}'")
if isinstance(vlan_data, NBVLAN):
vlan_data.update(data={"group": vlan_group})
elif isinstance(vlan_data, dict):
vlan_data["group"] = vlan_group
else:
log.debug2("No matching VLAN group found")

print(vlan_data)

return vlan_data

def get_vlan_object_if_exists(self, vlan_data=None, vlan_site=None):
"""
This function will try to find a matching VLAN object based on 'vlan_data'
Will return matching objects in following order:
* exact match: VLAN id and site match
* global match: VLAN id matches but the VLAN has no site assigned
* exact match: VLAN ID and site match
* global match: VLAN ID matches but the VLAN has no site assigned
If nothing matches the input data from 'vlan_data' will be returned
Parameters
Expand All @@ -664,10 +721,10 @@ def get_vlan_object_if_exists(self, vlan_data=None, vlan_site=None):
raise ValueError("Value of 'vlan_data' needs to be a dict.")

# check existing Devices for matches
log.debug2(f"Trying to find a {NBVLAN.name} based on the VLAN id '{vlan_data.get('vid')}'")
log.debug2(f"Trying to find a {NBVLAN.name} based on the VLAN ID '{vlan_data.get('vid')}'")

if vlan_data.get("vid") is None:
log.debug("No VLAN id set in vlan_data while trying to find matching VLAN.")
log.debug("No VLAN ID set in vlan_data while trying to find matching VLAN.")
return vlan_data

if vlan_site is None:
Expand Down Expand Up @@ -703,7 +760,7 @@ def get_vlan_object_if_exists(self, vlan_data=None, vlan_site=None):
vlan_object_without_site.get_display_name(including_second_key=True)))

else:
log.debug2("No matching existing VLAN found for this VLAN id.")
log.debug2("No matching existing VLAN found for this VLAN ID.")

return return_data

Expand All @@ -724,25 +781,14 @@ def add_vlan_object_to_netbox(self, vlan_data, site_name=None):
"""

# get config data
disable_vlan_sync = False
vlan_sync_exclude_by_name: List[ExcludedVLANName] = list()
vlan_sync_exclude_by_id: List[ExcludedVLANID] = list()
if "disable_vlan_sync" in self.settings:
disable_vlan_sync = self.settings.disable_vlan_sync
if "vlan_sync_exclude_by_name" in self.settings:
vlan_sync_exclude_by_name = self.settings.vlan_sync_exclude_by_name
if "vlan_sync_exclude_by_id" in self.settings:
vlan_sync_exclude_by_id = self.settings.vlan_sync_exclude_by_id

# VLAN is already an existing NetBox VLAN, then it can be reused
if isinstance(vlan_data, NetBoxObject):
return True

if vlan_data is None:
return False

if disable_vlan_sync is True:
if self.settings.disable_vlan_sync is True:
return False

# get VLAN details
Expand All @@ -757,11 +803,11 @@ def add_vlan_object_to_netbox(self, vlan_data, site_name=None):
log.warning(f"Skipping sync of invalid VLAN '{vlan_name}' ID: '{vlan_id}'")
return False

for excluded_vlan in vlan_sync_exclude_by_name or list():
for excluded_vlan in self.settings.vlan_sync_exclude_by_name or list():
if excluded_vlan.matches(vlan_name, site_name):
return False

for excluded_vlan in vlan_sync_exclude_by_id or list():
for excluded_vlan in self.settings.vlan_sync_exclude_by_id or list():
if excluded_vlan.matches(vlan_id, site_name):
return False

Expand Down
Loading

0 comments on commit 13685c5

Please sign in to comment.