diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 8338449aaa0a3e..d0b7100f5e0563 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -179,11 +179,12 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, """Transport for read pipes.""" def __init__(self, loop, sock, protocol, waiter=None, - extra=None, server=None): - self._pending_data = None + extra=None, server=None, buffer_size=65536): + self._pending_data_length = -1 self._paused = True super().__init__(loop, sock, protocol, waiter, extra, server) + self._data = bytearray(buffer_size) self._loop.call_soon(self._loop_reading) self._paused = False @@ -217,12 +218,12 @@ def resume_reading(self): if self._read_fut is None: self._loop.call_soon(self._loop_reading, None) - data = self._pending_data - self._pending_data = None - if data is not None: + length = self._pending_data_length + self._pending_data_length = -1 + if length > -1: # Call the protocol methode after calling _loop_reading(), # since the protocol can decide to pause reading again. - self._loop.call_soon(self._data_received, data) + self._loop.call_soon(self._data_received, self._data[:length], length) if self._loop.get_debug(): logger.debug("%r resumes reading", self) @@ -243,15 +244,15 @@ def _eof_received(self): if not keep_open: self.close() - def _data_received(self, data): + def _data_received(self, data, length): if self._paused: # Don't call any protocol method while reading is paused. # The protocol will be called on resume_reading(). - assert self._pending_data is None - self._pending_data = data + assert self._pending_data_length == -1 + self._pending_data_length = length return - if not data: + if length == 0: self._eof_received() return @@ -269,6 +270,7 @@ def _data_received(self, data): self._protocol.data_received(data) def _loop_reading(self, fut=None): + length = -1 data = None try: if fut is not None: @@ -277,18 +279,18 @@ def _loop_reading(self, fut=None): self._read_fut = None if fut.done(): # deliver data later in "finally" clause - data = fut.result() + length = fut.result() + if length == 0: + # we got end-of-file so no need to reschedule a new read + return + + data = self._data[:length] else: # the future will be replaced by next proactor.recv call fut.cancel() if self._closing: # since close() has been called we ignore any read data - data = None - return - - if data == b'': - # we got end-of-file so no need to reschedule a new read return # bpo-33694: buffer_updated() has currently no fast path because of @@ -296,7 +298,7 @@ def _loop_reading(self, fut=None): if not self._paused: # reschedule a new read - self._read_fut = self._loop._proactor.recv(self._sock, 32768) + self._read_fut = self._loop._proactor.recv_into(self._sock, self._data) except ConnectionAbortedError as exc: if not self._closing: self._fatal_error(exc, 'Fatal read error on pipe transport') @@ -314,8 +316,8 @@ def _loop_reading(self, fut=None): if not self._paused: self._read_fut.add_done_callback(self._loop_reading) finally: - if data is not None: - self._data_received(data) + if length > -1: + self._data_received(data, length) class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index b5d1df93efd650..50ba4c19d425ca 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -40,6 +40,7 @@ def setUp(self): self.loop._proactor = self.proactor self.protocol = test_utils.make_test_protocol(asyncio.Protocol) self.sock = mock.Mock(socket.socket) + self.buffer_size = 65536 def socket_transport(self, waiter=None): transport = _ProactorSocketTransport(self.loop, self.sock, @@ -53,28 +54,32 @@ def test_ctor(self): test_utils.run_briefly(self.loop) self.assertIsNone(fut.result()) self.protocol.connection_made(tr) - self.proactor.recv.assert_called_with(self.sock, 32768) + self.proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size)) def test_loop_reading(self): tr = self.socket_transport() tr._loop_reading() - self.loop._proactor.recv.assert_called_with(self.sock, 32768) + self.loop._proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size)) self.assertFalse(self.protocol.data_received.called) self.assertFalse(self.protocol.eof_received.called) def test_loop_reading_data(self): + buf = b'data' res = self.loop.create_future() - res.set_result(b'data') + res.set_result(len(buf)) tr = self.socket_transport() tr._read_fut = res + tr._data[:len(buf)] = buf tr._loop_reading(res) - self.loop._proactor.recv.assert_called_with(self.sock, 32768) - self.protocol.data_received.assert_called_with(b'data') + called_buf = bytearray(self.buffer_size) + called_buf[:len(buf)] = buf + self.loop._proactor.recv_into.assert_called_with(self.sock, called_buf) + self.protocol.data_received.assert_called_with(bytearray(buf)) def test_loop_reading_no_data(self): res = self.loop.create_future() - res.set_result(b'') + res.set_result(0) tr = self.socket_transport() self.assertRaises(AssertionError, tr._loop_reading, res) @@ -82,12 +87,12 @@ def test_loop_reading_no_data(self): tr.close = mock.Mock() tr._read_fut = res tr._loop_reading(res) - self.assertFalse(self.loop._proactor.recv.called) + self.assertFalse(self.loop._proactor.recv_into.called) self.assertTrue(self.protocol.eof_received.called) self.assertTrue(tr.close.called) def test_loop_reading_aborted(self): - err = self.loop._proactor.recv.side_effect = ConnectionAbortedError() + err = self.loop._proactor.recv_into.side_effect = ConnectionAbortedError() tr = self.socket_transport() tr._fatal_error = mock.Mock() @@ -97,7 +102,7 @@ def test_loop_reading_aborted(self): 'Fatal read error on pipe transport') def test_loop_reading_aborted_closing(self): - self.loop._proactor.recv.side_effect = ConnectionAbortedError() + self.loop._proactor.recv_into.side_effect = ConnectionAbortedError() tr = self.socket_transport() tr._closing = True @@ -106,7 +111,7 @@ def test_loop_reading_aborted_closing(self): self.assertFalse(tr._fatal_error.called) def test_loop_reading_aborted_is_fatal(self): - self.loop._proactor.recv.side_effect = ConnectionAbortedError() + self.loop._proactor.recv_into.side_effect = ConnectionAbortedError() tr = self.socket_transport() tr._closing = False tr._fatal_error = mock.Mock() @@ -114,7 +119,7 @@ def test_loop_reading_aborted_is_fatal(self): self.assertTrue(tr._fatal_error.called) def test_loop_reading_conn_reset_lost(self): - err = self.loop._proactor.recv.side_effect = ConnectionResetError() + err = self.loop._proactor.recv_into.side_effect = ConnectionResetError() tr = self.socket_transport() tr._closing = False @@ -125,7 +130,7 @@ def test_loop_reading_conn_reset_lost(self): tr._force_close.assert_called_with(err) def test_loop_reading_exception(self): - err = self.loop._proactor.recv.side_effect = (OSError()) + err = self.loop._proactor.recv_into.side_effect = (OSError()) tr = self.socket_transport() tr._fatal_error = mock.Mock() @@ -351,20 +356,31 @@ def test_write_eof_duplex_pipe(self): def test_pause_resume_reading(self): tr = self.socket_transport() - futures = [] - for msg in [b'data1', b'data2', b'data3', b'data4', b'data5', b'']: + index = 0 + msgs = [b'data1', b'data2', b'data3', b'data4', b'data5', b''] + reversed_msgs = list(reversed(msgs)) + + def recv_into(sock, data): f = self.loop.create_future() - f.set_result(msg) - futures.append(f) + msg = reversed_msgs.pop() + + result = f.result + def monkey(): + data[:len(msg)] = msg + return result() + f.result = monkey + + f.set_result(len(msg)) + return f - self.loop._proactor.recv.side_effect = futures + self.loop._proactor.recv_into.side_effect = recv_into self.loop._run_once() self.assertFalse(tr._paused) self.assertTrue(tr.is_reading()) - self.loop._run_once() - self.protocol.data_received.assert_called_with(b'data1') - self.loop._run_once() - self.protocol.data_received.assert_called_with(b'data2') + + for msg in msgs[:2]: + self.loop._run_once() + self.protocol.data_received.assert_called_with(bytearray(msg)) tr.pause_reading() tr.pause_reading() @@ -372,23 +388,23 @@ def test_pause_resume_reading(self): self.assertFalse(tr.is_reading()) for i in range(10): self.loop._run_once() - self.protocol.data_received.assert_called_with(b'data2') + self.protocol.data_received.assert_called_with(bytearray(msgs[1])) tr.resume_reading() tr.resume_reading() self.assertFalse(tr._paused) self.assertTrue(tr.is_reading()) - self.loop._run_once() - self.protocol.data_received.assert_called_with(b'data3') - self.loop._run_once() - self.protocol.data_received.assert_called_with(b'data4') + + for msg in msgs[2:4]: + self.loop._run_once() + self.protocol.data_received.assert_called_with(bytearray(msg)) tr.pause_reading() tr.resume_reading() self.loop.call_exception_handler = mock.Mock() self.loop._run_once() self.loop.call_exception_handler.assert_not_called() - self.protocol.data_received.assert_called_with(b'data5') + self.protocol.data_received.assert_called_with(bytearray(msgs[4])) tr.close() self.assertFalse(tr.is_reading()) diff --git a/Misc/NEWS.d/next/Library/2020-07-11-00-15-01.bpo-41273.SVrsJh.rst b/Misc/NEWS.d/next/Library/2020-07-11-00-15-01.bpo-41273.SVrsJh.rst new file mode 100644 index 00000000000000..c08204b9908c63 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-07-11-00-15-01.bpo-41273.SVrsJh.rst @@ -0,0 +1,3 @@ +Speed up any transport using ``_ProactorReadPipeTransport`` by calling +``recv_into`` instead of ``recv``, thus not creating a new buffer for each +``recv`` call in the transport's read loop.