-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathlocal_connection.py
79 lines (58 loc) · 2.5 KB
/
local_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from typing import Optional, Tuple
import asyncio
import aioshutil
from asyncssh import SSHCompletedProcess, ProcessError
class LocalConnection:
"""
This class is "monkey-patching" replacement for asyncssh.create_connection().
Instead of creating remote SSH connection it runs command on the local machine.
It supports only one method: run().
Syntax is the same as with asyncssh.create_connection(), look:
async with asyncssh.create_connection() as connection:
await connection.run("rm -rf /", check=True)
async with LocalConnection() as connection:
await connection.run("rm -rf /", check=True)
"""
def __init__(self, *args, **kwargs):
pass
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
return
async def run(self, *args: str, check: bool = False,
timeout: Optional[float] = None,
**kwargs: object) -> SSHCompletedProcess:
if len(args) != 1:
raise ValueError("LocalConnection supports only one-line command")
command = args[0]
process = await asyncio.create_subprocess_shell(
command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
await process.wait()
stdout_data = await process.stdout.read()
stderr_data = await process.stderr.read()
stdout_data = stdout_data.decode()
stderr_data = stderr_data.decode()
if check and process.returncode:
raise ProcessError(None, command, None,
None, None,
process.returncode, stdout_data, stderr_data)
else:
return SSHCompletedProcess(None, command, None,
None, None,
process.returncode, stdout_data,
stderr_data)
async def local_copy(source: str, destination: Tuple[object, str], preserve: Optional[bool] = True):
"""
local_copy() is drop-in replacement for asyncssh.scp, which makes local copy.
asyncssh.scp(source, (ssh, destination)) vs
local_copy(source, (ssh, destination))
"""
full_destination = destination[1]
await aioshutil.copy(source, full_destination)
async def main():
async with LocalConnection() as connection:
process = await connection.run("echo 'Hello world'")
print(process.stdout)
if __name__ == "__main__":
asyncio.run(main())