Skip to content

Commit 98d2086

Browse files
committed
Add tests for proper termination
QubesOS/qubes-issues#9779
1 parent ce3d443 commit 98d2086

File tree

1 file changed

+162
-0
lines changed

1 file changed

+162
-0
lines changed

splitgpg2/test_termination.py

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
#!/usr/bin/python3
2+
#
3+
# Copyright (C) 2025 Simon Gaiser <[email protected]>
4+
#
5+
# This program is free software; you can redistribute it and/or modify
6+
# it under the terms of the GNU General Public License as published by
7+
# the Free Software Foundation; either version 2 of the License, or
8+
# (at your option) any later version.
9+
#
10+
# This program is distributed in the hope that it will be useful,
11+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
# GNU General Public License for more details.
14+
#
15+
# You should have received a copy of the GNU Lesser General Public License along
16+
# with this program; if not, see <http://www.gnu.org/licenses/>.
17+
18+
import unittest
19+
import tempfile
20+
import subprocess
21+
import os
22+
import re
23+
import struct
24+
25+
26+
class DidNotTerminate(AssertionError):
27+
def __init__(self):
28+
super().__init__("splitgpg2 service did not terminate")
29+
30+
31+
# Test that the splitgpg2 service terminates itself as expected. IO happens
32+
# through stdin/-out when called from qrexec. This behaves a bit differently
33+
# than the Unix socket we use for other tests (for example on close). So
34+
# instead start the service script directly.
35+
class TC_Termination(unittest.TestCase):
36+
@staticmethod
37+
def path_prepend(env, name, value):
38+
if name in env:
39+
env[name] = ":".join([value, env[name]])
40+
else:
41+
env[name] = value
42+
43+
def setUp(self):
44+
super().setUp()
45+
46+
self.test_env = os.environ.copy()
47+
48+
self.tmp_dir = tempfile.TemporaryDirectory()
49+
50+
gpg_home = self.tmp_dir.name + "/gpg-home"
51+
self.test_env["GNUPGHOME"] = gpg_home
52+
os.mkdir(gpg_home, mode=0o700)
53+
54+
xdg_conf_dir = self.tmp_dir.name + "/xdg-config"
55+
os.mkdir(xdg_conf_dir)
56+
self.test_env["XDG_CONFIG_HOME"] = xdg_conf_dir
57+
58+
splitgpg2_conf_dir = xdg_conf_dir + "/qubes-split-gpg2"
59+
os.mkdir(splitgpg2_conf_dir)
60+
61+
with open(splitgpg2_conf_dir + "/qubes-split-gpg2.conf", "wb") as f:
62+
f.write(b"[DEFAULT]\nsource_keyring_dir = no\n")
63+
64+
path_dir = self.tmp_dir.name + "/path"
65+
os.mkdir(path_dir)
66+
self.path_prepend(self.test_env, "PATH", path_dir)
67+
68+
notify_path = path_dir + "/notify-send"
69+
with open(notify_path, "wb") as f:
70+
f.write(b"#!/bin/sh\n")
71+
os.chmod(notify_path, 0o755)
72+
73+
self.test_env["QREXEC_REMOTE_DOMAIN"] = "testvm"
74+
75+
top_dir = os.path.dirname(os.path.dirname(__file__))
76+
self.path_prepend(self.test_env, "PYTHONPATH", top_dir)
77+
78+
service_path = top_dir + "/qubes.Gpg2.service"
79+
self.service = subprocess.Popen(
80+
[service_path],
81+
env=self.test_env,
82+
stdin=subprocess.PIPE,
83+
stdout=subprocess.PIPE,
84+
)
85+
86+
self.addCleanup(self.cleanup_service)
87+
88+
hi = self.read_line()
89+
self.assertTrue(re.match(rb"\AOK\s", hi))
90+
91+
def tearDown(self):
92+
subprocess.run(["gpgconf", "--kill", "gpg-agent"], env=self.test_env)
93+
self.tmp_dir.cleanup()
94+
super().tearDown()
95+
96+
def cleanup_service(self):
97+
self.service.stdin.close()
98+
self.service.stdout.close()
99+
self.service.kill()
100+
self.service.wait()
101+
102+
def expect_termination(self):
103+
try:
104+
return self.service.wait(2)
105+
except subprocess.TimeoutExpired:
106+
raise DidNotTerminate()
107+
108+
def write(self, d):
109+
self.service.stdin.write(d)
110+
self.service.stdin.flush()
111+
112+
def read_line(self):
113+
return self.service.stdout.readline()
114+
115+
def test_000_bye(self):
116+
self.write(b"GETINFO version\n")
117+
self.assertTrue(re.match(rb"\AD\s", self.read_line()))
118+
self.assertTrue(re.match(rb"\AOK\s", self.read_line()))
119+
120+
self.write(b"BYE\n")
121+
self.assertTrue(re.match(rb"\AOK\s", self.read_line()))
122+
123+
self.expect_termination()
124+
125+
def test_001_close(self):
126+
self.service.stdin.close()
127+
128+
self.expect_termination()
129+
130+
def test_002_filterd(self):
131+
self.write(b"GETINFO asdf\n")
132+
self.assertEqual(
133+
self.read_line(), b"ERR 67109888 Command filtered by split-gpg2.\n"
134+
)
135+
136+
self.expect_termination()
137+
138+
def test_003_agent_kill(self):
139+
self.write(b"GETINFO version\n")
140+
self.assertTrue(re.match(rb"\AD\s", self.read_line()))
141+
self.assertTrue(re.match(rb"\AOK\s", self.read_line()))
142+
143+
# Simulate a sudden exit of gpg-agent. (Forcefully killing it is hard,
144+
# since gpg-agent doesn't like to be started in the foreground. So for
145+
# now ask it to terminate itself.)
146+
subprocess.run(["gpgconf", "--kill", "gpg-agent"], env=self.test_env)
147+
148+
# We currently don't detect a disconnected agent until we try to
149+
# communicate with it. So we have to trigger it.
150+
self.write(b"GETINFO version\n")
151+
152+
self.expect_termination()
153+
154+
def test_004_test_self_test(self):
155+
# Test out test method. With no reason to terminate it should still be
156+
# running.
157+
self.write(b"GETINFO version\n")
158+
self.assertTrue(re.match(rb"\AD\s", self.read_line()))
159+
self.assertTrue(re.match(rb"\AOK\s", self.read_line()))
160+
161+
with self.assertRaises(DidNotTerminate):
162+
self.expect_termination()

0 commit comments

Comments
 (0)