Skip to content

Commit

Permalink
Merge branch 'main' into torchscript
Browse files Browse the repository at this point in the history
  • Loading branch information
horheynm authored Jun 26, 2023
2 parents 48867dd + 79c4ada commit 580434d
Show file tree
Hide file tree
Showing 11 changed files with 581 additions and 373 deletions.
18 changes: 11 additions & 7 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,17 @@ def _check_supported_system():
)

if sys.platform.startswith("darwin"):
# mac is not supported, raise error on install
raise OSError(
"Native Mac is currently unsupported for DeepSparse. "
"Please run on a Linux system or within a Linux container on Mac. "
"More info can be found in our docs here: "
"https://docs.neuralmagic.com/deepsparse/source/hardware.html"
)
if os.getenv("NM_ALLOW_DARWIN", "0") != "0":
# experimental support for mac, allow install to go through
return
else:
# mac is not supported, raise error on install
raise OSError(
"Native Mac is currently unsupported for DeepSparse. "
"Please run on a Linux system or within a Linux container on Mac. "
"More info can be found in our docs here: "
"https://docs.neuralmagic.com/deepsparse/source/hardware.html"
)

# unknown system, raise error on install
raise OSError(
Expand Down
1 change: 0 additions & 1 deletion src/deepsparse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
)
from .engine import *
from .tasks import *
from .timing import *
from .pipeline import *
from .loggers import *
from .version import __version__, is_release
Expand Down
219 changes: 119 additions & 100 deletions src/deepsparse/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
validate_identifier,
)
from deepsparse.tasks import SupportedTasks, dynamic_import_task
from deepsparse.timing import InferencePhases, Timer
from deepsparse.utils import InferenceStages, StagedTimer, TimerManager


__all__ = [
Expand Down Expand Up @@ -157,13 +157,16 @@ def __init__(
context: Optional[Context] = None,
executor: Optional[Union[ThreadPoolExecutor, int]] = None,
logger: Optional[Union[BaseLogger, str]] = None,
benchmark: bool = False,
_delay_engine_initialize: bool = False, # internal use only
):
self._benchmark = benchmark
self._model_path_orig = model_path
self._model_path = model_path
self._engine_type = engine_type
self._batch_size = batch_size
self._alias = alias
self._timer_manager = TimerManager(enabled=True, multi=benchmark)
self.context = context
self.logger = (
logger
Expand Down Expand Up @@ -215,111 +218,89 @@ def __init__(
)

def __call__(self, *args, **kwargs) -> BaseModel:
if "engine_inputs" in kwargs:
raise ValueError(
"invalid kwarg engine_inputs. engine inputs determined "
f"by {self.__class__.__qualname__}.parse_inputs"
)
timer = Timer()

timer.start(InferencePhases.TOTAL_INFERENCE)

# ------ PREPROCESSING ------
timer.start(InferencePhases.PRE_PROCESS)
# parse inputs into input_schema
pipeline_inputs = self.parse_inputs(*args, **kwargs)

self.log(
identifier="pipeline_inputs",
value=pipeline_inputs,
category=MetricCategories.DATA,
)
with self.timer_manager.new_timer_context() as timer:
if "engine_inputs" in kwargs:
raise ValueError(
"invalid kwarg engine_inputs. engine inputs determined "
f"by {self.__class__.__qualname__}.parse_inputs"
)

if not isinstance(pipeline_inputs, self.input_schema):
raise RuntimeError(
f"Unable to parse {self.__class__} inputs into a "
f"{self.input_schema} object. Inputs parsed to {type(pipeline_inputs)}"
# ------ PREPROCESSING ------
timer.start(InferenceStages.PRE_PROCESS)
# parse inputs into input_schema
pipeline_inputs = self.parse_inputs(*args, **kwargs)
self.log(
identifier="pipeline_inputs",
value=pipeline_inputs,
category=MetricCategories.DATA,
)
# batch size of the inputs may be `> self._batch_size` at this point
engine_inputs: List[numpy.ndarray] = self.process_inputs(pipeline_inputs)
if isinstance(engine_inputs, tuple):
engine_inputs, postprocess_kwargs = engine_inputs
else:
postprocess_kwargs = {}
timer.stop(InferencePhases.PRE_PROCESS)

self.log(
identifier="engine_inputs",
value=engine_inputs,
category=MetricCategories.DATA,
)
self.log(
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.PRE_PROCESS}_seconds", # noqa E501
value=timer.time_delta(InferencePhases.PRE_PROCESS),
category=MetricCategories.SYSTEM,
)

# ------ INFERENCE ------
# split inputs into batches of size `self._batch_size`
timer.start(InferencePhases.ENGINE_FORWARD)
batches = self.split_engine_inputs(engine_inputs, self._batch_size)

# submit split batches to engine threadpool
batch_outputs = list(self.executor.map(self.engine_forward, batches))

# join together the batches of size `self._batch_size`
engine_outputs = self.join_engine_outputs(batch_outputs)
timer.stop(InferencePhases.ENGINE_FORWARD)

self.log(
identifier=f"{SystemGroups.INFERENCE_DETAILS}/input_batch_size_total",
# to get the batch size of the inputs, we need to look
# to multiply the engine batch size (self._batch_size)
# by the number of batches processed by the engine during
# a single inference call
value=len(batch_outputs) * self._batch_size,
category=MetricCategories.SYSTEM,
)
if not isinstance(pipeline_inputs, self.input_schema):
raise RuntimeError(
f"Unable to parse {self.__class__} inputs into a "
f"{self.input_schema} object. "
f"Inputs parsed to {type(pipeline_inputs)}"
)
# batch size of the inputs may be `> self._batch_size` at this point
engine_inputs: List[numpy.ndarray] = self.process_inputs(pipeline_inputs)
if isinstance(engine_inputs, tuple):
engine_inputs, postprocess_kwargs = engine_inputs
else:
postprocess_kwargs = {}

timer.stop(InferenceStages.PRE_PROCESS)
self.log(
identifier="engine_inputs",
value=engine_inputs,
category=MetricCategories.DATA,
)

self.log(
identifier="engine_outputs",
value=engine_outputs,
category=MetricCategories.DATA,
)
self.log(
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.ENGINE_FORWARD}_seconds", # noqa E501
value=timer.time_delta(InferencePhases.ENGINE_FORWARD),
category=MetricCategories.SYSTEM,
)
# ------ INFERENCE ------
# split inputs into batches of size `self._batch_size`
timer.start(InferenceStages.ENGINE_FORWARD)
batches = self.split_engine_inputs(engine_inputs, self._batch_size)

# submit split batches to engine threadpool
batch_outputs = list(self.executor.map(self.engine_forward, batches))

# join together the batches of size `self._batch_size`
engine_outputs = self.join_engine_outputs(batch_outputs)
timer.stop(InferenceStages.ENGINE_FORWARD)

self.log(
identifier=f"{SystemGroups.INFERENCE_DETAILS}/input_batch_size_total",
# to get the batch size of the inputs, we need to look
# to multiply the engine batch size (self._batch_size)
# by the number of batches processed by the engine during
# a single inference call
value=len(batch_outputs) * self._batch_size,
category=MetricCategories.SYSTEM,
)
self.log(
identifier="engine_outputs",
value=engine_outputs,
category=MetricCategories.DATA,
)

# ------ POSTPROCESSING ------
timer.start(InferencePhases.POST_PROCESS)
pipeline_outputs = self.process_engine_outputs(
engine_outputs, **postprocess_kwargs
)
if not isinstance(pipeline_outputs, self.output_schema):
raise ValueError(
f"Outputs of {self.__class__} must be instances of "
f"{self.output_schema} found output of type {type(pipeline_outputs)}"
# ------ POSTPROCESSING ------
timer.start(InferenceStages.POST_PROCESS)
pipeline_outputs = self.process_engine_outputs(
engine_outputs, **postprocess_kwargs
)
if not isinstance(pipeline_outputs, self.output_schema):
raise ValueError(
f"Outputs of {self.__class__} must be instances of "
f"{self.output_schema} found output of type "
f"{type(pipeline_outputs)}"
)
timer.stop(InferenceStages.POST_PROCESS)
self.log(
identifier="pipeline_outputs",
value=pipeline_outputs,
category=MetricCategories.DATA,
)
timer.stop(InferencePhases.POST_PROCESS)
timer.stop(InferencePhases.TOTAL_INFERENCE)

self.log(
identifier="pipeline_outputs",
value=pipeline_outputs,
category=MetricCategories.DATA,
)
self.log(
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.POST_PROCESS}_seconds", # noqa E501
value=timer.time_delta(InferencePhases.POST_PROCESS),
category=MetricCategories.SYSTEM,
)
self.log(
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.TOTAL_INFERENCE}_seconds", # noqa E501
value=timer.time_delta(InferencePhases.TOTAL_INFERENCE),
category=MetricCategories.SYSTEM,
)
self.log_inference_times(timer)

