Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add inferRequest capability to python clinet #269

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions src/python/library/tritonclient/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2248,3 +2248,175 @@ def get_response(self):
The underlying response dict.
"""
return self._result


class InferRequest:
"""An object of InferRequest class holds the request body and
provide methods to retrieve request values from an encoded
request body.

Parameters
----------
request : The inference request body sent to the server
verbose : bool
If True generate verbose output. Default value is False.
"""

def __init__(self, request, verbose):
header_length = request.get("Inference-Header-Content-Length")

# Internal class that simulate the interface of 'response'
class DecompressedResponse:
def __init__(self, decompressed_data):
self.decompressed_data_ = decompressed_data
self.offset_ = 0

def read(self, length=-1):
if length == -1:
return self.decompressed_data_[self.offset_:]
else:
prev_offset = self.offset_
self.offset_ += length
return self.decompressed_data_[prev_offset: self.offset_]

content_encoding = request.get("Content-Encoding")
if content_encoding is not None:
if content_encoding == "gzip":
request = DecompressedResponse(gzip.decompress(request.read()))
elif content_encoding == "deflate":
request = DecompressedResponse(zlib.decompress(request.read()))
if header_length is None:
content = request.read()
if verbose:
print(content)
try:
self._content = json.loads(content)
except UnicodeDecodeError as e:
raise_error(
f"Failed to encode using UTF-8. Please use binary_data=True, if"
f" you want to pass a byte array. UnicodeError: {e}"
)
else:
header_length = int(header_length)
content = request.read(length=header_length)
if verbose:
print(content)
self._content = json.loads(content)

# Maps the output name to the index in buffer for quick retrieval
self._input_name_to_buffer_map = {}
# Read the remaining data off the response body.
self._buffer = request.read()
buffer_index = 0
for input in self._content["inputs"]:
parameters = input.get("parameters")
if parameters is not None:
this_data_size = parameters.get("binary_data_size")
if this_data_size is not None:
self._input_name_to_buffer_map[input["name"]] = buffer_index
buffer_index = buffer_index + this_data_size

@classmethod
def from_request_body(
cls,
request_body,
verbose=False,
header_length=None,
content_encoding=None,
):

# Internal class that simulate the interface of 'response'
class Request:
def __init__(
self,
request_body,
header_length,
content_encoding,
):
self.request_body_ = request_body
self.offset_ = 0
self.parameters_ = {
"Inference-Header-Content-Length": header_length,
"Content-Encoding": content_encoding,
}

def get(self, key):
return self.parameters_.get(key)

def read(self, length=-1):
if length == -1:
return self.request_body_[self.offset_:]
else:
prev_offset = self.offset_
self.offset_ += length
return self.request_body_[prev_offset: self.offset_]

return cls(
Request(request_body, header_length, content_encoding),
verbose,
)

def as_numpy(self, name):
"""Get the tensor data for input associated with this object
in numpy format

Parameters
----------
name : str
The name of the input tensor whose result is to be retrieved.

Returns
-------
numpy array
The numpy array containing the request data for the tensor or
None if the data for specified tensor name is not found.
"""
if self._content.get("inputs") is not None:
for input in self._content["inputs"]:
if input["name"] == name:
datatype = input["datatype"]
has_binary_data = False
parameters = input.get("parameters")
if parameters is not None:
this_data_size = parameters.get("binary_data_size")
if this_data_size is not None:
has_binary_data = True
if this_data_size != 0:
start_index = self._input_name_to_buffer_map[name]
end_index = start_index + this_data_size
if datatype == "BYTES":
# String results contain a 4-byte string length
# followed by the actual string characters. Hence,
# need to decode the raw bytes to convert into
# array elements.
np_array = deserialize_bytes_tensor(
self._buffer[start_index:end_index]
)
elif datatype == "BF16":
np_array = deserialize_bf16_tensor(
self._buffer[start_index:end_index]
)
else:
np_array = np.frombuffer(
self._buffer[start_index:end_index],
dtype=triton_to_np_dtype(datatype),
)
else:
np_array = np.empty(0)
if not has_binary_data:
np_array = np.array(
input["data"], dtype=triton_to_np_dtype(datatype)
)
np_array = np_array.reshape(input["shape"])
return np_array
return None

def get_input(self, name):
for input in self._content["inputs"]:
if input["name"] == name:
return input

return None

def get_request(self):
return self._content