Skip to content

Commit

Permalink
Move compile method back and add special case for TopKOutput
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverholworthy committed Nov 9, 2023
1 parent aed1b27 commit 7e14b13
Showing 1 changed file with 149 additions and 145 deletions.
294 changes: 149 additions & 145 deletions merlin/models/tf/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
from merlin.models.tf.outputs.base import ModelOutput, ModelOutputType
from merlin.models.tf.outputs.classification import CategoricalOutput
from merlin.models.tf.outputs.contrastive import ContrastiveOutput
from merlin.models.tf.outputs.topk import TopKOutput
from merlin.models.tf.prediction_tasks.base import ParallelPredictionBlock, PredictionTask
from merlin.models.tf.transforms.features import PrepareFeatures, expected_input_cols_from_schema
from merlin.models.tf.transforms.sequence import SequenceTransform
Expand Down Expand Up @@ -324,6 +325,154 @@ def __init__(self, **kwargs):
initial_value=lambda: True,
)

def compile(
self,
optimizer="rmsprop",
loss: Optional[Union[str, Loss, Dict[str, Union[str, Loss]]]] = None,
metrics: Optional[
Union[
MetricType,
Sequence[MetricType],
Sequence[Sequence[MetricType]],
Dict[str, MetricType],
Dict[str, Sequence[MetricType]],
]
] = None,
loss_weights=None,
weighted_metrics: Optional[
Union[
MetricType,
Sequence[MetricType],
Sequence[Sequence[MetricType]],
Dict[str, MetricType],
Dict[str, Sequence[MetricType]],
]
] = None,
run_eagerly=None,
steps_per_execution=None,
jit_compile=None,
**kwargs,
):
"""Configures the model for training.
Example:
```python
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
loss=tf.keras.losses.BinaryCrossentropy(),
metrics=[tf.keras.metrics.BinaryAccuracy(),
tf.keras.metrics.FalseNegatives()])
```
Args:
optimizer: String (name of optimizer) or optimizer instance. See
`tf.keras.optimizers`.
loss: Optional[Union[str, Loss, Dict[str, Union[str, Loss]]]] = None
losses : Optional[Union[str, Loss, Dict[str, Union[str, Loss]]]], optional
{LOSS_PARAMETERS_DOCSTRINGS}
See `tf.keras.losses`. A loss
function is any callable with the signature `loss = fn(y_true,
y_pred)`, where `y_true` are the ground truth values, and
`y_pred` are the model's predictions.
`y_true` should have shape
`(batch_size, d0, .. dN)` (except in the case of
sparse loss functions such as
sparse categorical crossentropy which expects integer arrays of shape
`(batch_size, d0, .. dN-1)`).
`y_pred` should have shape `(batch_size, d0, .. dN)`.
The loss function should return a float tensor.
If a custom `Loss` instance is
used and reduction is set to `None`, return value has shape
`(batch_size, d0, .. dN-1)` i.e. per-sample or per-timestep loss
values; otherwise, it is a scalar. If the model has multiple outputs,
you can use a different loss on each output by passing a dictionary
or a list of losses. The loss value that will be minimized by the
model will then be the sum of all individual losses, unless
`loss_weights` is specified.
loss_weights: Optional list or dictionary specifying scalar coefficients
(Python floats) to weight the loss contributions of different model
outputs. The loss value that will be minimized by the model will then
be the *weighted sum* of all individual losses, weighted by the
`loss_weights` coefficients.
If a list, it is expected to have a 1:1 mapping to the model's
outputs (Keras sorts tasks by name).
If a dict, it is expected to map output names (strings)
to scalar coefficients.
metrics: Optional[ Union[ MetricType, Sequence[MetricType],
Sequence[Sequence[MetricType]],
Dict[str, MetricType], Dict[str, Sequence[MetricType]], ] ], optional
{METRICS_PARAMETERS_DOCSTRING}
weighted_metrics: Optional[ Union[ MetricType, Sequence[MetricType],
Sequence[Sequence[MetricType]],
Dict[str, MetricType], Dict[str, Sequence[MetricType]], ] ], optional
List of metrics to be evaluated and weighted by
`sample_weight` or `class_weight` during training and testing.
{METRICS_PARAMETERS_DOCSTRING}
run_eagerly: Bool. Defaults to `False`. If `True`, this `Model`'s
logic will not be wrapped in a `tf.function`. Recommended to leave
this as `None` unless your `Model` cannot be run inside a
`tf.function`. `run_eagerly=True` is not supported when using
`tf.distribute.experimental.ParameterServerStrategy`.
steps_per_execution: Int. Defaults to 1. The number of batches to run
during each `tf.function` call. Running multiple batches inside a
single `tf.function` call can greatly improve performance on TPUs or
small models with a large Python overhead. At most, one full epoch
will be run each execution. If a number larger than the size of the
epoch is passed, the execution will be truncated to the size of the
epoch. Note that if `steps_per_execution` is set to `N`,
`Callback.on_batch_begin` and `Callback.on_batch_end` methods will
only be called every `N` batches (i.e. before/after each `tf.function`
execution).
jit_compile: If `True`, compile the model training step with XLA.
[XLA](https://www.tensorflow.org/xla) is an optimizing compiler for
machine learning.
`jit_compile` is not enabled for by default.
This option cannot be enabled with `run_eagerly=True`.
Note that `jit_compile=True` is
may not necessarily work for all models.
For more information on supported operations please refer to the
[XLA documentation](https://www.tensorflow.org/xla).
Also refer to
[known XLA issues](https://www.tensorflow.org/xla/known_issues) for
more details.
**kwargs: Arguments supported for backwards compatibility only.
"""

