Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC of lowering compilation time using Python threading #3

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# limitations under the License.

import multiprocessing
import concurrent.futures
from typing import Any, List, Optional

import numpy as np
Expand All @@ -33,10 +34,10 @@
from nncf.quantization.algorithms.accuracy_control.backend import AsyncPreparedModel


def compile_model(model: ov.Model, done_queue: multiprocessing.Queue) -> None:
compiled_model = ov.Core().compile_model(model, "CPU")
model_stream = compiled_model.export_model()
done_queue.put(model_stream)
def compile_model(model: ov.Model) -> None:
ov_core = ov.Core()
compiled_model = ov_core.compile_model(model, "CPU")
return compiled_model


class OVAsyncPreparedModel(AsyncPreparedModel):
Expand Down Expand Up @@ -117,8 +118,7 @@ def prepare_for_inference(model: ov.Model) -> Any:
return ov.compile_model(model)

@staticmethod
def prepare_for_inference_async(model: ov.Model) -> Any:
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=compile_model, args=(model, queue))
p.start()
return OVAsyncPreparedModel(p, queue)
def prepare_for_inference_async(models: ov.Model, max_workers: int=20) -> Any:
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
results = [i for i in executor.map(compile_model, models)]
return results
27 changes: 26 additions & 1 deletion nncf/quantization/algorithms/accuracy_control/ranker.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def rank_groups_of_quantizers(
with timer():
# Calculate ranking score for groups of quantizers.
if self._num_processes > 1:
ranking_scores = self._multiprocessing_calculation_ranking_score(
ranking_scores = self._multithreading_calculation_ranking_score(
quantized_model,
quantized_model_graph,
groups_to_rank,
Expand Down Expand Up @@ -228,6 +228,31 @@ def _multiprocessing_calculation_ranking_score(

return ranking_scores

def _multithreading_calculation_ranking_score(
self,
quantized_model: TModel,
quantized_model_graph: NNCFGraph,
groups_to_rank: List[GroupToRank],
ranking_subset_indices: List[int],
):

ranking_scores = [] # ranking_scores[i] is the ranking score for groups_to_rank[i]
modified_models = []
for current_group in groups_to_rank:
modified_model = revert_operations_to_floating_point_precision(
current_group.operations, current_group.quantizers, quantized_model, quantized_model_graph
)

modified_models.append(modified_model)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general comment is that the proposed solution is not memory optimal. What I mean is that this solution requires storing as many copies of the model in memory as there are groups in groups_to_rank. For some models, the number of groups for ranking is in the hundreds. Probably, algorithm can be crashed by memory for huge models.


results = self._algo_backend.prepare_for_inference_async(modified_models)

for model in results:
ranking_score = self._calculate_ranking_score(model, ranking_subset_indices)
ranking_scores.append(float(ranking_score))

return ranking_scores

def _calculate_ranking_score(
self,
prepared_model: TPModel,
Expand Down