Skip to content

Commit

Permalink
v0.1.5 (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
3 people authored Oct 8, 2020
1 parent b09388b commit 81e568e
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 79 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
# 0.1.5

## Bugfixes

Fixed issue with id_by_name helper functions caused by incorrect cache impl (#28)
Fixed missing interface_id param for interface PUT operations (#30)

## Enhancements

Added additional unit tests for id_by_name operations
Merged and enhanced hitcount implementation by @arnydo (#29)

# 0.1.4

## Bugfixes
Expand Down
184 changes: 111 additions & 73 deletions fireREST/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-

import json
import logging
Expand Down Expand Up @@ -84,6 +84,20 @@ def _url(self, namespace='base', path=''):
raise exc.InvalidNamespaceError(f'Invalid namespace "{namespace}" provided. Options: {options.keys()}')
return options[namespace]

def _filter(self, items=None):
'''
Get filter string from list of key, value pairs
: param items: list of key value pairs used to build filter string
: return: valid filter string
'''
if items:
filter_str = ''
for k, v in items.items():
if v:
filter_str += f'{k}:{v};'
return filter_str.rstrip(';')
return ''

@utils.handle_errors
def _request(self, method: str, url: str, params=None, auth=None, data=None):
if params:
Expand Down Expand Up @@ -210,7 +224,6 @@ def _sanitize(self, method: str, payload: Dict):
sanitized_payload.pop('id', None)
return sanitized_payload

# @utils.cache_result
@utils.validate_object_type
@utils.minimum_version_required('6.1.0')
def get_object_id_by_name(self, object_type: str, object_name: str):
Expand All @@ -228,7 +241,6 @@ def get_object_id_by_name(self, object_type: str, object_name: str):
return obj['id']
return None

@utils.cache_result
@utils.minimum_version_required('6.1.0')
def get_device_id_by_name(self, device_name: str):
'''
Expand All @@ -244,52 +256,48 @@ def get_device_id_by_name(self, device_name: str):
return device['id']
return None

@utils.cache_result
@utils.minimum_version_required('6.2.3')
def get_device_hapair_id_by_name(self, device_hapair_name: str):
def get_devicehapair_id_by_name(self, hapair_name: str):
'''
helper function to retrieve device ha - pair id by name
: param device_hapair_name: name of the ha - pair
: param hapair_name: name of the ha - pair
: return: id if ha - pair is found, None otherwise
'''
request = '/devicehapairs/ftddevicehapairs'
url = self._url('config', request)
ha_pairs = self._get(url)
for ha_pair in ha_pairs:
if ha_pair['name'] == device_hapair_name:
if ha_pair['name'] == hapair_name:
return ha_pair['id']
return None

@utils.cache_result
@utils.minimum_version_required('6.2.3')
def get_device_id_from_hapair(self, device_hapair_id: str):
def get_device_id_from_hapair(self, hapair_id: str):
'''
helper function to retrieve device id from ha - pair
: param device_hapar_id: id of ha - pair
: param hapair_id: id of ha - pair
: return: id if device is found, None otherwise
'''
request = f'/devicehapairs/ftddevicehapairs/{device_hapair_id}'
request = f'/devicehapairs/ftddevicehapairs/{hapair_id}'
url = self._url('config', request)
ha_pair = self._get(url)
return ha_pair['primary']['id']

@utils.cache_result
@utils.minimum_version_required('6.2.3')
def get_nat_policy_id_by_name(self, nat_policy_name: str):
def get_natpolicy_id_by_name(self, policy_name: str):
'''
helper function to retrieve nat policy id by name
: param nat_policy_name: name of nat policy
: param policy_name: name of nat policy
: return: policy id if nat policy is found, None otherwise
'''
request = '/policy/ftdnatpolicies'
url = self._url('config', request)
nat_policies = self._get(url)
for nat_policy in nat_policies:
if nat_policy['name'] == nat_policy_name:
return nat_policy['id']
natpolicies = self._get(url)
for natpolicy in natpolicies:
if natpolicy['name'] == policy_name:
return natpolicy['id']
return None

@utils.cache_result
@utils.minimum_version_required('6.1.0')
def get_accesspolicy_id_by_name(self, policy_name: str):
'''
Expand All @@ -299,22 +307,34 @@ def get_accesspolicy_id_by_name(self, policy_name: str):
'''
request = '/policy/accesspolicies'
url = self._url('config', request)
accesspolicies = self._get(url)
for accesspolicy in accesspolicies:
if accesspolicy['name'] == policy_name:
return accesspolicy['id']
policies = self._get(url)
for policy in policies:
if policy['name'] == policy_name:
return policy['id']
return None

def get_prefilterpolicy_id_by_name(self, policy_name: str):
'''
helper function to retrieve prefilter policy id by name
: param policy_name: name of the prefilter policy
: return: prefilter policy id if policy name is found, None otherwise
'''
request = '/policy/prefilterpolicies'
url = self._url('config', request)
prefilterpolicies = self._get(url)
for policy in prefilterpolicies:
if policy['name'] == policy_name:
return policy['id']
return None

@utils.cache_result
@utils.minimum_version_required('6.1.0')
def get_accessrule_id_by_name(self, policy_name: str, rule_name: str):
def get_accessrule_id_by_name(self, policy_id: str, rule_name: str):
'''
helper function to retrieve access control policy rule id by name
: param policy_name: name of the access control policy that will be queried
: param rule_name: name of the access control policy rule
: return: accesspolicy rule id if access control policy rule is found, None otherwise
: param policy_id: id of the accesspolicy that will be queried
: param rule_name: name of the accessrule
: return: accesspolicy rule id if accessrule is found, None otherwise
'''
policy_id = self.get_accesspolicy_id_by_name(policy_name)
request = f'/policy/accesspolicies/{policy_id}/accessrules'
url = self._url('config', request)
accessrules = self._get(url)
Expand All @@ -323,32 +343,30 @@ def get_accessrule_id_by_name(self, policy_name: str, rule_name: str):
return accessrule['id']
return None

@utils.cache_result
@utils.minimum_version_required('6.1.0')
def get_syslog_alert_id_by_name(self, syslog_alert_name: str):
def get_syslogalert_id_by_name(self, alert_name: str):
'''
helper function to retrieve syslog alert object id by name
: param syslog_alert_name: name of syslog alert object
: param alert_name: name of syslog alert object
: return: syslogalert id if syslog alert is found, None otherwise
'''
syslogalerts = self.get_syslogalerts()
for syslog_alert in syslogalerts:
if syslog_alert['name'] == syslog_alert_name:
return syslog_alert['id']
for syslogalert in syslogalerts:
if syslogalert['name'] == alert_name:
return syslogalert['id']
return None

@utils.cache_result
@utils.minimum_version_required('6.1.0')
def get_snmp_alert_id_by_name(self, snmp_alert_name: str):
def get_snmpalert_id_by_name(self, alert_name: str):
'''
helper function to retrieve snmp alert object id by name
: param snmp_alert_name: name of snmp alert object
: param alert_name: name of snmp alert object
: return: snmpalert id if snmp alert is found, None otherwise
'''
snmp_alerts = self.get_snmpalerts()
for snmp_alert in snmp_alerts:
if snmp_alert['name'] == snmp_alert_name:
return snmp_alert['id']
snmpalerts = self.get_snmpalerts()
for snmpalert in snmpalerts:
if snmpalert['name'] == alert_name:
return snmpalert['id']
return None

def get_domain_id_by_name(self, domain_name: str):
Expand Down Expand Up @@ -475,46 +493,46 @@ def delete_device(self, device_id: str):
return self._delete(url)

@utils.minimum_version_required('6.2.3')
def get_device_hapairs(self):
def get_devicehapairs(self):
url = self._url('config', '/devicehapairs/ftddevicehapairs')
return self._get(url)

@utils.minimum_version_required('6.2.3')
def create_device_hapair(self, data: Dict):
def create_devicehapair(self, data: Dict):
url = self._url('config', '/devicehapairs/ftddevicehapairs')
return self._get(url, data)

@utils.minimum_version_required('6.2.3')
def get_device_hapair(self, device_hapair_id: str):
url = self._url('config', f'/devicehapairs/ftddevicehapairs/{device_hapair_id}')
def get_devicehapair(self, devicehapair_id: str):
url = self._url('config', f'/devicehapairs/ftddevicehapairs/{devicehapair_id}')
return self._get(url)

@utils.minimum_version_required('6.2.3')
def update_device_hapair(self, data: Dict, device_hapair_id: str):
url = self._url('config', f'/devicehapairs/ftddevicehapairs/{device_hapair_id}')
def update_devicehapair(self, data: Dict, devicehapair_id: str):
url = self._url('config', f'/devicehapairs/ftddevicehapairs/{devicehapair_id}')
return self._update(url, data)

@utils.minimum_version_required('6.2.3')
def delete_device_hapair(self, device_hapair_id: str):
url = self._url('config', f'/devicehapairs/ftddevicehapairs/{device_hapair_id}')
def delete_devicehapair(self, devicehapair_id: str):
url = self._url('config', f'/devicehapairs/ftddevicehapairs/{devicehapair_id}')
return self._delete(url)

@utils.minimum_version_required('6.3.0')
def get_device_hapair_monitoredinterfaces(self, device_hapair_id: str):
url = self._url('config', f'/devicehapairs/ftddevicehapairs/{device_hapair_id}/monitoredinterfaces',)
def get_devicehapair_monitoredinterfaces(self, devicehapair_id: str):
url = self._url('config', f'/devicehapairs/ftddevicehapairs/{devicehapair_id}/monitoredinterfaces')
return self._get(url)

@utils.minimum_version_required('6.3.0')
def get_device_hapair_monitoredinterface(self, device_hapair_id: str, monitoredinterface_id: str):
def get_devicehapair_monitoredinterface(self, devicehapair_id: str, monitoredinterface_id: str):
url = self._url(
'config', f'/devicehapairs/ftddevicehapairs/{device_hapair_id}/monitoredinterfaces/{monitoredinterface_id}',
'config', f'/devicehapairs/ftddevicehapairs/{devicehapair_id}/monitoredinterfaces/{monitoredinterface_id}',
)
return self._get(url)

@utils.minimum_version_required('6.3.0')
def update_device_hapair_monitoredinterface(self, device_hapair_id: str, monitoredinterface_id: str, data: Dict):
def update_devicehapair_monitoredinterface(self, devicehapair_id: str, monitoredinterface_id: str, data: Dict):
url = self._url(
'config', f'/devicehapairs/ftddevicehapairs/{device_hapair_id}/monitoredinterfaces/{monitoredinterface_id}',
'config', f'/devicehapairs/ftddevicehapairs/{devicehapair_id}/monitoredinterfaces/{monitoredinterface_id}',
)
return self._update(url, data)

Expand All @@ -525,12 +543,12 @@ def get_ftd_physical_interfaces(self, device_id: str):

@utils.minimum_version_required('6.1.0')
def get_ftd_physical_interface(self, device_id: str, interface_id: str):
url = self._url('config', f'/devices/devicerecords/{device_id}/physicalinterfaces/{interface_id}',)
url = self._url('config', f'/devices/devicerecords/{device_id}/physicalinterfaces/{interface_id}')
return self._get(url)

@utils.minimum_version_required('6.1.0')
def update_ftd_physical_interface(self, device_id: str, data: Dict):
url = self._url('config', f'/devices/devicerecords/{device_id}/physicalinterfaces')
def update_ftd_physical_interface(self, device_id: str, interface_id: str, data: Dict):
url = self._url('config', f'/devices/devicerecords/{device_id}/physicalinterfaces/{interface_id}')
return self._update(url, data)

@utils.minimum_version_required('6.1.0')
Expand All @@ -545,17 +563,17 @@ def get_ftd_redundant_interfaces(self, device_id: str):

@utils.minimum_version_required('6.1.0')
def get_ftd_redundant_interface(self, device_id: str, interface_id: str):
url = self._url('config', f'/devices/devicerecords/{device_id}/redundantinterfaces/{interface_id}',)
url = self._url('config', f'/devices/devicerecords/{device_id}/redundantinterfaces/{interface_id}')
return self._get(url)

@utils.minimum_version_required('6.1.0')
def update_ftd_redundant_interface(self, device_id: str, data: Dict):
url = self._url('config', f'/devices/devicerecords/{device_id}/redundantinterfaces')
def update_ftd_redundant_interface(self, device_id: str, interface_id: str, data: Dict):
url = self._url('config', f'/devices/devicerecords/{device_id}/redundantinterfaces/{interface_id}')
return self._update(url, data)

@utils.minimum_version_required('6.1.0')
def delete_ftd_redundant_interface(self, device_id: str, interface_id: str):
url = self._url('config', f'/devices/devicerecords/{device_id}/redundantinterfaces/{interface_id}',)
url = self._url('config', f'/devices/devicerecords/{device_id}/redundantinterfaces/{interface_id}')
return self._delete(url)

@utils.minimum_version_required('6.1.0')
Expand All @@ -570,17 +588,17 @@ def get_ftd_portchannel_interfaces(self, device_id: str):

@utils.minimum_version_required('6.1.0')
def get_ftd_portchannel_interface(self, device_id: str, interface_id: str):
url = self._url('config', f'/devices/devicerecords/{device_id}/etherchannelinterfaces/{interface_id}',)
url = self._url('config', f'/devices/devicerecords/{device_id}/etherchannelinterfaces/{interface_id}')
return self._get(url)

@utils.minimum_version_required('6.1.0')
def update_ftd_portchannel_interface(self, device_id: str, data: Dict):
url = self._url('config', f'/devices/devicerecords/{device_id}/etherchannelinterfaces')
def update_ftd_portchannel_interface(self, device_id: str, interface_id: str, data: Dict):
url = self._url('config', f'/devices/devicerecords/{device_id}/etherchannelinterfaces/{interface_id}')
return self._update(url, data)

@utils.minimum_version_required('6.1.0')
def delete_ftd_portchannel_interface(self, device_id: str, interface_id: str):
url = self._url('config', f'/devices/devicerecords/{device_id}/etherchannelinterfaces/{interface_id}',)
url = self._url('config', f'/devices/devicerecords/{device_id}/etherchannelinterfaces/{interface_id}')
return self._delete(url)

@utils.minimum_version_required('6.1.0')
Expand Down Expand Up @@ -620,17 +638,17 @@ def get_ftd_ipv4staticroutes(self, device_id: str):

@utils.minimum_version_required('6.3.0')
def get_ftd_ipv4staticroute(self, device_id: str, route_id: str):
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}',)
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}')
return self._get(url)

@utils.minimum_version_required('6.3.0')
def update_ftd_ipv4staticroute(self, device_id: str, route_id: str, data: Dict):
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}',)
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}')
return self._update(url, data)

@utils.minimum_version_required('6.3.0')
def delete_ftd_ipv4staticroute(self, device_id: str, route_id: str):
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}',)
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}')
return self._delete(url)

@utils.minimum_version_required('6.3.0')
Expand All @@ -645,17 +663,17 @@ def get_ftd_ipv6staticroutes(self, device_id: str):

@utils.minimum_version_required('6.3.0')
def get_ftd_ipv6staticroute(self, device_id: str, route_id: str):
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}',)
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}')
return self._get(url)

@utils.minimum_version_required('6.3.0')
def update_ftd_ipv6staticroute(self, device_id: str, route_id: str, data: Dict):
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}',)
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}')
return self._update(url, data)

@utils.minimum_version_required('6.3.0')
def delete_ftd_ipv6staticroute(self, device_id: str, route_id: str):
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}',)
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}')
return self._delete(url)

@utils.minimum_version_required('6.1.0')
Expand Down Expand Up @@ -886,3 +904,23 @@ def get_policy_assignment(self, policy_id: str):
def update_policy_assignment(self, policy_id: str, data: Dict):
url = self._url('config', f'/assignment/policyassignments/{policy_id}')
return self._update(url, data)

@utils.minimum_version_required('6.4.0')
def get_hitcounts(self, policy_id: str, device_id: str, rule_ids=None, fetch_zero_hitcount=True):
params = {
'filter': self._filter({'deviceId': device_id, 'ids': rule_ids, 'fetchZeroHitCount': fetch_zero_hitcount})
}
url = self._url('config', f'/policy/accesspolicies/{policy_id}/operational/hitcounts')
return self._get(url, params)

@utils.minimum_version_required('6.4.0')
def update_hitcounts(self, policy_id: str, device_id: str, rule_ids=None):
params = {'filter': self._filter({'deviceId': device_id, 'ids': rule_ids})}
url = self._url('config', f'/policy/accesspolicies/{policy_id}/operational/hitcounts')
return self._update(url, params)

@utils.minimum_version_required('6.4.0')
def delete_hitcounts(self, policy_id: str, device_id: str, rule_ids=None):
params = {'filter': self._filter({'deviceId': device_id, 'ids': rule_ids})}
url = self._url('config', f'/policy/accesspolicies/{policy_id}/operational/hitcounts')
return self._update(url, params)
Loading

0 comments on commit 81e568e

Please sign in to comment.