num_v1_blocks = len(self.prediction_tasks)
num_v2_blocks = len(self.model_outputs)

if num_v1_blocks > 1 and num_v2_blocks > 1:
raise ValueError(
"You cannot use both `prediction_tasks` and `prediction_blocks` at the same time.",
"`prediction_tasks` is deprecated and will be removed in a future version.",
)

if num_v1_blocks > 0:
self.output_names = [task.task_name for task in self.prediction_tasks]
else:
if num_v2_blocks == 1 and isinstance(self.model_outputs[0], TopKOutput):
pass
else:
self.output_names = [block.full_name for block in self.model_outputs]

# This flag will make Keras change the metric-names which is not needed in v2
from_serialized = kwargs.pop("from_serialized", num_v2_blocks > 0)

if hvd_installed and hvd.size() > 1:
# Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
# uses hvd.DistributedOptimizer() to compute gradients.
kwargs.update({"experimental_run_tf_function": False})

super(BaseModel, self).compile(
optimizer=self._create_optimizer(optimizer),
loss=self._create_loss(loss),
metrics=self._create_metrics(metrics, weighted=False),
weighted_metrics=self._create_metrics(weighted_metrics, weighted=True),
run_eagerly=run_eagerly,
loss_weights=loss_weights,
steps_per_execution=steps_per_execution,
jit_compile=jit_compile,
from_serialized=from_serialized,
**kwargs,
)

def _create_optimizer(self, optimizer):
def _create_single_distributed_optimizer(opt):
opt_config = opt.get_config()
Expand Down Expand Up @@ -1616,151 +1765,6 @@ def _maybe_build(self, inputs):
child._feature_dtypes = feature_dtypes
super()._maybe_build(inputs)

