Skip to content

Commit

Permalink
Merge pull request #10 from kaisero/develop
Browse files Browse the repository at this point in the history
Version 0.0.4 Release
  • Loading branch information
kaisero authored Nov 15, 2019
2 parents 96c5f7a + db9a285 commit 22016e1
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 47 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# 0.0.4

## Enhancements

* Add api calls for hapair monitoredinterfaces (read, update)
* Add helper function to get primary device id from hapair
* Add expandable option for get_depoyable_deployable_devices
* Default paging change from 25 to 100

## Bugfixes

* getbyid operations fails due to incorrect limit param
* api calls for ftd ipv4/ipv6 static routing fails due to incorrect URLs
* update ftd sub interface fails due to missing param
136 changes: 90 additions & 46 deletions fireREST/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import json
import requests
import logging
import requests
import urllib3

from .version import __version__

from requests.auth import HTTPBasicAuth
from requests.exceptions import ConnectionError
from time import sleep
from typing import Dict
from requests.auth import HTTPBasicAuth
from urllib3.exceptions import ConnectionError
from uuid import UUID


API_AUTH_URL = '/api/fmc_platform/v1/auth/generatetoken'
Expand Down Expand Up @@ -94,6 +95,7 @@ def __init__(self, hostname: str, username: str, password: str, session=dict(),
self.domains = session['domains']
self.headers['X-auth-access-token'] = session['X-auth-access-token']
self.headers['X-auth-refresh-token'] = session['X-auth-refresh-token']
self.domain_name = domain
self.domain = self.get_domain_id_by_name(domain)

@staticmethod
Expand All @@ -109,6 +111,13 @@ def _get_logger(logger: object):
return dummy_logger
return logger

def _is_getbyid_operation(self, request):
try:
val = UUID(request.split('/')[-1]) # noqa: F841
return True
except ValueError:
return False

def _url(self, namespace='base', path=str()):
'''
Generate URLs on the fly for requests to firepower api
Expand Down Expand Up @@ -153,11 +162,8 @@ def _login(self):
self.domains = json.loads(response.headers.get('DOMAINS', default=None))
self.refresh_counter = 0
self.logger.debug(f'Successfully authenticated to {self.hostname}')
except ConnectionRefusedError:
self.logger.error(f'Could not login to {self.hostname}. Connection refused')
raise
except ConnectionError:
self.logger.error(f'Could not login to {self.hostname}. Max retries exceeded with url: {request}')
except ConnectionError as exc:
self.logger.error(exc, exc_info=True)
raise
except FireRESTRateLimitException:
self.logger.debug(f'Could not login to {self.hostname}. Rate limit exceeded. Backing of for 10 seconds.')
Expand Down Expand Up @@ -191,15 +197,17 @@ def _refresh(self):

self.headers['X-auth-access-token'] = access_token
self.headers['X-auth-refresh-token'] = refresh_token
except ConnectionError:
msg = f'Could not connect to {self.hostname}. Max retries exceeded with url: {request}'
self.logger.error(msg)
except ConnectionError as exc:
self.logger.error(exc, exc_info=True)
raise
except FireRESTRateLimitException:
self.logger.debug(f'API token refresh to {self.hostname} failed. Rate limit exceeded. Backing of for 10 seconds.')
self.logger.debug(
f'API token refresh to {self.hostname} failed. Rate limit exceeded. Backing of for 10 seconds.')
sleep(10)
self._login()
except FireRESTApiException as exc:
self.logger.error(str(exc))
raise

self.logger.debug(f'Successfully refreshed authorization token for {self.hostname}')

Expand Down Expand Up @@ -229,49 +237,53 @@ def _delete(self, request: str, params=None):
return response

@RequestDebugDecorator('GET')
def _get_request(self, request: str, params=None, limit=25):
def _get_request(self, request: str, params=None, limit=100):
'''
GET Operation for FMC REST API. In case of authentication issues session will be refreshed
:param request: URL of request that should be performed
:param params: dict of parameters for http request
:param limit: set custom limit for paging. If not set, api will default to 100
:return: requests.Response object
'''
if params is None:
params = dict()
if not self._is_getbyid_operation(request):
params['limit'] = limit
try:
response = requests.get(request, headers=self.headers, params=params, verify=self.verify_cert,
timeout=self.timeout)
if response.status_code == 401:
if 'Access token invalid' in str(response.json()):
self._refresh()
return self._get_request(request, params)
return self._get_request(request, params, limit)
if response.status_code == 429:
msg = f'GET operation {request} failed due to FMC rate limiting. Backing off for 10 seconds.'
raise FireRESTRateLimitException(msg)
except FireRESTRateLimitException:
sleep(10)
return self._get_request(request, params)
return self._get_request(request, params, limit)
return response

def _get(self, request: str, params=None, limit=25):
def _get(self, request: str, params=None, limit=100):
'''
GET Operation that supports paging for FMC REST API. In case of authentication issues session will be refreshed
:param request: URL of request that should be performed
:param params: dict of parameters for http request
:param limit: set custom limit for paging. If not set, api will default to 100
:return: list of requests.Response objects
'''
if params is None:
params = dict()
responses = list()
response = self._get_request(request, params)
response = self._get_request(request, params, limit)
responses.append(response)
payload = response.json()
if 'paging' in payload.keys():
pages = int(payload['paging']['pages'])
limit = int(payload['paging']['limit'])
for i in range(1, pages, 1):
params['offset'] = str(int(i) * limit)
response_page = self._get_request(request, params)
response_page = self._get_request(request, params, limit)
responses.append(response_page)
return responses

Expand Down Expand Up @@ -370,19 +382,30 @@ def get_device_id_by_name(self, device_name: str):

def get_device_hapair_id_by_name(self, device_hapair_name: str):
'''
heloer function to retrieve device ha-pair id by name
helper function to retrieve device ha-pair id by name
:param device_hapair_name: name of the ha-pair
:return: id if ha-pair is found, None otherwise
'''
request = 'devicehapairs/ftddevicehapairs'
url = self._url(request)
request = '/devicehapairs/ftddevicehapairs'
url = self._url('config', request)
response = self._get(url)
for item in response:
for ha_pair in item.json()['items']:
if ha_pair['name'] == device_hapair_name:
return ha_pair['id']
return None

def get_device_id_from_hapair(self, device_hapair_id: str):
'''
helper function to retrieve device id from ha-pair
:param device_hapar_id: id of ha-pair
:return: id if device is found, None otherwise
'''
request = f'/devicehapairs/ftddevicehapairs/{device_hapair_id}'
url = self._url('config', request)
response = self._get(url)
return response[0].json()['primary']['id']

def get_nat_policy_id_by_name(self, nat_policy_name: str):
'''
helper function to retrieve nat policy id by name
Expand Down Expand Up @@ -588,6 +611,24 @@ def delete_device_hapair(self, device_hapair_id: str):
url = self._url('config', request)
return self._delete(url)

def get_device_hapair_monitoredinterfaces(self, device_hapair_id: str, expanded=False):
request = f'/devicehapairs/ftddevicehapairs/{device_hapair_id}/monitoredinterfaces'
params = {
'expanded': expanded
}
url = self._url('config', request)
return self._get(url, params)

def get_device_hapair_monitoredinterface(self, device_hapair_id: str, monitoredinterface_id: str):
request = f'/devicehapairs/ftddevicehapairs/{device_hapair_id}/monitoredinterfaces/{monitoredinterface_id}'
url = self._url('config', request)
return self._get(url)

def update_device_hapair_monitoredinterface(self, device_hapair_id: str, monitoredinterface_id: str, data: Dict):
request = f'/devicehapairs/ftddevicehapairs/{device_hapair_id}/monitoredinterfaces/{monitoredinterface_id}'
url = self._url('config', request)
return self._put(url, data)

def get_ftd_physical_interfaces(self, device_id: str, expanded=False):
request = f'/devices/devicerecords/{device_id}/physicalinterfaces'
url = self._url('config', request)
Expand Down Expand Up @@ -680,8 +721,8 @@ def get_ftd_sub_interface(self, device_id: str, interface_id: str):
url = self._url('config', request)
return self._get(url)

def update_ftd_sub_interface(self, device_id: str, data: Dict):
request = f'/devices/devicerecords/{device_id}/subinterfaces'
def update_ftd_sub_interface(self, device_id: str, interface_id: str, data: Dict):
request = f'/devices/devicerecords/{device_id}/subinterfaces/{interface_id}'
url = self._url('config', request)
return self._put(url, data)

Expand All @@ -690,71 +731,74 @@ def delete_ftd_sub_interface(self, device_id: str, interface_id: str):
url = self._url('config', request)
return self._delete(url)

def create_ftd_ipv4_route(self, device_id: str, data: Dict):
request = f'/devices/devicerecords/{device_id}/ipv4staticroutes'
def create_ftd_ipv4staticroute(self, device_id: str, data: Dict):
request = f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes'
url = self._url('config', request)
return self._post(url, data)

def get_ftd_ipv4_routes(self, device_id: str, expanded=False):
request = f'/devices/devicerecords/{device_id}/ipv4staticroutes'
def get_ftd_ipv4staticroutes(self, device_id: str, expanded=False):
request = f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes'
url = self._url('config', request)
params = {
'expanded': expanded
}
return self._get(url, params)

def get_ftd_ipv4_route(self, device_id: str, route_id: str):
request = f'/devices/devicerecords/{device_id}/ipv4staticroutes/{route_id}'
def get_ftd_ipv4staticroute(self, device_id: str, route_id: str):
request = f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}'
url = self._url('config', request)
return self._get(url)

