Skip to content

Commit 8f9ed1b

Browse files
committed
wip
1 parent f410133 commit 8f9ed1b

File tree

2 files changed

+60
-33
lines changed

2 files changed

+60
-33
lines changed

pythoned/__init__.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,59 @@
11
import importlib
22
import os
33
import re
4+
import tempfile
45
from types import ModuleType
5-
from typing import Dict, Iterator
6+
from typing import Callable, Dict, Iterator, List, TextIO, Union, cast
67

78

89
_TYPE_ERROR_MSG = "The provided expression must be an str (editing) or a bool (filtering), but got {}."
910

11+
def _construct_edit_func(expression: str, modules: List[str]) -> str:
12+
return f"""
13+
{"\n".join("import " + module for module in modules)}
14+
from typing import Union
1015
11-
def edit(lines: Iterator[str], expression) -> Iterator[str]:
12-
modules: Dict[str, ModuleType] = {}
16+
def process_line(_: str) -> Union[str, bool]:
17+
return {expression}
18+
"""
19+
20+
def _get_process_line_func(
21+
expression: str,
22+
modules: List[str],
23+
process_line_file: TextIO,
24+
) -> Callable[[str], Union[str, bool]]:
25+
process_line_file.write(_construct_edit_func(expression, modules).encode())
26+
process_line_module = importlib.import_module(process_line_file.name)
27+
return cast(
28+
Callable[[str], Union[str, bool]],
29+
process_line_module.process_line,
30+
)
31+
32+
def edit(
33+
lines: Iterator[str],
34+
expression,
35+
process_line_file: TextIO,
36+
) -> Iterator[str]:
37+
modules: List[str] = []
38+
process_line = _get_process_line_func(expression, modules, process_line_file)
1339
for line in lines:
1440
linesep = ""
1541
if line.endswith(os.linesep):
1642
linesep, line = os.linesep, line[: -len(os.linesep)]
17-
globals = {"_": line, **modules}
1843
try:
19-
value = eval(expression, globals)
44+
value = process_line(line)
2045
except NameError as name_error:
2146
match = re.match(r"name '([A-Za-z]+)'.*", str(name_error))
2247
if match:
2348
module = match.group(1)
2449
else:
2550
raise name_error
2651
try:
27-
modules[module] = importlib.import_module(module)
28-
globals = {"_": line, **modules}
52+
modules.append(module)
53+
process_line = _get_process_line_func(expression, modules, process_line_file)
2954
except:
3055
raise name_error
31-
value = eval(expression, globals)
56+
value = process_line(line)
3257
if isinstance(value, str):
3358
yield value + linesep
3459
elif isinstance(value, bool):

tests/test.py

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import tempfile
12
from typing import Iterator
23
import unittest
34

@@ -10,28 +11,29 @@ def lines() -> Iterator[str]:
1011

1112
class TestStream(unittest.TestCase):
1213
def test_edit(self) -> None:
13-
self.assertEqual(
14-
list(edit(lines(), "_[-1]")),
15-
["0\n", "r\n", "r"],
16-
msg="str expression must edit the lines",
17-
)
18-
self.assertEqual(
19-
list(edit(lines(), 're.sub(r"\d", "X", _)')),
20-
["fXX\n", "bar\n", "fXXbar"],
21-
msg="re should be supported out-of-the-box",
22-
)
23-
self.assertEqual(
24-
list(edit(lines(), '"0" in _')),
25-
["f00\n", "f00bar"],
26-
msg="bool expression must filter the lines",
27-
)
28-
self.assertEqual(
29-
list(edit(lines(), "len(_) == 3")),
30-
["f00\n", "bar\n"],
31-
msg="_ must exclude linesep",
32-
)
33-
self.assertEqual(
34-
list(edit(lines(), "str(int(math.pow(10, len(_))))")),
35-
["1000\n", "1000\n", "1000000"],
36-
msg="modules should be auto-imported",
37-
)
14+
with tempfile.NamedTemporaryFile(delete=True) as process_line_file:
15+
self.assertEqual(
16+
list(edit(lines(), "_[-1]", process_line_file)),
17+
["0\n", "r\n", "r"],
18+
msg="str expression must edit the lines",
19+
)
20+
self.assertEqual(
21+
list(edit(lines(), 're.sub(r"\d", "X", _)', process_line_file)),
22+
["fXX\n", "bar\n", "fXXbar"],
23+
msg="re should be supported out-of-the-box",
24+
)
25+
self.assertEqual(
26+
list(edit(lines(), '"0" in _', process_line_file)),
27+
["f00\n", "f00bar"],
28+
msg="bool expression must filter the lines",
29+
)
30+
self.assertEqual(
31+
list(edit(lines(), "len(_) == 3", process_line_file)),
32+
["f00\n", "bar\n"],
33+
msg="_ must exclude linesep",
34+
)
35+
self.assertEqual(
36+
list(edit(lines(), "str(int(math.pow(10, len(_))))", process_line_file)),
37+
["1000\n", "1000\n", "1000000"],
38+
msg="modules should be auto-imported",
39+
)

0 commit comments

Comments
 (0)