Skip to content

Commit 29d9497

Browse files
authored
Fixed race condition with native clients when global timeout setting … (#353)
* Fixed race condition with native clients when global timeout setting is used and running short lived commands. Resolves #344. * Updated changelog * Updated default log formatter set by `pssh.utils` enable logger functions.
1 parent a0787bd commit 29d9497

File tree

7 files changed

+41
-19
lines changed

7 files changed

+41
-19
lines changed

Changelog.rst

+14
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,20 @@
11
Change Log
22
============
33

4+
2.11.1
5+
+++++++
6+
7+
Changes
8+
--------
9+
10+
* Updated default log formatter set by `pssh.utils` enable logger functions.
11+
12+
Fixes
13+
------
14+
15+
* Using native clients under `pssh.clients.native` with very short lived commands would sometimes cause unexpected
16+
stalls/delays in reading output from completed commands when a client ``timeout`` setting was used - #344.
17+
418
2.11.0
519
+++++++
620

pssh/clients/base/single.py

+6-10
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ def _eagain_write_errcode(self, write_func, data, eagain, timeout=None):
559559
rc, bytes_written = write_func(data[total_written:])
560560
total_written += bytes_written
561561
if rc == eagain:
562-
self.poll(timeout=timeout)
562+
self.poll()
563563
sleep()
564564

565565
def _eagain_errcode(self, func, eagain, *args, **kwargs):
@@ -685,21 +685,17 @@ def _remote_paths_split(self, file_path):
685685
if _sep > 0:
686686
return file_path[:_sep]
687687

688-
def poll(self, timeout=None):
688+
def poll(self):
689689
raise NotImplementedError
690690

691-
def _poll_socket(self, events, timeout=None):
691+
def _poll_socket(self, events):
692692
if self.sock is None:
693693
return
694-
# gevent.select.poll converts seconds to miliseconds to match python socket
695-
# implementation
696-
timeout = timeout * 1000 if timeout is not None else 100
697694
poller = poll()
698695
poller.register(self.sock, eventmask=events)
699-
poller.poll(timeout=timeout)
696+
poller.poll(timeout=1)
700697

701-
def _poll_errcodes(self, directions_func, inbound, outbound, timeout=None):
702-
timeout = self.timeout if timeout is None else timeout
698+
def _poll_errcodes(self, directions_func, inbound, outbound):
703699
directions = directions_func()
704700
if directions == 0:
705701
return
@@ -708,4 +704,4 @@ def _poll_errcodes(self, directions_func, inbound, outbound, timeout=None):
708704
events = POLLIN
709705
if directions & outbound:
710706
events |= POLLOUT
711-
self._poll_socket(events, timeout=timeout)
707+
self._poll_socket(events)

pssh/clients/native/single.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -726,12 +726,12 @@ def poll(self, timeout=None):
726726
727727
Blocks current greenlet only if socket has pending read or write operations
728728
in the appropriate direction.
729+
:param timeout: Deprecated and unused - to be removed.
729730
"""
730731
self._poll_errcodes(
731732
self.session.block_directions,
732733
LIBSSH2_SESSION_BLOCK_INBOUND,
733734
LIBSSH2_SESSION_BLOCK_OUTBOUND,
734-
timeout=timeout,
735735
)
736736

737737
def _eagain_write(self, write_func, data, timeout=None):

pssh/clients/reader.py

+1-4
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@
1515
# License along with this library; if not, write to the Free Software
1616
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
1717

18-
try:
19-
from io import BytesIO
20-
except ImportError:
21-
from cStringIO import StringIO as BytesIO
18+
from io import BytesIO
2219

2320
from gevent import sleep
2421
from gevent.event import Event

pssh/clients/ssh/single.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ def execute(self, cmd, use_pty=False, channel=None):
243243

244244
def _read_output_to_buffer(self, channel, _buffer, is_stderr=False):
245245
while True:
246-
self.poll(timeout=self.timeout)
246+
self.poll()
247247
try:
248248
size, data = channel.read_nonblocking(is_stderr=is_stderr)
249249
except EOF:
@@ -316,12 +316,13 @@ def close_channel(self, channel):
316316
self._eagain(channel.close, timeout=self.timeout)
317317

318318
def poll(self, timeout=None):
319-
"""ssh-python based co-operative gevent poll on session socket."""
319+
"""ssh-python based co-operative gevent poll on session socket.
320+
:param timeout: Deprecated and unused - to be removed.
321+
"""
320322
self._poll_errcodes(
321323
self.session.get_poll_flags,
322324
SSH_READ_PENDING,
323325
SSH_WRITE_PENDING,
324-
timeout=timeout,
325326
)
326327

327328
def _eagain(self, func, *args, **kwargs):

pssh/utils.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def enable_logger(_logger, level=logging.INFO):
3333
logger.warning("Logger already has a StreamHandler attached")
3434
return
3535
handler = logging.StreamHandler()
36-
host_log_format = logging.Formatter('%(message)s')
36+
host_log_format = logging.Formatter('%(asctime)s %(levelname)-8s %(name)-15s %(message)s')
3737
handler.setFormatter(host_log_format)
3838
_logger.addHandler(handler)
3939

tests/native/test_single_client.py

+14
Original file line numberDiff line numberDiff line change
@@ -1039,6 +1039,20 @@ def test_copy_remote_dir_encoding(self):
10391039
]
10401040
self.assertListEqual(remote_file_mock.call_args_list, call_args)
10411041

1042+
def test_many_short_lived_commands(self):
1043+
for _ in range(20):
1044+
timeout = 2
1045+
start = datetime.now()
1046+
client = SSHClient(self.host, port=self.port,
1047+
pkey=self.user_key,
1048+
num_retries=1,
1049+
allow_agent=False,
1050+
timeout=timeout)
1051+
host_out = client.run_command(self.cmd)
1052+
_ = list(host_out.stdout)
1053+
end = datetime.now() - start
1054+
duration = end.total_seconds()
1055+
self.assertTrue(duration < timeout * 0.9, msg=f"Duration of instant cmd is {duration}")
10421056

10431057
# TODO
10441058
# * read output callback

0 commit comments

Comments
 (0)