def compile(
self,
optimizer="rmsprop",
loss: Optional[Union[str, Loss, Dict[str, Union[str, Loss]]]] = None,
metrics: Optional[
Union[
MetricType,
Sequence[MetricType],
Sequence[Sequence[MetricType]],
Dict[str, MetricType],
Dict[str, Sequence[MetricType]],
]
] = None,
loss_weights=None,
weighted_metrics: Optional[
Union[
MetricType,
Sequence[MetricType],
Sequence[Sequence[MetricType]],
Dict[str, MetricType],
Dict[str, Sequence[MetricType]],
]
] = None,
run_eagerly=None,
steps_per_execution=None,
jit_compile=None,
**kwargs,
):
"""Configures the model for training.
Example:
```python
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
loss=tf.keras.losses.BinaryCrossentropy(),
metrics=[tf.keras.metrics.BinaryAccuracy(),
tf.keras.metrics.FalseNegatives()])
```
Args:
optimizer: String (name of optimizer) or optimizer instance. See
`tf.keras.optimizers`.
loss: Optional[Union[str, Loss, Dict[str, Union[str, Loss]]]] = None
losses : Optional[Union[str, Loss, Dict[str, Union[str, Loss]]]], optional
{LOSS_PARAMETERS_DOCSTRINGS}
See `tf.keras.losses`. A loss
function is any callable with the signature `loss = fn(y_true,
y_pred)`, where `y_true` are the ground truth values, and
`y_pred` are the model's predictions.
`y_true` should have shape
`(batch_size, d0, .. dN)` (except in the case of
sparse loss functions such as
sparse categorical crossentropy which expects integer arrays of shape
`(batch_size, d0, .. dN-1)`).
`y_pred` should have shape `(batch_size, d0, .. dN)`.
The loss function should return a float tensor.
If a custom `Loss` instance is
used and reduction is set to `None`, return value has shape
`(batch_size, d0, .. dN-1)` i.e. per-sample or per-timestep loss
values; otherwise, it is a scalar. If the model has multiple outputs,
you can use a different loss on each output by passing a dictionary
or a list of losses. The loss value that will be minimized by the
model will then be the sum of all individual losses, unless
`loss_weights` is specified.
loss_weights: Optional list or dictionary specifying scalar coefficients
(Python floats) to weight the loss contributions of different model
outputs. The loss value that will be minimized by the model will then
be the *weighted sum* of all individual losses, weighted by the
`loss_weights` coefficients.
If a list, it is expected to have a 1:1 mapping to the model's
outputs (Keras sorts tasks by name).
If a dict, it is expected to map output names (strings)
to scalar coefficients.
metrics: Optional[ Union[ MetricType, Sequence[MetricType],
Sequence[Sequence[MetricType]],
Dict[str, MetricType], Dict[str, Sequence[MetricType]], ] ], optional
{METRICS_PARAMETERS_DOCSTRING}
weighted_metrics: Optional[ Union[ MetricType, Sequence[MetricType],
Sequence[Sequence[MetricType]],
Dict[str, MetricType], Dict[str, Sequence[MetricType]], ] ], optional
List of metrics to be evaluated and weighted by
`sample_weight` or `class_weight` during training and testing.
{METRICS_PARAMETERS_DOCSTRING}
run_eagerly: Bool. Defaults to `False`. If `True`, this `Model`'s
logic will not be wrapped in a `tf.function`. Recommended to leave
this as `None` unless your `Model` cannot be run inside a
`tf.function`. `run_eagerly=True` is not supported when using
`tf.distribute.experimental.ParameterServerStrategy`.
steps_per_execution: Int. Defaults to 1. The number of batches to run
during each `tf.function` call. Running multiple batches inside a
single `tf.function` call can greatly improve performance on TPUs or
small models with a large Python overhead. At most, one full epoch
will be run each execution. If a number larger than the size of the
epoch is passed, the execution will be truncated to the size of the
epoch. Note that if `steps_per_execution` is set to `N`,
`Callback.on_batch_begin` and `Callback.on_batch_end` methods will
only be called every `N` batches (i.e. before/after each `tf.function`
execution).
jit_compile: If `True`, compile the model training step with XLA.
[XLA](https://www.tensorflow.org/xla) is an optimizing compiler for
machine learning.
`jit_compile` is not enabled for by default.
This option cannot be enabled with `run_eagerly=True`.
Note that `jit_compile=True` is
may not necessarily work for all models.
For more information on supported operations please refer to the
[XLA documentation](https://www.tensorflow.org/xla).
Also refer to
[known XLA issues](https://www.tensorflow.org/xla/known_issues) for
more details.
**kwargs: Arguments supported for backwards compatibility only.
"""

num_v1_blocks = len(self.prediction_tasks)
num_v2_blocks = len(self.model_outputs)

if num_v1_blocks > 1 and num_v2_blocks > 1:
raise ValueError(
"You cannot use both `prediction_tasks` and `prediction_blocks` at the same time.",
"`prediction_tasks` is deprecated and will be removed in a future version.",
)

if num_v1_blocks > 0:
self.output_names = [task.task_name for task in self.prediction_tasks]
else:
self.output_names = [block.full_name for block in self.model_outputs]

# This flag will make Keras change the metric-names which is not needed in v2
from_serialized = kwargs.pop("from_serialized", num_v2_blocks > 0)

if hvd_installed and hvd.size() > 1:
# Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
# uses hvd.DistributedOptimizer() to compute gradients.
kwargs.update({"experimental_run_tf_function": False})

super().compile(
optimizer=self._create_optimizer(optimizer),
loss=self._create_loss(loss),
metrics=self._create_metrics(metrics, weighted=False),
weighted_metrics=self._create_metrics(weighted_metrics, weighted=True),
run_eagerly=run_eagerly,
loss_weights=loss_weights,
steps_per_execution=steps_per_execution,
jit_compile=jit_compile,
from_serialized=from_serialized,
**kwargs,
)

def build(self, input_shape=None):
"""Builds the model
Expand Down

0 comments on commit 7e14b13

Please sign in to comment.