Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mlserver example #1110

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions examples/mlserver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# **Step 1: Installation**

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

best to add an intro paragraph to give users a heads up of what this example does.

Install DeepSparse and MLServer.

```bash
pip install -r requirements.txt
```

# **Step 2: Write Custom Runtime**

We need to write a [Custom Inference Runtime](https://mlserver.readthedocs.io/en/stable/user-guide/custom.html) to use DeepSparse within MLServer.

### Implement `load()` and `predict()`

First, we implement the `load()` and `predict()` methods in `models/text-classification-model/models.py`. Note that your implementation of the of `load()` and `predict()` will vary by the task that you choose.

Here's an example for text classification:
```python
from mlserver import MLModel
from mlserver.codecs import decode_args
from typing import List
from deepsparse import Pipeline

class DeepSparseRuntime(MLModel):
async def load(self) -> bool:
# compiles the pipeline
self._pipeline = Pipeline.create(
task = self._settings.parameters.task, # from model-settings.json
model_path = self._settings.parameters.model_path, # from model-settings.json
batch_size = self._settings.parameters.batch_size, # from model-settings.json
sequence_length = self._settings.parameters.sequence_length, # from model-settings.json
)
return True

@decode_args
async def predict(self, sequences: List[str]) -> List[str]:
# runs the inference
prediction = self._pipeline(sequences=sequences)
return prediction.labels
```

### Create `model-settings.json`

Second, we create a config at `models/text-classification-model/model-settings.json`. In this file, we will specify the location of the implementation of the custom runtime as well as the
paramters of the deepsparse inference session.

```json
{
"name": "text-classification-model",
"implementation": "models.DeepSparseRuntime",
"parameters": {
"task": "text-classification",
"model_path": "zoo:nlp/sentiment_analysis/obert-base/pytorch/huggingface/sst2/pruned90_quant-none",
"batch_size": 1,
"sequence_length": 128
}
}
```

# **Step 3: Launch MLServer**

Launch the server with the CLI:

```bash
mlserver start ./models/text-classification-model/
```

# **Step 4: Send Inference Requests**

Now, an inference endpoint is exposed at `http://localhost:8080/v2/models/text-classification-model/infer`. `client.py` is a sample script for requesting the endpoint.

Run the following:
```python
python3 client.py
```
27 changes: 27 additions & 0 deletions examples/mlserver/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import requests, threading
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would suggest a few in line comments for self-documentation


NUM_THREADS = 2
URL = "http://localhost:8080/v2/models/text-classification-model/infer"
sentences = ["I hate using GPUs for inference", "I love using DeepSparse on CPUs"] * 100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should * 100 be * NUM_THREADS if we are taking only sentences[:NUM_THREADS] elements?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rsnm2 see suggestion below


def tfunc(text):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would rename to something more descriptive like inference_request

inference_request = {
"inputs": [
{
"name": "sequences",
"shape": [1],
"datatype": "BYTES",
"data": [text],
},
]
}
resp = requests.post(URL, json=inference_request).json()
for output in resp["outputs"]:
print(output["data"])
Comment on lines +19 to +20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executing a list printout while multithreaded may cause a race condition, any reason to not return the value and print in sequence at the end? (ie consider thread 1 and thread 2 happen to execute exactly at the same time, they will print their lines at the same time and might not tell which is which)



threads = [threading.Thread(target=tfunc, args=(sentence,)) for sentence in sentences[:NUM_THREADS]]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
Comment on lines +23 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like this creates NUM_THREADS threads to make the request, is that intended?
Might make more sense to create len(sentences) threads and execute NUM_THREADS at a time.

You can do this out of the box with ThreadPoolExecutor with something like:

Suggested change
threads = [threading.Thread(target=tfunc, args=(sentence,)) for sentence in sentences[:NUM_THREADS]]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
from concurrent.futures.thread import ThreadPoolExecutor
threadpool = ThreadPoolExecutor(max_workers=NUM_THREADS)
results = threadpool.map(tfunc, sentences)

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"name": "text-classification-model",
"implementation": "models.DeepSparseRuntime",
"parameters": {
"task": "text-classification",
"model_path": "zoo:nlp/sentiment_analysis/obert-base/pytorch/huggingface/sst2/pruned90_quant-none",
"batch_size": 1,
"sequence_length": 128
}
}
19 changes: 19 additions & 0 deletions examples/mlserver/models/text-classification-model/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from mlserver import MLModel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is great, love that it works out of the box - let's throw in the serving command as a comment just for convenience

from mlserver.codecs import decode_args
from typing import List
from deepsparse import Pipeline

class DeepSparseRuntime(MLModel):
async def load(self) -> bool:
self._pipeline = Pipeline.create(
task = self._settings.parameters.task,
model_path = self._settings.parameters.model_path,
batch_size = self._settings.parameters.batch_size,
sequence_length = self._settings.parameters.sequence_length,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a place for generic kwargs in the settings? Would be cool if we could use that instead to dump extra pipeline args so we can get full generic pipeline support out of the box

)
return True

@decode_args
async def predict(self, sequences: List[str]) -> List[str]:
prediction = self._pipeline(sequences=sequences)
return prediction.labels
2 changes: 2 additions & 0 deletions examples/mlserver/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mlserver
deepsparse[transformers]