diff --git a/CHANGELOG.md b/CHANGELOG.md index e69de29..6a72acd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -0,0 +1,14 @@ +# 0.0.4 + +## Enhancements + +* Add api calls for hapair monitoredinterfaces (read, update) +* Add helper function to get primary device id from hapair +* Add expandable option for get_depoyable_deployable_devices +* Default paging change from 25 to 100 + +## Bugfixes + +* getbyid operations fails due to incorrect limit param +* api calls for ftd ipv4/ipv6 static routing fails due to incorrect URLs +* update ftd sub interface fails due to missing param diff --git a/fireREST/__init__.py b/fireREST/__init__.py index 05af448..aeafa4d 100644 --- a/fireREST/__init__.py +++ b/fireREST/__init__.py @@ -1,14 +1,15 @@ import json -import requests import logging +import requests import urllib3 from .version import __version__ +from requests.auth import HTTPBasicAuth +from requests.exceptions import ConnectionError from time import sleep from typing import Dict -from requests.auth import HTTPBasicAuth -from urllib3.exceptions import ConnectionError +from uuid import UUID API_AUTH_URL = '/api/fmc_platform/v1/auth/generatetoken' @@ -94,6 +95,7 @@ def __init__(self, hostname: str, username: str, password: str, session=dict(), self.domains = session['domains'] self.headers['X-auth-access-token'] = session['X-auth-access-token'] self.headers['X-auth-refresh-token'] = session['X-auth-refresh-token'] + self.domain_name = domain self.domain = self.get_domain_id_by_name(domain) @staticmethod @@ -109,6 +111,13 @@ def _get_logger(logger: object): return dummy_logger return logger + def _is_getbyid_operation(self, request): + try: + val = UUID(request.split('/')[-1]) # noqa: F841 + return True + except ValueError: + return False + def _url(self, namespace='base', path=str()): ''' Generate URLs on the fly for requests to firepower api @@ -153,11 +162,8 @@ def _login(self): self.domains = json.loads(response.headers.get('DOMAINS', default=None)) self.refresh_counter = 0 self.logger.debug(f'Successfully authenticated to {self.hostname}') - except ConnectionRefusedError: - self.logger.error(f'Could not login to {self.hostname}. Connection refused') - raise - except ConnectionError: - self.logger.error(f'Could not login to {self.hostname}. Max retries exceeded with url: {request}') + except ConnectionError as exc: + self.logger.error(exc, exc_info=True) raise except FireRESTRateLimitException: self.logger.debug(f'Could not login to {self.hostname}. Rate limit exceeded. Backing of for 10 seconds.') @@ -191,15 +197,17 @@ def _refresh(self): self.headers['X-auth-access-token'] = access_token self.headers['X-auth-refresh-token'] = refresh_token - except ConnectionError: - msg = f'Could not connect to {self.hostname}. Max retries exceeded with url: {request}' - self.logger.error(msg) + except ConnectionError as exc: + self.logger.error(exc, exc_info=True) + raise except FireRESTRateLimitException: - self.logger.debug(f'API token refresh to {self.hostname} failed. Rate limit exceeded. Backing of for 10 seconds.') + self.logger.debug( + f'API token refresh to {self.hostname} failed. Rate limit exceeded. Backing of for 10 seconds.') sleep(10) self._login() except FireRESTApiException as exc: self.logger.error(str(exc)) + raise self.logger.debug(f'Successfully refreshed authorization token for {self.hostname}') @@ -229,41 +237,45 @@ def _delete(self, request: str, params=None): return response @RequestDebugDecorator('GET') - def _get_request(self, request: str, params=None, limit=25): + def _get_request(self, request: str, params=None, limit=100): ''' GET Operation for FMC REST API. In case of authentication issues session will be refreshed :param request: URL of request that should be performed :param params: dict of parameters for http request + :param limit: set custom limit for paging. If not set, api will default to 100 :return: requests.Response object ''' if params is None: params = dict() + if not self._is_getbyid_operation(request): + params['limit'] = limit try: response = requests.get(request, headers=self.headers, params=params, verify=self.verify_cert, timeout=self.timeout) if response.status_code == 401: if 'Access token invalid' in str(response.json()): self._refresh() - return self._get_request(request, params) + return self._get_request(request, params, limit) if response.status_code == 429: msg = f'GET operation {request} failed due to FMC rate limiting. Backing off for 10 seconds.' raise FireRESTRateLimitException(msg) except FireRESTRateLimitException: sleep(10) - return self._get_request(request, params) + return self._get_request(request, params, limit) return response - def _get(self, request: str, params=None, limit=25): + def _get(self, request: str, params=None, limit=100): ''' GET Operation that supports paging for FMC REST API. In case of authentication issues session will be refreshed :param request: URL of request that should be performed :param params: dict of parameters for http request + :param limit: set custom limit for paging. If not set, api will default to 100 :return: list of requests.Response objects ''' if params is None: params = dict() responses = list() - response = self._get_request(request, params) + response = self._get_request(request, params, limit) responses.append(response) payload = response.json() if 'paging' in payload.keys(): @@ -271,7 +283,7 @@ def _get(self, request: str, params=None, limit=25): limit = int(payload['paging']['limit']) for i in range(1, pages, 1): params['offset'] = str(int(i) * limit) - response_page = self._get_request(request, params) + response_page = self._get_request(request, params, limit) responses.append(response_page) return responses @@ -370,12 +382,12 @@ def get_device_id_by_name(self, device_name: str): def get_device_hapair_id_by_name(self, device_hapair_name: str): ''' - heloer function to retrieve device ha-pair id by name + helper function to retrieve device ha-pair id by name :param device_hapair_name: name of the ha-pair :return: id if ha-pair is found, None otherwise ''' - request = 'devicehapairs/ftddevicehapairs' - url = self._url(request) + request = '/devicehapairs/ftddevicehapairs' + url = self._url('config', request) response = self._get(url) for item in response: for ha_pair in item.json()['items']: @@ -383,6 +395,17 @@ def get_device_hapair_id_by_name(self, device_hapair_name: str): return ha_pair['id'] return None + def get_device_id_from_hapair(self, device_hapair_id: str): + ''' + helper function to retrieve device id from ha-pair + :param device_hapar_id: id of ha-pair + :return: id if device is found, None otherwise + ''' + request = f'/devicehapairs/ftddevicehapairs/{device_hapair_id}' + url = self._url('config', request) + response = self._get(url) + return response[0].json()['primary']['id'] + def get_nat_policy_id_by_name(self, nat_policy_name: str): ''' helper function to retrieve nat policy id by name @@ -588,6 +611,24 @@ def delete_device_hapair(self, device_hapair_id: str): url = self._url('config', request) return self._delete(url) + def get_device_hapair_monitoredinterfaces(self, device_hapair_id: str, expanded=False): + request = f'/devicehapairs/ftddevicehapairs/{device_hapair_id}/monitoredinterfaces' + params = { + 'expanded': expanded + } + url = self._url('config', request) + return self._get(url, params) + + def get_device_hapair_monitoredinterface(self, device_hapair_id: str, monitoredinterface_id: str): + request = f'/devicehapairs/ftddevicehapairs/{device_hapair_id}/monitoredinterfaces/{monitoredinterface_id}' + url = self._url('config', request) + return self._get(url) + + def update_device_hapair_monitoredinterface(self, device_hapair_id: str, monitoredinterface_id: str, data: Dict): + request = f'/devicehapairs/ftddevicehapairs/{device_hapair_id}/monitoredinterfaces/{monitoredinterface_id}' + url = self._url('config', request) + return self._put(url, data) + def get_ftd_physical_interfaces(self, device_id: str, expanded=False): request = f'/devices/devicerecords/{device_id}/physicalinterfaces' url = self._url('config', request) @@ -680,8 +721,8 @@ def get_ftd_sub_interface(self, device_id: str, interface_id: str): url = self._url('config', request) return self._get(url) - def update_ftd_sub_interface(self, device_id: str, data: Dict): - request = f'/devices/devicerecords/{device_id}/subinterfaces' + def update_ftd_sub_interface(self, device_id: str, interface_id: str, data: Dict): + request = f'/devices/devicerecords/{device_id}/subinterfaces/{interface_id}' url = self._url('config', request) return self._put(url, data) @@ -690,71 +731,74 @@ def delete_ftd_sub_interface(self, device_id: str, interface_id: str): url = self._url('config', request) return self._delete(url) - def create_ftd_ipv4_route(self, device_id: str, data: Dict): - request = f'/devices/devicerecords/{device_id}/ipv4staticroutes' + def create_ftd_ipv4staticroute(self, device_id: str, data: Dict): + request = f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes' url = self._url('config', request) return self._post(url, data) - def get_ftd_ipv4_routes(self, device_id: str, expanded=False): - request = f'/devices/devicerecords/{device_id}/ipv4staticroutes' + def get_ftd_ipv4staticroutes(self, device_id: str, expanded=False): + request = f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes' url = self._url('config', request) params = { 'expanded': expanded } return self._get(url, params) - def get_ftd_ipv4_route(self, device_id: str, route_id: str): - request = f'/devices/devicerecords/{device_id}/ipv4staticroutes/{route_id}' + def get_ftd_ipv4staticroute(self, device_id: str, route_id: str): + request = f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}' url = self._url('config', request) return self._get(url) - def update_ftd_ipv4_route(self, device_id: str, data: Dict): - request = f'/devices/devicerecords/{device_id}/ipv4staticroutes' + def update_ftd_ipv4staticroute(self, device_id: str, route_id: str, data: Dict): + request = f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}' url = self._url('config', request) return self._put(url, data) - def delete_ftd_ipv4_route(self, device_id: str, route_id: str): - request = f'/devices/devicerecords/{device_id}/ipv4staticroutes/{route_id}' + def delete_ftd_ipv4staticroute(self, device_id: str, route_id: str): + request = f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}' url = self._url('config', request) return self._delete(url) - def create_ftd_ipv6_route(self, device_id: str, data: Dict): - request = f'/devices/devicerecords/{device_id}/ipv6staticroutes' + def create_ftd_ipv6staticroute(self, device_id: str, data: Dict): + request = f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes' url = self._url('config', request) return self._post(url, data) - def get_ftd_ipv6_routes(self, device_id: str, expanded=False): - request = f'/devices/devicerecords/{device_id}/ipv6staticroutes' + def get_ftd_ipv6staticroutes(self, device_id: str, expanded=False): + request = f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes' url = self._url('config', request) params = { 'expanded': expanded } return self._get(url, params) - def get_ftd_ipv6_route(self, device_id: str, route_id: str): - request = f'/devices/devicerecords/{device_id}/ipv6staticroutes/{route_id}' + def get_ftd_ipv6staticroute(self, device_id: str, route_id: str): + request = f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}' url = self._url('config', request) return self._get(url) - def update_ftd_ipv6_route(self, device_id: str, data: Dict): - request = f'/devices/devicerecords/{device_id}/ipv6staticroutes' + def update_ftd_ipv6staticroute(self, device_id: str, route_id: str, data: Dict): + request = f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}' url = self._url('config', request) return self._put(url, data) - def delete_ftd_ipv6_route(self, device_id: str, route_id: str): - request = f'/devices/devicerecords/{device_id}/ipv6staticroutes/{route_id}' + def delete_ftd_ipv6staticroute(self, device_id: str, route_id: str): + request = f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}' url = self._url('config', request) return self._delete(url) - def create_deployment(self, data: Dict): + def deploy(self, data: Dict): request = '/deployment/deploymentrequests' url = self._url('config', request) return self._post(url, data) - def get_deployment(self): + def get_deployable_devices(self, expanded=False): request = '/deployment/deployabledevices' url = self._url('config', request) - return self._get(url) + params = { + 'expanded': expanded + } + return self._get(url, params) def create_policy(self, policy_type: str, data: Dict): request = f'/policy/{policy_type}' diff --git a/fireREST/version.py b/fireREST/version.py index ffcc925..156d6f9 100644 --- a/fireREST/version.py +++ b/fireREST/version.py @@ -1 +1 @@ -__version__ = '0.0.3' +__version__ = '0.0.4'