return pipeline_outputs

Expand Down Expand Up @@ -706,6 +687,31 @@ def engine_type(self) -> str:
"""
return self._engine_type

@property
def timer_manager(self) -> TimerManager:
return self._timer_manager

@property
def current_timer(self) -> Optional[StagedTimer]:
"""
:return: current timer for the pipeline, if any
"""
timer = self.timer_manager.current

if timer is None:
timer = self.timer_manager.latest

return timer

@property
def benchmark(self) -> bool:
return self._benchmark

@benchmark.setter
def benchmark(self, value: bool):
self._benchmark = value
self.timer_manager.multi = value

def to_config(self) -> "PipelineConfig":
"""
:return: PipelineConfig that can be used to reload this object
Expand Down Expand Up @@ -741,7 +747,7 @@ def log(
self,
identifier: str,
value: Any,
category: str,
category: Union[str, MetricCategories],
):
"""
Pass the logged data to the DeepSparse logger object (if present).
Expand Down Expand Up @@ -793,6 +799,19 @@ def engine_forward(self, engine_inputs: List[numpy.ndarray]) -> List[numpy.ndarr
"""
return self.engine(engine_inputs)

def log_inference_times(self, timer: StagedTimer):
"""
logs stage times in the given timer
:param timer: timer to log
"""
for stage, time in timer.times.items():
self.log(
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{stage}_seconds",
value=time,
category=MetricCategories.SYSTEM,
)

def _initialize_engine(self) -> Union[Engine, ORTEngine]:
engine_type = self.engine_type.lower()

Expand Down
18 changes: 0 additions & 18 deletions src/deepsparse/timing/__init__.py

This file was deleted.

26 changes: 0 additions & 26 deletions src/deepsparse/timing/inference_phases.py

This file was deleted.

Loading

0 comments on commit 580434d

Please sign in to comment.