Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: print lines debug #6182

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions jina/clients/base/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from jina.types.request import Request
from jina.types.request.data import DataRequest
from jina.types.request.status import StatusMessage

import timeit
if TYPE_CHECKING: # pragma: no cover
from opentelemetry import trace

Expand Down Expand Up @@ -122,8 +122,11 @@ async def __aenter__(self):

:return: start self
"""
return await self.start()

_start = timeit.default_timer()
res = await self.start()
_end = timeit.default_timer()
print(f'ASYNC ENTER {_end - _start}s')
return res
async def start(self):
"""Create ClientSession and enter context

Expand All @@ -139,7 +142,11 @@ async def start(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close(exc_type, exc_val, exc_tb)
_start = timeit.default_timer()
res = await self.close(exc_type, exc_val, exc_tb)
_end = timeit.default_timer()
print(f'ASYNC EXIT {_end - _start}s')
return res

async def close(self, *args, **kwargs):
"""Close ClientSession
Expand All @@ -160,25 +167,32 @@ async def send_message(self, request: 'Request'):
:param request: request as dict
:return: send post message
"""
_start = timeit.default_timer()
print(f'{_start} => HTTPClient send message lets start')
req_dict = request.to_dict()
req_dict['exec_endpoint'] = req_dict['header']['exec_endpoint']
if 'target_executor' in req_dict['header']:
req_dict['target_executor'] = req_dict['header']['target_executor']
for attempt in range(1, self.max_attempts + 1):
try:
_start_req = timeit.default_timer()
request_kwargs = {'url': self.url}
if not docarray_v2:
request_kwargs['json'] = req_dict
else:
from docarray.base_doc.io.json import orjson_dumps

request_kwargs['data'] = JinaJsonPayload(value=req_dict)
_end_req = timeit.default_timer()
print(f'{_end_req} => HTTPClient prepare request took {_end_req - _start_req}s')
response = await self.session.post(**request_kwargs).__aenter__()
try:
r_str = await response.json()
except aiohttp.ContentTypeError:
r_str = await response.text()
handle_response_status(response.status, r_str, self.url)
_end = timeit.default_timer()
print(f'{_end} => HTTPClient send_message total took {_end - _start}s')
return response
except (ValueError, ConnectionError, BadClient, aiohttp.ClientError) as err:
await retry.wait_or_raise_err(
Expand Down
95 changes: 64 additions & 31 deletions jina/clients/base/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from jina.serve.stream import RequestStreamer
from jina.types.request import Request
from jina.types.request.data import DataRequest
import timeit

if TYPE_CHECKING: # pragma: no cover
from jina.clients.base import CallbackFnType, InputType
Expand Down Expand Up @@ -96,20 +97,20 @@ async def _is_flow_ready(self, **kwargs) -> bool:
return False

async def _get_results(
self,
inputs: 'InputType',
on_done: 'CallbackFnType',
on_error: Optional['CallbackFnType'] = None,
on_always: Optional['CallbackFnType'] = None,
max_attempts: int = 1,
initial_backoff: float = 0.5,
max_backoff: float = 0.1,
backoff_multiplier: float = 1.5,
results_in_order: bool = False,
prefetch: Optional[int] = None,
timeout: Optional[int] = None,
return_type: Type[DocumentArray] = DocumentArray,
**kwargs,
self,
inputs: 'InputType',
on_done: 'CallbackFnType',
on_error: Optional['CallbackFnType'] = None,
on_always: Optional['CallbackFnType'] = None,
max_attempts: int = 1,
initial_backoff: float = 0.5,
max_backoff: float = 0.1,
backoff_multiplier: float = 1.5,
results_in_order: bool = False,
prefetch: Optional[int] = None,
timeout: Optional[int] = None,
return_type: Type[DocumentArray] = DocumentArray,
**kwargs,
):
"""
:param inputs: the callable
Expand All @@ -130,17 +131,24 @@ async def _get_results(
with ImportExtensions(required=True):
pass

_start_total = timeit.default_timer()
print(f'## {_start_total} => I AM in _get_results')

self.inputs = inputs
_aada = timeit.default_timer()
print(f'Setting inputs took {_aada - _start_total}s')
request_iterator = self._get_requests(**kwargs)
_aada2 = timeit.default_timer()
print(f'Getting req_it {_aada2 - _aada}s')
on = kwargs.get('on', '/post')
if len(self._endpoints) == 0:
await self._get_endpoints_from_openapi(**kwargs)

async with AsyncExitStack() as stack:
cm1 = ProgressBar(
total_length=self._inputs_length, disable=not self.show_progress
)
p_bar = stack.enter_context(cm1)
# cm1 = ProgressBar(
# total_length=self._inputs_length, disable=not self.show_progress
# )
# p_bar = stack.enter_context(cm1)
proto = 'https' if self.args.tls else 'http'
endpoint = on.strip('/')
has_default_endpoint = 'default' in self._endpoints
Expand All @@ -167,7 +175,7 @@ async def _get_results(
)

def _request_handler(
request: 'Request', **kwargs
request: 'Request', **kwargs
) -> 'Tuple[asyncio.Future, Optional[asyncio.Future]]':
"""
For HTTP Client, for each request in the iterator, we `send_message` using
Expand All @@ -176,27 +184,40 @@ def _request_handler(
:param kwargs: kwargs
:return: asyncio Task for sending message
"""
return asyncio.ensure_future(iolet.send_message(request=request)), None
res = asyncio.ensure_future(iolet.send_message(request=request)), None
return res

def _result_handler(result):
return result

streamer_args = vars(self.args)
if prefetch:
streamer_args['prefetch'] = prefetch

_start_streamer = timeit.default_timer()
print(f'## {_start_streamer} => I AM creating streamer')
streamer = RequestStreamer(
request_handler=_request_handler,
result_handler=_result_handler,
logger=self.logger,
**streamer_args,
)

_start = timeit.default_timer()
print(f'## Streamer created in {_start - _start_streamer}s')

async for response in streamer.stream(
request_iterator=request_iterator, results_in_order=results_in_order
request_iterator=request_iterator, results_in_order=results_in_order
):

r_status = response.status

r_str = await response.json()
_end = timeit.default_timer()
print(f'{_end} => GETTING RESPONSE from streamer took {_end - _start}s')
_start = timeit.default_timer()
handle_response_status(r_status, r_str, url)
_st = timeit.default_timer()

da = None
if 'data' in r_str and r_str['data'] is not None:
Expand All @@ -216,10 +237,15 @@ def _result_handler(result):
[return_type(**v) for v in r_str['data']]
)
del r_str['data']

_e = timeit.default_timer()
print(f'Create DocList took {_e - _st}s')
resp = DataRequest(r_str)
_e2 = timeit.default_timer()
print(f'Create DataRequest from r_str took {_e2 - _e}s')
if da is not None:
resp.direct_docs = da
_e3 = timeit.default_timer()
print(f'Setting direct_docs took {_e3 - _e2}s')

callback_exec(
response=resp,
Expand All @@ -229,18 +255,25 @@ def _result_handler(result):
on_always=on_always,
continue_on_error=self.continue_on_error,
)
if self.show_progress:
p_bar.update()
_e4 = timeit.default_timer()
print(f'Calling callback took {_e4 - _e3}s')
# if self.show_progress:
# p_bar.update()
_end = timeit.default_timer()
print(f'{_end} => YIELD RESPONSE TOOK {_end - _start}s')
yield resp

_end_total = timeit.default_timer()
print(f'## {_end_total} => I AM in _get_results took {_end_total - _start_total}s')

async def _get_streaming_results(
self,
on: str,
inputs: 'Document',
parameters: Optional[Dict] = None,
return_type: Type[Document] = Document,
timeout: Optional[int] = None,
**kwargs,
self,
on: str,
inputs: 'Document',
parameters: Optional[Dict] = None,
return_type: Type[Document] = Document,
timeout: Optional[int] = None,
**kwargs,
):
proto = 'https' if self.args.tls else 'http'
endpoint = on.strip('/')
Expand Down
17 changes: 14 additions & 3 deletions jina/clients/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from jina.excepts import InternalNetworkError
from jina.helper import deprecate_by, get_or_reuse_loop, run_async
from jina.importer import ImportExtensions

import timeit
if TYPE_CHECKING: # pragma: no cover
from pydantic import BaseModel
from jina.clients.base import CallbackFnType, InputType
Expand Down Expand Up @@ -387,8 +387,10 @@ def post(
.. warning::
``target_executor`` uses ``re.match`` for checking if the pattern is matched. ``target_executor=='foo'`` will match both deployments with the name ``foo`` and ``foo_what_ever_suffix``.
"""

print(f'##### I AM POSTING')
_post_start = timeit.default_timer()
c = self.client
_cl = timeit.default_timer()
c.show_progress = show_progress
c.continue_on_error = continue_on_error

Expand All @@ -397,6 +399,9 @@ def post(
return_results = (on_always is None) and (on_done is None)

async def _get_results(*args, **kwargs):
_start = timeit.default_timer()
print(f'{_start} ######## I AM GETTING RESULTS')

is_singleton = False
inferred_return_type = return_type
if docarray_v2:
Expand All @@ -415,13 +420,16 @@ async def _get_results(*args, **kwargs):
result.append(resp)
else:
result.extend(resp.docs)

_end = timeit.default_timer()
print(f'######## {_end} => I AM GETTING RESULTS took {_end - _start}s')
if return_results:
if not return_responses and is_singleton and len(result) == 1:
return result[0]
else:
return result

return self._with_retry(
res = self._with_retry(
func=_get_results,
inputs=inputs,
on_done=on_done,
Expand All @@ -442,6 +450,9 @@ async def _get_results(*args, **kwargs):
on=on,
**kwargs,
)
_post_end = timeit.default_timer()
print(f'##### I AM POSTING took {_post_end - _post_start}s')
return res

# ONLY CRUD, for other request please use `.post`
index = partialmethod(post, '/index')
Expand Down
4 changes: 3 additions & 1 deletion jina/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from rich.console import Console

from jina.constants import __windows__

import timeit
__all__ = [
'batch_iterator',
'parse_arg',
Expand Down Expand Up @@ -1316,6 +1316,8 @@ def run(self):
'something wrong when running the eventloop, result can not be retrieved'
)
else:

print(f'{timeit.default_timer()} ==> HEY HERE ASYNCIO RUN {func.__name__}')
return asyncio.run(func(*args, **kwargs))


Expand Down
19 changes: 18 additions & 1 deletion jina/serve/runtimes/worker/http_fastapi_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ def add_post_route(

app_kwargs['response_class'] = DocArrayResponse

from timeit import default_timer

@app.api_route(**app_kwargs)
async def post(body: input_model, response: Response):
_start = default_timer()
req = DataRequest()
if body.header is not None:
req.header.request_id = body.header.request_id
Expand All @@ -111,8 +114,19 @@ async def post(body: input_model, response: Response):
req.direct_docs = DocList[input_doc_list_model]([data])
if body.header is None:
req.header.request_id = req.docs[0].id
_end = default_timer()

logger.info(
f'Creating Request took {_end - _start}s'
)
_start = default_timer()
resp = await caller(req)
_end = default_timer()

logger.info(
f'Respon to Request took {_end - _start}s'
)
_start = default_timer()
status = resp.header.status

if status.code == jina_pb2.StatusProto.ERROR:
Expand All @@ -123,7 +137,10 @@ async def post(body: input_model, response: Response):
else:
docs_response = resp.docs
ret = output_model(data=docs_response, parameters=resp.parameters)

_end = default_timer()
logger.info(
f'Extra time {_end - _start}s'
)
return ret

def add_streaming_routes(
Expand Down
Loading
Loading