|
1 | 1 | import win32pipe, win32file, win32api, winerror, win32con, pywintypes
|
2 | 2 | from os import path
|
3 | 3 | import io
|
4 |
| -from typing import TypeVar, NewType, Literal, IO, Optional |
| 4 | +from typing import TypeVar, NewType, Literal, IO, Optional, Union |
5 | 5 |
|
6 | 6 | WritableBuffer = TypeVar("WritableBuffer")
|
7 | 7 | PyHANDLE = NewType("PyHANDLE", int)
|
@@ -44,7 +44,7 @@ def __init__(
|
44 | 44 | if not isinstance(bufsize, int):
|
45 | 45 | raise TypeError("bufsize must be an integer")
|
46 | 46 |
|
47 |
| - self.stream: IO | None = None # I/O stream of the pipe |
| 47 | + self.stream: Union[IO, None] = None # I/O stream of the pipe |
48 | 48 | self._path = _name_pipe() if name is None else rf"\\.\pipe\{name}"
|
49 | 49 | self._rd = any(c in mode for c in "r+")
|
50 | 50 | self._wr = any(c in mode for c in "wax+")
|
@@ -143,9 +143,7 @@ def wait(self):
|
143 | 143 | Wrapper = (
|
144 | 144 | io.BufferedRandom
|
145 | 145 | if self._rd and self._wr
|
146 |
| - else io.BufferedReader |
147 |
| - if self._rd |
148 |
| - else io.BufferedWriter |
| 146 | + else io.BufferedReader if self._rd else io.BufferedWriter |
149 | 147 | )
|
150 | 148 | stream = Wrapper(
|
151 | 149 | stream, self._bufsize if self._bufsize > 0 else io.DEFAULT_BUFFER_SIZE
|
@@ -207,7 +205,7 @@ def close(self) -> None:
|
207 | 205 |
|
208 | 206 | super().close()
|
209 | 207 |
|
210 |
| - def readinto(self, b: WritableBuffer) -> int | None: |
| 208 | + def readinto(self, b: WritableBuffer) -> Union[int, None]: |
211 | 209 | """Read bytes into a pre-allocated, writable bytes-like object ``b`` and
|
212 | 210 | return the number of bytes read. For example, ``b`` might be a ``bytearray``."""
|
213 | 211 |
|
|
0 commit comments