Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

benchmark server pipeline #1600

Merged
merged 12 commits into from
Mar 6, 2024
61 changes: 61 additions & 0 deletions src/deepsparse/benchmark/benchmark_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,67 @@ def _get_statistics(batch_times):
return sections, all_sections


def benchmark_from_pipeline(
horheynm marked this conversation as resolved.
Show resolved Hide resolved
pipeline: Pipeline,
batch_size: int = 1,
seconds_to_run: int = 10,
warmup_time: int = 2,
thread_pinning: str = "core",
scenario: str = "sync",
num_streams: int = 1,
data_type: str = "dummy",
**kwargs,
):
decide_thread_pinning(thread_pinning)
scenario = parse_scenario(scenario.lower())

input_type = data_type

config = PipelineBenchmarkConfig(
data_type=data_type,
**kwargs,
)
inputs = create_input_schema(pipeline, input_type, batch_size, config)

def _clear_measurements():
# Helper method to handle variations between v1 and v2 timers
if hasattr(pipeline.timer_manager, "clear"):
pipeline.timer_manager.clear()
else:
pipeline.timer_manager.measurements.clear()

if scenario == "singlestream":
singlestream_benchmark(pipeline, inputs, warmup_time)
_clear_measurements()
start_time = time.perf_counter()
singlestream_benchmark(pipeline, inputs, seconds_to_run)
elif scenario == "multistream":
multistream_benchmark(pipeline, inputs, warmup_time, num_streams)
_clear_measurements()
start_time = time.perf_counter()
multistream_benchmark(pipeline, inputs, seconds_to_run, num_streams)
elif scenario == "elastic":
multistream_benchmark(pipeline, inputs, warmup_time, num_streams)
_clear_measurements()
start_time = time.perf_counter()
multistream_benchmark(pipeline, inputs, seconds_to_run, num_streams)
else:
raise Exception(f"Unknown scenario '{scenario}'")

end_time = time.perf_counter()
total_run_time = end_time - start_time
if hasattr(pipeline.timer_manager, "all_times"):
batch_times = pipeline.timer_manager.all_times
else:
batch_times = pipeline.timer_manager.measurements
if len(batch_times) == 0:
raise Exception(
"Generated no batch timings, try extending benchmark time with '--time'"
)

return batch_times, total_run_time, num_streams


@click.command()
@click.argument("task_name", type=str)
@click.argument("model_path", type=str)
Expand Down
34 changes: 34 additions & 0 deletions src/deepsparse/benchmark/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,37 @@ class PipelineBenchmarkConfig(BaseModel):
default={},
description=("Additional arguments passed to input schema creations "),
)


