Skip to content

Commit

Permalink
Improve the cancellation of reading and writing
Browse files Browse the repository at this point in the history
  • Loading branch information
mrjohannchang committed Jan 2, 2020
1 parent c514334 commit 0fcb578
Showing 1 changed file with 40 additions and 15 deletions.
55 changes: 40 additions & 15 deletions src/aioserial/aioserial.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@ def __init__(
self._loop: Optional[asyncio.AbstractEventLoop] = loop
self._read_executor = \
concurrent.futures.ThreadPoolExecutor(max_workers=1)
self._cancel_read_executor = \
concurrent.futures.ThreadPoolExecutor(max_workers=1)
self._write_executor = \
concurrent.futures.ThreadPoolExecutor(max_workers=1)
self._cancel_write_executor = \
concurrent.futures.ThreadPoolExecutor(max_workers=1)

@property
def loop(self) -> Optional[asyncio.AbstractEventLoop]:
Expand All @@ -64,8 +68,11 @@ async def read_async(self, size: int = 1) -> bytes:
try:
return await self.loop.run_in_executor(
self._read_executor, self.read, size)
except asyncio.CancelledError:
self.cancel_read()
except (asyncio.CancelledError, asyncio.TimeoutError):
await asyncio.wait_for(
self.loop.run_in_executor(
self._cancel_read_executor, self.cancel_read),
timeout=self.timeout)
raise

async def read_until_async(
Expand All @@ -75,48 +82,66 @@ async def read_until_async(
try:
return await self.loop.run_in_executor(
self._read_executor, self.read_until, expected, size)
except asyncio.CancelledError:
self.cancel_read()
except (asyncio.CancelledError, asyncio.TimeoutError):
await asyncio.wait_for(
self.loop.run_in_executor(
self._cancel_read_executor, self.cancel_read),
timeout=self.timeout)
raise

async def readinto_async(self, b: Union[array.array, bytearray]):
try:
return await self.loop.run_in_executor(
self._read_executor, self.readinto, b)
except asyncio.CancelledError:
self.cancel_read()
except (asyncio.CancelledError, asyncio.TimeoutError):
await asyncio.wait_for(
self.loop.run_in_executor(
self._cancel_read_executor, self.cancel_read),
timeout=self.timeout)
raise

async def readline_async(self, size: int = -1) -> bytes:
try:
return await self.loop.run_in_executor(
self._read_executor, self.readline, size)
except asyncio.CancelledError:
self.cancel_read()
except (asyncio.CancelledError, asyncio.TimeoutError):
await asyncio.wait_for(
self.loop.run_in_executor(
self._cancel_read_executor, self.cancel_read),
timeout=self.timeout)
raise

async def readlines_async(self, hint: int = -1) -> List[bytes]:
try:
return await self.loop.run_in_executor(
self._read_executor, self.readlines, hint)
except asyncio.CancelledError:
self.cancel_read()
except (asyncio.CancelledError, asyncio.TimeoutError):
await asyncio.wait_for(
self.loop.run_in_executor(
self._cancel_read_executor, self.cancel_read),
timeout=self.timeout)
raise

async def write_async(
self, data: Union[bytearray, bytes, memoryview]) -> int:
try:
return await self.loop.run_in_executor(
self._write_executor, self.write, data)
except asyncio.CancelledError:
self.cancel_write()
except (asyncio.CancelledError, asyncio.TimeoutError):
await asyncio.wait_for(
self.loop.run_in_executor(
self._cancel_write_executor, self.cancel_write),
timeout=self.timeout)
raise

async def writelines_async(
self, lines: List[Union[bytearray, bytes, memoryview]]) -> int:
self, lines: List[Union[bytearray, bytes, memoryview]]):
try:
return await self.loop.run_in_executor(
self._write_executor, self.writelines, lines)
except asyncio.CancelledError:
self.cancel_write()
except (asyncio.CancelledError, asyncio.TimeoutError):
await asyncio.wait_for(
self.loop.run_in_executor(
self._cancel_write_executor, self.cancel_write),
timeout=self.timeout)
raise

0 comments on commit 0fcb578

Please sign in to comment.