diff --git a/SI_Toolkit_ASF_Template/__init__.py b/SI_Toolkit_ASF_Template/__init__.py index 51fa92b..62daba1 100644 --- a/SI_Toolkit_ASF_Template/__init__.py +++ b/SI_Toolkit_ASF_Template/__init__.py @@ -1,2 +1,13 @@ -GLOBALLY_DISABLE_COMPILATION = False # Set to False to use tf.function -USE_JIT_COMPILATION = True # XLA ignores random seeds. Set to False for reproducibility +### Choose whether to run TensorFlow in eager mode (slow, interpreted) or graph mode (fast, compiled) +# Set `USE_TENSORFLOW_EAGER_MODE=False` to... +# - decorate functions in optimizers and predictors with `@tf.function`. +# - and thereby enable TensorFlow graph mode. This is much faster than the standard eager mode. +USE_TENSORFLOW_EAGER_MODE = False + + +### Choose whether to use TensorFlow Accelerated Linear Algebra (XLA). +# XLA uses machine-specific conversions to speed up the compiled TensorFlow graph. +# Set USE_TENSORFLOW_XLA to True to accelerate the execution (for real-time). +# If `USE_TENSORFLOW_XLA=True`, this adds `jit_compile=True` to the `tf.function` decorator. +# However, XLA ignores random seeds. Set to False for guaranteed reproducibility, such as for simulations. +USE_TENSORFLOW_XLA = True diff --git a/src/SI_Toolkit/Functions/General/Dataset.py b/src/SI_Toolkit/Functions/General/Dataset.py index 688214a..3344857 100644 --- a/src/SI_Toolkit/Functions/General/Dataset.py +++ b/src/SI_Toolkit/Functions/General/Dataset.py @@ -191,7 +191,7 @@ def get_batch(self, idx_batch): def reset_batch_size(self, batch_size=None): if batch_size is None: - self.batch_size = self.args.batch_size + self.batch_size = self.args.num_rollouts else: self.batch_size = batch_size diff --git a/src/SI_Toolkit/Functions/Pytorch/Training.py b/src/SI_Toolkit/Functions/Pytorch/Training.py index 34438d3..bcd255a 100644 --- a/src/SI_Toolkit/Functions/Pytorch/Training.py +++ b/src/SI_Toolkit/Functions/Pytorch/Training.py @@ -35,7 +35,7 @@ def train_network_core(net, net_info, training_dfs_norm, validation_dfs_norm, te del training_dfs_norm, validation_dfs_norm, test_dfs_norm # Create PyTorch dataloaders for train and dev set - training_generator = data.DataLoader(dataset=training_dataset, batch_size=a.batch_size, shuffle=True) + training_generator = data.DataLoader(dataset=training_dataset, batch_size=a.num_rollouts, shuffle=True) validation_generator = data.DataLoader(dataset=validation_dataset, batch_size=512, shuffle=False) print('') diff --git a/src/SI_Toolkit/Functions/TF/Compile.py b/src/SI_Toolkit/Functions/TF/Compile.py index 3199794..b3189ad 100644 --- a/src/SI_Toolkit/Functions/TF/Compile.py +++ b/src/SI_Toolkit/Functions/TF/Compile.py @@ -1,19 +1,22 @@ -import logging import platform import tensorflow as tf -import torch + +from Control_Toolkit.others.get_logger import get_logger +log=get_logger(__name__) from SI_Toolkit.computation_library import ComputationLibrary + + try: - from SI_Toolkit_ASF import GLOBALLY_DISABLE_COMPILATION, USE_JIT_COMPILATION + from SI_Toolkit_ASF import USE_TENSORFLOW_EAGER_MODE, USE_TENSORFLOW_XLA except ImportError: - logging.warn("No compilation option set in SI_Toolkit_ASF. Setting GLOBALLY_DISABLE_COMPILATION to True.") - GLOBALLY_DISABLE_COMPILATION = True + raise Exception("Either/both of compilation options USE_TENSORFLOW_EAGER_MODE, USE_TENSORFLOW_XLA are missing in SI_Toolkit_ASF/__init.py__.") def tf_function_jit(func): - return tf.function(func=func, jit_compile=True) + # log.debug(f'compiling tf.function from {func}') + return tf.function(func=func, jit_compile=True,) def tf_function_experimental(func): @@ -24,27 +27,39 @@ def identity(func): return func -if GLOBALLY_DISABLE_COMPILATION: +if USE_TENSORFLOW_EAGER_MODE: + log.warning('TensorFlow compilation is disabled by USE_TENSORFLOW_EAGER_MODE=True and execution will be extremely slow') CompileTF = identity else: if platform.machine() == 'arm64' and platform.system() == 'Darwin': # For M1 Apple processor + log.info('TensorFlow compilation (but not JIT) is enabled by tf.function by USE_TENSORFLOW_EAGER_MODE=False and USE_TENSORFLOW_XLA = False') CompileTF = tf.function - elif not USE_JIT_COMPILATION: + elif not USE_TENSORFLOW_XLA: + log.info('TensorFlow compilation (but not JIT) is enabled by tf.function by USE_TENSORFLOW_EAGER_MODE=False and USE_TENSORFLOW_XLA = False') CompileTF = tf.function else: + log.info('TensorFlow compilation and JIT are both enabled by tf.function_jit by USE_TENSORFLOW_EAGER_MODE=False and USE_TENSORFLOW_XLA = True') CompileTF = tf_function_jit + log.info(f'using {CompileTF} compilation') # CompileTF = tf_function_experimental # Should be same as tf_function_jit, not appropriate for newer version of TF def CompileAdaptive(fun): + """ + Compiles the function using options for TensorFlow and XLA JIT, according to global flags USE_TENSORFLOW_EAGER_MODE. + + See SI_Toolkit_ASF\__init__.py + + """ instance = fun.__self__ assert hasattr(instance, "lib"), "Instance with this method has no computation library defined" computation_library: "type[ComputationLibrary]" = instance.lib lib_name = computation_library.lib - if GLOBALLY_DISABLE_COMPILATION: + if USE_TENSORFLOW_EAGER_MODE: return identity(fun) elif lib_name == 'TF': + log.debug(f'compiling tensorflow {fun}') return CompileTF(fun) else: - print('Jit compilation for Pytorch not yet implemented.') + log.warning(f'JIT compilation for {lib_name} not yet implemented.') return identity(fun) diff --git a/src/SI_Toolkit/GP/DataSelector.py b/src/SI_Toolkit/GP/DataSelector.py index f5194c1..708998e 100644 --- a/src/SI_Toolkit/GP/DataSelector.py +++ b/src/SI_Toolkit/GP/DataSelector.py @@ -138,8 +138,8 @@ def return_dataset_for_training(self, raw=False ): - if batch_size is None and self.args.batch_size is not None: - batch_size = self.args.batch_size + if batch_size is None and self.args.num_rollouts is not None: + batch_size = self.args.num_rollouts if inputs is None and self.args.inputs is not None: inputs = self.args.inputs diff --git a/src/SI_Toolkit/GP/TimeGP.py b/src/SI_Toolkit/GP/TimeGP.py index d720b9f..59cf22b 100644 --- a/src/SI_Toolkit/GP/TimeGP.py +++ b/src/SI_Toolkit/GP/TimeGP.py @@ -14,10 +14,10 @@ def timing_script_init(): m_loaded = load_model(save_dir) print("Done!") - num_rollouts = 2000 + batch_size = 2000 horizon = 35 - s = tf.zeros(shape=[num_rollouts, 6], dtype=tf.float64) + s = tf.zeros(shape=[batch_size, 6], dtype=tf.float64) m_loaded.predict_f(s) return m_loaded, s diff --git a/src/SI_Toolkit/GP/Train_GPR.py b/src/SI_Toolkit/GP/Train_GPR.py index e0b5e0b..b642946 100644 --- a/src/SI_Toolkit/GP/Train_GPR.py +++ b/src/SI_Toolkit/GP/Train_GPR.py @@ -26,7 +26,7 @@ a.wash_out_len = 0 a.post_wash_out_len = 1 outputs = a.outputs -batch_size = a.batch_size +batch_size = a.num_rollouts number_of_inducing_points = 10 diff --git a/src/SI_Toolkit/Predictors/__init__.py b/src/SI_Toolkit/Predictors/__init__.py index af7ecb9..794fbe2 100644 --- a/src/SI_Toolkit/Predictors/__init__.py +++ b/src/SI_Toolkit/Predictors/__init__.py @@ -16,13 +16,15 @@ def __init__(self, horizon: float, batch_size: int) -> None: self.predictor_external_input_features = CONTROL_INPUTS self.predictor_output_features = STATE_VARIABLES - def predict_tf(self, s: tf.Tensor, Q: tf.Tensor): + def predict_tf(self, s: tf.Tensor, Q: tf.Tensor, time:float=None): """Predict the whole MPC horizon using tensorflow :param s: Initial state [batch_size x state_dim] :type s: tf.Tensor :param Q: Control inputs [batch_size x horizon_length x control_dim] :type Q: tf.Tensor + :param time: time in seconds + :type time: float """ raise NotImplementedError() diff --git a/src/SI_Toolkit/Predictors/predictor_ODE_tf.py b/src/SI_Toolkit/Predictors/predictor_ODE_tf.py index 69375ba..a8195a9 100644 --- a/src/SI_Toolkit/Predictors/predictor_ODE_tf.py +++ b/src/SI_Toolkit/Predictors/predictor_ODE_tf.py @@ -51,26 +51,37 @@ def __init__(self, horizon: int, dt: float, intermediate_steps=10, disable_indiv self.predict_tf = CompileTF(self._predict_tf) - def predict(self, initial_state, Q): + def predict(self, initial_state, Q, time:float=None, horizon:int=None): initial_state, Q = convert_to_tensors(initial_state, Q) initial_state, Q = check_dimensions(initial_state, Q) self.batch_size = tf.shape(Q)[0] self.initial_state = initial_state - output = self.predict_tf(self.initial_state, Q) + output = self.predict_tf(self.initial_state, Q, params=None, horizon=horizon) return output.numpy() - def _predict_tf(self, initial_state, Q, params=None): + def _predict_tf(self, initial_state, Q, params=None, time:float=None, horizon:int=None): + """ Predict the states over horizon next timesteps. + Q must be a 3-dimensional vector [num_rollouts, horizon, Q] where Q is the vector of control inputs - self.output = tf.TensorArray(tf.float32, size=self.horizon + 1, dynamic_size=False) + :param initial_state: the state now + :param Q: the control over horizon next steps + :param params: optional parameters + :param time: the current time in seconds + :param horizon: optional horizon, if None then use self.horizon + + :returns: the predicted states including as first component of horizon dimension the initial state, [num_rollouts, horizon+1, states] + """ + horizon=self.horizon if horizon is None else horizon + self.output = tf.TensorArray(tf.float32, size=horizon + 1, dynamic_size=False) self.output = self.output.write(0, initial_state) next_state = initial_state - for k in tf.range(self.horizon): + for k in tf.range(horizon): next_state = self.next_step_predictor.step(next_state, Q[:, k, :], params) self.output = self.output.write(k + 1, next_state) diff --git a/src/SI_Toolkit/Predictors/predictor_wrapper.py b/src/SI_Toolkit/Predictors/predictor_wrapper.py index 3c53b52..1338ccf 100644 --- a/src/SI_Toolkit/Predictors/predictor_wrapper.py +++ b/src/SI_Toolkit/Predictors/predictor_wrapper.py @@ -13,7 +13,7 @@ class PredictorWrapper: """Wrapper class for creating a predictor. - + 1) Instantiate this wrapper without parameters within the controller class 2) Pass the instance of this wrapper to the optimizer, without the need to already know specifics about it 3) Call this wrapper's `configure` method in controller class to set optimization-specific parameters @@ -33,9 +33,10 @@ def __init__(self): self.predictor_type: str = self.predictor_config['predictor_type'] self.model_name: str = self.predictor_config['model_name'] - def configure(self, batch_size: int, horizon: int, dt: float, computation_library: "Optional[type[ComputationLibrary]]"=None, predictor_specification=None, compile_standalone=False, mode=None): + def configure(self, batch_size: int, horizon: int, dt: float, computation_library: Optional[ComputationLibrary]=None, predictor_specification=None, compile_standalone=False, mode=None): """Assign optimization-specific parameters to finalize instance creation. + :param batch_size: Batch size equals the number of parallel rollouts of the optimizer. :type batch_size: int :param horizon: Number of MPC horizon steps @@ -75,11 +76,16 @@ def configure(self, batch_size: int, horizon: int, dt: float, computation_librar self.predictor = predictor_ODE_tf(horizon=self.horizon, dt=dt, batch_size=self.batch_size, **self.predictor_config, **compile_standalone) else: - raise NotImplementedError('Type of the predictor not recognised.') - + raise NotImplementedError(f'Type of the predictor {self.predictor_type} is not recognised.') + # computation_library defaults to None. In that case, do not check for conformity. - if computation_library is not None and computation_library not in self.predictor.supported_computation_libraries: - raise ValueError(f"Predictor {self.predictor.__class__.__name__} does not support {computation_library.__name__}") + # in other cases, check after we configure it to make sure it supports itself + if not computation_library is None and computation_library not in self.predictor.supported_computation_libraries: + raise ValueError( + f"Predictor {self.predictor.__class__.__name__} does not support {computation_library.__name__}") + + self.predictor.lib=computation_library # set the library type on the predictor object so we can use it to assign attributes later + def configure_with_compilation(self, batch_size, horizon, dt, predictor_specification=None, mode=None): """ @@ -155,8 +161,8 @@ def update_predictor_config_from_specification(self, predictor_specification: st def predict(self, s, Q): return self.predictor.predict(s, Q) - def predict_tf(self, s, Q): # TODO: This function should disappear: predict() should manage the right library - return self.predictor.predict_tf(s, Q) + def predict_tf(self, state, Q, time=None): # TODO: This function should disappear: predict() should manage the right library + return self.predictor.predict_tf(state, Q, time=time) def update(self, Q0, s): if self.predictor_type == 'neural': diff --git a/src/SI_Toolkit/computation_library.py b/src/SI_Toolkit/computation_library.py index cde8bf8..ca5a971 100644 --- a/src/SI_Toolkit/computation_library.py +++ b/src/SI_Toolkit/computation_library.py @@ -55,7 +55,7 @@ class ComputationLibrary: gather: Callable[[TensorType, TensorType, int], TensorType] = None gather_last: Callable[[TensorType, TensorType], TensorType] = None arange: Callable[[Optional[NumericType], NumericType, Optional[NumericType]], TensorType] = None - zeros: Callable[["tuple[int]"], TensorType] = None + zeros: Callable[["tuple[int,...]"], TensorType] = None zeros_like: Callable[[TensorType], TensorType] = None ones: Callable[["tuple[int]"], TensorType] = None ones_like: Callable[[TensorType], TensorType] = None @@ -92,9 +92,16 @@ class ComputationLibrary: dot: Callable[[TensorType, TensorType], TensorType] = None stop_gradient: Callable[[TensorType], TensorType] = None assign: Callable[[Union[TensorType, tf.Variable], TensorType], Union[TensorType, tf.Variable]] = None + nan:TensorType=None + isnan:Callable[[TensorType],bool]=None + string = None + equal= lambda x,y: x==y + pow=lambda x,p: x**p where: Callable[[TensorType, TensorType, TensorType], TensorType] = None logical_and: Callable[[TensorType, TensorType], TensorType] = None logical_or: Callable[[TensorType, TensorType], TensorType] = None + dtype=lambda x: x.dtype + fill = None class NumpyLibrary(ComputationLibrary): @@ -169,19 +176,24 @@ class NumpyLibrary(ComputationLibrary): dot = np.dot stop_gradient = lambda x: x assign = LibraryHelperFunctions.set_to_value + nan = np.nan + isnan=np.isnan + string=str where = np.where logical_and = np.logical_and logical_or = np.logical_or - - + equal= lambda x,y: x==y + cond= lambda cond, t, f: t if cond else f + pow=lambda x,p: np.power(x,p) + fill = lambda x,y: x.np.fill(y) class TensorFlowLibrary(ComputationLibrary): lib = 'TF' reshape = tf.reshape permute = tf.transpose newaxis = tf.newaxis - shape = lambda x: x.get_shape() # .as_list() - to_numpy = lambda x: x.numpy() + shape = tf.shape # tobi does not understand reason for this previous definition: # lambda x: x.get_shape() # .as_list() + to_numpy = lambda x: x.numpy() if isinstance(x,(tf.Tensor, tf.Variable)) else x to_variable = lambda x, dtype: tf.Variable(x, dtype=dtype) to_tensor = lambda x, dtype: tf.convert_to_tensor(x, dtype=dtype) constant = lambda x, t: tf.constant(x, dtype=t) @@ -247,9 +259,16 @@ class TensorFlowLibrary(ComputationLibrary): dot = lambda a, b: tf.tensordot(a, b, 1) stop_gradient = tf.stop_gradient assign = LibraryHelperFunctions.set_to_variable + nan=tf.constant(np.nan) + isnan=tf.math.is_nan + string=tf.string where = tf.where logical_and = tf.math.logical_and logical_or = tf.math.logical_or + equal= lambda x,y: tf.math.equal(x,y) + cond= lambda cond, t, f: tf.cond(cond,t,f) + pow=lambda x,p: tf.pow(x,p) + fill = lambda dims,value: tf.fill(dims,value) class PyTorchLibrary(ComputationLibrary): @@ -332,6 +351,12 @@ def gather_last_pytorch(a, index_vector): dot = torch.dot stop_gradient = tf.stop_gradient # FIXME: How to imlement this in torch? assign = LibraryHelperFunctions.set_to_value + nan=torch.nan + isnan=torch.isnan + string=lambda x: torch.ByteTensor(bytes(x,'utf8')) where = torch.where logical_and = torch.logical_and logical_or = torch.logical_or + equal=lambda x,y: torch.equal(x,y) + pow=lambda x,p: torch.pow(x,p) + fill = lambda x,y: x.torch.Tensor.fill(x,y)