class PipelineBenchmarkServerConfig(PipelineBenchmarkConfig):
batch_size: int = Field(
default=1,
description="The batch size of the inputs to be used with the engine",
)
seconds_to_run: int = Field(
default=10,
description="The number of seconds to run benchmark for",
)
warmup_time: int = Field(
default=2,
description="The length to run pipeline before beginning benchmark",
)
thread_pinning: str = Field(
default="core",
description="To enable binding threads to cores",
)
scenario: str = Field(
default="sync",
description=(
"`BenchmarkScenario` object with specification for running "
"benchmark on an onnx model"
),
)
num_streams: int = Field(
default=1,
description=(
" The max number of requests the model can handle "
"concurrently. None or 0 implies a scheduler-defined default value; "
"default None"
),
)
8 changes: 7 additions & 1 deletion src/deepsparse/middlewares/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ class MiddlewareManager:
:param _lock: lock for the state
"""

def __init__(self, middleware: Optional[Sequence[MiddlewareSpec]], *args, **kwargs):
def __init__(
self, middleware: Optional[Sequence[MiddlewareSpec]] = None, *args, **kwargs
):

self.middleware: Optional[
Sequence[MiddlewareSpec]
Expand Down Expand Up @@ -172,3 +174,7 @@ def _update_middleware_spec_send(
next_middleware.send = self.recieve

self.middleware.append(MiddlewareSpec(next_middleware, **init_args))

@property
def middlewares(self):
return [middleware.cls for middleware in self.middleware]
4 changes: 4 additions & 0 deletions src/deepsparse/pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ class PipelineConfig(BaseModel):
"with multiple models. Default is None"
),
)
middlewares: Optional[List[str]] = Field(
default=None,
description="Middlewares to use",
)
kwargs: Optional[Dict[str, Any]] = Field(
default={},
description=(
Expand Down
4 changes: 4 additions & 0 deletions src/deepsparse/server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ class EndpointConfig(BaseModel):
"```\n"
),
)
middlewares: Optional[List[str]] = Field(
default=None, description=("Middleware to use")
)

kwargs: Dict[str, Any] = Field(
default={}, description="Additional arguments to pass to the Pipeline"
Expand All @@ -147,6 +150,7 @@ def to_pipeline_config(self) -> PipelineConfig:
num_cores=None, # this will be set from Context
alias=self.name,
input_shapes=input_shapes,
middlewares=self.middlewares,
kwargs=kwargs,
)

Expand Down
69 changes: 69 additions & 0 deletions src/deepsparse/server/deepsparse_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from functools import partial

from deepsparse import Pipeline
from deepsparse.middlewares import MiddlewareManager, MiddlewareSpec, TimerMiddleware
from deepsparse.server.config import EndpointConfig
from deepsparse.server.server import CheckReady, ModelMetaData, ProxyPipeline, Server
from deepsparse.tasks import SupportedTasks
Expand Down Expand Up @@ -105,6 +106,11 @@ def _add_endpoint(
endpoint_config,
pipeline,
)
self._add_benchmark_endpoints(
app,
endpoint_config,
pipeline,
)
self._add_status_and_metadata_endpoints(app, endpoint_config, pipeline)

def _add_status_and_metadata_endpoints(
Expand Down Expand Up @@ -199,3 +205,66 @@ def _add_inference_endpoints(
methods=["POST"],
tags=["model", "inference"],
)

def _add_benchmark_endpoints(
self,
app: FastAPI,
endpoint_config: EndpointConfig,
pipeline: Pipeline,
):
if not hasattr(pipeline, "middleware_mamanger"):
pipeline.middleware_manager = MiddlewareManager()
if TimerMiddleware not in pipeline.middleware_manager.middlewares:
pipeline.middleware_manager.add_middleware(
[MiddlewareSpec(TimerMiddleware)]
)

routes_and_fns = []
if endpoint_config.route:
endpoint_config.route = self.clean_up_route(endpoint_config.route)
route = f"/v2/models{endpoint_config.route}/benchmark"
else:
route = f"/v2/models/{endpoint_config.name}/benchmark"

routes_and_fns.append(
(
route,
partial(
Server.benchmark,
ProxyPipeline(pipeline),
self.server_config.system_logging,
),
)
)

legacy_pipeline = not isinstance(pipeline, Pipeline) and hasattr(
pipeline.input_schema, "from_files"
)
# New pipelines do not have to have an input_schema. Just checking task
# names for now but can keep a list of supported from_files tasks in
# SupportedTasks as more pipelines are migrated as well as output schemas.
new_pipeline = SupportedTasks.is_image_classification(endpoint_config.task)

if legacy_pipeline or new_pipeline:
routes_and_fns.append(
(
route + "/from_files",
partial(
Server.predict_from_files,
ProxyPipeline(pipeline),
self.server_config.system_logging,
),
)
)
if isinstance(pipeline, Pipeline):
response_model = None
else:
response_model = pipeline.output_schema

self._update_routes(
app=app,
routes_and_fns=routes_and_fns,
response_model=response_model,
methods=["POST"],
tags=["model", "pipeline"],
)
16 changes: 16 additions & 0 deletions src/deepsparse/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from pydantic import BaseModel

import uvicorn
from deepsparse.benchmark.benchmark_pipeline import benchmark_from_pipeline
from deepsparse.benchmark.config import PipelineBenchmarkConfig
from deepsparse.engine import Context
from deepsparse.pipeline import Pipeline
from deepsparse.server.config import ServerConfig, SystemLoggingConfig
Expand Down Expand Up @@ -268,6 +270,20 @@ async def format_response():

return prep_for_serialization(pipeline_outputs)

@staticmethod
async def benchmark(
proxy_pipeline: ProxyPipeline,
system_logging_config: SystemLoggingConfig,
raw_request: Request,
):
json_params = await raw_request.json()
benchmark_config = PipelineBenchmarkConfig(**json_params)
results = benchmark_from_pipeline(
pipeline=proxy_pipeline.pipeline, **benchmark_config.dict()
)

return results

@staticmethod
async def predict_from_files(
proxy_pipeline: ProxyPipeline,
Expand Down
Loading