diff --git a/dftimewolf/cli/dftimewolf_recipes.py b/dftimewolf/cli/dftimewolf_recipes.py index f44d7c8e9..675821fd3 100644 --- a/dftimewolf/cli/dftimewolf_recipes.py +++ b/dftimewolf/cli/dftimewolf_recipes.py @@ -14,11 +14,15 @@ import uuid from typing import TYPE_CHECKING, List, Optional, Dict, Any, cast + +from dftimewolf.lib.validators import manager as validators_manager from dftimewolf.cli.curses_display_manager import CursesDisplayManager from dftimewolf.cli.curses_display_manager import CDMStringIOWrapper +# The following import makes sure validators are registered. +from dftimewolf.lib import validators # pylint: disable=unused-import + # pylint: disable=wrong-import-position -from dftimewolf.lib import args_validator from dftimewolf.lib import logging_utils from dftimewolf.lib import telemetry from dftimewolf import config @@ -326,7 +330,7 @@ def ValidateArguments(self) -> None: if argument_mandatory or argument_set: argument_value = self.state.command_line_options[switch] try: - valid_value = args_validator.ValidatorsManager.Validate( + valid_value = validators_manager.ValidatorsManager.Validate( str(argument_value), arg) self.state.command_line_options[switch] = valid_value except errors.RecipeArgsValidationFailure as exception: diff --git a/dftimewolf/lib/args_validator.py b/dftimewolf/lib/args_validator.py index 529f8da92..572375945 100644 --- a/dftimewolf/lib/args_validator.py +++ b/dftimewolf/lib/args_validator.py @@ -1,15 +1,8 @@ """Validators for recipe arguments.""" import abc -import ipaddress -import re -from typing import Any, Dict, List, Union, Type, Sequence, Optional - -import datetime -from urllib.parse import urlparse - -from dftimewolf.lib import errors, resources +from dftimewolf.lib import resources class AbstractValidator(abc.ABC): @@ -95,602 +88,3 @@ def ValidateSingle(self, errors.RecipeArgsValidationFailure: If an invalid argument is found. errors.RecipeArgsValidatorError: An error in validation. """ - - -class AWSRegionValidator(AbstractValidator): - """Validates a correct AWS region.""" - - # Source: - # curl -s https://ip-ranges.amazonaws.com/ip-ranges.json | \ - # jq -r '.prefixes[] | select(.service == "EC2") | .region' ip-ranges.json \ - # | sort | uniq - # Fetched 2023-01-15 - # TODO - Fetch at runtime? - _regions = { - 'af-south-1', 'ap-east-1', 'ap-northeast-1', 'ap-northeast-2', - 'ap-northeast-3', 'ap-south-1', 'ap-south-2', 'ap-southeast-1', - 'ap-southeast-2', 'ap-southeast-3', 'ap-southeast-4', 'ap-southeast-6', - 'ca-central-1', 'ca-west-1', 'cn-north-1', 'cn-northwest-1', - 'eu-central-1', - 'eu-central-2', 'eu-north-1', 'eu-south-1', 'eu-south-2', 'eu-west-1', - 'eu-west-2', 'eu-west-3', 'il-central-1', 'me-central-1', 'me-south-1', - 'sa-east-1', 'us-east-1', 'us-east-2', 'us-gov-east-1', 'us-gov-west-1', - 'us-west-1', 'us-west-2' - } - NAME = 'aws_region' - - def Validate(self, - argument_value: str, - recipe_argument: resources.RecipeArgument) -> str: - """Validate operand is a valid AWS region. - - Args: - argument_value: The argument value to validate. - recipe_argument: The definition of the argument. - - Returns: - A valid AWS region name. - - Raises: - RecipeArgsValidationFailure: if the argument value is not a valid AWS - region. - """ - if argument_value not in self._regions: - raise (errors.RecipeArgsValidationFailure( - recipe_argument.switch, - argument_value, - self.NAME, - 'Invalid AWS Region name')) - - return argument_value - - -class AzureRegionValidator(AbstractValidator): - """Validates an Azure region.""" - - # Source: az account list-locations | jq -r '.[].name' | sort - # Fetched 2023-02-07 - # TODO - Fetch at runtime? - _regions = { - 'asia', 'asiapacific', 'australia', 'australiacentral', - 'australiacentral2', 'australiaeast', 'australiasoutheast', 'brazil', - 'brazilsouth', 'brazilsoutheast', 'canada', 'canadacentral', 'canadaeast', - 'centralindia', 'centralus', 'centraluseuap', 'centralusstage', - 'eastasia', 'eastasiastage', 'eastus', 'eastus2', 'eastus2euap', - 'eastus2stage', 'eastusstage', 'eastusstg', 'europe', 'france', - 'francecentral', 'francesouth', 'germany', 'germanynorth', - 'germanywestcentral', 'global', 'india', 'japan', 'japaneast', - 'japanwest', 'jioindiacentral', 'jioindiawest', 'korea', 'koreacentral', - 'koreasouth', 'northcentralus', 'northcentralusstage', 'northeurope', - 'norway', 'norwayeast', 'norwaywest', 'qatarcentral', 'singapore', - 'southafrica', 'southafricanorth', 'southafricawest', 'southcentralus', - 'southcentralusstage', 'southcentralusstg', 'southeastasia', - 'southeastasiastage', 'southindia', 'swedencentral', 'switzerland', - 'switzerlandnorth', 'switzerlandwest', 'uae', 'uaecentral', 'uaenorth', - 'uk', 'uksouth', 'ukwest', 'unitedstates', 'unitedstateseuap', - 'westcentralus', 'westeurope', 'westindia', 'westus', 'westus2', - 'westus2stage', 'westus3', 'westusstage' - } - NAME = 'azure_region' - - def Validate(self, - argument_value: str, - recipe_argument: resources.RecipeArgument) -> str: - """Validate that argument is a valid Azure region. - - Args: - argument_value: The argument value to validate. - recipe_argument: The definition of the argument. - - Returns: - A valid Azure region name. - - Raises: - RecipeArgsValidationFailure: If the argument value is not a valid Azure - region. - """ - if argument_value not in self._regions: - raise (errors.RecipeArgsValidationFailure( - recipe_argument.switch, - argument_value, - self.NAME, - 'Invalid Azure Region name')) - - return argument_value - - -class GCPZoneValidator(AbstractValidator): - """Validates a GCP zone.""" - - # Source: https://cloud.google.com/compute/docs/regions-zones/ - # Fetched 2023-01-13 - # TODO - Fetch at runtime? - _zones = { - 'asia-east1-a', 'asia-east1-b', 'asia-east1-c', 'asia-east2-a', - 'asia-east2-b', 'asia-east2-c', 'asia-northeast1-a', 'asia-northeast1-b', - 'asia-northeast1-c', 'asia-northeast2-a', 'asia-northeast2-b', - 'asia-northeast2-c', 'asia-northeast3-a', 'asia-northeast3-b', - 'asia-northeast3-c', 'asia-south1-a', 'asia-south1-b', 'asia-south1-c', - 'asia-south2-a', 'asia-south2-b', 'asia-south2-c', 'asia-southeast1-a', - 'asia-southeast1-b', 'asia-southeast1-c', 'asia-southeast2-a', - 'asia-southeast2-b', 'asia-southeast2-c', 'australia-southeast1-a', - 'australia-southeast1-b', 'australia-southeast1-c', - 'australia-southeast2-a', 'australia-southeast2-b', - 'australia-southeast2-c', 'europe-central2-a', 'europe-central2-b', - 'europe-central2-c', 'europe-north1-a', 'europe-north1-b', - 'europe-north1-c', 'europe-southwest1-a', 'europe-southwest1-b', - 'europe-southwest1-c', 'europe-west1-b', 'europe-west1-c', - 'europe-west1-d', 'europe-west2-a', 'europe-west2-b', 'europe-west2-c', - 'europe-west3-a', 'europe-west3-b', 'europe-west3-c', 'europe-west4-a', - 'europe-west4-b', 'europe-west4-c', 'europe-west6-a', 'europe-west6-b', - 'europe-west6-c', 'europe-west8-a', 'europe-west8-b', 'europe-west8-c', - 'europe-west9-a', 'europe-west9-b', 'europe-west9-c', 'me-west1-a', - 'me-west1-b', 'me-west1-c', 'northamerica-northeast1-a', - 'northamerica-northeast1-b', 'northamerica-northeast1-c', - 'northamerica-northeast2-a', 'northamerica-northeast2-b', - 'northamerica-northeast2-c', 'southamerica-east1-a', - 'southamerica-east1-b', - 'southamerica-east1-c', 'southamerica-west1-a', 'southamerica-west1-b', - 'southamerica-west1-c', 'us-central1-a', 'us-central1-b', 'us-central1-c', - 'us-central1-f', 'us-east1-b', 'us-east1-c', 'us-east1-d', 'us-east4-a', - 'us-east4-b', 'us-east4-c', 'us-east5-a', 'us-east5-b', 'us-east5-c', - 'us-south1-a', 'us-south1-b', 'us-south1-c', 'us-west1-a', 'us-west1-b', - 'us-west1-c', 'us-west2-a', 'us-west2-b', 'us-west2-c', 'us-west3-a', - 'us-west3-b', 'us-west3-c', 'us-west4-a', 'us-west4-b', 'us-west4-c', - 'global' - } - NAME = 'gcp_zone' - - def Validate(self, - argument_value: str, - recipe_argument: resources.RecipeArgument) -> Any: - """Validate that operand is a valid GCP zone. - - Args: - argument_value: The argument value to validate. - recipe_argument: The definition of the argument. - - Returns: - A valid GCP zone name. - - Raises: - RecipeArgsValidationFailure: If the argument is not a valid GCP zone. - """ - if argument_value not in self._zones: - raise errors.RecipeArgsValidationFailure( - recipe_argument.switch, - argument_value, - self.NAME, - 'Invalid GCP Zone name') - - return argument_value - - -class RegexValidator(CommaSeparatedValidator): - """Validates a string according to a regular expression.""" - - NAME = 'regex' - - def ValidateSingle(self, - argument_value: str, - recipe_argument: resources.RecipeArgument) -> str: - """Validate a string according to a regular expression. - - Args: - argument_value: The argument value to validate. - recipe_argument: The definition of the argument. - - Returns: - str: a valid string. - - Raises: - errors.RecipeArgsValidatorError: If no regex is found to use. - errors.RecipeArgsValidationFailure: If the argument value does not match - the regex. - """ - expression = recipe_argument.validation_params.get('regex') - if expression is None: - raise errors.RecipeArgsValidatorError( - 'Missing validator parameter: regex') - - regex = re.compile(expression) - - if not regex.match(argument_value): - raise errors.RecipeArgsValidationFailure( - recipe_argument.switch, - argument_value, - self.NAME, - f'does not match regex /{expression}/') - - return argument_value - - -class SubnetValidator(CommaSeparatedValidator): - """Validates a subnet.""" - - NAME = 'subnet' - - def ValidateSingle(self, - argument_value: str, - recipe_argument: resources.RecipeArgument) -> str: - """Validate that the argument_value is a valid subnet string. - - Args: - argument_value: The argument value to validate. - recipe_argument: The definition of the argument. - - Returns: - A valid subnet string - - Raises: - errors.RecipeArgsValidationFailure: If the argument is not a valid subnet. - """ - try: - ipaddress.ip_network(argument_value) - except ValueError: - raise errors.RecipeArgsValidationFailure( - recipe_argument.switch, - argument_value, - self.NAME, - 'Not a valid subnet') - - return argument_value - - -class DatetimeValidator(AbstractValidator): - """Validates a datetime string. - - Requires a format string that defines what the datetime should look like. - - Optionally, it can confirm order of multiple dates as well. A recipe can - specify the following datetime validator: - - { - "format": "datetime", - "format_string": "%Y-%m-%dT%H:%M:%SZ", - "before": "dateX", # optional - "after": "dateY" # optional - } - - The argument will then be checked that it is before the date in 'before', and - after the date in 'after'. Caveat: if a value in before or after is also a - parameter, e.g. with a recipe containing: - - "args": { - [ - "start_date", - "Start date", - null, - { - "format": "datetime", - "format_string": "%Y-%m-%dT%H:%M:%SZ", - "before": "@end_date" - } - ], [ - "end_date", - "End date", - null, - { - "format": "datetime", - "format_string": "%Y-%m-%dT%H:%M:%SZ", - "after": "@start_date" - } - ], - ... - then "format_string" must be the same for both args. - """ - - NAME = 'datetime' - - def Validate(self, argument_value: str, - recipe_argument: resources.RecipeArgument) -> str: - """Validate that operand is a valid GCP zone. - - Args: - argument_value: The argument value to validate. - recipe_argument: The definition of the argument. - - Returns: - A valid datetime string. - - Raises: - errors.RecipeArgsValidatorError: An error in validation. - errors.RecipeArgsValidationFailure: If the argument is not a valid - datetime. - """ - validation_parameters = recipe_argument.validation_params - if 'format_string' not in validation_parameters: - raise errors.RecipeArgsValidatorError( - 'Missing validator parameter: format_string') - - try: - dt = datetime.datetime.strptime( - argument_value, validation_parameters['format_string']) - except ValueError: # Date parsing failure - raise errors.RecipeArgsValidationFailure( - recipe_argument.switch, - argument_value, - self.NAME, - f'does not match format {validation_parameters["format_string"]}') - - try: - if 'before' in validation_parameters and validation_parameters['before']: - if not self._ValidateOrder( - dt, validation_parameters['before'], - validation_parameters["format_string"]): - raise errors.RecipeArgsValidationFailure( - recipe_argument.switch, - argument_value, - self.NAME, - (f'{validation_parameters["before"]} is before {dt} but it ' - 'should be the other way around')) - - if 'after' in validation_parameters and validation_parameters['after']: - if not self._ValidateOrder( - validation_parameters['after'], dt, - validation_parameters["format_string"]): - raise errors.RecipeArgsValidationFailure( - recipe_argument.switch, - argument_value, - self.NAME, - (f'{dt} is before {validation_parameters["after"]} but it ' - 'should be the other way around')) - except ValueError as exception: - raise errors.RecipeArgsValidatorError( - f'Error in order comparison: {str(exception)}') - return argument_value - - def _ValidateOrder(self, - first: Union[str, datetime.datetime], - second: Union[str, datetime.datetime], - format_string: str) -> bool: - """Validates date ordering. - - Args: - first: The intended earlier date. - second: The intended later date. - format_string: A format string defining str -> datetime conversion. - - Returns: - True if the ordering is correct, false otherwise. - - Raises: - ValueError: If string -> datetime conversion fails. - """ - if isinstance(first, str): - first = datetime.datetime.strptime(first, format_string) - if isinstance(second, str): - second = datetime.datetime.strptime(second, format_string) - - return first < second - - -class HostnameValidator(CommaSeparatedValidator): - """Validator for hostnames. - - Can validate flat hostnames and FQDNs. Optionally, can have `fqdn_only` - specified to require FQDNs and reject flat hostnames.""" - - NAME = 'hostname' - FQDN_ONLY_FLAG = 'fqdn_only' - HOSTNAME_REGEX = r'^[-_a-z0-9]{3,64}$' # Flat names, like 'localhost' - FQDN_REGEX = ( - r'(?=^.{4,253}$)(^((?!-)[a-zA-Z0-9-]{1,63}(? str: - """Validate a hostname. - - Args: - argument_value: The hostname to validate. - recipe_argument: The definition of the argument. - - Returns: - A valid hostname. - - Raises: - errors.RecipeArgsValidationFailure: If the argument is not a valid hostname. - """ - regexes = [self.FQDN_REGEX] - if not recipe_argument.validation_params.get(self.FQDN_ONLY_FLAG, False): - regexes.append(self.HOSTNAME_REGEX) - - for regex in regexes: - if re.match(regex, argument_value): - return argument_value - - raise errors.RecipeArgsValidationFailure( - recipe_argument.switch, - argument_value, - self.NAME, - 'Not a valid hostname') - - -class GRRHostValidator(HostnameValidator): - """Validates a GRR host identifier. - - GRR can accept FQDNs, or GRR client IDs, which take the form of - C.1facf5562db006ad. - """ - - NAME = 'grr_host' - GRR_REGEX = r'^C\.[0-9a-f]{16}$' - - def ValidateSingle(self, argument_value: str, - recipe_argument: resources.RecipeArgument) -> Any: - """Validates a GRR host ID. - - Args: - argument_value: The ID to validate. - recipe_argument: Unused for this validator. - - Returns: - A valid GRR host identifier. - - Raises: - errors.RecipeArgsValidationFailure: If the argument value is not a GRR - host ID. - """ - regexes = [self.GRR_REGEX, self.FQDN_REGEX, self.HOSTNAME_REGEX] - - for regex in regexes: - if re.match(regex, argument_value): - return argument_value - - raise errors.RecipeArgsValidationFailure( - recipe_argument.switch, - argument_value, - self.NAME, - 'Not a GRR host identifier') - - -class URLValidator(CommaSeparatedValidator): - """Validates a URL.""" - - NAME = "url" - - def ValidateSingle(self, - argument_value: str, - recipe_argument: resources.RecipeArgument) -> str: - """Validates a URL. - - Args: - argument_value: The URL to validate. - recipe_argument: The definition of the argument. - - - Returns: - A valid URL string. - - Raises: - errors.RecipeArgsValidationFailure: If the argument is not a valid URL. - """ - url = urlparse(argument_value) - if not all([url.scheme, url.netloc]): - raise errors.RecipeArgsValidationFailure( - recipe_argument.switch, - argument_value, - self.NAME, - 'Not a valid URL') - - return argument_value - - -class ValidatorsManager: - """Class that handles validating arguments.""" - - _validator_classes = {} # type: Dict[str, Type['AbstractValidator']] - - @classmethod - def ListValidators(cls) -> List[str]: - """Returns a list of all registered validators. - - Returns: - A list of all registered validators. - """ - return list(cls._validator_classes.keys()) - - - @classmethod - def RegisterValidator(cls, - validator_class: Type['AbstractValidator']) -> None: - """Register a validator class for usage. - - Args: - validator_class: Class to register. - - Raises: - KeyError: if there's already a validator class set for the corresponding - class name. - """ - class_name = validator_class.NAME - if class_name in cls._validator_classes: - raise KeyError( - 'Validator class already set for: {0:s}.'.format(class_name)) - - cls._validator_classes[class_name] = validator_class - - @classmethod - def DeregisterValidator(cls, - validator_class: Type['AbstractValidator']) -> None: - """Deregister a validator class. - - Args: - validator_class: Class to deregister. - - Raises: - KeyError: if validator class is not set for the corresponding class name. - """ - class_name = validator_class.NAME - if class_name not in cls._validator_classes: - raise KeyError('Module class not set for: {0:s}.'.format(class_name)) - - del cls._validator_classes[class_name] - - @classmethod - def RegisterValidators( - cls, validator_classes: Sequence[Type['AbstractValidator']]) -> None: - """Registers validator classes. - - The module classes are identified based on their class name. - - Args: - validator_classes (Sequence[type]): classes to register. - Raises: - KeyError: if module class is already set for the corresponding class name. - """ - for module_class in validator_classes: - cls.RegisterValidator(module_class) - - @classmethod - def GetValidatorByName(cls, name: str) -> Optional[Type['AbstractValidator']]: - """Retrieves a specific validator by its name. - - Args: - name (str): name of the module. - - Returns: - type: the module class, which is a subclass of BaseModule, or None if - no corresponding module was found. - """ - return cls._validator_classes.get(name, None) - - @classmethod - def Validate(cls, - argument_value: str, - recipe_argument: resources.RecipeArgument) -> str: - """Validate an argument value. - - Args: - argument_value: The argument value to validate. - recipe_argument: The definition of the argument. - - Returns: - The validated argument value. If the recipe argument doesn't specify a - validator, the argument value is returned unchanged. - - Raises: - errors.RecipeArgsValidationFailure: If the argument is not valid. - errors.RecipeArgsValidatorError: Raised on validator config errors. - """ - validator_name = str(recipe_argument.validation_params.get("format", "")) - if not validator_name: - return argument_value - - if validator_name not in cls._validator_classes: - raise errors.RecipeArgsValidatorError( - f'{validator_name} is not a registered validator') - - validator_class = cls._validator_classes[validator_name] - validator = validator_class() - - return validator.Validate(argument_value, recipe_argument) - - -ValidatorsManager.RegisterValidators( - [AWSRegionValidator, AzureRegionValidator, DatetimeValidator, - HostnameValidator, GCPZoneValidator, GRRHostValidator, RegexValidator, - SubnetValidator, URLValidator]) diff --git a/dftimewolf/lib/validators/__init__.py b/dftimewolf/lib/validators/__init__.py new file mode 100644 index 000000000..184602691 --- /dev/null +++ b/dftimewolf/lib/validators/__init__.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- +"""This file imports Python modules that register validators.""" +from dftimewolf.lib.validators import aws_region +from dftimewolf.lib.validators import azure_region +from dftimewolf.lib.validators import datetime_validator +from dftimewolf.lib.validators import gcp_zone +from dftimewolf.lib.validators import grr_host +from dftimewolf.lib.validators import hostname +from dftimewolf.lib.validators import regex +from dftimewolf.lib.validators import subnet +from dftimewolf.lib.validators import url diff --git a/dftimewolf/lib/validators/aws_region.py b/dftimewolf/lib/validators/aws_region.py new file mode 100644 index 000000000..a008fea66 --- /dev/null +++ b/dftimewolf/lib/validators/aws_region.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +"""Validator for AWS region names.""" +from dftimewolf.lib import errors, resources, args_validator +from dftimewolf.lib.validators import manager as validators_manager + +# Source: +# curl -s https://ip-ranges.amazonaws.com/ip-ranges.json | \ +# jq -r '.prefixes[] | select(.service == "EC2") | .region' ip-ranges.json \ +# | sort | uniq +# Fetched 2023-01-15 +# TODO - Fetch at runtime? +REGIONS = frozenset({ + 'af-south-1', 'ap-east-1', 'ap-northeast-1', 'ap-northeast-2', + 'ap-northeast-3', 'ap-south-1', 'ap-south-2', 'ap-southeast-1', + 'ap-southeast-2', 'ap-southeast-3', 'ap-southeast-4', 'ap-southeast-6', + 'ca-central-1', 'ca-west-1', 'cn-north-1', 'cn-northwest-1', + 'eu-central-1', + 'eu-central-2', 'eu-north-1', 'eu-south-1', 'eu-south-2', 'eu-west-1', + 'eu-west-2', 'eu-west-3', 'il-central-1', 'me-central-1', 'me-south-1', + 'sa-east-1', 'us-east-1', 'us-east-2', 'us-gov-east-1', 'us-gov-west-1', + 'us-west-1', 'us-west-2'}) + + +class AWSRegionValidator(args_validator.AbstractValidator): + """Validates a correct AWS region.""" + + NAME = 'aws_region' + + def Validate(self, + argument_value: str, + recipe_argument: resources.RecipeArgument) -> str: + """Validate operand is a valid AWS region. + + Args: + argument_value: The argument value to validate. + recipe_argument: The definition of the argument. + + Returns: + A valid AWS region name. + + Raises: + RecipeArgsValidationFailure: if the argument value is not a valid AWS + region. + """ + if argument_value not in REGIONS: + raise (errors.RecipeArgsValidationFailure( + recipe_argument.switch, + argument_value, + self.NAME, + 'Invalid AWS Region name')) + + return argument_value + +validators_manager.ValidatorsManager.RegisterValidator(AWSRegionValidator) diff --git a/dftimewolf/lib/validators/azure_region.py b/dftimewolf/lib/validators/azure_region.py new file mode 100644 index 000000000..710962650 --- /dev/null +++ b/dftimewolf/lib/validators/azure_region.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +"""Validator for Azure region names.""" +from dftimewolf.lib import errors, resources, args_validator +from dftimewolf.lib.validators import manager as validators_manager + +# Source: az account list-locations | jq -r '.[].name' | sort +# Fetched 2023-02-07 +# TODO - Fetch at runtime? +REGIONS = frozenset({ + 'asia', 'asiapacific', 'australia', 'australiacentral', + 'australiacentral2', 'australiaeast', 'australiasoutheast', 'brazil', + 'brazilsouth', 'brazilsoutheast', 'canada', 'canadacentral', 'canadaeast', + 'centralindia', 'centralus', 'centraluseuap', 'centralusstage', + 'eastasia', 'eastasiastage', 'eastus', 'eastus2', 'eastus2euap', + 'eastus2stage', 'eastusstage', 'eastusstg', 'europe', 'france', + 'francecentral', 'francesouth', 'germany', 'germanynorth', + 'germanywestcentral', 'global', 'india', 'japan', 'japaneast', + 'japanwest', 'jioindiacentral', 'jioindiawest', 'korea', 'koreacentral', + 'koreasouth', 'northcentralus', 'northcentralusstage', 'northeurope', + 'norway', 'norwayeast', 'norwaywest', 'qatarcentral', 'singapore', + 'southafrica', 'southafricanorth', 'southafricawest', 'southcentralus', + 'southcentralusstage', 'southcentralusstg', 'southeastasia', + 'southeastasiastage', 'southindia', 'swedencentral', 'switzerland', + 'switzerlandnorth', 'switzerlandwest', 'uae', 'uaecentral', 'uaenorth', + 'uk', 'uksouth', 'ukwest', 'unitedstates', 'unitedstateseuap', + 'westcentralus', 'westeurope', 'westindia', 'westus', 'westus2', + 'westus2stage', 'westus3', 'westusstage'}) + + +class AzureRegionValidator(args_validator.AbstractValidator): + """Validates an Azure region.""" + + NAME = 'azure_region' + + def Validate(self, + argument_value: str, + recipe_argument: resources.RecipeArgument) -> str: + """Validate that argument is a valid Azure region. + + Args: + argument_value: The argument value to validate. + recipe_argument: The definition of the argument. + + Returns: + A valid Azure region name. + + Raises: + RecipeArgsValidationFailure: If the argument value is not a valid Azure + region. + """ + if argument_value not in REGIONS: + raise (errors.RecipeArgsValidationFailure( + recipe_argument.switch, + argument_value, + self.NAME, + 'Invalid Azure Region name')) + + return argument_value + +validators_manager.ValidatorsManager.RegisterValidator(AzureRegionValidator) diff --git a/dftimewolf/lib/validators/datetime_validator.py b/dftimewolf/lib/validators/datetime_validator.py new file mode 100644 index 000000000..8cad199c6 --- /dev/null +++ b/dftimewolf/lib/validators/datetime_validator.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +"""Validator for dates and times""" +import datetime +from typing import Union + +from dftimewolf.lib import errors, resources, args_validator +from dftimewolf.lib.validators import manager as validators_manager + + + +class DatetimeValidator(args_validator.AbstractValidator): + """Validates a datetime string. + + Requires a format string that defines what the datetime should look like. + + Optionally, it can confirm order of multiple dates as well. A recipe can + specify the following datetime validator: + + { + "format": "datetime", + "format_string": "%Y-%m-%dT%H:%M:%SZ", + "before": "dateX", # optional + "after": "dateY" # optional + } + + The argument will then be checked that it is before the date in 'before', and + after the date in 'after'. Caveat: if a value in before or after is also a + parameter, e.g. with a recipe containing: + + "args": { + [ + "start_date", + "Start date", + null, + { + "format": "datetime", + "format_string": "%Y-%m-%dT%H:%M:%SZ", + "before": "@end_date" + } + ], [ + "end_date", + "End date", + null, + { + "format": "datetime", + "format_string": "%Y-%m-%dT%H:%M:%SZ", + "after": "@start_date" + } + ], + ... + then "format_string" must be the same for both args. + """ + + NAME = 'datetime' + + def Validate(self, argument_value: str, + recipe_argument: resources.RecipeArgument) -> str: + """Validate that operand is a valid GCP zone. + + Args: + argument_value: The argument value to validate. + recipe_argument: The definition of the argument. + + Returns: + A valid datetime string. + + Raises: + errors.RecipeArgsValidatorError: An error in validation. + errors.RecipeArgsValidationFailure: If the argument is not a valid + datetime. + """ + validation_parameters = recipe_argument.validation_params + if 'format_string' not in validation_parameters: + raise errors.RecipeArgsValidatorError( + 'Missing validator parameter: format_string') + + try: + dt = datetime.datetime.strptime( + argument_value, validation_parameters['format_string']) + except ValueError: # Date parsing failure + raise errors.RecipeArgsValidationFailure( + recipe_argument.switch, + argument_value, + self.NAME, + f'does not match format {validation_parameters["format_string"]}') + + try: + if 'before' in validation_parameters and validation_parameters['before']: + if not self._ValidateOrder( + dt, validation_parameters['before'], + validation_parameters["format_string"]): + raise errors.RecipeArgsValidationFailure( + recipe_argument.switch, + argument_value, + self.NAME, + (f'{validation_parameters["before"]} is before {dt} but it ' + 'should be the other way around')) + + if 'after' in validation_parameters and validation_parameters['after']: + if not self._ValidateOrder( + validation_parameters['after'], dt, + validation_parameters["format_string"]): + raise errors.RecipeArgsValidationFailure( + recipe_argument.switch, + argument_value, + self.NAME, + (f'{dt} is before {validation_parameters["after"]} but it ' + 'should be the other way around')) + except ValueError as exception: + raise errors.RecipeArgsValidatorError( + f'Error in order comparison: {str(exception)}') + return argument_value + + def _ValidateOrder(self, + first: Union[str, datetime.datetime], + second: Union[str, datetime.datetime], + format_string: str) -> bool: + """Validates date ordering. + + Args: + first: The intended earlier date. + second: The intended later date. + format_string: A format string defining str -> datetime conversion. + + Returns: + True if the ordering is correct, false otherwise. + + Raises: + ValueError: If string -> datetime conversion fails. + """ + if isinstance(first, str): + first = datetime.datetime.strptime(first, format_string) + if isinstance(second, str): + second = datetime.datetime.strptime(second, format_string) + + return first < second + +validators_manager.ValidatorsManager.RegisterValidator(DatetimeValidator) diff --git a/dftimewolf/lib/validators/gcp_zone.py b/dftimewolf/lib/validators/gcp_zone.py new file mode 100644 index 000000000..ff41ac375 --- /dev/null +++ b/dftimewolf/lib/validators/gcp_zone.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +"""Validator for GCP zone names.""" +from typing import Any + +from dftimewolf.lib import errors, resources +from dftimewolf.lib.args_validator import AbstractValidator +from dftimewolf.lib.validators import manager as validators_manager + +# Source: https://cloud.google.com/compute/docs/regions-zones/ +# Fetched 2023-01-13 +# TODO - Fetch at runtime? +ZONES = frozenset({ + 'asia-east1-a', 'asia-east1-b', 'asia-east1-c', 'asia-east2-a', + 'asia-east2-b', 'asia-east2-c', 'asia-northeast1-a', 'asia-northeast1-b', + 'asia-northeast1-c', 'asia-northeast2-a', 'asia-northeast2-b', + 'asia-northeast2-c', 'asia-northeast3-a', 'asia-northeast3-b', + 'asia-northeast3-c', 'asia-south1-a', 'asia-south1-b', 'asia-south1-c', + 'asia-south2-a', 'asia-south2-b', 'asia-south2-c', 'asia-southeast1-a', + 'asia-southeast1-b', 'asia-southeast1-c', 'asia-southeast2-a', + 'asia-southeast2-b', 'asia-southeast2-c', 'australia-southeast1-a', + 'australia-southeast1-b', 'australia-southeast1-c', + 'australia-southeast2-a', 'australia-southeast2-b', + 'australia-southeast2-c', 'europe-central2-a', 'europe-central2-b', + 'europe-central2-c', 'europe-north1-a', 'europe-north1-b', + 'europe-north1-c', 'europe-southwest1-a', 'europe-southwest1-b', + 'europe-southwest1-c', 'europe-west1-b', 'europe-west1-c', + 'europe-west1-d', 'europe-west2-a', 'europe-west2-b', 'europe-west2-c', + 'europe-west3-a', 'europe-west3-b', 'europe-west3-c', 'europe-west4-a', + 'europe-west4-b', 'europe-west4-c', 'europe-west6-a', 'europe-west6-b', + 'europe-west6-c', 'europe-west8-a', 'europe-west8-b', 'europe-west8-c', + 'europe-west9-a', 'europe-west9-b', 'europe-west9-c', 'me-west1-a', + 'me-west1-b', 'me-west1-c', 'northamerica-northeast1-a', + 'northamerica-northeast1-b', 'northamerica-northeast1-c', + 'northamerica-northeast2-a', 'northamerica-northeast2-b', + 'northamerica-northeast2-c', 'southamerica-east1-a', + 'southamerica-east1-b', + 'southamerica-east1-c', 'southamerica-west1-a', 'southamerica-west1-b', + 'southamerica-west1-c', 'us-central1-a', 'us-central1-b', 'us-central1-c', + 'us-central1-f', 'us-east1-b', 'us-east1-c', 'us-east1-d', 'us-east4-a', + 'us-east4-b', 'us-east4-c', 'us-east5-a', 'us-east5-b', 'us-east5-c', + 'us-south1-a', 'us-south1-b', 'us-south1-c', 'us-west1-a', 'us-west1-b', + 'us-west1-c', 'us-west2-a', 'us-west2-b', 'us-west2-c', 'us-west3-a', + 'us-west3-b', 'us-west3-c', 'us-west4-a', 'us-west4-b', 'us-west4-c', + 'global'}) + +class GCPZoneValidator(AbstractValidator): + """Validates a GCP zone.""" + + NAME = 'gcp_zone' + + def Validate(self, + argument_value: str, + recipe_argument: resources.RecipeArgument) -> Any: + """Validate that argument value is a valid GCP zone. + + Args: + argument_value: The argument value to validate. + recipe_argument: The definition of the argument. + + Returns: + A valid GCP zone name. + + Raises: + RecipeArgsValidationFailure: If the argument is not a valid GCP zone. + """ + if argument_value not in ZONES: + raise errors.RecipeArgsValidationFailure( + recipe_argument.switch, + argument_value, + self.NAME, + 'Invalid GCP Zone name') + + return argument_value +validators_manager.ValidatorsManager.RegisterValidator(GCPZoneValidator) diff --git a/dftimewolf/lib/validators/grr_host.py b/dftimewolf/lib/validators/grr_host.py new file mode 100644 index 000000000..db5fe37f1 --- /dev/null +++ b/dftimewolf/lib/validators/grr_host.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +"""Validator for GRR host identifiers.""" +import re +from typing import Any + +from dftimewolf.lib import errors, resources +from dftimewolf.lib.validators import hostname +from dftimewolf.lib.validators import manager as validators_manager + + +class GRRHostValidator(hostname.HostnameValidator): + """Validates a GRR host identifier. + + GRR can accept FQDNs, or GRR client IDs, which take the form of + C.1facf5562db006ad. + """ + + NAME = 'grr_host' + GRR_REGEX = r'^C\.[0-9a-f]{16}$' + + def ValidateSingle(self, argument_value: str, + recipe_argument: resources.RecipeArgument) -> Any: + """Validates a GRR host ID. + + Args: + argument_value: The ID to validate. + recipe_argument: Unused for this validator. + + Returns: + A valid GRR host identifier. + + Raises: + errors.RecipeArgsValidationFailure: If the argument value is not a GRR + host ID. + """ + regexes = [self.GRR_REGEX, self.FQDN_REGEX, self.HOSTNAME_REGEX] + + for regex in regexes: + if re.match(regex, argument_value): + return argument_value + + raise errors.RecipeArgsValidationFailure( + recipe_argument.switch, + argument_value, + self.NAME, + 'Not a GRR host identifier') + +validators_manager.ValidatorsManager.RegisterValidator(GRRHostValidator) diff --git a/dftimewolf/lib/validators/hostname.py b/dftimewolf/lib/validators/hostname.py new file mode 100644 index 000000000..37d1f1108 --- /dev/null +++ b/dftimewolf/lib/validators/hostname.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +"""Validator for hostnames.""" +import re + +from dftimewolf.lib import errors, resources, args_validator +from dftimewolf.lib.validators import manager as validators_manager + + +class HostnameValidator(args_validator.CommaSeparatedValidator): + """Validator for hostnames. + + Can validate flat hostnames and FQDNs. Optionally, can have `fqdn_only` + specified to require FQDNs and reject flat hostnames.""" + + NAME = 'hostname' + FQDN_ONLY_FLAG = 'fqdn_only' + HOSTNAME_REGEX = r'^[-_a-z0-9]{3,64}$' # Flat names, like 'localhost' + FQDN_REGEX = ( + r'(?=^.{4,253}$)(^((?!-)[a-zA-Z0-9-]{1,63}(? str: + """Validate a hostname. + + Args: + argument_value: The hostname to validate. + recipe_argument: The definition of the argument. + + Returns: + A valid hostname. + + Raises: + errors.RecipeArgsValidationFailure: If the argument is not a valid hostname. + """ + regexes = [self.FQDN_REGEX] + if not recipe_argument.validation_params.get(self.FQDN_ONLY_FLAG, False): + regexes.append(self.HOSTNAME_REGEX) + + for regex in regexes: + if re.match(regex, argument_value): + return argument_value + + raise errors.RecipeArgsValidationFailure( + recipe_argument.switch, + argument_value, + self.NAME, + 'Not a valid hostname') + +validators_manager.ValidatorsManager.RegisterValidator(HostnameValidator) diff --git a/dftimewolf/lib/validators/manager.py b/dftimewolf/lib/validators/manager.py new file mode 100644 index 000000000..d71abe643 --- /dev/null +++ b/dftimewolf/lib/validators/manager.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +"""Manager class for validators.""" +from typing import Dict, List, Optional, Sequence, Type + +from dftimewolf.lib import errors, resources, args_validator + + +class ValidatorsManager: + """Class that handles validating arguments.""" + + _validator_classes = {} # type: Dict[str, Type['args_validator.AbstractValidator']] # pylint: disable=line-too-long + + @classmethod + def ListValidators(cls) -> List[str]: + """Returns a list of all registered validators. + + Returns: + A list of all registered validators. + """ + return list(cls._validator_classes.keys()) + + + @classmethod + def RegisterValidator( + cls, validator_class: Type['args_validator.AbstractValidator'], + override: bool = False) -> None: + """Register a validator class for usage. + + Args: + validator_class: Class to register. + override: Override any existing validator class with the same name. + + Raises: + KeyError: if there's already a validator class set for the corresponding + class name and override is not set. + """ + class_name = validator_class.NAME + + if class_name in cls._validator_classes and not override: + raise KeyError( + 'Validator class already set for: {0:s}.'.format(class_name)) + + cls._validator_classes[class_name] = validator_class + + @classmethod + def DeregisterValidator( + cls, validator_class: Type['args_validator.AbstractValidator']) -> None: + """Deregister a validator class. + + Args: + validator_class: Class to deregister. + + Raises: + KeyError: if validator class is not set for the corresponding class name. + """ + class_name = validator_class.NAME + if class_name not in cls._validator_classes: + raise KeyError('Module class not set for: {0:s}.'.format(class_name)) + + del cls._validator_classes[class_name] + + @classmethod + def RegisterValidators( + cls, + validator_classes: Sequence[Type['args_validator.AbstractValidator']]) -> None: #pylint: disable=line-too-long + """Registers validator classes. + + The module classes are identified based on their class name. + + Args: + validator_classes (Sequence[type]): classes to register. + Raises: + KeyError: if module class is already set for the corresponding class name. + """ + for module_class in validator_classes: + cls.RegisterValidator(module_class) + + @classmethod + def GetValidatorByName( + cls, name: str) -> Optional[Type['args_validator.AbstractValidator']]: + """Retrieves a specific validator by its name. + + Args: + name (str): name of the module. + + Returns: + type: the module class, which is a subclass of BaseModule, or None if + no corresponding module was found. + """ + return cls._validator_classes.get(name, None) + + @classmethod + def Validate(cls, + argument_value: str, + recipe_argument: resources.RecipeArgument) -> str: + """Validate an argument value. + + Args: + argument_value: The argument value to validate. + recipe_argument: The definition of the argument. + + Returns: + The validated argument value. If the recipe argument doesn't specify a + validator, the argument value is returned unchanged. + + Raises: + errors.RecipeArgsValidationFailure: If the argument is not valid. + errors.RecipeArgsValidatorError: Raised on validator config errors. + """ + validator_name = str(recipe_argument.validation_params.get("format", "")) + if not validator_name: + return argument_value + + if validator_name not in cls._validator_classes: + raise errors.RecipeArgsValidatorError( + f'{validator_name} is not a registered validator') + + validator_class = cls._validator_classes[validator_name] + validator = validator_class() + + return validator.Validate(argument_value, recipe_argument) diff --git a/dftimewolf/lib/validators/regex.py b/dftimewolf/lib/validators/regex.py new file mode 100644 index 000000000..7a2dd1382 --- /dev/null +++ b/dftimewolf/lib/validators/regex.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +"""Validator for regular expression matches.""" +import re + +from dftimewolf.lib import errors, resources, args_validator +from dftimewolf.lib.validators import manager as validators_manager + + +class RegexValidator(args_validator.CommaSeparatedValidator): + """Validates a string according to a regular expression.""" + + NAME = 'regex' + + def ValidateSingle(self, + argument_value: str, + recipe_argument: resources.RecipeArgument) -> str: + """Validate a string according to a regular expression. + + Args: + argument_value: The argument value to validate. + recipe_argument: The definition of the argument. + + Returns: + str: a valid string. + + Raises: + errors.RecipeArgsValidatorError: If no regex is found to use. + errors.RecipeArgsValidationFailure: If the argument value does not match + the regex. + """ + expression = recipe_argument.validation_params.get('regex') + if expression is None: + raise errors.RecipeArgsValidatorError( + 'Missing validator parameter: regex') + + regex = re.compile(expression) + + if not regex.match(argument_value): + raise errors.RecipeArgsValidationFailure( + recipe_argument.switch, + argument_value, + self.NAME, + f'does not match regex /{expression}/') + + return argument_value + +validators_manager.ValidatorsManager.RegisterValidator(RegexValidator) diff --git a/dftimewolf/lib/validators/subnet.py b/dftimewolf/lib/validators/subnet.py new file mode 100644 index 000000000..079486e5b --- /dev/null +++ b/dftimewolf/lib/validators/subnet.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +"""Validator for subnets.""" +import ipaddress + +from dftimewolf.lib import errors, resources, args_validator +from dftimewolf.lib.validators import manager as validators_manager + + +class SubnetValidator(args_validator.CommaSeparatedValidator): + """Validates a subnet.""" + + NAME = 'subnet' + + def ValidateSingle(self, + argument_value: str, + recipe_argument: resources.RecipeArgument) -> str: + """Validate that the argument_value is a valid subnet string. + + Args: + argument_value: The argument value to validate. + recipe_argument: The definition of the argument. + + Returns: + A valid subnet string + + Raises: + errors.RecipeArgsValidationFailure: If the argument is not a valid subnet. + """ + try: + ipaddress.ip_network(argument_value) + except ValueError: + raise errors.RecipeArgsValidationFailure( + recipe_argument.switch, + argument_value, + self.NAME, + 'Not a valid subnet') + + return argument_value + +validators_manager.ValidatorsManager.RegisterValidator(SubnetValidator) diff --git a/dftimewolf/lib/validators/url.py b/dftimewolf/lib/validators/url.py new file mode 100644 index 000000000..612426a81 --- /dev/null +++ b/dftimewolf/lib/validators/url.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +"""Validator for URLs.""" +from urllib.parse import urlparse + +from dftimewolf.lib import errors, resources, args_validator +from dftimewolf.lib.validators import manager as validators_manager + + +class URLValidator(args_validator.CommaSeparatedValidator): + """Validates a URL.""" + + NAME = "url" + + def ValidateSingle(self, + argument_value: str, + recipe_argument: resources.RecipeArgument) -> str: + """Validates a URL. + + Args: + argument_value: The URL to validate. + recipe_argument: The definition of the argument. + + + Returns: + A valid URL string. + + Raises: + errors.RecipeArgsValidationFailure: If the argument is not a valid URL. + """ + url = urlparse(argument_value) + if not all([url.scheme, url.netloc]): + raise errors.RecipeArgsValidationFailure( + recipe_argument.switch, + argument_value, + self.NAME, + 'Not a valid URL') + + return argument_value + +validators_manager.ValidatorsManager.RegisterValidator(URLValidator) diff --git a/tests/cli/main_tool.py b/tests/cli/main_tool.py index 3041b98af..2976127e1 100644 --- a/tests/cli/main_tool.py +++ b/tests/cli/main_tool.py @@ -8,7 +8,12 @@ from dftimewolf.cli import dftimewolf_recipes from dftimewolf.lib import state as dftw_state -from dftimewolf.lib import resources, errors, args_validator +from dftimewolf.lib import resources, errors +from dftimewolf.lib.validators import manager as validators_manager + +# The following import makes sure validators are registered. +from dftimewolf.lib import validators # pylint: disable=unused-import + from dftimewolf import config # This test recipe requires two args: Anything for arg1, and the word 'Second' @@ -99,7 +104,7 @@ def testRecipeValidators(self): if arg.validation_params: self.assertIn( arg.validation_params['format'], - args_validator.ValidatorsManager.ListValidators(), + validators_manager.ValidatorsManager.ListValidators(), f'Error in {recipe.name}:{arg.switch} - ' f'Invalid validator {arg.validation_params["format"]}.') diff --git a/tests/lib/args_validator.py b/tests/lib/args_validator.py index 35aab7716..518e1ff98 100644 --- a/tests/lib/args_validator.py +++ b/tests/lib/args_validator.py @@ -83,496 +83,6 @@ def FailingValidateSingle(argument_value, _): with self.assertRaises(errors.RecipeArgsValidationFailure): validator.Validate('one,two,three', argument_definition) -# pylint: enable=abstract-class-instantiated -# pytype: enable=not-instantiable - - -class AWSRegionValidatorTest(unittest.TestCase): - """Tests AWSRegionValidator.""" - - def setUp(self): - """Setup.""" - self.validator = args_validator.AWSRegionValidator() - self.recipe_argument = resources.RecipeArgument() - self.recipe_argument.switch = 'testawsregion' - - def testInit(self): - """Tests initialisation.""" - self.assertEqual(self.validator.NAME, 'aws_region') - - def testValidateSuccess(self): - """Test that correct values do not throw an exception.""" - regions = ['ap-southeast-2', 'us-east-1', 'me-central-1'] - - for region in regions: - val = self.validator.Validate(region, self.recipe_argument) - self.assertEqual(val, region) - - def testValidateFailure(self): - """Tests invalid values correctly throw an exception.""" - regions = ['invalid', '123456'] - - for r in regions: - with self.assertRaisesRegex( - errors.RecipeArgsValidationFailure, - 'Invalid AWS Region name'): - self.validator.Validate(r, self.recipe_argument) - - -class AzureRegionValidatorTest(unittest.TestCase): - """Tests AzureRegionValidator.""" - - def setUp(self): - """Setup.""" - self.validator = args_validator.AzureRegionValidator() - self.recipe_argument = resources.RecipeArgument() - self.recipe_argument.switch = 'testazureregion' - - def testInit(self): - """Tests initialisation.""" - self.assertEqual(self.validator.NAME, 'azure_region') - - def testValidateSuccess(self): - """Test that correct values do not throw an exception.""" - regions = ['eastasia', 'norwaywest', 'westindia'] - - for region in regions: - val = self.validator.Validate(region, self.recipe_argument) - self.assertEqual(val, region) - - def testValidateFailure(self): - """Tests invalid values correctly throw an exception.""" - regions = ['invalid', '123456'] - - for region in regions: - with self.assertRaisesRegex( - errors.RecipeArgsValidationFailure, - 'Invalid Azure Region name'): - self.validator.Validate(region, self.recipe_argument) - - -class GCPZoneValidatorTest(unittest.TestCase): - """Tests GCPZoneValidator.""" - - def setUp(self): - """Setup.""" - self.validator = args_validator.GCPZoneValidator() - self.recipe_argument = resources.RecipeArgument() - self.recipe_argument.switch = 'testgcpzone' - - def testInit(self): - """Tests initialisation.""" - self.assertEqual(self.validator.NAME, 'gcp_zone') - - def testValidateSuccess(self): - """Test that correct values do not throw an exception.""" - zones = ['asia-east1-a', 'europe-west2-a', 'us-central1-f'] - - for zone in zones: - val = self.validator.Validate(zone, self.recipe_argument) - self.assertEqual(val, zone) - - def testValidateFailure(self): - """Tests invalid values correctly throw an exception.""" - zones = ['nope', '123456'] - - for zone in zones: - with self.assertRaisesRegex( - errors.RecipeArgsValidationFailure, 'Invalid GCP Zone name'): - self.validator.Validate(zone, self.recipe_argument) - - -class RegexValidatorTest(unittest.TestCase): - """Tests RegexValidator.""" - - def setUp(self): - """Setup.""" - self.validator = args_validator.RegexValidator() - self.recipe_argument = resources.RecipeArgument() - self.recipe_argument.switch = 'testregex' - self.recipe_argument.validation_params = {'comma_separated': True} - - def testInit(self): - """Tests initialisation.""" - self.assertEqual(self.validator.NAME, 'regex') - - def testValidateSuccess(self): - """Test that correct values do not throw an exception.""" - values = ['abcdef', 'bcdefg', 'abcdef,bcdefg'] - self.recipe_argument.validation_params['regex'] = '.?bcdef.?' - for value in values: - valid_value = self.validator.Validate(value, self.recipe_argument) - self.assertEqual(valid_value, value) - - def testValidateFailure(self): - """Test Regex test failure.""" - self.recipe_argument.validation_params['regex'] = '.?bcdef.?' - - with self.assertRaisesRegex( - errors.RecipeArgsValidationFailure, - "does not match regex /.\?bcdef.\?"): # pylint: disable=anomalous-backslash-in-string - self.validator.Validate('tuvwxy', self.recipe_argument) - - def testRequiredParam(self): - """Tests an error is thrown is the regex param is missing.""" - self.recipe_argument.validation_params['regex'] = None - with self.assertRaisesRegex( - errors.RecipeArgsValidatorError, - 'Missing validator parameter: regex'): - self.validator.Validate('tuvwxy', self.recipe_argument) - - -class SubnetValidatorTest(unittest.TestCase): - """Tests SubnetValidator.""" - - def setUp(self): - """Setup.""" - self.validator = args_validator.SubnetValidator() - self.recipe_argument = resources.RecipeArgument() - self.recipe_argument.switch = 'testsubnet' - self.recipe_argument.validation_params = {'comma_separated': True} - - - def testInit(self): - """Tests initialisation.""" - self.assertEqual(self.validator.NAME, 'subnet') - - def testValidateSuccess(self): - """Test that correct values do not throw an exception.""" - values = ['1.2.3.4/32','192.168.0.0/24','1.2.3.4/32,192.168.0.0/24'] - for value in values: - valid_value = self.validator.Validate(value, self.recipe_argument) - self.assertEqual(valid_value, value) - - def testValidateFailure(self): - """Test Subnet test failure.""" - values = ['1.2.3.4/33', '267.0.0.1/32', 'text'] - - for value in values: - with self.assertRaisesRegex( - errors.RecipeArgsValidationFailure, - 'Not a valid subnet'): - self.validator.Validate(value, self.recipe_argument) - - -class DatetimeValidatorTest(unittest.TestCase): - """Tests the DatetimeValidator class.""" - - FORMAT_STRING = '%Y-%m-%d %H:%M:%S' - - def setUp(self): - """Setup.""" - self.validator = args_validator.DatetimeValidator() - self.recipe_argument = resources.RecipeArgument() - self.recipe_argument.switch = 'testdatetime' - - def testInit(self): - """Tests initialisation.""" - self.assertEqual(self.validator.NAME, 'datetime') - - def testRequiredParam(self): - """Tests an error is thrown if format_string is missing.""" - with self.assertRaisesRegex( - errors.RecipeArgsValidatorError, - 'Missing validator parameter: format_string'): - self.validator.Validate('value', self.recipe_argument) - - def testValidateSuccess(self): - """Tests a successful validation.""" - self.recipe_argument.validation_params['format_string'] = self.FORMAT_STRING - date = '2023-12-31 23:29:59' - val = self.validator.Validate(date, self.recipe_argument) - self.assertEqual(val, date) - - def testValidateSuccessWithOrder(self): - """Tests validation success with order parameters.""" - first = '2023-01-01 00:00:00' - second = '2023-01-02 00:00:00' - third = '2023-01-03 00:00:00' - fourth = '2023-01-04 00:00:00' - fifth = '2023-01-05 00:00:00' - - self.recipe_argument.validation_params['format_string'] = self.FORMAT_STRING - self.recipe_argument.validation_params['before'] = fourth - self.recipe_argument.validation_params['after'] = second - - val = self.validator.Validate(third, self.recipe_argument) - self.assertEqual(val, third) - - with self.assertRaisesRegex( - errors.RecipeArgsValidationFailure, - f'{first} is before {second} but it should be the other way around'): - self.validator.Validate(first, self.recipe_argument) - - with self.assertRaisesRegex( - errors.RecipeArgsValidationFailure, - f'{fourth} is before {fifth} but it should be the other way around'): - self.validator.Validate(fifth, self.recipe_argument) - - def testValidateFailureInvalidFormat(self): - """Tests invalid date formats correctly fail.""" - values = ['value', '2023-12-31', '2023-31-12 23:29:59'] - self.recipe_argument.validation_params['format_string'] = self.FORMAT_STRING - for value in values: - with self.assertRaisesRegex( - errors.RecipeArgsValidationFailure, - f'does not match format {self.FORMAT_STRING}'): - self.validator.Validate(value, self.recipe_argument) - - # pylint: disable=protected-access - def testValidateOrder(self): - """Tests the _ValidateOrder method.""" - first = '2023-01-01 00:00:00' - second = '2023-01-02 00:00:00' - - # Correct order passes - val = self.validator._ValidateOrder(first, second, self.FORMAT_STRING) - self.assertTrue(val) - - # Reverse order fails - val = self.validator._ValidateOrder(second, first, self.FORMAT_STRING) - self.assertFalse(val) - - -class HostnameValidatorTest(unittest.TestCase): - """Tests the HostnameValidator class.""" - - def setUp(self): - """Setup.""" - self.validator = args_validator.HostnameValidator() - self.recipe_argument = resources.RecipeArgument() - self.recipe_argument.switch = 'testhostname' - - def testInit(self): - """Tests initialization.""" - self.assertEqual(self.validator.NAME, 'hostname') - - def testValidateSuccess(self): - """Test successful validation.""" - fqdns = [ - 'github.com', - 'grr-client-ubuntu.c.ramoj-playground.internal', - 'www.google.com.au', - 'www.google.co.uk', - 'localhost', - 'grr-server' - ] - for fqdn in fqdns: - val = self.validator.Validate(fqdn, self.recipe_argument) - self.assertTrue(val) - - self.recipe_argument.validation_params['comma_separated'] = True - val = self.validator.Validate(','.join(fqdns), self.recipe_argument) - self.assertTrue(val) - - def testValidationFailure(self): - """Tests validation failures.""" - fqdns = ['a-.com', '-a.com'] - for fqdn in fqdns: - with self.assertRaisesRegex(errors.RecipeArgsValidationFailure, - 'Not a valid hostname'): - self.validator.Validate(fqdn, self.recipe_argument) - - self.recipe_argument.validation_params['comma_separated'] = True - with self.assertRaisesRegex( - errors.RecipeArgsValidationFailure, - 'Not a valid hostname'): - self.validator.Validate(','.join(fqdns), self.recipe_argument) - - - def testValidationFailureWithFQDNOnly(self): - """tests validation fails for flat names when FQDN_ONLY is set.""" - fqdns = ['localhost', 'grr-server'] - self.recipe_argument.validation_params['comma_separated'] = False - self.recipe_argument.validation_params['fqdn_only'] = True - for fqdn in fqdns: - with self.assertRaisesRegex( - errors.RecipeArgsValidationFailure, - 'Not a valid hostname'): - self.validator.Validate(fqdn, self.recipe_argument) - - self.recipe_argument.validation_params['comma_separated'] = True - with self.assertRaisesRegex( - errors.RecipeArgsValidationFailure, - 'Not a valid hostname'): - self.validator.Validate(','.join(fqdns), self.recipe_argument) - - -class GRRHostValidatorTest(unittest.TestCase): - """Tests the GRRHostValidator class.""" - - def setUp(self): - """Setup.""" - self.validator = args_validator.GRRHostValidator() - self.recipe_argument = resources.RecipeArgument() - self.recipe_argument.switch = 'testgrrhost' - - def testInit(self): - """Tests initialization.""" - self.assertEqual(self.validator.NAME, 'grr_host') - - def testValidateSuccess(self): - """Test successful validation.""" - client_ids = ['C.1facf5562db006ad', - 'grr-client-ubuntu.c.ramoj-playground.internal', - 'grr-client'] - for client_id in client_ids: - val = self.validator.Validate(client_id, self.recipe_argument) - self.assertEqual(val, client_id) - - self.recipe_argument.validation_params['comma_separated'] = True - val = self.validator.Validate(','.join(client_ids), self.recipe_argument) - self.assertEqual(val, ','.join(client_ids)) - - def testValidationFailure(self): - """Tests validation failures.""" - fqdns = ['a-.com', 'C.a', 'C.01234567890123456789'] - for fqdn in fqdns: - with self.assertRaisesRegex( - errors.RecipeArgsValidationFailure, - 'Not a GRR host identifier'): - self.validator.Validate(fqdn, self.recipe_argument) - - self.recipe_argument.validation_params['comma_separated'] = True - with self.assertRaisesRegex( - errors.RecipeArgsValidationFailure, - 'Not a GRR host identifier'): - self.validator.Validate(','.join(fqdns), self.recipe_argument) - - -class URLValidatorTest(unittest.TestCase): - """Tests the URLValidator class.""" - - def setUp(self): - """Setup.""" - self.validator = args_validator.URLValidator() - self.recipe_argument = resources.RecipeArgument() - self.recipe_argument.switch = 'testurl' - - def testInit(self): - """Tests initialization.""" - self.assertEqual(self.validator.NAME, 'url') - - def testValidateSuccess(self): - """Test successful validation.""" - fqdns = [ - 'http://10.100.0.100:8080', - 'http://10.100.0.100', - 'https://10.100.0.100', - 'http://localhost:8080', - 'http://grr-server:8080', - 'http://grr.ramoj-playground.internal:8080', - 'http://grr.ramoj-playground.internal', - 'https://grr.ramoj-playground.internal', - ] - for fqdn in fqdns: - val = self.validator.Validate(fqdn, self.recipe_argument) - self.assertTrue(val, f'{fqdn} failed validation') - - self.recipe_argument.validation_params['comma_separated'] = True - val = self.validator.Validate(','.join(fqdns), self.recipe_argument) - self.assertTrue(val) - - def testValidationFailure(self): - """Tests validation failures.""" - fqdns = [ - 'value', - '10.100.0.100', # Needs scheme - ] - for fqdn in fqdns: - with self.assertRaisesRegex( - errors.RecipeArgsValidationFailure, - "Not a valid URL"): - self.validator.Validate(fqdn, self.recipe_argument) - - self.recipe_argument.validation_params['comma_separated'] = True - with self.assertRaisesRegex( - errors.RecipeArgsValidationFailure, "Error: Not a valid URL"): - self.validator.Validate(','.join(fqdns), self.recipe_argument) - - -class _TestValidator(args_validator.AbstractValidator): - """Validator class for unit tests.""" - NAME = 'test' - - def Validate(self, argument_value, recipe_argument): - return argument_value - -class _TestValidator2(args_validator.AbstractValidator): - """Validator class for unit tests.""" - NAME = 'test2' - - def Validate(self, argument_value, recipe_argument): - return argument_value - - -# Tests for the ValidatorsManager class. -# pylint: disable=protected-access -class ValidatorsManagerTest(unittest.TestCase): - """Tests for the validators manager.""" - - # pylint: disable=protected-access - def testRegistration(self): - """Tests the RegisterValidator and DeregisterValidator functions.""" - number_of_validator_classes = len( - args_validator.ValidatorsManager._validator_classes) - - args_validator.ValidatorsManager.RegisterValidator(_TestValidator) - self.assertEqual( - len(args_validator.ValidatorsManager._validator_classes), - number_of_validator_classes + 1) - - args_validator.ValidatorsManager.DeregisterValidator(_TestValidator) - self.assertEqual( - len(args_validator.ValidatorsManager._validator_classes), - number_of_validator_classes) - - - def testRegisterValidators(self): - """Tests the RegisterValidators function.""" - number_of_validator_classes = len( - args_validator.ValidatorsManager._validator_classes) - - args_validator.ValidatorsManager.RegisterValidators( - [_TestValidator, _TestValidator2]) - self.assertEqual( - len(args_validator.ValidatorsManager._validator_classes), - number_of_validator_classes + 2) - - args_validator.ValidatorsManager.DeregisterValidator(_TestValidator) - args_validator.ValidatorsManager.DeregisterValidator(_TestValidator2) - - self.assertEqual( - number_of_validator_classes, - len(args_validator.ValidatorsManager._validator_classes)) - - - def testValidate(self): - """Tests the Validate function.""" - recipe_argument = resources.RecipeArgument() - recipe_argument.validation_params = {'format': 'test'} - - args_validator.ValidatorsManager.RegisterValidator(_TestValidator) - - validation_result = args_validator.ValidatorsManager.Validate( - 'test', recipe_argument) - self.assertEqual(validation_result, 'test') - - recipe_argument.validation_params['format'] = 'does_not_exist' - with self.assertRaisesRegex( - errors.RecipeArgsValidatorError, 'not a registered validator'): - args_validator.ValidatorsManager.Validate('test', recipe_argument) - - def testListValidators(self): - """Tests the ListValidators function.""" - registered_validators = args_validator.ValidatorsManager.ListValidators() - self.assertNotIn(_TestValidator.NAME, registered_validators) - args_validator.ValidatorsManager.RegisterValidator(_TestValidator) - - - registered_validators = args_validator.ValidatorsManager.ListValidators() - self.assertIn(_TestValidator.NAME, registered_validators) - args_validator.ValidatorsManager.DeregisterValidator(_TestValidator) if __name__ == '__main__': unittest.main() diff --git a/tests/lib/validators/__init__.py b/tests/lib/validators/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/validators/aws_region.py b/tests/lib/validators/aws_region.py new file mode 100644 index 000000000..fce858c02 --- /dev/null +++ b/tests/lib/validators/aws_region.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +"""Tests for the AWS region validator.""" + +import unittest + +from dftimewolf.lib import errors, resources +from dftimewolf.lib.validators import aws_region + +class AWSRegionValidatorTest(unittest.TestCase): + """Tests AWSRegionValidator.""" + + def setUp(self): + """Setup.""" + self.validator = aws_region.AWSRegionValidator() + self.recipe_argument = resources.RecipeArgument() + self.recipe_argument.switch = 'testawsregion' + + def testInit(self): + """Tests initialisation.""" + self.assertEqual(self.validator.NAME, 'aws_region') + + def testValidateSuccess(self): + """Test that correct values do not throw an exception.""" + regions = ['ap-southeast-2', 'us-east-1', 'me-central-1'] + + for region in regions: + val = self.validator.Validate(region, self.recipe_argument) + self.assertEqual(val, region) + + def testValidateFailure(self): + """Tests invalid values correctly throw an exception.""" + regions = ['invalid', '123456'] + + for r in regions: + with self.assertRaisesRegex( + errors.RecipeArgsValidationFailure, + 'Invalid AWS Region name'): + self.validator.Validate(r, self.recipe_argument) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/lib/validators/azure_region.py b/tests/lib/validators/azure_region.py new file mode 100644 index 000000000..f0e6ec737 --- /dev/null +++ b/tests/lib/validators/azure_region.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +"""Tests for the Azure region validator.""" + +import unittest + +from dftimewolf.lib import errors, resources +from dftimewolf.lib.validators import azure_region + + +class AzureRegionValidatorTest(unittest.TestCase): + """Tests AzureRegionValidator.""" + + def setUp(self): + """Setup.""" + self.validator = azure_region.AzureRegionValidator() + self.recipe_argument = resources.RecipeArgument() + self.recipe_argument.switch = 'testazureregion' + + def testInit(self): + """Tests initialisation.""" + self.assertEqual(self.validator.NAME, 'azure_region') + + def testValidateSuccess(self): + """Test that correct values do not throw an exception.""" + regions = ['eastasia', 'norwaywest', 'westindia'] + + for region in regions: + val = self.validator.Validate(region, self.recipe_argument) + self.assertEqual(val, region) + + def testValidateFailure(self): + """Tests invalid values correctly throw an exception.""" + regions = ['invalid', '123456'] + + for region in regions: + with self.assertRaisesRegex( + errors.RecipeArgsValidationFailure, + 'Invalid Azure Region name'): + self.validator.Validate(region, self.recipe_argument) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/lib/validators/datetime.py b/tests/lib/validators/datetime.py new file mode 100644 index 000000000..04a61a859 --- /dev/null +++ b/tests/lib/validators/datetime.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +"""Tests for the datetime validator.""" + +import unittest + +from dftimewolf.lib import errors, resources +from dftimewolf.lib.validators import datetime_validator + + +class DatetimeValidatorTest(unittest.TestCase): + """Tests the DatetimeValidator class.""" + + FORMAT_STRING = '%Y-%m-%d %H:%M:%S' + + def setUp(self): + """Setup.""" + self.validator = datetime_validator.DatetimeValidator() + self.recipe_argument = resources.RecipeArgument() + self.recipe_argument.switch = 'testdatetime' + + def testInit(self): + """Tests initialisation.""" + self.assertEqual(self.validator.NAME, 'datetime') + + def testRequiredParam(self): + """Tests an error is thrown if format_string is missing.""" + with self.assertRaisesRegex( + errors.RecipeArgsValidatorError, + 'Missing validator parameter: format_string'): + self.validator.Validate('value', self.recipe_argument) + + def testValidateSuccess(self): + """Tests a successful validation.""" + self.recipe_argument.validation_params['format_string'] = self.FORMAT_STRING + date = '2023-12-31 23:29:59' + val = self.validator.Validate(date, self.recipe_argument) + self.assertEqual(val, date) + + def testValidateSuccessWithOrder(self): + """Tests validation success with order parameters.""" + first = '2023-01-01 00:00:00' + second = '2023-01-02 00:00:00' + third = '2023-01-03 00:00:00' + fourth = '2023-01-04 00:00:00' + fifth = '2023-01-05 00:00:00' + + self.recipe_argument.validation_params['format_string'] = self.FORMAT_STRING + self.recipe_argument.validation_params['before'] = fourth + self.recipe_argument.validation_params['after'] = second + + val = self.validator.Validate(third, self.recipe_argument) + self.assertEqual(val, third) + + with self.assertRaisesRegex( + errors.RecipeArgsValidationFailure, + f'{first} is before {second} but it should be the other way around'): + self.validator.Validate(first, self.recipe_argument) + + with self.assertRaisesRegex( + errors.RecipeArgsValidationFailure, + f'{fourth} is before {fifth} but it should be the other way around'): + self.validator.Validate(fifth, self.recipe_argument) + + def testValidateFailureInvalidFormat(self): + """Tests invalid date formats correctly fail.""" + values = ['value', '2023-12-31', '2023-31-12 23:29:59'] + self.recipe_argument.validation_params['format_string'] = self.FORMAT_STRING + for value in values: + with self.assertRaisesRegex( + errors.RecipeArgsValidationFailure, + f'does not match format {self.FORMAT_STRING}'): + self.validator.Validate(value, self.recipe_argument) + + # pylint: disable=protected-access + def testValidateOrder(self): + """Tests the _ValidateOrder method.""" + first = '2023-01-01 00:00:00' + second = '2023-01-02 00:00:00' + + # Correct order passes + val = self.validator._ValidateOrder(first, second, self.FORMAT_STRING) + self.assertTrue(val) + + # Reverse order fails + val = self.validator._ValidateOrder(second, first, self.FORMAT_STRING) + self.assertFalse(val) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/lib/validators/gcp_zone.py b/tests/lib/validators/gcp_zone.py new file mode 100644 index 000000000..e192f9863 --- /dev/null +++ b/tests/lib/validators/gcp_zone.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +"""Tests for the AWS region validator.""" + +import unittest + +from dftimewolf.lib import errors, resources +from dftimewolf.lib.validators import gcp_zone + + +class GCPZoneValidatorTest(unittest.TestCase): + """Tests GCPZoneValidator.""" + + def setUp(self): + """Setup.""" + self.validator = gcp_zone.GCPZoneValidator() + self.recipe_argument = resources.RecipeArgument() + self.recipe_argument.switch = 'testgcpzone' + + def testInit(self): + """Tests initialisation.""" + self.assertEqual(self.validator.NAME, 'gcp_zone') + + def testValidateSuccess(self): + """Test that correct values do not throw an exception.""" + zones = ['asia-east1-a', 'europe-west2-a', 'us-central1-f'] + + for zone in zones: + val = self.validator.Validate(zone, self.recipe_argument) + self.assertEqual(val, zone) + + def testValidateFailure(self): + """Tests invalid values correctly throw an exception.""" + zones = ['nope', '123456'] + + for zone in zones: + with self.assertRaisesRegex( + errors.RecipeArgsValidationFailure, 'Invalid GCP Zone name'): + self.validator.Validate(zone, self.recipe_argument) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/lib/validators/grr_host.py b/tests/lib/validators/grr_host.py new file mode 100644 index 000000000..861361df5 --- /dev/null +++ b/tests/lib/validators/grr_host.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +"""Tests for the GRR host validator.""" + +import unittest + +from dftimewolf.lib import errors, resources +from dftimewolf.lib.validators import grr_host + + +class GRRHostValidatorTest(unittest.TestCase): + """Tests the GRRHostValidator class.""" + + def setUp(self): + """Setup.""" + self.validator = grr_host.GRRHostValidator() + self.recipe_argument = resources.RecipeArgument() + self.recipe_argument.switch = 'testgrrhost' + + def testInit(self): + """Tests initialization.""" + self.assertEqual(self.validator.NAME, 'grr_host') + + def testValidateSuccess(self): + """Test successful validation.""" + client_ids = ['C.1facf5562db006ad', + 'grr-client-ubuntu.c.ramoj-playground.internal', + 'grr-client'] + for client_id in client_ids: + val = self.validator.Validate(client_id, self.recipe_argument) + self.assertEqual(val, client_id) + + self.recipe_argument.validation_params['comma_separated'] = True + val = self.validator.Validate(','.join(client_ids), self.recipe_argument) + self.assertEqual(val, ','.join(client_ids)) + + def testValidationFailure(self): + """Tests validation failures.""" + fqdns = ['a-.com', 'C.a', 'C.01234567890123456789'] + for fqdn in fqdns: + with self.assertRaisesRegex( + errors.RecipeArgsValidationFailure, + 'Not a GRR host identifier'): + self.validator.Validate(fqdn, self.recipe_argument) + + self.recipe_argument.validation_params['comma_separated'] = True + with self.assertRaisesRegex( + errors.RecipeArgsValidationFailure, + 'Not a GRR host identifier'): + self.validator.Validate(','.join(fqdns), self.recipe_argument) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/lib/validators/hostname.py b/tests/lib/validators/hostname.py new file mode 100644 index 000000000..4fee9309f --- /dev/null +++ b/tests/lib/validators/hostname.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +"""Tests for the hostname validator.""" + +import unittest + +from dftimewolf.lib import errors, resources +from dftimewolf.lib.validators import hostname + + +class HostnameValidatorTest(unittest.TestCase): + """Tests the HostnameValidator class.""" + + def setUp(self): + """Setup.""" + self.validator = hostname.HostnameValidator() + self.recipe_argument = resources.RecipeArgument() + self.recipe_argument.switch = 'testhostname' + + def testInit(self): + """Tests initialization.""" + self.assertEqual(self.validator.NAME, 'hostname') + + def testValidateSuccess(self): + """Test successful validation.""" + fqdns = [ + 'github.com', + 'grr-client-ubuntu.c.ramoj-playground.internal', + 'www.google.com.au', + 'www.google.co.uk', + 'localhost', + 'grr-server' + ] + for fqdn in fqdns: + val = self.validator.Validate(fqdn, self.recipe_argument) + self.assertTrue(val) + + self.recipe_argument.validation_params['comma_separated'] = True + val = self.validator.Validate(','.join(fqdns), self.recipe_argument) + self.assertTrue(val) + + def testValidationFailure(self): + """Tests validation failures.""" + fqdns = ['a-.com', '-a.com'] + for fqdn in fqdns: + with self.assertRaisesRegex(errors.RecipeArgsValidationFailure, + 'Not a valid hostname'): + self.validator.Validate(fqdn, self.recipe_argument) + + self.recipe_argument.validation_params['comma_separated'] = True + with self.assertRaisesRegex( + errors.RecipeArgsValidationFailure, + 'Not a valid hostname'): + self.validator.Validate(','.join(fqdns), self.recipe_argument) + + + def testValidationFailureWithFQDNOnly(self): + """tests validation fails for flat names when FQDN_ONLY is set.""" + fqdns = ['localhost', 'grr-server'] + self.recipe_argument.validation_params['comma_separated'] = False + self.recipe_argument.validation_params['fqdn_only'] = True + for fqdn in fqdns: + with self.assertRaisesRegex( + errors.RecipeArgsValidationFailure, + 'Not a valid hostname'): + self.validator.Validate(fqdn, self.recipe_argument) + + self.recipe_argument.validation_params['comma_separated'] = True + with self.assertRaisesRegex( + errors.RecipeArgsValidationFailure, + 'Not a valid hostname'): + self.validator.Validate(','.join(fqdns), self.recipe_argument) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/lib/validators/manager.py b/tests/lib/validators/manager.py new file mode 100644 index 000000000..c87f303e5 --- /dev/null +++ b/tests/lib/validators/manager.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +"""Tests for the validator manager.""" + +import unittest + +from dftimewolf.lib import args_validator, errors, resources +from dftimewolf.lib.validators import manager + + +class _TestValidator(args_validator.AbstractValidator): + """Validator class for unit tests.""" + NAME = 'test' + + def Validate(self, argument_value, recipe_argument): + return argument_value + + +class _TestValidator2(args_validator.AbstractValidator): + """Validator class for unit tests.""" + NAME = 'test2' + + def Validate(self, argument_value, recipe_argument): + return argument_value + + +class ValidatorsManagerTest(unittest.TestCase): + """Tests for the validators manager.""" + + # pylint: disable=protected-access + def testRegistration(self): + """Tests the RegisterValidator and DeregisterValidator functions.""" + number_of_validator_classes = len( + manager.ValidatorsManager._validator_classes) + + manager.ValidatorsManager.RegisterValidator(_TestValidator) + self.assertEqual( + len(manager.ValidatorsManager._validator_classes), + number_of_validator_classes + 1) + + manager.ValidatorsManager.DeregisterValidator(_TestValidator) + self.assertEqual( + len(manager.ValidatorsManager._validator_classes), + number_of_validator_classes) + + def testRegisterValidators(self): + """Tests the RegisterValidators function.""" + number_of_validator_classes = len( + manager.ValidatorsManager._validator_classes) + + manager.ValidatorsManager.RegisterValidators( + [_TestValidator, _TestValidator2]) + self.assertEqual( + len(manager.ValidatorsManager._validator_classes), + number_of_validator_classes + 2) + + manager.ValidatorsManager.DeregisterValidator(_TestValidator) + manager.ValidatorsManager.DeregisterValidator(_TestValidator2) + + self.assertEqual( + number_of_validator_classes, + len(manager.ValidatorsManager._validator_classes)) + + def testValidate(self): + """Tests the Validate function.""" + recipe_argument = resources.RecipeArgument() + recipe_argument.validation_params = {'format': 'test'} + + manager.ValidatorsManager.RegisterValidator(_TestValidator) + + validation_result = manager.ValidatorsManager.Validate( + 'test', recipe_argument) + self.assertEqual(validation_result, 'test') + + recipe_argument.validation_params['format'] = 'does_not_exist' + with self.assertRaisesRegex( + errors.RecipeArgsValidatorError, 'not a registered validator'): + manager.ValidatorsManager.Validate('test', recipe_argument) + + def testListValidators(self): + """Tests the ListValidators function.""" + registered_validators = manager.ValidatorsManager.ListValidators() + self.assertNotIn(_TestValidator.NAME, registered_validators) + manager.ValidatorsManager.RegisterValidator(_TestValidator) + + registered_validators = manager.ValidatorsManager.ListValidators() + self.assertIn(_TestValidator.NAME, registered_validators) + manager.ValidatorsManager.DeregisterValidator(_TestValidator) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/lib/validators/regex.py b/tests/lib/validators/regex.py new file mode 100644 index 000000000..1b5c937c8 --- /dev/null +++ b/tests/lib/validators/regex.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +"""Tests for the regex validator.""" + +import unittest + +from dftimewolf.lib import errors, resources +from dftimewolf.lib.validators import regex + + +class RegexValidatorTest(unittest.TestCase): + """Tests RegexValidator.""" + + def setUp(self): + """Setup.""" + self.validator = regex.RegexValidator() + self.recipe_argument = resources.RecipeArgument() + self.recipe_argument.switch = 'testregex' + self.recipe_argument.validation_params = {'comma_separated': True} + + def testInit(self): + """Tests initialisation.""" + self.assertEqual(self.validator.NAME, 'regex') + + def testValidateSuccess(self): + """Test that correct values do not throw an exception.""" + values = ['abcdef', 'bcdefg', 'abcdef,bcdefg'] + self.recipe_argument.validation_params['regex'] = '.?bcdef.?' + for value in values: + valid_value = self.validator.Validate(value, self.recipe_argument) + self.assertEqual(valid_value, value) + + def testValidateFailure(self): + """Test Regex test failure.""" + self.recipe_argument.validation_params['regex'] = '.?bcdef.?' + + with self.assertRaisesRegex( + errors.RecipeArgsValidationFailure, + "does not match regex /.\?bcdef.\?"): # pylint: disable=anomalous-backslash-in-string + self.validator.Validate('tuvwxy', self.recipe_argument) + + def testRequiredParam(self): + """Tests an error is thrown is the regex param is missing.""" + self.recipe_argument.validation_params['regex'] = None + with self.assertRaisesRegex( + errors.RecipeArgsValidatorError, + 'Missing validator parameter: regex'): + self.validator.Validate('tuvwxy', self.recipe_argument) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/lib/validators/subnet.py b/tests/lib/validators/subnet.py new file mode 100644 index 000000000..a0bfc4cdd --- /dev/null +++ b/tests/lib/validators/subnet.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +"""Tests for the subnet validator.""" + +import unittest + +from dftimewolf.lib import errors, resources +from dftimewolf.lib.validators import subnet + + +class SubnetValidatorTest(unittest.TestCase): + """Tests SubnetValidator.""" + + def setUp(self): + """Setup.""" + self.validator = subnet.SubnetValidator() + self.recipe_argument = resources.RecipeArgument() + self.recipe_argument.switch = 'testsubnet' + self.recipe_argument.validation_params = {'comma_separated': True} + + + def testInit(self): + """Tests initialisation.""" + self.assertEqual(self.validator.NAME, 'subnet') + + def testValidateSuccess(self): + """Test that correct values do not throw an exception.""" + values = ['1.2.3.4/32','192.168.0.0/24','1.2.3.4/32,192.168.0.0/24'] + for value in values: + valid_value = self.validator.Validate(value, self.recipe_argument) + self.assertEqual(valid_value, value) + + def testValidateFailure(self): + """Test Subnet test failure.""" + values = ['1.2.3.4/33', '267.0.0.1/32', 'text'] + + for value in values: + with self.assertRaisesRegex( + errors.RecipeArgsValidationFailure, + 'Not a valid subnet'): + self.validator.Validate(value, self.recipe_argument) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/lib/validators/url.py b/tests/lib/validators/url.py new file mode 100644 index 000000000..5e8bf49ad --- /dev/null +++ b/tests/lib/validators/url.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +"""Tests for the URL validator.""" + +import unittest + +from dftimewolf.lib import errors, resources +from dftimewolf.lib.validators import url + + +class URLValidatorTest(unittest.TestCase): + """Tests the URLValidator class.""" + + def setUp(self): + """Setup.""" + self.validator = url.URLValidator() + self.recipe_argument = resources.RecipeArgument() + self.recipe_argument.switch = 'testurl' + + def testInit(self): + """Tests initialization.""" + self.assertEqual(self.validator.NAME, 'url') + + def testValidateSuccess(self): + """Test successful validation.""" + fqdns = [ + 'http://10.100.0.100:8080', + 'http://10.100.0.100', + 'https://10.100.0.100', + 'http://localhost:8080', + 'http://grr-server:8080', + 'http://grr.ramoj-playground.internal:8080', + 'http://grr.ramoj-playground.internal', + 'https://grr.ramoj-playground.internal', + ] + for fqdn in fqdns: + val = self.validator.Validate(fqdn, self.recipe_argument) + self.assertTrue(val, f'{fqdn} failed validation') + + self.recipe_argument.validation_params['comma_separated'] = True + val = self.validator.Validate(','.join(fqdns), self.recipe_argument) + self.assertTrue(val) + + def testValidationFailure(self): + """Tests validation failures.""" + fqdns = [ + 'value', + '10.100.0.100', # Needs scheme + ] + for fqdn in fqdns: + with self.assertRaisesRegex( + errors.RecipeArgsValidationFailure, + "Not a valid URL"): + self.validator.Validate(fqdn, self.recipe_argument) + + self.recipe_argument.validation_params['comma_separated'] = True + with self.assertRaisesRegex( + errors.RecipeArgsValidationFailure, "Error: Not a valid URL"): + self.validator.Validate(','.join(fqdns), self.recipe_argument) + +if __name__ == '__main__': + unittest.main()