diff --git a/Lib/_pyrepl/base_eventqueue.py b/Lib/_pyrepl/base_eventqueue.py index 0589a0f437ec7c..3393c563a120e8 100644 --- a/Lib/_pyrepl/base_eventqueue.py +++ b/Lib/_pyrepl/base_eventqueue.py @@ -31,6 +31,8 @@ from .trace import trace class BaseEventQueue: + _ESCAPE_TIMEOUT_MS = 50 + def __init__(self, encoding: str, keymap_dict: dict[bytes, str]) -> None: self.compiled_keymap = keymap.compile_keymap(keymap_dict) self.keymap = self.compiled_keymap @@ -38,6 +40,7 @@ def __init__(self, encoding: str, keymap_dict: dict[bytes, str]) -> None: self.encoding = encoding self.events: deque[Event] = deque() self.buf = bytearray() + self._pending_escape_deadline: float | None = None def get(self) -> Event | None: """ @@ -69,6 +72,50 @@ def insert(self, event: Event) -> None: trace('added event {event}', event=event) self.events.append(event) + def has_pending_escape_sequence(self) -> bool: + """ + Check if there's a potential escape sequence waiting for more input. + + Returns True if we have exactly one byte (ESC) in the buffer and + we're in the middle of keymap navigation, indicating we're waiting + to see if more bytes will arrive to complete an escape sequence. + """ + return ( + len(self.buf) == 1 + and self.buf[0] == 27 # ESC byte + and self.keymap is not self.compiled_keymap + ) + + def should_emit_standalone_escape(self, current_time_ms: float) -> bool: + """ + Check if a pending ESC should be emitted as a standalone escape key. + """ + if not self.has_pending_escape_sequence(): + return False + + if self._pending_escape_deadline is None: + # First time checking - set the deadline + self._pending_escape_deadline = current_time_ms + self._ESCAPE_TIMEOUT_MS + return False + + # Check if the deadline has passed + return current_time_ms >= self._pending_escape_deadline + + def emit_standalone_escape(self) -> None: + """ + Emit the buffered ESC byte as a standalone escape key event. + """ + self.keymap = self.compiled_keymap + # Standalone ESC event + self.insert(Event('key', '\033', b'\033')) + + # Just in case there are remaining bytes in the buffer + remaining = self.flush_buf()[1:] + for byte in remaining: + self.push(byte) + + self._pending_escape_deadline = None + def push(self, char: int | bytes) -> None: """ Processes a character by updating the buffer and handling special key mappings. @@ -78,6 +125,9 @@ def push(self, char: int | bytes) -> None: char = ord_char.to_bytes() self.buf.append(ord_char) + if self._pending_escape_deadline is not None: + self._pending_escape_deadline = None + if char in self.keymap: if self.keymap is self.compiled_keymap: # sanity check, buffer is empty when a special key comes diff --git a/Lib/_pyrepl/commands.py b/Lib/_pyrepl/commands.py index 10127e58897a58..9f0ed678c8197d 100644 --- a/Lib/_pyrepl/commands.py +++ b/Lib/_pyrepl/commands.py @@ -325,10 +325,19 @@ def do(self) -> None: b = r.buffer for _ in range(r.get_arg()): p = r.pos + 1 - if p <= len(b): - r.pos = p + # In vi normal mode, don't move past the last character + if r.editor_mode.is_normal(): + eol_pos = r.eol() + max_pos = max(r.bol(), eol_pos - 1) if eol_pos > r.bol() else r.bol() + if p <= max_pos: + r.pos = p + else: + self.reader.error("end of line") else: - self.reader.error("end of buffer") + if p <= len(b): + r.pos = p + else: + self.reader.error("end of buffer") class beginning_of_line(MotionCommand): @@ -336,9 +345,21 @@ def do(self) -> None: self.reader.pos = self.reader.bol() +class first_non_whitespace_character(MotionCommand): + def do(self) -> None: + self.reader.pos = self.reader.first_non_whitespace() + + class end_of_line(MotionCommand): def do(self) -> None: - self.reader.pos = self.reader.eol() + r = self.reader + eol_pos = r.eol() + if r.editor_mode.is_normal(): + bol_pos = r.bol() + # Don't go past the last character (but stay at bol if line is empty) + r.pos = max(bol_pos, eol_pos - 1) if eol_pos > bol_pos else bol_pos + else: + r.pos = eol_pos class home(MotionCommand): @@ -365,6 +386,20 @@ def do(self) -> None: r.pos = r.bow() +class end_of_word(MotionCommand): + def do(self) -> None: + r = self.reader + for _ in range(r.get_arg()): + r.pos = r.vi_eow() + + +class vi_forward_word(MotionCommand): + def do(self) -> None: + r = self.reader + for _ in range(r.get_arg()): + r.pos = r.vi_forward_word() + + class self_insert(EditCommand): def do(self) -> None: r = self.reader @@ -503,3 +538,55 @@ def do(self) -> None: ) self.reader.insert(data.replace(done, "")) self.reader.last_refresh_cache.invalidated = True + + +class vi_normal_mode(Command): + def do(self) -> None: + self.reader.enter_normal_mode() + + +class vi_insert_mode(Command): + def do(self) -> None: + self.reader.enter_insert_mode() + + +class vi_append_mode(Command): + def do(self) -> None: + if self.reader.pos < len(self.reader.buffer): + self.reader.pos += 1 + self.reader.enter_insert_mode() + + +class vi_append_eol(Command): + def do(self) -> None: + while self.reader.pos < len(self.reader.buffer): + if self.reader.buffer[self.reader.pos] == '\n': + break + self.reader.pos += 1 + self.reader.enter_insert_mode() + + +class vi_insert_bol(Command): + def do(self) -> None: + self.reader.pos = self.reader.first_non_whitespace() + self.reader.enter_insert_mode() + + +class vi_open_below(Command): + def do(self) -> None: + while self.reader.pos < len(self.reader.buffer): + if self.reader.buffer[self.reader.pos] == '\n': + break + self.reader.pos += 1 + + self.reader.insert('\n') + self.reader.enter_insert_mode() + +class vi_open_above(Command): + def do(self) -> None: + while self.reader.pos > 0 and self.reader.buffer[self.reader.pos - 1] != '\n': + self.reader.pos -= 1 + + self.reader.insert('\n') + self.reader.pos -= 1 + self.reader.enter_insert_mode() diff --git a/Lib/_pyrepl/historical_reader.py b/Lib/_pyrepl/historical_reader.py index c4b95fa2e81ee6..014ff3603086c1 100644 --- a/Lib/_pyrepl/historical_reader.py +++ b/Lib/_pyrepl/historical_reader.py @@ -257,19 +257,27 @@ def __post_init__(self) -> None: ) def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]: - return super().collect_keymap() + ( + bindings: list[tuple[KeySpec, CommandName]] = [ (r"\C-n", "next-history"), (r"\C-p", "previous-history"), (r"\C-o", "operate-and-get-next"), (r"\C-r", "reverse-history-isearch"), (r"\C-s", "forward-history-isearch"), - (r"\M-r", "restore-history"), - (r"\M-.", "yank-arg"), (r"\", "history-search-forward"), - (r"\x1b[6~", "history-search-forward"), (r"\", "history-search-backward"), - (r"\x1b[5~", "history-search-backward"), - ) + ] + + if not self.use_vi_mode: + bindings.extend( + [ + (r"\M-r", "restore-history"), + (r"\M-.", "yank-arg"), + (r"\x1b[6~", "history-search-forward"), + (r"\x1b[5~", "history-search-backward"), + ] + ) + + return super().collect_keymap() + tuple(bindings) def select_item(self, i: int) -> None: self.transient_history[self.historyi] = self.get_unicode() diff --git a/Lib/_pyrepl/reader.py b/Lib/_pyrepl/reader.py index 0ebd9162eca4bb..987e7144cc94d6 100644 --- a/Lib/_pyrepl/reader.py +++ b/Lib/_pyrepl/reader.py @@ -27,6 +27,8 @@ from contextlib import contextmanager from dataclasses import dataclass, field, fields +import enum + from . import commands, console, input from .utils import wlen, unbracket, disp_str, gen_colors, THEME from .trace import trace @@ -41,6 +43,22 @@ SYNTAX_WHITESPACE, SYNTAX_WORD, SYNTAX_SYMBOL = range(3) +class Mode(str, enum.Enum): + INSERT = "insert" + NORMAL = "normal" + + +class EditorMode: + def __init__(self, mode: Mode = Mode.INSERT) -> None: + self.mode = mode + + def is_insert(self) -> bool: + return self.mode == Mode.INSERT + + def is_normal(self) -> bool: + return self.mode == Mode.NORMAL + + def make_default_syntax_table() -> dict[str, int]: # XXX perhaps should use some unicodedata here? st: dict[str, int] = {} @@ -131,6 +149,67 @@ def make_default_commands() -> dict[CommandName, type[Command]]: ) +vi_insert_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple( + [binding for binding in default_keymap if not binding[0].startswith((r"\M-", r"\x1b", r"\EOF", r"\EOH"))] + + [(r"\", "vi-normal-mode")] +) + + +vi_normal_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple( + [ + # Basic motions + (r"h", "left"), + (r"j", "down"), + (r"k", "up"), + (r"l", "right"), + (r"0", "beginning-of-line"), + (r"$", "end-of-line"), + (r"w", "vi-forward-word"), + (r"b", "backward-word"), + (r"e", "end-of-word"), + (r"^", "first-non-whitespace-character"), + + # Edit commands + (r"x", "delete"), + (r"i", "vi-insert-mode"), + (r"a", "vi-append-mode"), + (r"A", "vi-append-eol"), + (r"I", "vi-insert-bol"), + (r"o", "vi-open-below"), + (r"O", "vi-open-above"), + + # Special keys still work in normal mode + (r"\", "left"), + (r"\", "right"), + (r"\", "up"), + (r"\", "down"), + (r"\", "beginning-of-line"), + (r"\", "end-of-line"), + (r"\", "delete"), + (r"\", "left"), + + # Control keys (important ones that work in both modes) + (r"\C-c", "interrupt"), + (r"\C-d", "delete"), + (r"\C-l", "clear-screen"), + (r"\C-r", "reverse-history-isearch"), + + # Digit args for counts (1-9, not 0 which is BOL) + (r"1", "digit-arg"), + (r"2", "digit-arg"), + (r"3", "digit-arg"), + (r"4", "digit-arg"), + (r"5", "digit-arg"), + (r"6", "digit-arg"), + (r"7", "digit-arg"), + (r"8", "digit-arg"), + (r"9", "digit-arg"), + + (r"\", "invalid-key"), + ] +) + + @dataclass(slots=True) class Reader: """The Reader class implements the bare bones of a command reader, @@ -214,6 +293,8 @@ class Reader: scheduled_commands: list[str] = field(default_factory=list) can_colorize: bool = False threading_hook: Callback | None = None + use_vi_mode: bool = False + editor_mode: EditorMode = field(default_factory=EditorMode) ## cached metadata to speed up screen refreshes @dataclass @@ -281,6 +362,11 @@ def __post_init__(self) -> None: self.last_refresh_cache.dimensions = (0, 0) def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]: + if self.use_vi_mode: + if self.editor_mode.is_insert(): + return vi_insert_keymap + elif self.editor_mode.is_normal(): + return vi_normal_keymap return default_keymap def calc_screen(self) -> list[str]: @@ -433,6 +519,57 @@ def eow(self, p: int | None = None) -> int: p += 1 return p + def vi_eow(self, p: int | None = None) -> int: + """Return the 0-based index of the last character of the word + following p most immediately (vi 'e' semantics). + + Unlike eow(), this returns the position ON the last word character, + not past it. p defaults to self.pos; word boundaries are determined + using self.syntax_table.""" + if p is None: + p = self.pos + st = self.syntax_table + b = self.buffer + + # If we're already at the end of a word, move past it + if (p < len(b) and st.get(b[p], SYNTAX_WORD) == SYNTAX_WORD and + (p + 1 >= len(b) or st.get(b[p + 1], SYNTAX_WORD) != SYNTAX_WORD)): + p += 1 + + # Skip non-word characters to find the start of next word + while p < len(b) and st.get(b[p], SYNTAX_WORD) != SYNTAX_WORD: + p += 1 + + # Move to the last character of this word (not past it) + while p + 1 < len(b) and st.get(b[p + 1], SYNTAX_WORD) == SYNTAX_WORD: + p += 1 + + # Clamp to valid buffer range + return min(p, len(b) - 1) if b else 0 + + def vi_forward_word(self, p: int | None = None) -> int: + """Return the 0-based index of the first character of the next word + (vi 'w' semantics). + + Unlike eow(), this lands ON the first character of the next word, + not past it. p defaults to self.pos; word boundaries are determined + using self.syntax_table.""" + if p is None: + p = self.pos + st = self.syntax_table + b = self.buffer + + # Skip the rest of the current word if we're on one + while p < len(b) and st.get(b[p], SYNTAX_WORD) == SYNTAX_WORD: + p += 1 + + # Skip non-word characters to find the start of next word + while p < len(b) and st.get(b[p], SYNTAX_WORD) != SYNTAX_WORD: + p += 1 + + # Clamp to valid buffer range + return min(p, len(b) - 1) if b else 0 + def bol(self, p: int | None = None) -> int: """Return the 0-based index of the line break preceding p most immediately. @@ -458,6 +595,18 @@ def eol(self, p: int | None = None) -> int: p += 1 return p + def first_non_whitespace(self, p: int | None = None) -> int: + """Return the 0-based index of the first non-whitespace character + on the current line. + + p defaults to self.pos.""" + bol_pos = self.bol(p) + eol_pos = self.eol(p) + pos = bol_pos + while pos < eol_pos and self.buffer[pos].isspace() and self.buffer[pos] != '\n': + pos += 1 + return pos + def max_column(self, y: int) -> int: """Return the last x-offset for line y""" return self.screeninfo[y][0] + sum(self.screeninfo[y][1]) @@ -481,9 +630,15 @@ def get_prompt(self, lineno: int, cursor_on_line: bool) -> str: elif self.paste_mode: prompt = "(paste) " elif "\n" in self.buffer: + newline_count = self.buffer.count("\n") + ends_with_newline = bool(self.buffer) and self.buffer[-1] == "\n" if lineno == 0: - prompt = self.ps2 - elif self.ps4 and lineno == self.buffer.count("\n"): + prompt = self.ps1 + elif lineno < newline_count: + prompt = self.ps3 + elif ends_with_newline and lineno == newline_count: + prompt = self.ps3 + elif self.ps4 and lineno == newline_count: prompt = self.ps4 else: prompt = self.ps3 @@ -589,6 +744,8 @@ def prepare(self) -> None: self.pos = 0 self.dirty = True self.last_command = None + if self.use_vi_mode: + self.enter_insert_mode() self.calc_screen() except BaseException: self.restore() @@ -760,3 +917,38 @@ def bind(self, spec: KeySpec, command: CommandName) -> None: def get_unicode(self) -> str: """Return the current buffer as a unicode string.""" return "".join(self.buffer) + + def enter_insert_mode(self) -> None: + if self.editor_mode.is_insert(): + return + + self.editor_mode.mode = Mode.INSERT + + # Switch translator to insert mode keymap + self.keymap = self.collect_keymap() + self.input_trans = input.KeymapTranslator( + self.keymap, invalid_cls="invalid-key", character_cls="self-insert" + ) + + self.dirty = True + + def enter_normal_mode(self) -> None: + if self.editor_mode.is_normal(): + return + + self.editor_mode.mode = Mode.NORMAL + + # Switch translator to normal mode keymap + self.keymap = self.collect_keymap() + self.input_trans = input.KeymapTranslator( + self.keymap, invalid_cls="invalid-key", character_cls="invalid-key" + ) + + # In vi normal mode, cursor should be ON a character, not after the last one + # If we're past the end of line, move back to the last character + bol_pos = self.bol() + eol_pos = self.eol() + if self.pos >= eol_pos and eol_pos > bol_pos: + self.pos = eol_pos - 1 + + self.dirty = True diff --git a/Lib/_pyrepl/readline.py b/Lib/_pyrepl/readline.py index 23b8fa6b9c7625..c9467ba742278f 100644 --- a/Lib/_pyrepl/readline.py +++ b/Lib/_pyrepl/readline.py @@ -65,7 +65,6 @@ MoreLinesCallable = Callable[[str], bool] - __all__ = [ "add_history", "clear_history", @@ -344,6 +343,10 @@ def do(self) -> None: # ____________________________________________________________ +def _is_vi_mode_enabled() -> bool: + return os.environ.get("PYREPL_VI_MODE", "").lower() in {"1", "true", "on", "yes"} + + @dataclass(slots=True) class _ReadlineWrapper: f_in: int = -1 @@ -362,7 +365,11 @@ def __post_init__(self) -> None: def get_reader(self) -> ReadlineAlikeReader: if self.reader is None: console = Console(self.f_in, self.f_out, encoding=ENCODING) - self.reader = ReadlineAlikeReader(console=console, config=self.config) + self.reader = ReadlineAlikeReader( + console=console, + config=self.config, + use_vi_mode=_is_vi_mode_enabled() + ) return self.reader def input(self, prompt: object = "") -> str: diff --git a/Lib/_pyrepl/unix_console.py b/Lib/_pyrepl/unix_console.py index 09247de748ee3b..04456ca35cad22 100644 --- a/Lib/_pyrepl/unix_console.py +++ b/Lib/_pyrepl/unix_console.py @@ -419,6 +419,21 @@ def get_event(self, block: bool = True) -> Event | None: return None while self.event_queue.empty(): + # Check if we have a pending escape sequence that needs timeout handling + if self.event_queue.has_pending_escape_sequence(): + current_time_ms = time.monotonic() * 1000 + + if self.event_queue.should_emit_standalone_escape(current_time_ms): + # Timeout expired - emit the ESC as a standalone key + self.event_queue.emit_standalone_escape() + break + + if not self.wait(timeout=10): + current_time_ms = time.monotonic() * 1000 + if self.event_queue.should_emit_standalone_escape(current_time_ms): + self.event_queue.emit_standalone_escape() + continue + while True: try: self.push_char(self.__read(1)) @@ -445,6 +460,7 @@ def wait(self, timeout: float | None = None) -> bool: or bool(self.pollob.poll(timeout)) ) + def set_cursor_vis(self, visible): """ Set the visibility of the cursor. diff --git a/Lib/_pyrepl/windows_console.py b/Lib/_pyrepl/windows_console.py index c56dcd6d7dd434..b16817a31e16bc 100644 --- a/Lib/_pyrepl/windows_console.py +++ b/Lib/_pyrepl/windows_console.py @@ -446,6 +446,24 @@ def get_event(self, block: bool = True) -> Event | None: return None while self.event_queue.empty(): + # Check if we have a pending escape sequence that needs timeout handling + if self.event_queue.has_pending_escape_sequence(): + import time + current_time_ms = time.monotonic() * 1000 + + if self.event_queue.should_emit_standalone_escape(current_time_ms): + # Timeout expired - emit the ESC as a standalone key + self.event_queue.emit_standalone_escape() + break + + # Wait for a short time to check for more input + if not self.wait(timeout=10): + # Check again after timeout + current_time_ms = time.monotonic() * 1000 + if self.event_queue.should_emit_standalone_escape(current_time_ms): + self.event_queue.emit_standalone_escape() + continue + rec = self._read_input() if rec is None: return None @@ -583,6 +601,7 @@ def repaint(self) -> None: raise NotImplementedError("No repaint support") + # Windows interop class CONSOLE_SCREEN_BUFFER_INFO(Structure): _fields_ = [ diff --git a/Lib/test/test_pyrepl/support.py b/Lib/test/test_pyrepl/support.py index 4f7f9d77933336..c5ce4a066ab3c5 100644 --- a/Lib/test/test_pyrepl/support.py +++ b/Lib/test/test_pyrepl/support.py @@ -83,6 +83,14 @@ def get_prompt(lineno, cursor_on_line) -> str: return reader +def prepare_vi_reader(console: Console, **kwargs): + reader = prepare_reader(console, **kwargs) + reader.use_vi_mode = True + reader.enter_normal_mode() + reader.enter_insert_mode() + return reader + + def prepare_console(events: Iterable[Event], **kwargs) -> MagicMock | Console: console = MagicMock() console.get_event.side_effect = events diff --git a/Lib/test/test_pyrepl/test_reader.py b/Lib/test/test_pyrepl/test_reader.py index b1b6ae16a1e592..dc27421b72b4ad 100644 --- a/Lib/test/test_pyrepl/test_reader.py +++ b/Lib/test/test_pyrepl/test_reader.py @@ -8,12 +8,11 @@ from .support import handle_all_events, handle_events_narrow_console from .support import ScreenEqualMixin, code_to_events -from .support import prepare_reader, prepare_console +from .support import prepare_reader, prepare_console, prepare_vi_reader from _pyrepl.console import Event from _pyrepl.reader import Reader from _colorize import default_theme - overrides = {"reset": "z", "soft_keyword": "K"} colors = {overrides.get(k, k[0].lower()): v for k, v in default_theme.syntax.items()} @@ -552,9 +551,444 @@ def test_syntax_highlighting_literal_brace_in_fstring_or_tstring(self): self.maxDiff=None self.assert_screen_equal(reader, expected) + def test_vi_escape_switches_to_normal_mode(self): + events = itertools.chain( + code_to_events("hello"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + Event(evt="key", data="h", raw=bytearray(b"h")), + ], + ) + reader, _ = handle_all_events( + events, + prepare_reader=prepare_vi_reader, + ) + self.assertEqual(reader.get_unicode(), "hello") + self.assertTrue(reader.editor_mode.is_normal()) + self.assertEqual(reader.pos, len("hello") - 2) # After 'h' left movement + def test_control_characters(self): code = 'flag = "🏳️‍🌈"' events = code_to_events(code) reader, _ = handle_all_events(events) self.assert_screen_equal(reader, 'flag = "🏳️\\u200d🌈"', clean=True) self.assert_screen_equal(reader, 'flag {o}={z} {s}"🏳️\\u200d🌈"{z}'.format(**colors)) + + +@force_not_colorized_test_class +class TestViMode(TestCase): + def _run_vi(self, events, prepare_reader_hook=prepare_vi_reader): + return handle_all_events(events, prepare_reader=prepare_reader_hook) + + def test_insert_typing_and_ctrl_a_e(self): + events = itertools.chain( + code_to_events("hello"), + [ + Event(evt="key", data="\x01", raw=bytearray(b"\x01")), # Ctrl-A + Event(evt="key", data="X", raw=bytearray(b"X")), + Event(evt="key", data="\x05", raw=bytearray(b"\x05")), # Ctrl-E + Event(evt="key", data="!", raw=bytearray(b"!")), + ], + ) + reader, _ = self._run_vi(events) + self.assertEqual(reader.get_unicode(), "Xhello!") + self.assertTrue(reader.editor_mode.is_insert()) + + def test_escape_switches_to_normal_mode_and_is_idempotent(self): + events = itertools.chain( + code_to_events("hello"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + Event(evt="key", data="h", raw=bytearray(b"h")), + ], + ) + reader, _ = self._run_vi(events) + self.assertEqual(reader.get_unicode(), "hello") + self.assertTrue(reader.editor_mode.is_normal()) + self.assertEqual(reader.pos, len("hello") - 2) # After 'h' left movement + + def test_normal_mode_motion_and_edit_commands(self): + events = itertools.chain( + code_to_events("hello"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC + Event(evt="key", data="0", raw=bytearray(b"0")), # go to bol + Event(evt="key", data="l", raw=bytearray(b"l")), # right + Event(evt="key", data="x", raw=bytearray(b"x")), # delete + Event(evt="key", data="a", raw=bytearray(b"a")), # append + Event(evt="key", data="!", raw=bytearray(b"!")), + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + ], + ) + reader, _ = self._run_vi(events) + self.assertEqual(reader.get_unicode(), "hl!lo") + self.assertTrue(reader.editor_mode.is_normal()) + + def test_open_below_and_above(self): + events = itertools.chain( + code_to_events("first"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + Event(evt="key", data="o", raw=bytearray(b"o")), + ], + code_to_events("second"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + Event(evt="key", data="O", raw=bytearray(b"O")), + ], + code_to_events("zero"), + [Event(evt="key", data="\x1b", raw=bytearray(b"\x1b"))], + ) + reader, _ = self._run_vi(events) + self.assertEqual(reader.get_unicode(), "first\nzero\nsecond") + + def test_mode_resets_to_insert_on_prepare(self): + events = itertools.chain( + code_to_events("text"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + ], + ) + reader, console = self._run_vi(events) + self.assertTrue(reader.editor_mode.is_normal()) + reader.prepare() + self.assertTrue(reader.editor_mode.is_insert()) + console.prepare.assert_called() # ensure console prepare called again + + def test_translator_stack_preserves_mode(self): + events_insert_path = itertools.chain( + code_to_events("hello"), + [ + Event(evt="key", data="\x12", raw=bytearray(b"\x12")), # Ctrl-R + Event(evt="key", data="h", raw=bytearray(b"h")), + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + ], + ) + reader, _ = self._run_vi(events_insert_path) + self.assertTrue(reader.editor_mode.is_insert()) + + events_normal_path = itertools.chain( + code_to_events("hello"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + Event(evt="key", data="\x12", raw=bytearray(b"\x12")), + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + ], + ) + reader, _ = self._run_vi(events_normal_path) + self.assertTrue(reader.editor_mode.is_normal()) + + def test_insert_bol_and_append_eol(self): + events = itertools.chain( + code_to_events("hello"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC to normal + Event(evt="key", data="I", raw=bytearray(b"I")), # Insert at BOL + Event(evt="key", data="[", raw=bytearray(b"[")), + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # Back to normal + Event(evt="key", data="A", raw=bytearray(b"A")), # Append at EOL + Event(evt="key", data="]", raw=bytearray(b"]")), + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + ], + ) + reader, _ = self._run_vi(events) + self.assertEqual(reader.get_unicode(), "[hello]") + self.assertTrue(reader.editor_mode.is_normal()) + + def test_insert_mode_from_normal(self): + events = itertools.chain( + code_to_events("hello"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC to normal + Event(evt="key", data="0", raw=bytearray(b"0")), # Go to beginning + Event(evt="key", data="l", raw=bytearray(b"l")), # Move right + Event(evt="key", data="l", raw=bytearray(b"l")), # Move right again + Event(evt="key", data="i", raw=bytearray(b"i")), # Insert mode + Event(evt="key", data="X", raw=bytearray(b"X")), + ], + ) + reader, _ = self._run_vi(events) + self.assertEqual(reader.get_unicode(), "heXllo") + self.assertTrue(reader.editor_mode.is_insert()) + + def test_hjkl_motions(self): + events = itertools.chain( + code_to_events("hello"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC to normal + Event(evt="key", data="0", raw=bytearray(b"0")), # Go to start of line + Event(evt="key", data="l", raw=bytearray(b"l")), # Right (h->e) + Event(evt="key", data="l", raw=bytearray(b"l")), # Right (e->l) + Event(evt="key", data="h", raw=bytearray(b"h")), # Left (l->e) + Event(evt="key", data="x", raw=bytearray(b"x")), # Delete 'e' + ], + ) + reader, _ = self._run_vi(events) + self.assertEqual(reader.get_unicode(), "hllo") + self.assertTrue(reader.editor_mode.is_normal()) + + def test_dollar_end_of_line(self): + events = itertools.chain( + code_to_events("hello"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC + Event(evt="key", data="0", raw=bytearray(b"0")), # Beginning + Event(evt="key", data="$", raw=bytearray(b"$")), # End (on last char) + Event(evt="key", data="x", raw=bytearray(b"x")), # Delete 'o' + ], + ) + reader, _ = self._run_vi(events) + self.assertEqual(reader.get_unicode(), "hell") + + def test_word_motions(self): + events = itertools.chain( + code_to_events("one two"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC + Event(evt="key", data="0", raw=bytearray(b"0")), # Beginning + Event(evt="key", data="w", raw=bytearray(b"w")), # Forward word + Event(evt="key", data="x", raw=bytearray(b"x")), # Delete first char of 'two' + ], + ) + reader, _ = self._run_vi(events) + self.assertIn("one", reader.get_unicode()) + self.assertNotEqual(reader.get_unicode(), "one two") # Something was deleted + + def test_repeat_counts(self): + events = itertools.chain( + code_to_events("abcdefghij"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC + Event(evt="key", data="0", raw=bytearray(b"0")), # Beginning + Event(evt="key", data="3", raw=bytearray(b"3")), # Count 3 + Event(evt="key", data="l", raw=bytearray(b"l")), # Move right 3 times + Event(evt="key", data="2", raw=bytearray(b"2")), # Count 2 + Event(evt="key", data="x", raw=bytearray(b"x")), # Delete 2 chars (d, e) + ], + ) + reader, _ = self._run_vi(events) + self.assertEqual(reader.get_unicode(), "abcfghij") + self.assertTrue(reader.editor_mode.is_normal()) + + def test_multiline_navigation(self): + # Test j/k navigation across multiple lines + code = "first\nsecond\nthird" + events = itertools.chain( + code_to_events(code), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC + Event(evt="key", data="k", raw=bytearray(b"k")), # Up to "second" + Event(evt="key", data="0", raw=bytearray(b"0")), # Beginning of line + Event(evt="key", data="x", raw=bytearray(b"x")), # Delete 's' + Event(evt="key", data="j", raw=bytearray(b"j")), # Down to "third" + Event(evt="key", data="0", raw=bytearray(b"0")), # Beginning + Event(evt="key", data="x", raw=bytearray(b"x")), # Delete 't' + ], + ) + reader, _ = self._run_vi(events) + self.assertEqual(reader.get_unicode(), "first\necond\nhird") + + def test_arrow_keys_in_normal_mode(self): + events = itertools.chain( + code_to_events("test"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC + Event(evt="key", data="left", raw=bytearray(b"\x1b[D")), # Left arrow + Event(evt="key", data="left", raw=bytearray(b"\x1b[D")), # Left arrow + Event(evt="key", data="x", raw=bytearray(b"x")), # Delete 'e' + ], + ) + reader, _ = self._run_vi(events) + self.assertEqual(reader.get_unicode(), "tst") + + def test_escape_in_normal_mode_is_noop(self): + events = itertools.chain( + code_to_events("hello"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC to normal + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC again (no-op) + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC again (no-op) + ], + ) + reader, _ = self._run_vi(events) + self.assertTrue(reader.editor_mode.is_normal()) + self.assertEqual(reader.get_unicode(), "hello") + + def test_backspace_in_normal_mode(self): + events = itertools.chain( + code_to_events("abcd"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC + Event(evt="key", data="\x7f", raw=bytearray(b"\x7f")), # Backspace + Event(evt="key", data="\x7f", raw=bytearray(b"\x7f")), # Backspace again + ], + ) + reader, _ = self._run_vi(events) + self.assertTrue(reader.editor_mode.is_normal()) + self.assertIsNotNone(reader.get_unicode()) + + def test_end_of_word_motion(self): + events = itertools.chain( + code_to_events("hello world test"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC + Event(evt="key", data="0", raw=bytearray(b"0")), # Beginning + Event(evt="key", data="e", raw=bytearray(b"e")), # End of "hello" + ], + ) + reader, _ = self._run_vi(events) + # Should be on 'o' of "hello" (last char of word) + self.assertEqual(reader.pos, 4) + self.assertEqual(reader.buffer[reader.pos], 'o') + + # Test multiple 'e' commands + events2 = itertools.chain( + code_to_events("one two three"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + Event(evt="key", data="0", raw=bytearray(b"0")), + Event(evt="key", data="e", raw=bytearray(b"e")), # End of "one" + Event(evt="key", data="e", raw=bytearray(b"e")), # End of "two" + ], + ) + reader2, _ = self._run_vi(events2) + # Should be on 'o' of "two" + self.assertEqual(reader2.buffer[reader2.pos], 'o') + + def test_backward_word_motion(self): + # Test from end of buffer + events = itertools.chain( + code_to_events("one two"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC at end + Event(evt="key", data="b", raw=bytearray(b"b")), # Back to start of "two" + ], + ) + reader, _ = self._run_vi(events) + self.assertEqual(reader.pos, 4) # At 't' of "two" + self.assertEqual(reader.buffer[reader.pos], 't') + + # Test multiple backwards + events2 = itertools.chain( + code_to_events("one two three"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC + Event(evt="key", data="b", raw=bytearray(b"b")), # Back to "three" + Event(evt="key", data="b", raw=bytearray(b"b")), # Back to "two" + Event(evt="key", data="b", raw=bytearray(b"b")), # Back to "one" + ], + ) + reader2, _ = self._run_vi(events2) + # Should be at beginning of "one" + self.assertEqual(reader2.pos, 0) + self.assertEqual(reader2.buffer[reader2.pos], 'o') + + def test_first_non_whitespace_character(self): + events = itertools.chain( + code_to_events(" hello world"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), # ESC + Event(evt="key", data="^", raw=bytearray(b"^")), # First non-ws + ], + ) + reader, _ = self._run_vi(events) + # Should be at 'h' of "hello", skipping the 3 spaces + self.assertEqual(reader.pos, 3) + self.assertEqual(reader.buffer[reader.pos], 'h') + + # Test with tabs and spaces + events2 = itertools.chain( + code_to_events("\t text"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + Event(evt="key", data="0", raw=bytearray(b"0")), # Go to BOL first + Event(evt="key", data="^", raw=bytearray(b"^")), # Then to first non-ws + ], + ) + reader2, _ = self._run_vi(events2) + self.assertEqual(reader2.buffer[reader2.pos], 't') + + def test_word_motion_edge_cases(self): + # Test with punctuation - underscore should be a word boundary + events = itertools.chain( + code_to_events("hello_world"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + Event(evt="key", data="0", raw=bytearray(b"0")), + Event(evt="key", data="w", raw=bytearray(b"w")), # Forward word + ], + ) + reader, _ = self._run_vi(events) + # 'w' moves to next word, underscore is not alphanumeric so treated as boundary + self.assertIn(reader.pos, [5, 6]) # Could be on '_' or 'w' depending on implementation + + # Test 'e' at end of buffer stays in bounds + events2 = itertools.chain( + code_to_events("end"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + Event(evt="key", data="e", raw=bytearray(b"e")), # Already at end of word + Event(evt="key", data="e", raw=bytearray(b"e")), # Should stay in bounds + ], + ) + reader2, _ = self._run_vi(events2) + # Should not go past end of buffer + self.assertLessEqual(reader2.pos, len(reader2.buffer) - 1) + + # Test 'b' at beginning doesn't crash + events3 = itertools.chain( + code_to_events("start"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + Event(evt="key", data="0", raw=bytearray(b"0")), + Event(evt="key", data="b", raw=bytearray(b"b")), # Should stay at 0 + ], + ) + reader3, _ = self._run_vi(events3) + self.assertEqual(reader3.pos, 0) + + def test_repeat_count_with_word_motions(self): + events = itertools.chain( + code_to_events("one two three four"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + Event(evt="key", data="0", raw=bytearray(b"0")), + Event(evt="key", data="2", raw=bytearray(b"2")), # Count 2 + Event(evt="key", data="w", raw=bytearray(b"w")), # Forward 2 words + ], + ) + reader, _ = self._run_vi(events) + # Should be at start of "three" (2 words forward from "one") + self.assertEqual(reader.buffer[reader.pos], 't') # 't' of "three" + + # Test with 'e' + events2 = itertools.chain( + code_to_events("alpha beta gamma"), + [ + Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")), + Event(evt="key", data="0", raw=bytearray(b"0")), + Event(evt="key", data="2", raw=bytearray(b"2")), + Event(evt="key", data="e", raw=bytearray(b"e")), # End of 2nd word + ], + ) + reader2, _ = self._run_vi(events2) + # Should be at end of "beta" + self.assertEqual(reader2.buffer[reader2.pos], 'a') # Last 'a' of "beta" + + +@force_not_colorized_test_class +class TestHistoricalReaderBindings(TestCase): + def test_meta_bindings_present_only_in_emacs_mode(self): + console = prepare_console(iter(())) + reader = prepare_reader(console) + emacs_keymap = dict(reader.collect_keymap()) + self.assertIn(r"\M-r", emacs_keymap) + self.assertIn(r"\x1b[6~", emacs_keymap) + + reader.use_vi_mode = True + reader.enter_insert_mode() + vi_keymap = dict(reader.collect_keymap()) + self.assertNotIn(r"\M-r", vi_keymap) + self.assertNotIn(r"\x1b[6~", vi_keymap) + self.assertIn(r"\C-r", vi_keymap)