-
Notifications
You must be signed in to change notification settings - Fork 355
/
Copy pathextensions.py
executable file
·165 lines (132 loc) · 5.06 KB
/
extensions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#!/usr/bin/env python3
import os
import subprocess
import select
import time
import uuid
sqlite_exec = "./target/debug/limbo"
sqlite_flags = os.getenv("SQLITE_FLAGS", "-q").split(" ")
def init_limbo():
pipe = subprocess.Popen(
[sqlite_exec, *sqlite_flags],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
)
return pipe
def execute_sql(pipe, sql):
end_suffix = "END_OF_RESULT"
write_to_pipe(pipe, sql)
write_to_pipe(pipe, f"SELECT '{end_suffix}';\n")
stdout = pipe.stdout
stderr = pipe.stderr
output = ""
while True:
ready_to_read, _, error_in_pipe = select.select(
[stdout, stderr], [], [stdout, stderr]
)
ready_to_read_or_err = set(ready_to_read + error_in_pipe)
if stderr in ready_to_read_or_err:
exit_on_error(stderr)
if stdout in ready_to_read_or_err:
fragment = stdout.read(select.PIPE_BUF)
output += fragment.decode()
if output.rstrip().endswith(end_suffix):
output = output.rstrip().removesuffix(end_suffix)
break
output = strip_each_line(output)
return output
def strip_each_line(lines: str) -> str:
lines = lines.split("\n")
lines = [line.strip() for line in lines if line != ""]
return "\n".join(lines)
def write_to_pipe(pipe, command):
if pipe.stdin is None:
raise RuntimeError("Failed to write to shell")
pipe.stdin.write((command + "\n").encode())
pipe.stdin.flush()
def exit_on_error(stderr):
while True:
ready_to_read, _, _ = select.select([stderr], [], [])
if not ready_to_read:
break
print(stderr.read().decode(), end="")
exit(1)
def run_test(pipe, sql, validator=None):
print(f"Running test: {sql}")
result = execute_sql(pipe, sql)
if validator is not None:
if not validator(result):
print(f"Test FAILED: {sql}")
print(f"Returned: {result}")
raise Exception("Validation failed")
print("Test PASSED")
def validate_true(result):
return result == "1"
def validate_false(result):
return result == "0"
def validate_blob(result):
# HACK: blobs are difficult to test because the shell
# tries to return them as utf8 strings, so we call hex
# and assert they are valid hex digits
return int(result, 16) is not None
def validate_string_uuid(result):
return len(result) == 36 and result.count("-") == 4
def returns_null(result):
return result == "" or result == b"\n" or result == b""
def assert_now_unixtime(result):
return result == str(int(time.time()))
def assert_specific_time(result):
return result == "1736720789"
def test_uuid(pipe):
specific_time = "01945ca0-3189-76c0-9a8f-caf310fc8b8e"
extension_path = "./target/debug/liblimbo_uuid.so"
# before extension loads, assert no function
run_test(pipe, "SELECT uuid4();", returns_null)
run_test(pipe, "SELECT uuid4_str();", returns_null)
run_test(pipe, f".load {extension_path}", returns_null)
print(f"Extension {extension_path} loaded successfully.")
run_test(pipe, "SELECT hex(uuid4());", validate_blob)
run_test(pipe, "SELECT uuid4_str();", validate_string_uuid)
run_test(pipe, "SELECT hex(uuid7());", validate_blob)
run_test(
pipe,
"SELECT uuid7_timestamp_ms(uuid7()) / 1000;",
)
run_test(pipe, "SELECT uuid7_str();", validate_string_uuid)
run_test(pipe, "SELECT uuid_str(uuid7());", validate_string_uuid)
run_test(pipe, "SELECT hex(uuid_blob(uuid7_str()));", validate_blob)
run_test(pipe, "SELECT uuid_str(uuid_blob(uuid7_str()));", validate_string_uuid)
run_test(
pipe,
f"SELECT uuid7_timestamp_ms('{specific_time}') / 1000;",
assert_specific_time,
)
def test_regexp(pipe):
extension_path = "./target/debug/liblimbo_regexp.so"
# before extension loads, assert no function
run_test(pipe, "SELECT regexp('a.c', 'abc');", returns_null)
run_test(pipe, f".load {extension_path}", returns_null)
print(f"Extension {extension_path} loaded successfully.")
run_test(pipe, "SELECT regexp('a.c', 'abc');", validate_true)
run_test(pipe, "SELECT regexp('a.c', 'ac');", validate_false)
run_test(pipe, "SELECT regexp('[0-9]+', 'the year is 2021');", validate_true)
run_test(pipe, "SELECT regexp('[0-9]+', 'the year is unknow');", validate_false)
run_test(pipe, "SELECT regexp_like('the year is 2021', '[0-9]+');", validate_true)
run_test(pipe, "SELECT regexp_like('the year is unknow', '[0-9]+');", validate_false)
run_test(pipe, "SELECT regexp_substr('the year is 2021', '[0-9]+') = '2021';", validate_true)
run_test(pipe, "SELECT regexp_substr('the year is unknow', '[0-9]+');", returns_null)
def main():
pipe = init_limbo()
try:
test_regexp(pipe)
test_uuid(pipe)
except Exception as e:
print(f"Test FAILED: {e}")
pipe.terminate()
exit(1)
pipe.terminate()
print("All tests passed successfully.")
if __name__ == "__main__":
main()