Skip to content

Commit

Permalink
Add additional methods to Pipeline class
Browse files Browse the repository at this point in the history
  • Loading branch information
andrey-churkin committed Sep 21, 2023
1 parent 96e8d76 commit d3d90a1
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 103 deletions.
27 changes: 10 additions & 17 deletions nncf/quantization/pipelines/hyperparameter_tuner/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
from nncf.quantization.algorithms.accuracy_control.subset_selection import select_subset
from nncf.quantization.pipelines.pipeline import Pipeline
from nncf.quantization.pipelines.pipeline import collect_statistics
from nncf.quantization.pipelines.pipeline import get_statistic_points
from nncf.quantization.pipelines.pipeline import run_pipeline_from_step
from nncf.quantization.pipelines.pipeline import run_pipeline_step

TModel = TypeVar("TModel")
TTensor = TypeVar("TTensor")
Expand Down Expand Up @@ -290,10 +287,10 @@ def run(self, model: TModel, dataset: Dataset) -> TModel:
if not step_param_grid:
# TODO(andrey-churkin): Think about how it can be avoided.
params = apply_combination(self._init_params, best_settings)
pipeline_step = self._pipeline_cls(**params).pipeline_steps[step_index]
container = get_statistic_points(pipeline_step, step_model, step_graph)
pipeline = self._pipeline_cls(**params)
container = pipeline.get_statistic_points_for_step(step_index, step_model, step_graph)
step_statistics = collect_statistics(container, step_model, step_graph, self._calibration_dataset)
step_model = run_pipeline_step(pipeline_step, step_statistics, step_model, step_graph)
step_model = pipeline.run_step(step_index, step_statistics, step_model, step_graph)
continue

step_combinations = create_combinations(step_param_grid)
Expand All @@ -318,9 +315,9 @@ def run(self, model: TModel, dataset: Dataset) -> TModel:
)