def update_ftd_ipv4_route(self, device_id: str, data: Dict):
request = f'/devices/devicerecords/{device_id}/ipv4staticroutes'
def update_ftd_ipv4staticroute(self, device_id: str, route_id: str, data: Dict):
request = f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}'
url = self._url('config', request)
return self._put(url, data)

def delete_ftd_ipv4_route(self, device_id: str, route_id: str):
request = f'/devices/devicerecords/{device_id}/ipv4staticroutes/{route_id}'
def delete_ftd_ipv4staticroute(self, device_id: str, route_id: str):
request = f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}'
url = self._url('config', request)
return self._delete(url)

def create_ftd_ipv6_route(self, device_id: str, data: Dict):
request = f'/devices/devicerecords/{device_id}/ipv6staticroutes'
def create_ftd_ipv6staticroute(self, device_id: str, data: Dict):
request = f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes'
url = self._url('config', request)
return self._post(url, data)

def get_ftd_ipv6_routes(self, device_id: str, expanded=False):
request = f'/devices/devicerecords/{device_id}/ipv6staticroutes'
def get_ftd_ipv6staticroutes(self, device_id: str, expanded=False):
request = f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes'
url = self._url('config', request)
params = {
'expanded': expanded
}
return self._get(url, params)

def get_ftd_ipv6_route(self, device_id: str, route_id: str):
request = f'/devices/devicerecords/{device_id}/ipv6staticroutes/{route_id}'
def get_ftd_ipv6staticroute(self, device_id: str, route_id: str):
request = f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}'
url = self._url('config', request)
return self._get(url)

