Skip to content

Commit

Permalink
Fix mypy errors
Browse files Browse the repository at this point in the history
  • Loading branch information
nhuet committed Mar 21, 2024
1 parent 537148d commit 1c85777
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 58 deletions.
2 changes: 1 addition & 1 deletion src/decomon/layers/activations/activation.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def compute_output_shape(
) = self.inputs_outputs_spec.split_input_shape(input_shape=input_shape)
return self.inputs_outputs_spec.flatten_outputs_shape(
affine_bounds_propagated_shape=affine_bounds_to_propagate_shape,
constant_bounds_propagated_shape=constant_oracle_bounds_shape,
constant_bounds_propagated_shape=constant_oracle_bounds_shape, # type: ignore
)


Expand Down
2 changes: 1 addition & 1 deletion src/decomon/layers/crown.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def call(self, inputs: list[list[BackendTensor]]) -> list[BackendTensor]:
b_l = add_tensors(b_l, b_l_i, missing_batchsize=missing_batchsize)
b_u = add_tensors(b_u, b_u_i, missing_batchsize=missing_batchsize)

affine_bounds = w_l, b_l, w_u, b_u
affine_bounds = [w_l, b_l, w_u, b_u]
return affine_bounds

def build(self, input_shape: list[list[tuple[Optional[int], ...]]]) -> None:
Expand Down
28 changes: 19 additions & 9 deletions src/decomon/layers/fuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(
m1_input_shape: tuple[int, ...],
m_1_output_shapes: list[tuple[int, ...]],
from_linear_2: list[bool],
**kwargs,
**kwargs: Any,
):
"""
Expand Down Expand Up @@ -138,7 +138,7 @@ def _is_from_linear_m1_ith_affine_bounds(self, affine_bounds: list[Tensor], i: i
return len(affine_bounds) == 0 or affine_bounds[1].shape == self.m_1_output_shapes[i]

def _is_from_linear_m1_ith_affine_bounds_shape(
self, affine_bounds_shape: list[tuple[Optional[int]]], i: int
self, affine_bounds_shape: list[tuple[Optional[int], ...]], i: int
) -> bool:
return len(affine_bounds_shape) == 0 or affine_bounds_shape[1] == self.m_1_output_shapes[i]

Expand Down Expand Up @@ -226,14 +226,16 @@ def compute_output_shape(
) -> list[tuple[Optional[int], ...]]:
bounds_1_shape, bounds_2_shape = input_shape

bounds_fused_shape: list[tuple[int, ...]] = []
bounds_fused_shape: list[tuple[Optional[int], ...]] = []
for i in range(self.nb_outputs_first_model):
bounds_1_i_shape = bounds_1_shape[
i
* self.inputs_outputs_spec_1.nb_output_tensors : (i + 1)
* self.inputs_outputs_spec_1.nb_output_tensors
]
affine_bounds_1_shape, constant_bounds_1_shape = self.inputs_outputs_spec_1.split_output_shape(
affine_bounds_1_shape: list[tuple[Optional[int], ...]]
constant_bounds_1_shape: list[tuple[Optional[int], ...]]
affine_bounds_1_shape, constant_bounds_1_shape = self.inputs_outputs_spec_1.split_output_shape( # type: ignore
bounds_1_i_shape
)

Expand All @@ -242,9 +244,14 @@ def compute_output_shape(
* self.inputs_outputs_spec_2[0].nb_output_tensors : (i + 1)
* self.inputs_outputs_spec_2[0].nb_output_tensors
]
affine_bounds_2_shape, constant_bounds_2_shape = self.inputs_outputs_spec_2[0].split_output_shape(
bounds_2_i_shape
)
affine_bounds_2_shape: list[tuple[Optional[int], ...]]
constant_bounds_2_shape: list[tuple[Optional[int], ...]]
(
affine_bounds_2_shape,
constant_bounds_2_shape,
) = self.inputs_outputs_spec_2[ # type:ignore
0
].split_output_shape(bounds_2_i_shape)

# constant bounds
if self.ibp_2:
Expand All @@ -264,10 +271,11 @@ def compute_output_shape(
# affine bounds
if self.affine_1 and self.affine_2:
_, b2_shape, _, _ = affine_bounds_2_shape
model_2_output_shape_wo_batchisze: tuple[int, ...]
if self.from_linear_2[i]:
model_2_output_shape_wo_batchisze = b2_shape
model_2_output_shape_wo_batchisze = b2_shape # type: ignore
else:
model_2_output_shape_wo_batchisze = b2_shape[1:]
model_2_output_shape_wo_batchisze = b2_shape[1:] # type: ignore

diagonal = self.inputs_outputs_spec_1.is_diagonal_bounds_shape(
affine_bounds_1_shape
Expand All @@ -281,6 +289,8 @@ def compute_output_shape(
self._is_from_linear_m1_ith_affine_bounds_shape(affine_bounds_shape=affine_bounds_1_shape, i=i)
and self.from_linear_2[i]
)
w_fused_shape: tuple[Optional[int], ...]
b_fused_shape: tuple[Optional[int], ...]
if from_linear_layer:
w_fused_shape = w_fused_shape_wo_batchsize
b_fused_shape = model_2_output_shape_wo_batchisze
Expand Down
12 changes: 5 additions & 7 deletions src/decomon/layers/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def compute_output_shape(
self,
input_shape: tuple[Optional[int], ...],
) -> list[tuple[Optional[int], ...]]:
perturbation_domain_input_shape_wo_batchsize = input_shape[1:]
perturbation_domain_input_shape_wo_batchsize: tuple[int, ...] = input_shape[1:] # type: ignore
keras_input_shape_wo_batchsize = self.perturbation_domain.get_keras_input_shape_wo_batchsize(
x_shape=perturbation_domain_input_shape_wo_batchsize
)
Expand All @@ -112,7 +112,7 @@ def compute_output_shape(
else:
constant_bounds_shape = []
return self.inputs_outputs_spec.flatten_inputs_shape(
affine_bounds_to_propagate_shape=affine_bounds_shape,
affine_bounds_to_propagate_shape=affine_bounds_shape, # type: ignore
constant_oracle_bounds_shape=constant_bounds_shape,
perturbation_domain_inputs_shape=[],
)
Expand Down Expand Up @@ -165,7 +165,7 @@ def compute_output_shape(
self,
input_shape: tuple[Optional[int], ...],
) -> list[tuple[Optional[int], ...]]:
perturbation_domain_input_shape_wo_batchsize = input_shape[1:]
perturbation_domain_input_shape_wo_batchsize: tuple[int, ...] = input_shape[1:] # type: ignore
keras_input_shape_wo_batchsize = self.perturbation_domain.get_keras_input_shape_wo_batchsize(
x_shape=perturbation_domain_input_shape_wo_batchsize
)
Expand Down Expand Up @@ -312,11 +312,13 @@ def compute_output_shape(
else:
w_shape_wo_batchsize = w_shape[1:]
is_diag = w_shape_wo_batchsize == model_output_shape
m2_output_shape: tuple[Optional[int], ...]
if is_diag:
m2_output_shape = model_output_shape
else:
m2_output_shape = w_shape_wo_batchsize[len(model_output_shape) :]
b_shape_wo_batchsize = m2_output_shape
b_shape: tuple[Optional[int], ...]
if from_linear:
b_shape = b_shape_wo_batchsize
else:
Expand All @@ -332,10 +334,6 @@ def compute_output_shape(
return input_shape


def _is_keras_tensor_shape(shape):
return len(shape) > 0 and (shape[0] is None or isinstance(shape[0], int))


def flatten_backward_bounds(
backward_bounds: Union[keras.KerasTensor, list[keras.KerasTensor], list[list[keras.KerasTensor]]]
) -> list[keras.KerasTensor]:
Expand Down
6 changes: 3 additions & 3 deletions src/decomon/layers/inputs_outputs_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ def has_multiple_bounds_inputs(self) -> bool:
return self.propagation == Propagation.FORWARD and self.is_merging_layer

@overload
def extract_shapes_from_affine_bounds(
def extract_shapes_from_affine_bounds( # type:ignore
self, affine_bounds: list[Tensor], i: int = -1
) -> list[tuple[Optional[int], ...]]:
...
Expand Down Expand Up @@ -575,14 +575,14 @@ def is_wo_batch_bounds_shape(
b_shape = affine_bounds_shape[1]
if self.propagation == Propagation.FORWARD:
if i > -1:
return len(b_shape) == len(self.layer_input_shape[i])
return len(b_shape) == len(self.layer_input_shape[i]) # type: ignore
else:
return len(b_shape) == len(self.layer_input_shape)
else:
return len(b_shape) == len(self.model_output_shape)

@overload
def is_wo_batch_bounds_by_keras_input(
def is_wo_batch_bounds_by_keras_input( # type: ignore
self,
affine_bounds: list[Tensor],
) -> bool:
Expand Down
13 changes: 8 additions & 5 deletions src/decomon/layers/layer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from inspect import Parameter, signature
from typing import Any, Optional
from typing import Any, Optional, Union

import keras
import keras.ops as K
Expand Down Expand Up @@ -169,7 +169,7 @@ def is_merging_layer(self) -> bool:

@property
def layer_input_shape(self) -> tuple[int, ...]:
return self.inputs_outputs_spec.layer_input_shape
return self.inputs_outputs_spec.layer_input_shape # type: ignore

@property
def model_input_shape(self) -> tuple[int, ...]:
Expand Down Expand Up @@ -640,6 +640,7 @@ def compute_output_shape(
# outputs shape depends if layer and inputs are diagonal / linear (w/o batch)
b_shape_wo_batchisze = model_output_shape_wo_batchsize
if self.diagonal and self.inputs_outputs_spec.is_diagonal_bounds_shape(affine_bounds_to_propagate_shape):
w_shape_wo_batchsize: Union[tuple[int, ...], list[tuple[int, ...]]]
if self._is_merging_layer:
w_shape_wo_batchsize = [model_output_shape_wo_batchsize] * self.inputs_outputs_spec.nb_keras_inputs
else:
Expand All @@ -652,15 +653,17 @@ def compute_output_shape(
]
else:
w_shape_wo_batchsize = self.layer.input.shape[1:] + model_output_shape_wo_batchsize
b_shape: tuple[Optional[int], ...]
w_shape: Union[tuple[Optional[int], ...], list[tuple[Optional[int], ...]]]
if self.linear and self.inputs_outputs_spec.is_wo_batch_bounds_shape(affine_bounds_to_propagate_shape):
b_shape = b_shape_wo_batchisze
w_shape = w_shape_wo_batchsize
w_shape = w_shape_wo_batchsize # type: ignore
else:
b_shape = (None,) + b_shape_wo_batchisze
if self._is_merging_layer:
w_shape = [(None,) + sub_w_shape_wo_batchsize for sub_w_shape_wo_batchsize in w_shape_wo_batchsize]
w_shape = [(None,) + sub_w_shape_wo_batchsize for sub_w_shape_wo_batchsize in w_shape_wo_batchsize] # type: ignore
else:
w_shape = (None,) + w_shape_wo_batchsize
w_shape = (None,) + w_shape_wo_batchsize # type: ignore
if self._is_merging_layer:
affine_bounds_propagated_shape = [
[
Expand Down
7 changes: 4 additions & 3 deletions src/decomon/layers/merging/base_merge.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any

import keras
import keras.ops as K

from decomon.keras_utils import add_tensors, batch_multid_dot
Expand All @@ -12,7 +13,7 @@ class DecomonMerge(DecomonLayer):
_is_merging_layer = True

@property
def keras_layer_input(self):
def keras_layer_input(self) -> list[keras.KerasTensor]:
"""self.layer.input returned as a list.
In the degenerate case where only 1 input is merged, self.layer.input is a single keras tensor.
Expand All @@ -25,7 +26,7 @@ def keras_layer_input(self):
return [self.layer.input]