best_settings.update(step_combinations[step_best_combination_key])
pipeline_step = self._pipelines[step_best_combination_key].pipeline_steps[step_index]
step_model = run_pipeline_step(
pipeline_step, self._step_index_to_statistics[step_index], step_model, step_graph
pipeline = self._pipelines[step_best_combination_key]
step_model = pipeline.run_step(
step_index, self._step_index_to_statistics[step_index], step_model, step_graph
)

# TODO(andrey-churkin): Show final best settings
Expand Down Expand Up @@ -357,7 +354,7 @@ def _prepare_pipeline_step(

# Collect statistics required to execute `step_index`-th pipeline step
containers = [
get_statistic_points(pipeline.pipeline_steps[step_index], step_model, step_graph)
pipeline.get_statistic_points_for_step(step_index, step_model, step_graph)
for pipeline in self._pipelines.values()
]
self._step_index_to_statistics[step_index] = collect_statistics(
Expand Down Expand Up @@ -386,13 +383,9 @@ def _calculate_combination_score(
if combination_key in self._calculated_scores:
return self._calculated_scores[combination_key]

model = run_pipeline_from_step(
self._pipelines[combination_key],
step_model,
self._calibration_dataset,
step_graph,
step_index,
self._step_index_to_statistics,
pipeline = self._pipelines[combination_key]
model = pipeline.run_from_step(
step_model, self._calibration_dataset, step_graph, step_index, self._step_index_to_statistics
)

score = self._validate_model(model, dataset, subset_indices)
Expand Down
172 changes: 86 additions & 86 deletions nncf/quantization/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,6 @@
PipelineStep = List[Algorithm]


def get_statistic_points(pipeline_step: PipelineStep, model: TModel, graph: NNCFGraph) -> StatisticPointsContainer:
"""
TODO
:param pipeline_step:
:param model:
:param graph:
:return:
"""
container = StatisticPointsContainer()
for algorithm in pipeline_step:
for statistic_points in algorithm.get_statistic_points(model, graph).values():
for statistic_point in statistic_points:
container.add_statistic_point(statistic_point)

return container


def collect_statistics(
containers: Union[StatisticPointsContainer, List[StatisticPointsContainer]],
model: TModel,
Expand Down Expand Up @@ -105,81 +87,99 @@ def run(self, model: TModel, dataset: Dataset) -> TModel:
:param dataset: A dataset that holds the data items for algorithms.
:return: The updated model after executing the entire pipeline.
"""
return run_pipeline_from_step(self, model, dataset)


def run_pipeline_step(
pipeline_step: PipelineStep,
pipeline_step_statistics: StatisticPointsContainer,
model: TModel,
graph: NNCFGraph,
) -> TModel:
"""
Executes a provided pipeline step on the provided model.
return self.run_from_step(model, dataset)

def run_step(
self,
step_index: int,
step_statistics: StatisticPointsContainer,
model: TModel,
graph: NNCFGraph,
) -> TModel:
"""
Executes a provided pipeline step on the provided model.
:param pipeline_step: A sequence of algorithms representing a pipeline step.
:param pipeline_step_statistics: Statistics required to execute a pipeline step.
:param model: A model to which a pipeline step will be applied.
:param graph: A graph assosiated with a model.
:return: The updated model after executing the pipeline step.
"""
current_model = model
current_graph = graph
:param pipeline_step: A sequence of algorithms representing a pipeline step.
:param pipeline_step_statistics: Statistics required to execute a pipeline step.
:param model: A model to which a pipeline step will be applied.
:param graph: A graph assosiated with a model.
:return: The updated model after executing the pipeline step.
"""
current_model = model
current_graph = graph

pipeline_step = self.pipeline_steps[step_index]
for algorithm in pipeline_step[:-1]:
current_model = algorithm.apply(current_model, current_graph, step_statistics)
current_graph = NNCFGraphFactory.create(current_model)
current_model = pipeline_step[-1].apply(current_model, current_graph, step_statistics)

return current_model

def run_from_step(
self,
model: TModel,
dataset: Dataset,
graph: Optional[NNCFGraph] = None,
start_step_index: int = 0,
step_index_to_statistics: Optional[Dict[int, StatisticPointsContainer]] = None,
) -> TModel:
"""
Executes the pipeline from the specified pipeline step to the end.
:param model: This is the model after the (start_step_index - 1)-th pipeline
step, or the initial model if start_step_index is 0.
:param dataset: A dataset that holds the data items for pipeline steps.
:param graph: A graph assosiated with a model.
:param start_step_index: Zero-based pipeline step index from which the pipeline
should be executed.
:param step_index_to_statistics: A mapping from pipeline step index to statistics
required to execute pipeline step.
:return: The updated model after executing the pipeline from the specified pipeline
step to the end.
"""
if step_index_to_statistics is None:
step_index_to_statistics = {}

for algorithm in pipeline_step[:-1]:
current_model = algorithm.apply(current_model, current_graph, pipeline_step_statistics)
current_graph = NNCFGraphFactory.create(current_model)
current_model = pipeline_step[-1].apply(current_model, current_graph, pipeline_step_statistics)
# The `step_model` and `step_graph` entities are required to execute `step_index`-th pipeline step
step_model = model
step_graph = graph
step_index = start_step_index

return current_model
for step_index in range(start_step_index, len(self.pipeline_steps)):
# Create graph required to run current pipeline step
if step_graph is None:
step_graph = NNCFGraphFactory.create(step_model)

# Collect statistics required to run current pipeline step
step_statistics = step_index_to_statistics.get(step_index)
if step_statistics is None:
statistic_points = self.get_statistic_points_for_step(step_index, step_model, step_graph)
step_statistics = collect_statistics(statistic_points, step_model, step_graph, dataset)

def run_pipeline_from_step(
pipeline: Pipeline,
model: TModel,
dataset: Dataset,
graph: Optional[NNCFGraph] = None,
start_step_index: int = 0,
step_index_to_statistics: Optional[Dict[int, StatisticPointsContainer]] = None,
) -> TModel:
"""
Execute the pipeline from the specified pipeline step to the end.
:param pipeline: A pipeline part of which should be executed.
:param model: This is the model after the (start_step_index - 1)-th pipeline
step, or the initial model if start_step_index is 0.
:param dataset: A dataset that holds the data items for pipeline steps.
:param graph: A graph assosiated with a model.
:param start_step_index: Zero-based pipeline step index from which the pipeline
should be executed.
:param step_index_to_statistics: A mapping from pipeline step index to statistics
required to execute pipeline step.
:return: The updated model after executing the pipeline from the specified pipeline
step to the end.
"""
if step_index_to_statistics is None:
step_index_to_statistics = {}
# Run current pipeline step
step_model = self.run_step(step_index, step_statistics, step_model, step_graph)

# The `step_model` and `step_graph` entities are required to execute `step_index`-th pipeline step
step_model = model
step_graph = graph
step_index = start_step_index
step_graph = None # We should rebuild the graph for the next pipeline step
step_index += 1

for pipeline_step in pipeline.pipeline_steps[start_step_index:]:
# Create graph required to run current pipeline step
if step_graph is None:
step_graph = NNCFGraphFactory.create(step_model)
return step_model

# Collect statistics required to run current pipeline step
step_statistics = step_index_to_statistics.get(step_index)
if step_statistics is None:
statistic_points = get_statistic_points(pipeline_step, step_model, step_graph)
step_statistics = collect_statistics(statistic_points, step_model, step_graph, dataset)

# Run current pipeline step
step_model = run_pipeline_step(pipeline_step, step_statistics, step_model, step_graph)
def get_statistic_points_for_step(
self, step_index: int, model: TModel, graph: NNCFGraph
) -> StatisticPointsContainer:
"""
TODO
step_graph = None # We should rebuild the graph for the next pipeline step
step_index += 1
:param pipeline_step:
:param model:
:param graph:
:return:
"""
container = StatisticPointsContainer()
for algorithm in self.pipeline_steps[step_index]:
for statistic_points in algorithm.get_statistic_points(model, graph).values():
for statistic_point in statistic_points:
container.add_statistic_point(statistic_point)

return step_model
return container

0 comments on commit d3d90a1

Please sign in to comment.