def update_ftd_ipv6_route(self, device_id: str, data: Dict):
request = f'/devices/devicerecords/{device_id}/ipv6staticroutes'
def update_ftd_ipv6staticroute(self, device_id: str, route_id: str, data: Dict):
request = f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}'
url = self._url('config', request)
return self._put(url, data)

def delete_ftd_ipv6_route(self, device_id: str, route_id: str):
request = f'/devices/devicerecords/{device_id}/ipv6staticroutes/{route_id}'
def delete_ftd_ipv6staticroute(self, device_id: str, route_id: str):
request = f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}'
url = self._url('config', request)
return self._delete(url)

def create_deployment(self, data: Dict):
def deploy(self, data: Dict):
request = '/deployment/deploymentrequests'
url = self._url('config', request)
return self._post(url, data)

def get_deployment(self):
def get_deployable_devices(self, expanded=False):
request = '/deployment/deployabledevices'
url = self._url('config', request)
return self._get(url)
params = {
'expanded': expanded
}
return self._get(url, params)

def create_policy(self, policy_type: str, data: Dict):
request = f'/policy/{policy_type}'
Expand Down
2 changes: 1 addition & 1 deletion fireREST/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.0.3'
__version__ = '0.0.4'

0 comments on commit 22016e1

Please sign in to comment.