diff --git a/fiddy/derivative_check.py b/fiddy/derivative_check.py index ac3be92..8fbb703 100644 --- a/fiddy/derivative_check.py +++ b/fiddy/derivative_check.py @@ -1,9 +1,11 @@ import abc +from typing import Any, Callable, Dict, List, Union +from itertools import chain from dataclasses import dataclass -from typing import Any, Dict, List import numpy as np import pandas as pd +import math from .constants import Type from .derivative import Derivative @@ -95,17 +97,20 @@ class NumpyIsCloseDerivativeCheck(DerivativeCheck): def method(self, *args, **kwargs): directional_derivative_check_results = [] - for direction_index, directional_derivative in enumerate( + expected_values, test_values = get_expected_and_test_values( self.derivative.directional_derivatives - ): - test_value = directional_derivative.value - - expected_value = [] - for output_index in np.ndindex(self.output_indices): - element = self.expectation[output_index][direction_index] - expected_value.append(element) - expected_value = np.array(expected_value).reshape(test_value.shape) + ) + for ( + direction_index, + directional_derivative, + expected_value, + test_value, + ) in enumerate(zip( + self.derivative.directional_derivatives, + expected_values, + test_values, + )): test_result = np.isclose( test_value, expected_value, @@ -137,3 +142,120 @@ def method(self, *args, **kwargs): success=success, ) return derivative_check_result + + +def get_expected_and_test_values(directional_derivatives): + expected_values = [] + test_values = [] + for direction_index, directional_derivative in enumerate( + directional_derivatives + ): + test_value = directional_derivative.value + test_values.append(test_value) + + expected_value = [] + for output_index in np.ndindex(self.output_indices): + element = self.expectation[output_index][direction_index] + expected_value.append(element) + expected_value = np.array(expected_value).reshape(test_value.shape) + expected_values.append(expected_value) + + return expected_values, test_values + + +class HybridDerivativeCheck(DerivativeCheck): + """HybridDerivativeCheck. + + The method checks, if gradients are in finite differences range [min, max], + using forward, backward and central finite differences for potential + multiple stepsizes eps. If true, gradients will be checked for each + parameter and assessed whether or not gradients are within acceptable + absolute tolerances. + .. math:: + \\frac{|\\mu - \\kappa|}{\\lambda} < \\epsilon + """ + + method_id = "hybrid" + + def method(self, *args, **kwargs): + success = True + expected_values, test_values = get_expected_and_test_values( + self.derivative.directional_derivatives + ) + + results_all = [] + directional_derivative_check_results = [] + for step_size in range(0, len(expected_values)): + approxs_for_param = [] + grads_for_param = [] + results = [] + for diff_index, directional_derivative in enumerate( + self.derivative.directional_derivatives + ): + try: + for grad, approx in zip( + expected_values[diff_index - 1][step_size - 1], + test_values[diff_index - 1][step_size - 1], + ): + approxs_for_param.append(approx) + grads_for_param.append(grad) + fd_range = np.percentile(approxs_for_param, [0, 100]) + fd_mean = np.mean(approxs_for_param) + grad_mean = np.mean(grads_for_param) + if not (fd_range[0] <= grad_mean <= fd_range[1]): + if np.any( + [ + abs(x - y) > kwargs["atol"] + for i, x in enumerate(approxs_for_param) + for j, y in enumerate(approxs_for_param) + if i != j + ] + ): + fd_range = abs(fd_range[1] - fd_range[0]) + if ( + abs(grad_mean - fd_mean) + / abs(fd_range + np.finfo(float).eps) + ) > kwargs["rtol"]: + results.append(False) + else: + results.append(True) + else: + results.append( + None + ) # can't judge consistency / questionable grad approxs + else: + fd_range = abs(fd_range[1] - fd_range[0]) + if not np.isfinite([fd_range, fd_mean]).all(): + results.append(None) + else: + result = True + results.append(result) + except (IndexError, TypeError) as err: + raise ValueError( + f"Unexpected error encountered (This should never happen!)" + ) from err + + directional_derivative_check_result = ( + DirectionalDerivativeCheckResult( + direction_id=directional_derivative.id, + method_id=self.method_id, + test=test_value, + expectation=expected_value, + output={"return": results}, + success=all(results), + ) + ) + directional_derivative_check_results.append( + directional_derivative_check_result + ) + results_all.append(results) + + success = all(chain(*results_all)) + derivative_check_result = DerivativeCheckResult( + method_id=self.method_id, + directional_derivative_check_results=directional_derivative_check_results, + test=self.derivative.value, + expectation=self.expectation, + success=success, + ) + return derivative_check_result diff --git a/fiddy/gradient_check.py b/fiddy/gradient_check.py new file mode 100644 index 0000000..1bf15c8 --- /dev/null +++ b/fiddy/gradient_check.py @@ -0,0 +1,282 @@ +# FIXME currently `simplify_results_df` and `keep_lowest_error` +# refactor to just one method +from dataclasses import dataclass +from functools import partial +from typing import Callable, Iterable, List, Tuple, Union + +import numpy as np +import pandas as pd + +from . import quotient +from .constants import ( + TYPE_DIMENSION, + TYPE_FUNCTION, + TYPE_POINT, + GradientCheckMethod, +) +from .step import dstep + + +DEFAULT_ATOL = float(np.finfo(np.float64).resolution) + + +def gradient_check( + function: TYPE_FUNCTION, + point: TYPE_POINT, + gradient: TYPE_FUNCTION, + sizes: Iterable[float] = None, + dimensions: Iterable[TYPE_DIMENSION] = None, + stop_at_success: bool = True, + # TODO or custom Callable + fd_gradient_method: GradientCheckMethod = None, + # atol: float = 1e-2, # TODO + # rtol: float = 1e-2, # TODO + check_protocol: List[Callable[[pd.DataFrame], None]] = None, + postprocessor_protocol: List[Callable[[pd.DataFrame], None]] = None, + sort: bool = True, +) -> Tuple[bool, pd.DataFrame]: + """Manage a gradient check. + + Args: + point: + The point about which to check the gradient. + function: + The function. + gradient: + A function to compute the expected gradient at a point. + sizes: + The sizes of the steps to take. + Defaults to `[1e-1, 1e-3, 1e-5, 1e-7, 1e-9]`. + dimensions: + The dimensions along which to step. + Defaults to all dimensions of the point. + stop_at_success: + Whether to stop gradient checks for a specific parameter + as soon as a tolerance is satisfied for its corresponding + gradient. + method: + The method by which to check the gradient. + atol: + The absolute tolerance. REMOVE? + rtol: + The relative tolerance. REMOVE? + check_protocol: + These methods are applied to the results, to perform the checks. + Defaults to `default_check_protocol`. + Methods in this protocol should set the `"success"` column to + `True` if the check passes, and put the reason for success in the + `"success_reason"` column. + postprocessor_protocol: + Similar to `check_protocol`, but applied after `check_protocol`. + sort: + Whether to sort the results. + + Returns: + (1) Whether the gradient check passed, and (2) full results, + for debugging incorrect gradients and further analysis. + """ + # Setup, default values + results: Iterable[Result] = [] + if dimensions is None: + dimensions = [index for index, _ in enumerate(point)] + if sizes is None: + sizes = [1e-1, 1e-3, 1e-5, 1e-7, 1e-9] + if fd_gradient_method is None: + fd_gradient_method = "central" + if check_protocol is None: + check_protocol = default_check_protocol + if postprocessor_protocol is None: + # or? postprocessor_protocol = default_postprocessor_protocol + postprocessor_protocol = [] + + # TODO allow supplying this, optionally instead of `gradient` callable + expected_gradient = gradient(point) + + # Create a method to approximate the gradient. Should only require a + # step as its only argument (use as kwarg). + # `fd_gradient_callable` should only require a step to run. + if fd_gradient_method == "forward": + fd_gradient_callable = partial( + quotient.forward, function=function, point=point + ) + elif fd_gradient_method == "backward": + fd_gradient_callable = partial( + quotient.backward, function=function, point=point + ) + elif fd_gradient_method == "central": + fd_gradient_callable = partial( + quotient.central, function=function, point=point + ) + else: + raise NotImplementedError(f"Method: {fd_gradient_method}") + + for size in sizes: + for dimension in dimensions: + step = dstep(point=point, dimension=dimension, size=size) + test_gradient = fd_gradient_callable(step=step) + results.append( + Result( + dimension=dimension, + size=size, + test_gradient=test_gradient, + expected_gradient=expected_gradient[dimension], + method=fd_gradient_method, + ) + ) + + results_df = pd.DataFrame(results) + results_df["success"] = False + results_df["success_reason"] = None + for check in check_protocol: + check(results_df) + if postprocessor_protocol is not None: + for postprocessor in postprocessor_protocol: + postprocessor(results_df) + + # Success is true if each dimension has at least one success. + # TODO better name than "simplify" for this + simplified_results_df = simplify_results_df(results_df=results_df) + success = simplified_results_df["success"].all() + + if sort: + results_df.sort_values( + by=["dimension", "size"], + inplace=True, + ) + + return success, results_df + + +# FIXME refactor to some `gradient.py` where these FD methods can be used to +# compute gradients. +# would result in or require a similar method +def simplify_results_df(results_df: pd.DataFrame) -> pd.DataFrame: + """Keep only one row per successful dimension, in the dataframe. + + Can be useful for debugging problematic dimensions. + + Args: + results_df: + The results. + + Returns: + pd.DataFrame + The simplified results. + """ + dimension_result_dfs = [] + for dimension, df in results_df.groupby("dimension"): + # If any checks were successful for this dimension, only include the + # first successful check. + if df["success"].any(): + # TODO someone only include "best of" successes? + dimension_result_dfs.append(df[df["success"]].head(1)) + # Else include all checks. + else: + # TODO somehow only include "best of" failures? + dimension_result_dfs.append(df) + simplified_results_df = pd.concat(dimension_result_dfs) + return simplified_results_df + + +@dataclass +class Result: + """Information about a single finite difference gradient computation.""" + + size: float + """The size of the step taken.""" + dimension: TYPE_DIMENSION + """The dimension along which the gradient was checked.""" + method: GradientCheckMethod + """The method used to compute the gradient.""" + test_gradient: float + """The (finite difference) gradient.""" + expected_gradient: float + """The expected gradient.""" + + +# FIXME string literals +def add_absolute_error(results_df): + results_df["|aerr|"] = abs( + results_df["test_gradient"] - results_df["expected_gradient"] + ) + + +def check_absolute_error(results_df, tolerance: float = DEFAULT_ATOL): + success = results_df["|aerr|"] < tolerance + set_success(results_df, success, reason="|aerr|") + + +def add_relative_error(results_df): + if "|aerr|" not in results_df.columns: + add_absolute_error(results_df) + epsilon = results_df["|aerr|"].min() * 1e-10 + + results_df["|rerr|"] = abs( + results_df["|aerr|"] / (results_df["expected_gradient"] + epsilon) + ) + + +def check_relative_error(results_df, tolerance: float = 1e-2): + success = results_df["|rerr|"] < tolerance + set_success(results_df, success, reason="|rerr|") + + +def set_success(results_df: pd.DataFrame, success: pd.Series, reason: str): + new_success = success & ~results_df["success"] + results_df["success"] = results_df["success"] | new_success + results_df["success_reason"].mask( + new_success, + reason, + inplace=True, + ) + + +default_check_protocol = [ + # set_all_failed, + add_absolute_error, + add_relative_error, + check_absolute_error, + check_relative_error, +] + + +def keep_lowest_error( + results_df, + error: str = "|rerr|", + inplace: bool = True, + sort: bool = True, +) -> Union[None, pd.DataFrame]: + keep_indices = [] + + for dimension, df in results_df.groupby("dimension"): + # Keep best success from each dimension. + if df["success"].any(): + keep_index = df.loc[df["success"]][error].idxmin() + keep_indices.append(keep_index) + # Include all results if there is no success. + else: + keep_indices.extend(df.index) + + minimal_results_df = results_df.drop( + [index for index in results_df.index if index not in keep_indices], + inplace=inplace, + ) + + sort_df = results_df + if not inplace: + sort_df = minimal_results_df + sort_df.sort_values( + ["success", "dimension", "size"], + ascending=[False, True, True], + inplace=True, + ) + + # Kept like this for readability, but simply `return minimal_results_df` + # should be equivalent. + if not inplace: + return minimal_results_df + + +default_postprocessor_protocol = [ + keep_lowest_error, +] diff --git a/tests/test_derivative.py b/tests/test_derivative.py index 8f09222..09df8d3 100644 --- a/tests/test_derivative.py +++ b/tests/test_derivative.py @@ -5,11 +5,16 @@ from more_itertools import one from scipy.optimize import rosen, rosen_der -from fiddy import MethodId, get_derivative, methods -from fiddy.analysis import ApproximateCentral from fiddy.derivative import Computer from fiddy.derivative_check import NumpyIsCloseDerivativeCheck +from fiddy import MethodId, get_derivative, methods +from fiddy.analysis import ApproximateCentral from fiddy.success import Consistency +from fiddy.derivative_check import ( + NumpyIsCloseDerivativeCheck, + HybridDerivativeCheck, +) + RTOL = 1e-2 ATOL = 1e-15 @@ -106,7 +111,7 @@ def test_get_derivative(point, sizes, output_shape): # FIXME default? sizes=[1e-10, 1e-5], # FIXME default? - method_ids=[MethodId.FORWARD, MethodId.BACKWARD], + method_ids=[MethodId.FORWARD, MethodId.BACKWARD, MethodId.CENTRAL], # FIXME default? analysis_classes=[ApproximateCentral], # FIXME default? not just "True" ... @@ -119,7 +124,55 @@ def test_get_derivative(point, sizes, output_shape): expectation=expected_value, point=point, ) - result = check(rtol=1e-2) + result = check(rtol=1e-2, atol=1e-3) + assert result.success + + +@pytest.mark.parametrize( + "point, sizes, output_shape", + [ + (np.array(point), sizes, output_shape) + for point in [ + (1, 0, 0), + (0.9, 0.1, 0.2, 0.4), + ] + for sizes in [ + [1e-10, 1e-5], + ] + for output_shape in [ + (1,), + (1, 2), + (5, 3, 6, 2, 4), + ] + ], +) +def test_get_derivative_hybrid(point, sizes, output_shape): + function = partial(rosenbrock, output_shape=output_shape) + expected_derivative_function = partial( + rosenbrock_der, output_shape=output_shape + ) + derivative = get_derivative( + function=function, + point=point, + # FIXME default? + sizes=[1e-10, 1e-5], + # FIXME default? + method_ids=[MethodId.FORWARD, MethodId.BACKWARD, MethodId.CENTRAL], + # FIXME default? + analysis_classes=[ApproximateCentral], + # FIXME default? not just "True" ... + success_checker=Consistency(), + ) + test_value = derivative.value + expected_value = expected_derivative_function(point) + + check = HybridDerivativeCheck( + derivative=derivative, + expectation=expected_value, + point=point, + ) + # based on given tolerances, hybrid gradient check should not fail + result = check(rtol=1e-2, atol=1e-3) assert result.success