@property
def nb_keras_inputs(self):
def nb_keras_inputs(self) -> int:
"""Number of inputs merged by the underlying layer."""
return len(self.keras_layer_input)

Expand Down Expand Up @@ -274,7 +275,7 @@ def forward_affine_propagate(
from_linear_layer_new = all(from_linear_add)
return w_l_new, b_l_new, w_u_new, b_u_new

def backward_affine_propagate(
def backward_affine_propagate( # type: ignore
self, output_affine_bounds: list[Tensor], input_constant_bounds: list[list[Tensor]]
) -> list[tuple[Tensor, Tensor, Tensor, Tensor]]:
"""Propagate model affine bounds in backward direction.
Expand Down
9 changes: 6 additions & 3 deletions src/decomon/layers/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,13 @@ def compute_output_shape(
"""Compute output shape in case of symbolic call."""
if self.is_merging_layer:
output_shape = []
for layer_input_shape_i in self.layer_input_shape:
layer_input_shape_i: tuple[int, ...]
for layer_input_shape_i in self.layer_input_shape: # type: ignore
layer_input_shape_w_batchsize_i = (None,) + layer_input_shape_i
output_shape.append([layer_input_shape_w_batchsize_i, layer_input_shape_w_batchsize_i])
return output_shape
else:
layer_input_shape_w_batchsize = (None,) + self.layer_input_shape
layer_input_shape_w_batchsize = (None,) + self.layer_input_shape # type: ignore
return [layer_input_shape_w_batchsize, layer_input_shape_w_batchsize]


Expand Down Expand Up @@ -222,7 +223,7 @@ def get_forward_oracle(
x = perturbation_domain_inputs[0]
if is_merging_layer:
constant_bounds = []
for affine_bounds_i, from_linear_i in zip(affine_bounds, from_linear):
for affine_bounds_i, from_linear_i in zip(affine_bounds, from_linear): # type: ignore
if len(affine_bounds_i) == 0:
# special case: empty affine bounds => identity bounds
l_affine = perturbation_domain.get_lower_x(x)
Expand All @@ -239,6 +240,8 @@ def get_forward_oracle(
l_affine = perturbation_domain.get_lower_x(x)
u_affine = perturbation_domain.get_upper_x(x)
else:
if not isinstance(from_linear, bool):
raise ValueError("from_linear must be a boolean for a non-merging layer")
w_l, b_l, w_u, b_u = affine_bounds
l_affine = perturbation_domain.get_lower(x, w_l, b_l, missing_batchsize=from_linear)
u_affine = perturbation_domain.get_upper(x, w_u, b_u, missing_batchsize=from_linear)
Expand Down
10 changes: 8 additions & 2 deletions src/decomon/layers/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,16 @@ def compute_output_shape(
self,
input_shape: list[tuple[Optional[int], ...]],
) -> list[tuple[Optional[int], ...]]:
affine_bounds_from_shape: list[list[tuple[Optional[int], ...]]]
constant_bounds_from_shape: list[list[tuple[Optional[int], ...]]]
perturbation_domain_inputs_shape: list[tuple[Optional[int], ...]]
(
affine_bounds_from_shape,
constant_bounds_from_shape,
perturbation_domain_inputs_shape,
) = self.inputs_outputs_spec.split_input_shape(input_shape)
) = self.inputs_outputs_spec.split_input_shape( # type: ignore
input_shape
)
constant_bounds_to_shape: list[list[tuple[Optional[int], ...]]]
affine_bounds_to_shape: list[list[tuple[Optional[int], ...]]]

Expand All @@ -167,7 +172,8 @@ def compute_output_shape(
affine_bounds_to_shape = affine_bounds_from_shape
else:
x_shape = perturbation_domain_inputs_shape[0]
keras_input_shape = self.perturbation_domain.get_keras_input_shape_wo_batchsize(x_shape[1:])
x_shape_wo_batchsize: tuple[int, ...] = x_shape[1:] # type: ignore
keras_input_shape = self.perturbation_domain.get_keras_input_shape_wo_batchsize(x_shape_wo_batchsize)
affine_bounds_to_shape = []
for model_output_shape in self.model_output_shapes:
b_shape = (None,) + model_output_shape
Expand Down
7 changes: 4 additions & 3 deletions src/decomon/models/backward_cloning.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def crown_model(
model_output_shape = get_model_output_shape(
node=node, backward_bounds=backward_bounds_node, from_linear=from_linear
)
backward_map_node = {}
backward_map_node: dict[int, DecomonLayer] = {}

output_crown = crown(
node=node,
Expand Down Expand Up @@ -406,7 +406,7 @@ def convert_backward(
layer_fn: Callable[..., DecomonLayer] = to_decomon,
backward_bounds: Optional[list[keras.KerasTensor]] = None,
from_linear_backward_bounds: Union[bool, list[bool]] = False,
slope: Union[str, Slope] = Slope.V_SLOPE,
slope: Slope = Slope.V_SLOPE,
forward_output_map: Optional[dict[int, list[keras.KerasTensor]]] = None,
forward_layer_map: Optional[dict[int, DecomonLayer]] = None,
mapping_keras2decomon_classes: Optional[dict[type[Layer], type[DecomonLayer]]] = None,
Expand Down Expand Up @@ -440,6 +440,7 @@ def convert_backward(
"""
if perturbation_domain is None:
perturbation_domain = BoxDomain()
backward_bounds_for_crown_model: list[list[keras.KerasTensor]]
if backward_bounds is None:
backward_bounds_for_crown_model = [[]] * len(model.outputs)
else:
Expand Down Expand Up @@ -474,7 +475,7 @@ def convert_backward(
return output


def get_model_output_shape(node: Node, backward_bounds: list[Tensor], from_linear: bool = False):
def get_model_output_shape(node: Node, backward_bounds: list[Tensor], from_linear: bool = False) -> tuple[int, ...]:
"""Get outer model output shape w/o batchsize.
If any backward bounds are passed, we deduce the outer keras model output shape from it.
Expand Down
12 changes: 6 additions & 6 deletions src/decomon/models/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,18 +311,18 @@ def clone(
if perturbation_domain is None:
perturbation_domain = BoxDomain()

default_final_ibp, default_final_affine = get_final_ibp_affine_from_method(method)
if final_ibp is None:
final_ibp = default_final_ibp
if final_affine is None:
final_affine = default_final_affine

if isinstance(method, str):
method = ConvertMethod(method.lower())

if isinstance(slope, str):
slope = Slope(slope.lower())

default_final_ibp, default_final_affine = get_final_ibp_affine_from_method(method)
if final_ibp is None:
final_ibp = default_final_ibp
if final_affine is None:
final_affine = default_final_affine

# preprocess backward_bounds
backward_bounds_flattened: Optional[list[keras.KerasTensor]]
backward_bounds_for_convert: Optional[list[keras.KerasTensor]]
Expand Down
Loading

0 comments on commit 1c85777

Please sign in to comment.