diff --git a/jina/clients/base/helper.py b/jina/clients/base/helper.py index 0a2d7481164a8..d512c09568ff3 100644 --- a/jina/clients/base/helper.py +++ b/jina/clients/base/helper.py @@ -15,7 +15,7 @@ from jina.types.request import Request from jina.types.request.data import DataRequest from jina.types.request.status import StatusMessage - +import timeit if TYPE_CHECKING: # pragma: no cover from opentelemetry import trace @@ -122,8 +122,11 @@ async def __aenter__(self): :return: start self """ - return await self.start() - + _start = timeit.default_timer() + res = await self.start() + _end = timeit.default_timer() + print(f'ASYNC ENTER {_end - _start}s') + return res async def start(self): """Create ClientSession and enter context @@ -139,7 +142,11 @@ async def start(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.close(exc_type, exc_val, exc_tb) + _start = timeit.default_timer() + res = await self.close(exc_type, exc_val, exc_tb) + _end = timeit.default_timer() + print(f'ASYNC EXIT {_end - _start}s') + return res async def close(self, *args, **kwargs): """Close ClientSession @@ -160,12 +167,15 @@ async def send_message(self, request: 'Request'): :param request: request as dict :return: send post message """ + _start = timeit.default_timer() + print(f'{_start} => HTTPClient send message lets start') req_dict = request.to_dict() req_dict['exec_endpoint'] = req_dict['header']['exec_endpoint'] if 'target_executor' in req_dict['header']: req_dict['target_executor'] = req_dict['header']['target_executor'] for attempt in range(1, self.max_attempts + 1): try: + _start_req = timeit.default_timer() request_kwargs = {'url': self.url} if not docarray_v2: request_kwargs['json'] = req_dict @@ -173,12 +183,16 @@ async def send_message(self, request: 'Request'): from docarray.base_doc.io.json import orjson_dumps request_kwargs['data'] = JinaJsonPayload(value=req_dict) + _end_req = timeit.default_timer() + print(f'{_end_req} => HTTPClient prepare request took {_end_req - _start_req}s') response = await self.session.post(**request_kwargs).__aenter__() try: r_str = await response.json() except aiohttp.ContentTypeError: r_str = await response.text() handle_response_status(response.status, r_str, self.url) + _end = timeit.default_timer() + print(f'{_end} => HTTPClient send_message total took {_end - _start}s') return response except (ValueError, ConnectionError, BadClient, aiohttp.ClientError) as err: await retry.wait_or_raise_err( diff --git a/jina/clients/base/http.py b/jina/clients/base/http.py index c55156bf69365..2ad5ca0426028 100644 --- a/jina/clients/base/http.py +++ b/jina/clients/base/http.py @@ -12,6 +12,7 @@ from jina.serve.stream import RequestStreamer from jina.types.request import Request from jina.types.request.data import DataRequest +import timeit if TYPE_CHECKING: # pragma: no cover from jina.clients.base import CallbackFnType, InputType @@ -96,20 +97,20 @@ async def _is_flow_ready(self, **kwargs) -> bool: return False async def _get_results( - self, - inputs: 'InputType', - on_done: 'CallbackFnType', - on_error: Optional['CallbackFnType'] = None, - on_always: Optional['CallbackFnType'] = None, - max_attempts: int = 1, - initial_backoff: float = 0.5, - max_backoff: float = 0.1, - backoff_multiplier: float = 1.5, - results_in_order: bool = False, - prefetch: Optional[int] = None, - timeout: Optional[int] = None, - return_type: Type[DocumentArray] = DocumentArray, - **kwargs, + self, + inputs: 'InputType', + on_done: 'CallbackFnType', + on_error: Optional['CallbackFnType'] = None, + on_always: Optional['CallbackFnType'] = None, + max_attempts: int = 1, + initial_backoff: float = 0.5, + max_backoff: float = 0.1, + backoff_multiplier: float = 1.5, + results_in_order: bool = False, + prefetch: Optional[int] = None, + timeout: Optional[int] = None, + return_type: Type[DocumentArray] = DocumentArray, + **kwargs, ): """ :param inputs: the callable @@ -130,17 +131,24 @@ async def _get_results( with ImportExtensions(required=True): pass + _start_total = timeit.default_timer() + print(f'## {_start_total} => I AM in _get_results') + self.inputs = inputs + _aada = timeit.default_timer() + print(f'Setting inputs took {_aada - _start_total}s') request_iterator = self._get_requests(**kwargs) + _aada2 = timeit.default_timer() + print(f'Getting req_it {_aada2 - _aada}s') on = kwargs.get('on', '/post') if len(self._endpoints) == 0: await self._get_endpoints_from_openapi(**kwargs) async with AsyncExitStack() as stack: - cm1 = ProgressBar( - total_length=self._inputs_length, disable=not self.show_progress - ) - p_bar = stack.enter_context(cm1) + # cm1 = ProgressBar( + # total_length=self._inputs_length, disable=not self.show_progress + # ) + # p_bar = stack.enter_context(cm1) proto = 'https' if self.args.tls else 'http' endpoint = on.strip('/') has_default_endpoint = 'default' in self._endpoints @@ -167,7 +175,7 @@ async def _get_results( ) def _request_handler( - request: 'Request', **kwargs + request: 'Request', **kwargs ) -> 'Tuple[asyncio.Future, Optional[asyncio.Future]]': """ For HTTP Client, for each request in the iterator, we `send_message` using @@ -176,7 +184,8 @@ def _request_handler( :param kwargs: kwargs :return: asyncio Task for sending message """ - return asyncio.ensure_future(iolet.send_message(request=request)), None + res = asyncio.ensure_future(iolet.send_message(request=request)), None + return res def _result_handler(result): return result @@ -184,19 +193,31 @@ def _result_handler(result): streamer_args = vars(self.args) if prefetch: streamer_args['prefetch'] = prefetch + + _start_streamer = timeit.default_timer() + print(f'## {_start_streamer} => I AM creating streamer') streamer = RequestStreamer( request_handler=_request_handler, result_handler=_result_handler, logger=self.logger, **streamer_args, ) + + _start = timeit.default_timer() + print(f'## Streamer created in {_start - _start_streamer}s') + async for response in streamer.stream( - request_iterator=request_iterator, results_in_order=results_in_order + request_iterator=request_iterator, results_in_order=results_in_order ): + r_status = response.status r_str = await response.json() + _end = timeit.default_timer() + print(f'{_end} => GETTING RESPONSE from streamer took {_end - _start}s') + _start = timeit.default_timer() handle_response_status(r_status, r_str, url) + _st = timeit.default_timer() da = None if 'data' in r_str and r_str['data'] is not None: @@ -216,10 +237,15 @@ def _result_handler(result): [return_type(**v) for v in r_str['data']] ) del r_str['data'] - + _e = timeit.default_timer() + print(f'Create DocList took {_e - _st}s') resp = DataRequest(r_str) + _e2 = timeit.default_timer() + print(f'Create DataRequest from r_str took {_e2 - _e}s') if da is not None: resp.direct_docs = da + _e3 = timeit.default_timer() + print(f'Setting direct_docs took {_e3 - _e2}s') callback_exec( response=resp, @@ -229,18 +255,25 @@ def _result_handler(result): on_always=on_always, continue_on_error=self.continue_on_error, ) - if self.show_progress: - p_bar.update() + _e4 = timeit.default_timer() + print(f'Calling callback took {_e4 - _e3}s') + # if self.show_progress: + # p_bar.update() + _end = timeit.default_timer() + print(f'{_end} => YIELD RESPONSE TOOK {_end - _start}s') yield resp + _end_total = timeit.default_timer() + print(f'## {_end_total} => I AM in _get_results took {_end_total - _start_total}s') + async def _get_streaming_results( - self, - on: str, - inputs: 'Document', - parameters: Optional[Dict] = None, - return_type: Type[Document] = Document, - timeout: Optional[int] = None, - **kwargs, + self, + on: str, + inputs: 'Document', + parameters: Optional[Dict] = None, + return_type: Type[Document] = Document, + timeout: Optional[int] = None, + **kwargs, ): proto = 'https' if self.args.tls else 'http' endpoint = on.strip('/') diff --git a/jina/clients/mixin.py b/jina/clients/mixin.py index a6960fa355f63..8d643e75362e9 100644 --- a/jina/clients/mixin.py +++ b/jina/clients/mixin.py @@ -10,7 +10,7 @@ from jina.excepts import InternalNetworkError from jina.helper import deprecate_by, get_or_reuse_loop, run_async from jina.importer import ImportExtensions - +import timeit if TYPE_CHECKING: # pragma: no cover from pydantic import BaseModel from jina.clients.base import CallbackFnType, InputType @@ -387,8 +387,10 @@ def post( .. warning:: ``target_executor`` uses ``re.match`` for checking if the pattern is matched. ``target_executor=='foo'`` will match both deployments with the name ``foo`` and ``foo_what_ever_suffix``. """ - + print(f'##### I AM POSTING') + _post_start = timeit.default_timer() c = self.client + _cl = timeit.default_timer() c.show_progress = show_progress c.continue_on_error = continue_on_error @@ -397,6 +399,9 @@ def post( return_results = (on_always is None) and (on_done is None) async def _get_results(*args, **kwargs): + _start = timeit.default_timer() + print(f'{_start} ######## I AM GETTING RESULTS') + is_singleton = False inferred_return_type = return_type if docarray_v2: @@ -415,13 +420,16 @@ async def _get_results(*args, **kwargs): result.append(resp) else: result.extend(resp.docs) + + _end = timeit.default_timer() + print(f'######## {_end} => I AM GETTING RESULTS took {_end - _start}s') if return_results: if not return_responses and is_singleton and len(result) == 1: return result[0] else: return result - return self._with_retry( + res = self._with_retry( func=_get_results, inputs=inputs, on_done=on_done, @@ -442,6 +450,9 @@ async def _get_results(*args, **kwargs): on=on, **kwargs, ) + _post_end = timeit.default_timer() + print(f'##### I AM POSTING took {_post_end - _post_start}s') + return res # ONLY CRUD, for other request please use `.post` index = partialmethod(post, '/index') diff --git a/jina/helper.py b/jina/helper.py index b5bcd9759b4b8..80bd8dd4b4d26 100644 --- a/jina/helper.py +++ b/jina/helper.py @@ -38,7 +38,7 @@ from rich.console import Console from jina.constants import __windows__ - +import timeit __all__ = [ 'batch_iterator', 'parse_arg', @@ -1316,6 +1316,8 @@ def run(self): 'something wrong when running the eventloop, result can not be retrieved' ) else: + + print(f'{timeit.default_timer()} ==> HEY HERE ASYNCIO RUN {func.__name__}') return asyncio.run(func(*args, **kwargs)) diff --git a/jina/serve/runtimes/worker/http_fastapi_app.py b/jina/serve/runtimes/worker/http_fastapi_app.py index 889166d8aeb63..61ed8ad1a1fe3 100644 --- a/jina/serve/runtimes/worker/http_fastapi_app.py +++ b/jina/serve/runtimes/worker/http_fastapi_app.py @@ -87,8 +87,11 @@ def add_post_route( app_kwargs['response_class'] = DocArrayResponse + from timeit import default_timer + @app.api_route(**app_kwargs) async def post(body: input_model, response: Response): + _start = default_timer() req = DataRequest() if body.header is not None: req.header.request_id = body.header.request_id @@ -111,8 +114,19 @@ async def post(body: input_model, response: Response): req.direct_docs = DocList[input_doc_list_model]([data]) if body.header is None: req.header.request_id = req.docs[0].id + _end = default_timer() + logger.info( + f'Creating Request took {_end - _start}s' + ) + _start = default_timer() resp = await caller(req) + _end = default_timer() + + logger.info( + f'Respon to Request took {_end - _start}s' + ) + _start = default_timer() status = resp.header.status if status.code == jina_pb2.StatusProto.ERROR: @@ -123,7 +137,10 @@ async def post(body: input_model, response: Response): else: docs_response = resp.docs ret = output_model(data=docs_response, parameters=resp.parameters) - + _end = default_timer() + logger.info( + f'Extra time {_end - _start}s' + ) return ret def add_streaming_routes( diff --git a/jina/serve/runtimes/worker/request_handling.py b/jina/serve/runtimes/worker/request_handling.py index 7d9958c35c049..bc62bf9f0d135 100644 --- a/jina/serve/runtimes/worker/request_handling.py +++ b/jina/serve/runtimes/worker/request_handling.py @@ -30,6 +30,7 @@ from jina.serve.instrumentation import MetricsTimer from jina.serve.runtimes.worker.batch_queue import BatchQueue from jina.types.request.data import DataRequest, SingleDocumentRequest +from timeit import default_timer if docarray_v2: from docarray import DocList @@ -671,6 +672,7 @@ async def handle( :returns: the processed message """ # skip executor if endpoints mismatch + _start = default_timer() exec_endpoint: str = requests[0].header.exec_endpoint if exec_endpoint not in self._executor.requests: if __default_endpoint__ in self._executor.requests: @@ -716,6 +718,8 @@ async def handle( docs_matrix, docs_map = WorkerRequestHandler._get_docs_matrix_from_request( requests ) + _end = default_timer() + print(f'EXTRA BEFORE EXECUTOR TOOK {_end - _start}s') return_data = await self._executor.__acall__( req_endpoint=exec_endpoint, docs=docs, @@ -724,6 +728,7 @@ async def handle( docs_map=docs_map, tracing_context=tracing_context, ) + _start = default_timer() _ = self._set_result(requests, return_data, docs, http=http) for req in requests: @@ -735,7 +740,8 @@ async def handle( except AttributeError: pass self._record_response_size_monitoring(requests) - + _end = default_timer() + print(f'EXTRA AFTER EXECUTOR TOOK {_end - _start}s') return requests[0] @staticmethod diff --git a/jina/serve/stream/__init__.py b/jina/serve/stream/__init__.py index 03c488d78f114..1ae5bf666d529 100644 --- a/jina/serve/stream/__init__.py +++ b/jina/serve/stream/__init__.py @@ -20,7 +20,7 @@ from jina._docarray import DocumentArray from jina.types.request.data import Response - +import timeit if TYPE_CHECKING: # pragma: no cover from jina.types.request import Request @@ -178,6 +178,8 @@ async def stream( :param args: positional arguments :yield: responses from Executors """ + _start_streaming = timeit.default_timer() + print(f'## {_start_streaming} Start streaming') prefetch = prefetch or self._prefetch if context is not None: for metadatum in context.invocation_metadata(): @@ -197,6 +199,8 @@ async def stream( return_type=return_type, ) async for response in async_iter: + _first_resp_streaming = timeit.default_timer() + print(f'## {_first_resp_streaming} Got first response in {_first_resp_streaming - _start_streaming}s') yield response except InternalNetworkError as err: if (