diff --git a/README.md b/README.md index 342451e..c5a750c 100644 --- a/README.md +++ b/README.md @@ -208,8 +208,89 @@ cd Mouser python -m venv .venv ``` -
-Windows +### Dependencies + +| Package | Purpose | +|---|---| +| `PySide6` | Qt Quick / QML UI framework | +| `hidapi` | HID++ communication with the mouse (gesture button, DPI) | +| `Pillow` | Image processing for icon generation | +| `pyobjc-framework-Quartz` | macOS CGEventTap / Quartz event support | +| `pyobjc-framework-Cocoa` | macOS app detection and media-key support | +| `evdev` | Linux mouse grab and virtual device forwarding (uinput) | +| `pyobjc-core` | Core PyObjC runtime required by the macOS framework bindings | +| `PyYAML` | YAML parsing for CLI config import | +| `jsonschema` | JSON Schema validation for imported CLI configs | +| `jsonschema-specifications` | Bundled JSON Schema metaschemas used by `jsonschema` | +| `referencing` | Reference resolution support used by `jsonschema` | +| `rpds-py` | Performance-oriented data structures used by `jsonschema` | +| `attrs` | Shared utility dependency used by the schema-validation stack | +| `typing-extensions` | Backported typing helpers required by some dependencies | +| `pyinstaller` | Build-time dependency for packaging standalone app bundles | + +### Linux Device Permissions + +Mouser's Linux portable build runs as a normal user. HID++ features need +Logitech `hidraw` access, while button remapping needs readable +`/dev/input/event*` nodes and writable `/dev/uinput`. If Mouser sees the mouse +only when launched with `sudo`, install the bundled udev rule instead of +running the app as root: + +```bash +cd /path/to/extracted/Mouser +./install-linux-permissions.sh +``` + +When running from source, use the same helper from the checkout: + +```bash +./packaging/linux/install-linux-permissions.sh +``` + +The helper installs `69-mouser-logitech.rules`, reloads udev, and tries to load +`uinput`. Reconnect the mouse, fully quit Mouser, and launch it normally. If a +desktop launcher or autostart entry still cannot access the devices, log out and +back in once so the session receives fresh device ACLs. On systems without +logind/uaccess support, adding the user to the `input` group may still be +required as a distro-specific fallback. + +### Running + +```bash +# Option A: Run directly +python main_qml.py + +# Option B: Start directly in the tray / menu bar +python main_qml.py --start-hidden + +# Option C: Use the batch file (shows a console window) +Mouser.bat + +# Option D: Use the desktop shortcut (no console window) +# Double-click Mouser.lnk + +# Option E: Use the CLI +python main_cli.py [...args] +``` + +> **Tip:** To run without a console window, use `pythonw.exe main_qml.py` or the `.lnk` shortcut. +> On macOS, `--start-hidden` is the same tray-first startup path used when you launch Mouser directly in the background. The login item uses your saved startup settings. + +Temporary macOS transport override for debugging: + +```bash +python main_qml.py --hid-backend=iokit +python main_qml.py --hid-backend=hidapi +python main_qml.py --hid-backend=auto +``` + +Use this only for troubleshooting. On macOS, Mouser now defaults to `iokit`; +`hidapi` and `auto` remain available as manual overrides for debugging. Other +platforms continue to default to `auto`. + +### Creating a Desktop Shortcut + +A `Mouser.lnk` shortcut is included. To create one manually: ```powershell .\.venv\Scripts\activate @@ -306,9 +387,10 @@ For project layout, the architecture diagram, the HID++ gesture detector, the En - [ ] **Improved scroll inversion** — explore driver-level or interception-driver approaches - [ ] **Gesture swipe tuning** — improve swipe reliability and defaults across more devices - [ ] **Per-app profile auto-creation** — detect new apps and prompt to create a profile -- [ ] **Export / import config** — share configurations between machines -- [ ] **Tray icon badge** — show the active profile name in the tray tooltip -- [ ] **Broader Wayland support** — extend app detection beyond X11 / KDE and validate across more distros +- [x] **Export/import config** — share configurations between machines (CLI only for now) +- [ ] **Tray icon badge** — show active profile name in tray tooltip +- [x] **macOS support** — added via CGEventTap, Quartz CGEvent, and NSWorkspace +- [ ] **Broader Wayland support and Linux validation** — extend app detection beyond KDE Wayland / X11 and validate across more distros and desktop environments - [ ] **Plugin system** — allow third-party action providers --- diff --git a/core/config.py b/core/config.py index 6b3ca47..e5b73cf 100644 --- a/core/config.py +++ b/core/config.py @@ -92,6 +92,7 @@ "start_minimized": True, "start_at_login": False, "hscroll_threshold": 1, + "hscroll_cooldown_ms": 350, "invert_hscroll": False, # swap horizontal scroll directions "invert_vscroll": False, # swap vertical scroll directions "dpi": 1000, # pointer speed / DPI setting diff --git a/core/config_validation.py b/core/config_validation.py new file mode 100644 index 0000000..c95c40e --- /dev/null +++ b/core/config_validation.py @@ -0,0 +1,289 @@ +""" +Schema-first config validation helpers shared by config import paths. +""" + +from __future__ import annotations + +import json +from functools import lru_cache +from typing import Any + +from jsonschema import Draft202012Validator +from core.config import ( + DEFAULT_CONFIG, + PROFILE_BUTTON_NAMES, + _merge_defaults, + _migrate, + _validate_types, +) +from core.device_layouts import get_manual_layout_choices +from core.logi_devices import DEFAULT_DPI_MAX, DEFAULT_DPI_MIN + +_VALID_BUTTON_KEYS = set(PROFILE_BUTTON_NAMES) +_VALID_LAYOUT_OVERRIDE_KEYS = { + choice["key"] for choice in get_manual_layout_choices() if choice["key"] +} + + +class ConfigValidationError(ValueError): + """Human-readable config validation error.""" + + +def _schema_path(path: str, key: str) -> str: + return f"{path}.{key}" if path else key + + +CONFIG_SCHEMA: dict[str, Any] = { + "type": "object", + "additionalProperties": False, + "required": ["version", "active_profile", "profiles", "settings"], + "properties": { + "version": {"type": "integer", "minimum": 1}, + "active_profile": {"type": "string", "minLength": 1}, + "profiles": { + "type": "object", + "minProperties": 1, + "additionalProperties": { + "type": "object", + "additionalProperties": False, + "required": ["label", "apps", "mappings"], + "properties": { + "label": {"type": "string", "minLength": 1}, + "apps": { + "type": "array", + "items": {"type": "string", "minLength": 1}, + }, + "mappings": { + "type": "object", + "additionalProperties": {"type": "string", "minLength": 1}, + }, + }, + }, + }, + "settings": { + "type": "object", + "additionalProperties": False, + "required": list(DEFAULT_CONFIG["settings"].keys()), + "properties": { + "start_minimized": {"type": "boolean"}, + "start_at_login": {"type": "boolean"}, + "hscroll_threshold": {"type": "number", "minimum": 0}, + "hscroll_cooldown_ms": {"type": "number", "minimum": 0}, + "invert_hscroll": {"type": "boolean"}, + "invert_vscroll": {"type": "boolean"}, + "dpi": { + "type": "integer", + "minimum": DEFAULT_DPI_MIN, + "maximum": DEFAULT_DPI_MAX, + }, + "smart_shift_mode": { + "type": "string", + "enum": ["ratchet", "freespin"], + }, + "smart_shift_enabled": {"type": "boolean"}, + "smart_shift_threshold": { + "type": "integer", + "minimum": 1, + "maximum": 50, + }, + "gesture_threshold": {"type": "integer", "minimum": 5}, + "gesture_deadzone": {"type": "integer", "minimum": 0}, + "gesture_timeout_ms": {"type": "integer", "minimum": 250}, + "gesture_cooldown_ms": {"type": "integer", "minimum": 0}, + "appearance_mode": { + "type": "string", + "enum": ["system", "light", "dark"], + }, + "debug_mode": {"type": "boolean"}, + "device_layout_overrides": { + "type": "object", + "additionalProperties": {"type": "string", "minLength": 1}, + }, + "language": {"type": "string", "minLength": 1}, + "ignore_trackpad": {"type": "boolean"}, + "dpi_presets": { + "type": "array", + "minItems": 1, + "items": { + "type": "integer", + "minimum": DEFAULT_DPI_MIN, + "maximum": DEFAULT_DPI_MAX, + }, + }, + }, + }, + }, +} + + +def _display_path(path: str) -> str: + return path or "config" + + +@lru_cache(maxsize=1) +def _schema_validator() -> Draft202012Validator: + return Draft202012Validator(CONFIG_SCHEMA) + + +def _error_path(error) -> str: + parts: list[str] = [] + for part in error.absolute_path: + if isinstance(part, int): + if parts: + parts[-1] = f"{parts[-1]}[{part}]" + else: + parts.append(f"[{part}]") + else: + parts.append(str(part)) + return ".".join(parts) + + +def _format_schema_error(error) -> str: + path = _display_path(_error_path(error)) + validator = error.validator + + if validator == "additionalProperties": + extras = sorted(set(error.instance) - set(error.schema.get("properties", {}))) + if extras: + return f"Unknown key at {_schema_path(_error_path(error), extras[0])}" + + if validator == "required": + missing = error.message.split("'")[1] + return f"{path} is missing required key '{missing}'" + + if validator == "type": + return f"{path} must be a {error.validator_value}" + + if validator == "enum": + allowed = ", ".join(repr(item) for item in error.validator_value) + return f"{path} must be one of: {allowed}" + + if validator == "minLength": + if error.validator_value == 1: + return f"{path} must not be empty" + return f"{path} must be at least {error.validator_value} characters" + + if validator == "minimum": + return f"{path} must be >= {error.validator_value}" + + if validator == "maximum": + return f"{path} must be <= {error.validator_value}" + + if validator == "minItems": + return ( + f"{path} must contain at least {error.validator_value} item" + f"{'' if error.validator_value == 1 else 's'}" + ) + + if validator == "minProperties": + return ( + f"{path} must contain at least {error.validator_value} entr" + f"{'y' if error.validator_value == 1 else 'ies'}" + ) + + return error.message + + +@lru_cache(maxsize=1) +def _action_metadata() -> tuple[set[str], set[str]]: + from core.key_simulator import ACTIONS, valid_custom_key_names + + return set(ACTIONS), set(valid_custom_key_names()) + + +def _validate_custom_action(action_id: str, path: str) -> None: + _, valid_custom_keys = _action_metadata() + parts = [part.strip().lower() for part in action_id[7:].split("+")] + if not parts or any(not part for part in parts): + raise ConfigValidationError( + f"{path} must contain at least one valid key in custom shortcut" + ) + invalid = [part for part in parts if part not in valid_custom_keys] + if invalid: + raise ConfigValidationError( + f"{path} contains unknown custom key(s): {', '.join(sorted(set(invalid)))}" + ) + + +def _validate_action_id(action_id: str, path: str) -> None: + valid_action_ids, _ = _action_metadata() + if action_id.startswith("custom:"): + _validate_custom_action(action_id, path) + return + if action_id not in valid_action_ids: + raise ConfigValidationError(f"{path} has unknown action '{action_id}'. Did you mean 'custom:{action_id}'?") + + +def validate_config(cfg: dict[str, Any]) -> None: + errors = sorted(_schema_validator().iter_errors(cfg), key=lambda e: (list(e.absolute_path), e.validator)) + if errors: + raise ConfigValidationError(_format_schema_error(errors[0])) + + active_profile = cfg["active_profile"] + profiles = cfg["profiles"] + if "default" not in profiles: + raise ConfigValidationError("Config must define a `default` profile") + if active_profile not in profiles: + raise ConfigValidationError( + f"Active profile '{active_profile}' not found in profiles" + ) + + for profile_name, profile in profiles.items(): + mappings = profile["mappings"] + for button_key, action_id in mappings.items(): + if button_key not in _VALID_BUTTON_KEYS: + raise ConfigValidationError( + f"profiles.{profile_name}.mappings.{button_key} is not a valid button mapping" + ) + _validate_action_id( + action_id, + f"profiles.{profile_name}.mappings.{button_key}", + ) + + overrides = cfg["settings"]["device_layout_overrides"] + for device_key, layout_key in overrides.items(): + if layout_key not in _VALID_LAYOUT_OVERRIDE_KEYS: + raise ConfigValidationError( + f"settings.device_layout_overrides.{device_key} has unknown layout '{layout_key}'" + ) + + +def normalize_config(raw_cfg: Any) -> dict[str, Any]: + """Return a migrated, default-filled, strictly validated config dict.""" + if not isinstance(raw_cfg, dict): + raise ConfigValidationError("Config document must be an object") + cfg = json.loads(json.dumps(raw_cfg)) + profiles = cfg.get("profiles") + if isinstance(profiles, dict) and "default" not in profiles: + raise ConfigValidationError("Config must define a `default` profile") + if "version" not in cfg: + cfg["version"] = DEFAULT_CONFIG["version"] + cfg = _migrate(cfg) + cfg = _merge_defaults(cfg, DEFAULT_CONFIG) + validate_config(cfg) + cfg = _validate_types(cfg, DEFAULT_CONFIG) + return cfg + + +def assemble_full_config(config: dict[str, Any]) -> dict[str, Any]: + active_profile = config.get("active_profile") + profiles = config.get("profiles") + if active_profile is None or not isinstance(profiles, dict): + raise ConfigValidationError("Config must specify an `active_profile`") + if active_profile not in profiles: + raise ConfigValidationError( + f"Active profile '{active_profile}' not found in profiles" + ) + if "default" not in profiles: + raise ConfigValidationError("Config must define a `default` profile") + if profiles["default"].get("apps") != []: + raise ConfigValidationError("Default profile must have an empty `apps` list") + + default_mappings = profiles["default"]["mappings"] + for profile_name, profile in profiles.items(): + if profile_name == "default": + continue + for mapping in default_mappings: + if mapping not in profile["mappings"]: + profile["mappings"][mapping] = default_mappings[mapping] + return config diff --git a/core/engine.py b/core/engine.py index 906165b..879b8b5 100644 --- a/core/engine.py +++ b/core/engine.py @@ -331,7 +331,11 @@ def handler(event): threshold = self._hscroll_threshold() now = getattr(event, "timestamp", None) or time.time() - cooldown = HSCROLL_VOLUME_COOLDOWN_S if action_id in _VOLUME_ACTIONS else HSCROLL_ACTION_COOLDOWN_S + cooldown = ( + HSCROLL_VOLUME_COOLDOWN_S + if action_id in _VOLUME_ACTIONS + else self._hscroll_action_cooldown() + ) if now - state["last_fire_at"] < cooldown: state["accum"] = 0.0 return @@ -363,6 +367,12 @@ def _hscroll_threshold(self): float(self.cfg.get("settings", {}).get("hscroll_threshold", 1)), ) + def _hscroll_action_cooldown(self): + return max( + 0.0, + float(self.cfg.get("settings", {}).get("hscroll_cooldown_ms", 350)) / 1000.0, + ) + # ------------------------------------------------------------------ # Per-app auto-switching # ------------------------------------------------------------------ diff --git a/core/hid_gesture.py b/core/hid_gesture.py index bbd66d9..4cf82f0 100644 --- a/core/hid_gesture.py +++ b/core/hid_gesture.py @@ -684,6 +684,11 @@ def _parse(raw): return dev, feat, func, sw, params +def _decode_s8(value): + value = int(value) & 0xFF + return value - 0x100 if value & 0x80 else value + + def _hex_bytes(data): if not data: return "-" @@ -745,6 +750,13 @@ def __init__(self, on_down=None, on_up=None, on_move=None, self._connected_device_info = None self._last_controls = [] # REPROG_V4 controls from last connection self._consecutive_request_timeouts = 0 + self._last_reported_cids = None + self._last_rawxy_diag = None + self._last_feat_report_diag = None + self._rx_sequence = 0 + self._last_short_report_diag = None + self._short_report_prev_xy = None + self._short_report_origin_xy = None # ── public API ──────────────────────────────────────────────── @@ -943,6 +955,11 @@ def _tx(self, report_id, feat, func, params): for i, b in enumerate(params): if 4 + i < LONG_LEN: buf[4 + i] = b & 0xFF + print( + "[HidGesture] TX " + f"rid=0x{LONG_ID:02X} dev=0x{self._dev_idx:02X} feat=0x{feat:02X} " + f"func=0x{func:X} params=[{_hex_bytes(params)}] raw=[{_hex_bytes(buf)}]" + ) self._dev.write(buf) def _rx(self, timeout_ms=2000): @@ -953,7 +970,16 @@ def _rx(self, timeout_ms=2000): if dev is None: return None d = dev.read(64, timeout_ms) - return list(d) if d else None + if not d: + return None + raw = list(d) + self._rx_sequence += 1 + print( + "[HidGesture] RX " + f"#{self._rx_sequence} timeout_ms={timeout_ms} " + f"bytes=[{_hex_bytes(raw)}]" + ) + return raw def _request(self, feat, func, params, timeout_ms=2000): """Send a long HID++ request, wait for matching response.""" @@ -1169,7 +1195,16 @@ def add_candidate(cid): if raw_xy_capable and virtual_or_named and flags & 0x0020: add_candidate(cid) - return ordered or list(preferred) + ordered = ordered or list(preferred) + + # On macOS, some devices expose both the physical gesture button + # (0x00C3) and a virtual gesture button (0x00D7). Prefer the + # virtual CID first when both are present; it tends to provide a + # cleaner logical press/release stream for gesture handling. + if sys.platform == "darwin" and 0x00D7 in ordered and 0x00C3 in ordered: + ordered = [0x00D7] + [cid for cid in ordered if cid != 0x00D7] + + return ordered def _divert(self): """Divert the selected gesture control and enable raw XY when supported.""" @@ -1180,13 +1215,19 @@ def _divert(self): resp = self._set_cid_reporting(cid, 0x33) if resp is not None: self._rawxy_enabled = True - print(f"[HidGesture] Divert {_format_cid(cid)} with RawXY: OK") + print( + f"[HidGesture] Divert {_format_cid(cid)} with RawXY: OK " + f"(selected gesture CID={_format_cid(self._gesture_cid)})" + ) return True self._rawxy_enabled = False resp = self._set_cid_reporting(cid, 0x03) ok = resp is not None - print(f"[HidGesture] Divert {_format_cid(cid)}: " - f"{'OK' if ok else 'FAILED'}") + print( + f"[HidGesture] Divert {_format_cid(cid)}: " + f"{'OK' if ok else 'FAILED'} " + f"(selected gesture CID={_format_cid(self._gesture_cid)})" + ) if ok: return True self._gesture_cid = DEFAULT_GESTURE_CID @@ -1521,6 +1562,8 @@ def _force_release_stale_holds(self): if self._held: self._held = False print("[HidGesture] Gesture force-released (stale hold)") + self._short_report_prev_xy = None + self._short_report_origin_xy = None if self._on_up: try: self._on_up() @@ -1539,21 +1582,50 @@ def _force_release_stale_holds(self): def _on_report(self, raw): """Inspect an incoming HID++ report for diverted button / raw XY events.""" + if self._on_short_report(raw): + return msg = _parse(raw) if msg is None: + print(f"[HidGesture] Unparsed report raw=[{_hex_bytes(raw)}]") return - _, feat, func, _sw, params = msg + report_id, feat, func, sw, params = msg if feat != self._feat_idx: return + feat_diag = (report_id, feat, func, sw, tuple(params)) + self._last_feat_report_diag = feat_diag + print( + "[HidGesture] Feature report " + f"rid=0x{report_id:02X} feat=0x{feat:02X} " + f"func=0x{func:X} sw=0x{sw:01X} " + f"params=[{_hex_bytes(params)}] raw=[{_hex_bytes(raw)}]" + ) + if func == 1: if not self._rawxy_enabled: + print( + "[HidGesture] Ignoring func=0x1 report because RawXY " + "is not enabled" + ) return - if len(params) < 4 or not self._held: + if len(params) < 4: + print( + "[HidGesture] Ignoring short RawXY report " + f"params=[{_hex_bytes(params)}]" + ) return dx = self._decode_s16(params[0], params[1]) dy = self._decode_s16(params[2], params[3]) + rawxy_diag = (dx, dy, self._held) + self._last_rawxy_diag = rawxy_diag + print( + "[HidGesture] RawXY report " + f"dx={dx} dy={dy} held={self._held} " + f"selected gesture CID={_format_cid(self._gesture_cid)}" + ) + if not self._held: + return if (dx or dy) and self._on_move: try: self._on_move(dx, dy) @@ -1562,6 +1634,10 @@ def _on_report(self, raw): return if func != 0: + print( + "[HidGesture] Unhandled feature report " + f"func=0x{func:X} params=[{_hex_bytes(params)}]" + ) return # Params: sequential CID pairs terminated by 0x0000 @@ -1574,6 +1650,15 @@ def _on_report(self, raw): cids.add(c) i += 2 + reported_cids = tuple(sorted(cids)) + self._last_reported_cids = reported_cids + formatted = ", ".join(_format_cid(cid) for cid in reported_cids) or "-" + print( + "[HidGesture] Button CID report " + f"selected={_format_cid(self._gesture_cid)} " + f"reported=[{formatted}] held={self._held}" + ) + gesture_now = self._gesture_cid in cids if gesture_now and not self._held: @@ -1616,6 +1701,80 @@ def _on_report(self, raw): except Exception as e: print(f"[HidGesture] extra up callback error: {e}") + def _on_short_report(self, raw): + """Handle the short 8-byte report shape seen on some macOS devices. + + Observed pattern: + - byte0 == 0x02 + - byte1 bit 0x20 reflects gesture-button held state + - bytes3/4 appear to carry small signed movement deltas while held + """ + if len(raw) != 8 or raw[0] != 0x02: + return False + + held_now = bool(raw[1] & 0x20) + x = _decode_s8(raw[3]) + # The short 0x02 macOS report stream appears to encode vertical + # movement with the opposite sign from the long RawXY path. + y = -_decode_s8(raw[4]) + diag = (held_now, x, y, tuple(raw)) + self._last_short_report_diag = diag + print( + "[HidGesture] Short report " + f"held={held_now} x={x} y={y} bytes=[{_hex_bytes(raw)}]" + ) + if held_now and not self._held: + self._held = True + self._short_report_prev_xy = (x, y) + self._short_report_origin_xy = (x, y) + print("[HidGesture] Gesture DOWN (short report)") + if self._on_down: + try: + self._on_down() + except Exception as e: + print(f"[HidGesture] down callback error: {e}") + elif not held_now and self._held: + self._held = False + self._short_report_prev_xy = None + self._short_report_origin_xy = None + print("[HidGesture] Gesture UP (short report)") + if self._on_up: + try: + self._on_up() + except Exception as e: + print(f"[HidGesture] up callback error: {e}") + + rel_x = 0 + rel_y = 0 + if held_now: + prev = self._short_report_prev_xy + origin = self._short_report_origin_xy + if prev is None: + self._short_report_prev_xy = (x, y) + elif abs(x - prev[0]) > 48 or abs(y - prev[1]) > 48: + print( + "[HidGesture] Short report outlier ignored " + f"x={x} y={y} prev_x={prev[0]} prev_y={prev[1]} held={held_now}" + ) + else: + self._short_report_prev_xy = (x, y) + if origin is None: + self._short_report_origin_xy = (x, y) + origin = self._short_report_origin_xy + rel_x = x - origin[0] + rel_y = y - origin[1] + + if held_now and (rel_x or rel_y) and self._on_move: + print( + "[HidGesture] Short report move " + f"rel_x={rel_x} rel_y={rel_y} held={held_now}" + ) + try: + self._on_move(rel_x, rel_y, {"mode": "absolute"}) + except Exception as e: + print(f"[HidGesture] move callback error: {e}") + return True + # ── connect / main loop ─────────────────────────────────────── def _try_connect(self): @@ -1903,6 +2062,7 @@ def _main_loop(self): self._consecutive_request_timeouts = 0 if self._held: self._held = False + self._short_report_prev_xy = None print("[HidGesture] Gesture force-released on disconnect") if self._on_up: try: diff --git a/core/key_simulator.py b/core/key_simulator.py index 7a11fd5..838469b 100644 --- a/core/key_simulator.py +++ b/core/key_simulator.py @@ -786,28 +786,50 @@ def is_mouse_button_action(action_id): kVK_Control: Quartz.kCGEventFlagMaskControl if _QUARTZ_OK else 0, } + def _split_modifier_keys(keys): + modifiers = [] + normals = [] + for key in keys: + if key in _MOD_FLAGS: + modifiers.append(key) + else: + normals.append(key) + return modifiers, normals + def send_key_combo(keys, hold_ms=50): """Press and release a combination of CGKeyCodes.""" if not _QUARTZ_OK: return - # Compute modifier flags - flags = 0 - for k in keys: - flags |= _MOD_FLAGS.get(k, 0) - - # Press all - for k in keys: - ev = Quartz.CGEventCreateKeyboardEvent(None, k, True) - if flags: - Quartz.CGEventSetFlags(ev, flags) + modifiers, normals = _split_modifier_keys(keys) + active_flags = 0 + + for key in modifiers: + active_flags |= _MOD_FLAGS[key] + ev = Quartz.CGEventCreateKeyboardEvent(None, key, True) + if active_flags: + Quartz.CGEventSetFlags(ev, active_flags) + Quartz.CGEventPost(Quartz.kCGHIDEventTap, ev) + + for key in normals: + ev = Quartz.CGEventCreateKeyboardEvent(None, key, True) + if active_flags: + Quartz.CGEventSetFlags(ev, active_flags) Quartz.CGEventPost(Quartz.kCGHIDEventTap, ev) if hold_ms: time.sleep(hold_ms / 1000.0) - # Release in reverse - for k in reversed(keys): - ev = Quartz.CGEventCreateKeyboardEvent(None, k, False) + for key in reversed(normals): + ev = Quartz.CGEventCreateKeyboardEvent(None, key, False) + if active_flags: + Quartz.CGEventSetFlags(ev, active_flags) + Quartz.CGEventPost(Quartz.kCGHIDEventTap, ev) + + for key in reversed(modifiers): + active_flags &= ~_MOD_FLAGS[key] + ev = Quartz.CGEventCreateKeyboardEvent(None, key, False) + if active_flags: + Quartz.CGEventSetFlags(ev, active_flags) Quartz.CGEventPost(Quartz.kCGHIDEventTap, ev) def send_key_press(vk): diff --git a/core/mouse_hook_macos.py b/core/mouse_hook_macos.py index e6d5e7a..10b4ad1 100644 --- a/core/mouse_hook_macos.py +++ b/core/mouse_hook_macos.py @@ -36,6 +36,7 @@ def _autoreleased(fn): def wrapper(*args, **kwargs): with objc.autorelease_pool(): return fn(*args, **kwargs) + return wrapper @@ -46,6 +47,7 @@ def wrapper(*args, **kwargs): _INJECTED_EVENT_MARKER = 0x4D4F5554 _kCGEventTapDisabledByTimeout = 0xFFFFFFFE _kCGEventTapDisabledByUserInput = 0xFFFFFFFF +_BTN_GESTURE_RAW_CANDIDATES = {5, 6} class MouseHook(BaseMouseHook): @@ -66,6 +68,70 @@ def __init__(self): self._dispatch_queue = queue.Queue() self._dispatch_thread = None self._first_event_logged = False + self._gesture_prefer_hid_until = 0.0 + self._pending_event_tap_dx = 0.0 + self._pending_event_tap_dy = 0.0 + self._gesture_consumed = False + + _EVENT_TAP_HID_GRACE_MS = 30 + _MAX_EFFECTIVE_GESTURE_COOLDOWN_MS = 70 + _HID_ABSOLUTE_VERTICAL_WEIGHT = 0.55 + + def _normalized_gesture_components(self, delta_x, delta_y): + if self._gesture_input_source == "hid_rawxy": + return delta_x, delta_y * self._HID_ABSOLUTE_VERTICAL_WEIGHT + return delta_x, delta_y + + def _clear_pending_event_tap_gesture(self): + self._pending_event_tap_dx = 0.0 + self._pending_event_tap_dy = 0.0 + + def _gesture_cooldown_duration_ms(self): + return min( + self._gesture_cooldown_ms, + self._MAX_EFFECTIVE_GESTURE_COOLDOWN_MS, + ) + + def _buffer_event_tap_gesture(self, delta_x, delta_y): + self._pending_event_tap_dx += delta_x + self._pending_event_tap_dy += delta_y + + def _has_pending_event_tap_gesture(self): + return bool(self._pending_event_tap_dx or self._pending_event_tap_dy) + + def _should_buffer_event_tap_gesture(self): + if not self._gesture_active or self._gesture_input_source is not None: + return False + if not self._hid_gesture_available(): + return False + return time.monotonic() < self._gesture_prefer_hid_until + + def _flush_pending_event_tap_gesture(self): + if not self._has_pending_event_tap_gesture(): + return + delta_x = self._pending_event_tap_dx + delta_y = self._pending_event_tap_dy + self._clear_pending_event_tap_gesture() + self._emit_debug( + "Gesture using buffered event_tap motion " + f"dx={delta_x} dy={delta_y}" + ) + self._accumulate_gesture_delta(delta_x, delta_y, "event_tap") + + def _gesture_binding_active(self): + if self._callbacks.get(MouseEvent.GESTURE_CLICK): + return True + if not self._gesture_direction_enabled: + return False + for event_type in ( + MouseEvent.GESTURE_SWIPE_LEFT, + MouseEvent.GESTURE_SWIPE_RIGHT, + MouseEvent.GESTURE_SWIPE_UP, + MouseEvent.GESTURE_SWIPE_DOWN, + ): + if self._callbacks.get(event_type): + return True + return False def _negate_scroll_axis(self, cg_event, axis): for field_name in ( @@ -132,20 +198,71 @@ def _post_inverted_scroll_event(self, cg_event): Quartz.CGEventPost(Quartz.kCGHIDEventTap, inverted) return True + def _gesture_distance(self): + delta_x, delta_y = self._normalized_gesture_components( + self._gesture_delta_x, + self._gesture_delta_y, + ) + return (delta_x ** 2 + delta_y ** 2) ** 0.5 + + def _gesture_click_radius(self): + return max(8.0, min(self._gesture_threshold * 0.5, self._gesture_deadzone)) + + def _classify_gesture_displacement(self): + delta_x, delta_y = self._normalized_gesture_components( + self._gesture_delta_x, + self._gesture_delta_y, + ) + abs_x = abs(delta_x) + abs_y = abs(delta_y) + if max(abs_x, abs_y) <= self._gesture_click_radius(): + return MouseEvent.GESTURE_CLICK + if abs_x >= abs_y: + return ( + MouseEvent.GESTURE_SWIPE_RIGHT + if delta_x > 0 + else MouseEvent.GESTURE_SWIPE_LEFT + ) + return ( + MouseEvent.GESTURE_SWIPE_DOWN + if delta_y > 0 + else MouseEvent.GESTURE_SWIPE_UP + ) + + def _detect_gesture_event(self): + delta_x, delta_y = self._normalized_gesture_components( + self._gesture_delta_x, + self._gesture_delta_y, + ) + + abs_x = abs(delta_x) + abs_y = abs(delta_y) + dominant = max(abs_x, abs_y) + if dominant < self._gesture_threshold: + return None + + cross_limit = max(self._gesture_deadzone, dominant * 0.35) + + if abs_x > abs_y: + if abs_y > cross_limit: + return None + if delta_x > 0: + return MouseEvent.GESTURE_SWIPE_RIGHT + return MouseEvent.GESTURE_SWIPE_LEFT + + if abs_x > cross_limit: + return None + if delta_y > 0: + return MouseEvent.GESTURE_SWIPE_DOWN + return MouseEvent.GESTURE_SWIPE_UP + def _accumulate_gesture_delta(self, delta_x, delta_y, source): if not (self._gesture_direction_enabled and self._gesture_active): return - if self._gesture_cooldown_active(): + if self._gesture_consumed: self._emit_debug( - f"Gesture cooldown active source={source} dx={delta_x} dy={delta_y}" - ) - self._emit_gesture_event( - { - "type": "cooldown_active", - "source": source, - "dx": delta_x, - "dy": delta_y, - } + f"Gesture move ignored after consume source={source} " + f"dx={delta_x} dy={delta_y}" ) return if not self._gesture_tracking: @@ -199,52 +316,133 @@ def _accumulate_gesture_delta(self, delta_x, delta_y, source): } ) - while True: - gesture_event = self._detect_gesture_event() - if not gesture_event: - return + gesture_event = self._detect_gesture_event() + if not gesture_event: + self._emit_debug( + "Gesture threshold not yet reached " + f"source={source} distance={self._gesture_distance():.1f} " + f"threshold={self._gesture_threshold:.1f}" + ) + return + + self._gesture_triggered = True + self._gesture_consumed = True + self._emit_debug( + "Gesture detected " + f"{gesture_event} source={source} " + f"delta_x={self._gesture_delta_x} delta_y={self._gesture_delta_y}" + ) + self._emit_gesture_event( + { + "type": "detected", + "event_name": gesture_event, + "source": source, + "dx": self._gesture_delta_x, + "dy": self._gesture_delta_y, + } + ) + self._dispatch_queue.put( + MouseEvent( + gesture_event, + { + "delta_x": self._gesture_delta_x, + "delta_y": self._gesture_delta_y, + "source": source, + }, + ) + ) + self._finish_gesture_tracking() + return - self._gesture_triggered = True + def _accumulate_gesture_position(self, pos_x, pos_y, source): + if not (self._gesture_direction_enabled and self._gesture_active): + return + if self._gesture_consumed: self._emit_debug( - "Gesture detected " - f"{gesture_event} source={source} " - f"delta_x={self._gesture_delta_x} delta_y={self._gesture_delta_y}" + f"Gesture position ignored after consume source={source} " + f"x={pos_x} y={pos_y}" ) + return + if not self._gesture_tracking: + self._emit_debug(f"Gesture tracking started source={source}") self._emit_gesture_event( { - "type": "detected", - "event_name": gesture_event, + "type": "tracking_started", "source": source, - "dx": self._gesture_delta_x, - "dy": self._gesture_delta_y, } ) - self._dispatch_queue.put( - MouseEvent( - gesture_event, - { - "delta_x": self._gesture_delta_x, - "delta_y": self._gesture_delta_y, - "source": source, - }, - ) + self._start_gesture_tracking() + + now = time.monotonic() + idle_ms = (now - self._gesture_last_move_at) * 1000.0 + if idle_ms > self._gesture_timeout_ms: + self._emit_debug( + f"Gesture segment reset timeout source={source} " + f"accum_x={self._gesture_delta_x} accum_y={self._gesture_delta_y}" ) - self._gesture_cooldown_until = ( - time.monotonic() + self._gesture_cooldown_ms / 1000.0 + self._start_gesture_tracking() + + if self._gesture_input_source not in (None, source): + self._emit_debug( + f"Gesture source locked to {self._gesture_input_source}; " + f"ignoring {source} x={pos_x} y={pos_y}" ) + return + self._gesture_input_source = source + + self._gesture_delta_x = pos_x + self._gesture_delta_y = pos_y + self._gesture_last_move_at = now + self._emit_debug( + f"Gesture position source={source} " + f"x={self._gesture_delta_x} y={self._gesture_delta_y}" + ) + self._emit_gesture_event( + { + "type": "segment", + "source": source, + "dx": self._gesture_delta_x, + "dy": self._gesture_delta_y, + } + ) + + gesture_event = self._detect_gesture_event() + if not gesture_event: self._emit_debug( - f"Gesture cooldown started source={source} " - f"for_ms={self._gesture_cooldown_ms}" + "Gesture threshold not yet reached " + f"source={source} distance={self._gesture_distance():.1f} " + f"threshold={self._gesture_threshold:.1f}" ) - self._emit_gesture_event( + return + + self._gesture_triggered = True + self._gesture_consumed = True + self._emit_debug( + "Gesture detected " + f"{gesture_event} source={source} " + f"delta_x={self._gesture_delta_x} delta_y={self._gesture_delta_y}" + ) + self._emit_gesture_event( + { + "type": "detected", + "event_name": gesture_event, + "source": source, + "dx": self._gesture_delta_x, + "dy": self._gesture_delta_y, + } + ) + self._dispatch_queue.put( + MouseEvent( + gesture_event, { - "type": "cooldown_started", + "delta_x": self._gesture_delta_x, + "delta_y": self._gesture_delta_y, "source": source, - "for_ms": self._gesture_cooldown_ms, - } + }, ) - self._finish_gesture_tracking() - return + ) + self._finish_gesture_tracking() + return def _dispatch_worker(self): while self._running: @@ -285,6 +483,23 @@ def _event_tap_callback(self, proxy, event_type, cg_event, refcon): pass mouse_event = None should_block = False + if event_type in ( + Quartz.kCGEventOtherMouseDown, + Quartz.kCGEventOtherMouseUp, + Quartz.kCGEventOtherMouseDragged, + ): + btn = Quartz.CGEventGetIntegerValueField( + cg_event, Quartz.kCGMouseEventButtonNumber + ) + if ( + btn in _BTN_GESTURE_RAW_CANDIDATES + and self._gesture_binding_active() + ): + self._emit_debug( + "Swallowing raw gesture button event " + f"type={int(event_type)} btn={btn}" + ) + return None if ( event_type @@ -315,6 +530,31 @@ def _event_tap_callback(self, proxy, event_type, cg_event, refcon): ) if self._gesture_input_source == "hid_rawxy": return None + if self._should_buffer_event_tap_gesture(): + self._buffer_event_tap_gesture( + Quartz.CGEventGetIntegerValueField( + cg_event, Quartz.kCGMouseEventDeltaX + ), + Quartz.CGEventGetIntegerValueField( + cg_event, Quartz.kCGMouseEventDeltaY + ), + ) + self._emit_debug( + "Gesture buffering event_tap motion while waiting " + "for hid_rawxy" + ) + return None + if self._gesture_input_source is None and self._has_pending_event_tap_gesture(): + self._buffer_event_tap_gesture( + Quartz.CGEventGetIntegerValueField( + cg_event, Quartz.kCGMouseEventDeltaX + ), + Quartz.CGEventGetIntegerValueField( + cg_event, Quartz.kCGMouseEventDeltaY + ), + ) + self._flush_pending_event_tap_gesture() + return None self._accumulate_gesture_delta( Quartz.CGEventGetIntegerValueField( cg_event, Quartz.kCGMouseEventDeltaX @@ -423,9 +663,17 @@ def _on_hid_gesture_down(self): if not self._gesture_active: self._gesture_active = True self._gesture_triggered = False + self._gesture_consumed = False + self._clear_pending_event_tap_gesture() + if self._hid_gesture_available(): + self._gesture_prefer_hid_until = ( + time.monotonic() + self._EVENT_TAP_HID_GRACE_MS / 1000.0 + ) + else: + self._gesture_prefer_hid_until = 0.0 self._emit_debug("HID gesture button down") self._emit_gesture_event({"type": "button_down"}) - if self._gesture_direction_enabled and not self._gesture_cooldown_active(): + if self._gesture_direction_enabled: self._start_gesture_tracking() else: self._gesture_tracking = False @@ -434,7 +682,51 @@ def _on_hid_gesture_down(self): def _on_hid_gesture_up(self): if self._gesture_active: should_click = not self._gesture_triggered + dispatch_event = None + if ( + self._gesture_direction_enabled + and not self._gesture_consumed + and self._gesture_tracking + ): + gesture_event = self._classify_gesture_displacement() + self._emit_debug( + "Gesture release classification " + f"event={gesture_event} delta_x={self._gesture_delta_x} " + f"delta_y={self._gesture_delta_y} " + f"distance={self._gesture_distance():.1f}" + ) + if gesture_event == MouseEvent.GESTURE_CLICK: + should_click = True + else: + should_click = False + dispatch_event = MouseEvent( + gesture_event, + { + "delta_x": self._gesture_delta_x, + "delta_y": self._gesture_delta_y, + "source": self._gesture_input_source, + }, + ) + self._gesture_triggered = True + self._gesture_consumed = True + self._emit_debug( + "Gesture released -> detected " + f"{gesture_event} delta_x={self._gesture_delta_x} " + f"delta_y={self._gesture_delta_y}" + ) + self._emit_gesture_event( + { + "type": "detected", + "event_name": gesture_event, + "source": self._gesture_input_source, + "dx": self._gesture_delta_x, + "dy": self._gesture_delta_y, + } + ) self._gesture_active = False + self._gesture_consumed = False + self._gesture_prefer_hid_until = 0.0 + self._clear_pending_event_tap_gesture() self._finish_gesture_tracking() self._gesture_triggered = False self._emit_debug( @@ -446,6 +738,8 @@ def _on_hid_gesture_up(self): "click_candidate": should_click, } ) + if dispatch_event is not None: + self._dispatch(dispatch_event) if should_click: self._dispatch(MouseEvent(MouseEvent.GESTURE_CLICK)) @@ -465,8 +759,18 @@ def _on_hid_dpi_switch_up(self): self._emit_debug("HID DPI switch button up") self._dispatch(MouseEvent(MouseEvent.DPI_SWITCH_UP)) - def _on_hid_gesture_move(self, delta_x, delta_y): - self._emit_debug(f"HID rawxy move dx={delta_x} dy={delta_y}") + def _on_hid_gesture_move(self, delta_x, delta_y, meta=None): + if self._has_pending_event_tap_gesture(): + self._emit_debug( + "Gesture discarding buffered event_tap motion in favor of " + "hid_rawxy" + ) + self._clear_pending_event_tap_gesture() + self._gesture_prefer_hid_until = 0.0 + self._emit_debug( + f"HID rawxy move dx={delta_x} dy={delta_y} " + f"mode={(meta or {}).get('mode', 'delta')}" + ) self._emit_gesture_event( { "type": "move", @@ -475,7 +779,10 @@ def _on_hid_gesture_move(self, delta_x, delta_y): "dy": delta_y, } ) - self._accumulate_gesture_delta(delta_x, delta_y, "hid_rawxy") + if (meta or {}).get("mode") == "absolute": + self._accumulate_gesture_position(delta_x, delta_y, "hid_rawxy") + else: + self._accumulate_gesture_delta(delta_x, delta_y, "hid_rawxy") def _register_wake_observer(self): try: @@ -506,11 +813,13 @@ def _on_session_resign(notification): def _on_session_activate(notification): _re_enable_tap_and_reconnect("user-switch") - self._wake_observer = notification_center.addObserverForName_object_queue_usingBlock_( - "NSWorkspaceDidWakeNotification", - None, - None, - _on_wake, + self._wake_observer = ( + notification_center.addObserverForName_object_queue_usingBlock_( + "NSWorkspaceDidWakeNotification", + None, + None, + _on_wake, + ) ) self._session_resign_observer = ( notification_center.addObserverForName_object_queue_usingBlock_( diff --git a/main_cli.py b/main_cli.py new file mode 100644 index 0000000..97bffe4 --- /dev/null +++ b/main_cli.py @@ -0,0 +1,274 @@ +""" +Simple command-line interface for exporting and loading Mouser configs. +""" + +from __future__ import annotations + +import argparse +import json +import os +import plistlib +import signal +import subprocess +import sys +import threading +from pathlib import Path +from typing import Any + +import Quartz +import yaml + +from core.config import ( + load_config, + save_config, +) +from core.config_validation import ( + ConfigValidationError, + assemble_full_config, + normalize_config, +) +from core.log_setup import setup_logging + +CLI_SERVICE_LABEL = "io.github.tombadash.mouser.headless" +CLI_SERVICE_PLIST_NAME = f"{CLI_SERVICE_LABEL}.plist" + + +def export_config(*, stdout=None) -> int: + stdout = stdout or sys.stdout + json.dump(load_config(), stdout, indent=2) + stdout.write("\n") + return 0 + + +def _read_config_json(path: str, ft: str=None) -> dict[str, Any]: + ft = ft or (Path(path).suffix.lower().lstrip(".") if path != "-" else "json") + if ft not in ("json", "yaml"): + raise ValueError(f"Unsupported config file type: {ft}") + + if path == "-": + raw = sys.stdin + if ft == "json": + processed = json.load(raw) + elif ft == "yaml": + processed = yaml.safe_load(raw) + else: + with open(path, "r", encoding="utf-8") as raw: + if ft == "json": + processed = json.load(raw) + elif ft == "yaml": + processed = yaml.safe_load(raw) + + return normalize_config(processed) + + +def _format_cli_error(exc: Exception) -> str: + if isinstance(exc, ConfigValidationError): + return str(exc) + if isinstance(exc, json.JSONDecodeError): + return f"Invalid JSON: {exc.msg} at line {exc.lineno}, column {exc.colno}" + yaml_error = getattr(yaml, "YAMLError", None) + if yaml_error and isinstance(exc, yaml_error): + return f"Invalid YAML: {exc}" + if isinstance(exc, FileNotFoundError): + return f"File not found: {exc.filename}" + if isinstance(exc, OSError): + return str(exc) + return str(exc) or exc.__class__.__name__ + + +def run_headless_instance(*, stop_event: threading.Event | None = None, engine_factory=None) -> int: + if engine_factory is None: + from core.engine import Engine + + engine_factory = Engine + + stop_event = stop_event or threading.Event() + engine = engine_factory() + + def _request_stop(_signum, _frame): + stop_event.set() + + previous_handlers: list[tuple[int, Any]] = [] + for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGHUP): + try: + previous_handlers.append((sig, signal.getsignal(sig))) + signal.signal(sig, _request_stop) + except (AttributeError, ValueError): + continue + + try: + engine.start() + while not stop_event.is_set(): + _wait_for_headless_activity(stop_event, timeout_s=0.5) + return 0 + finally: + engine.stop() + for sig, previous in previous_handlers: + try: + signal.signal(sig, previous) + except (AttributeError, ValueError): + pass + + +def _wait_for_headless_activity( + stop_event: threading.Event, + *, + timeout_s: float, +) -> None: + """Keep the process responsive while the headless engine is running. + + On macOS, the mouse hook installs a CGEventTap onto the current CFRunLoop. + The Qt app naturally pumps that run loop, but the CLI path does not, so the + tap would otherwise remain idle even when Accessibility permission is + granted. + """ + remaining = max(float(timeout_s), 0.0) + slice_s = 0.1 + while remaining > 0 and not stop_event.is_set(): + step = min(slice_s, remaining) + Quartz.CFRunLoopRunInMode(Quartz.kCFRunLoopDefaultMode, step, False) + remaining -= step + + +def load_config_and_start( + path: str, + *, + filetype: str | None = None, +) -> int: + cfg = _read_config_json(path, ft=filetype) + cfg = assemble_full_config(cfg) + save_config(cfg) + return start_background_service() + + +def _service_program_arguments() -> list[str]: + exe = os.path.abspath(sys.executable) + if getattr(sys, "frozen", False): + return [exe, "run"] + return [exe, os.path.abspath(__file__), "run"] + + +def _service_plist_path() -> str: + return os.path.expanduser(os.path.join("~/Library/LaunchAgents", CLI_SERVICE_PLIST_NAME)) + + +def _launchctl_run(args: list[str]) -> subprocess.CompletedProcess: + return subprocess.run(args, capture_output=True, text=True) + + +def start_background_service() -> int: + plist_path = _service_plist_path() + launch_agents_dir = os.path.dirname(plist_path) + domain = f"gui/{os.getuid()}" + + os.makedirs(launch_agents_dir, exist_ok=True) + if os.path.isfile(plist_path): + _launchctl_run(["launchctl", "bootout", domain, plist_path]) + + payload = { + "Label": CLI_SERVICE_LABEL, + "ProgramArguments": _service_program_arguments(), + "RunAtLoad": True, + "KeepAlive": True, + "ProcessType": "Background", + } + with open(plist_path, "wb") as f: + plistlib.dump(payload, f, fmt=plistlib.FMT_XML) + + result = _launchctl_run(["launchctl", "bootstrap", domain, plist_path]) + if result.returncode != 0: + raise RuntimeError(f"launchctl bootstrap failed: {result.stderr.strip()}") + return 0 + + +def stop_background_service() -> int: + plist_path = _service_plist_path() + domain = f"gui/{os.getuid()}" + + if os.path.isfile(plist_path): + _launchctl_run(["launchctl", "bootout", domain, plist_path]) + try: + os.remove(plist_path) + except OSError: + pass + else: + _launchctl_run(["launchctl", "bootout", domain, CLI_SERVICE_LABEL]) + return 0 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Mouser command line interface") + subparsers = parser.add_subparsers(dest="command", required=True) + + subparsers.add_parser( + "export", + help="Print the current Mouser config as JSON", + ) + + load_parser = subparsers.add_parser( + "load", + help="Load a config and delegate startup to the background service", + ) + + load_parser.add_argument( + "config", + help="Path to a Mouser config JSON file, or '-' to read from stdin", + ) + load_parser.add_argument( + "-t", + "--filetype", + choices=["json", "yaml"], + help="Override config file type detection", + ) + + subparsers.add_parser( + "start", + help="Start Mouser headlessly in the background", + ) + + subparsers.add_parser( + "stop", + help="Stop the background Mouser headless service", + ) + + subparsers.add_parser( + "run", + help="Run the headless Mouser engine in the foreground (for testing and debugging)", + ) + + return parser + + +def main(argv: list[str] | None = None) -> int: + if sys.platform != "darwin": + raise NotImplementedError("stop is currently only supported on macOS") + + parser = build_parser() + args = parser.parse_args(argv) + + try: + if args.command == "export": + return export_config() + setup_logging() + if args.command == "load": + return load_config_and_start(args.config, filetype=args.filetype) + if args.command == "start": + return start_background_service() + if args.command == "stop": + return stop_background_service() + if args.command == "run": + return run_headless_instance() + parser.error(f"Unknown command: {args.command}") + except (ConfigValidationError, json.JSONDecodeError, OSError, RuntimeError) as exc: + print(f"Error: {_format_cli_error(exc)}", file=sys.stderr) + return 2 + except Exception as exc: + yaml_error = getattr(yaml, "YAMLError", None) + if yaml_error and isinstance(exc, yaml_error): + print(f"Error: {_format_cli_error(exc)}", file=sys.stderr) + return 2 + raise + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/requirements.txt b/requirements.txt index 2660c83..df5fc7f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,15 @@ -pyinstaller>=6.0 -hidapi>=0.14 -PySide6>=6.6 Pillow>=10.0 -pyobjc-framework-Quartz>=10.0; sys_platform == "darwin" -pyobjc-framework-Cocoa>=10.0; sys_platform == "darwin" +PySide6>=6.6 +attrs==26.1.0 evdev>=1.6; sys_platform == "linux" +hidapi>=0.14 +jsonschema-specifications==2025.9.1 +jsonschema==4.26.0 +pyinstaller>=6.0 +pyobjc-core==12.1 +pyobjc-framework-Cocoa>=10.0; sys_platform == "darwin" +pyobjc-framework-Quartz>=10.0; sys_platform == "darwin" +pyyaml==6.0.3 +referencing==0.37.0 +rpds-py==0.30.0 +typing-extensions==4.15.0 diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..ae2a56d --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,319 @@ +import io +import json +import plistlib +import sys +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + +sys.modules.setdefault("Quartz", type("QuartzStub", (), {})()) +sys.modules.setdefault("yaml", type("YamlStub", (), {"safe_load": staticmethod(lambda stream: None)})()) + +sys.platform = "linux" +import main_cli + + +class CliTests(unittest.TestCase): + def _valid_config(self): + return { + "version": 8, + "active_profile": "default", + "profiles": { + "default": { + "label": "Default", + "apps": [], + "mappings": { + "middle": "none", + "gesture": "none", + "gesture_left": "none", + "gesture_right": "none", + "gesture_up": "none", + "gesture_down": "none", + "xbutton1": "alt_tab", + "xbutton2": "alt_tab", + "hscroll_left": "browser_back", + "hscroll_right": "browser_forward", + "mode_shift": "switch_scroll_mode", + }, + } + }, + "settings": { + "start_minimized": True, + "start_at_login": False, + "hscroll_threshold": 1, + "hscroll_cooldown_ms": 350, + "invert_hscroll": False, + "invert_vscroll": False, + "dpi": 1000, + "smart_shift_mode": "ratchet", + "smart_shift_enabled": False, + "smart_shift_threshold": 25, + "gesture_threshold": 50, + "gesture_deadzone": 40, + "gesture_timeout_ms": 3000, + "gesture_cooldown_ms": 500, + "appearance_mode": "system", + "debug_mode": False, + "device_layout_overrides": {}, + "language": "en", + "ignore_trackpad": True, + }, + } + + def test_wait_for_headless_activity_pumps_macos_run_loop(self): + stop_event = main_cli.threading.Event() + calls = [] + + class _QuartzStub: + kCFRunLoopDefaultMode = "default" + + @staticmethod + def CFRunLoopRunInMode(mode, seconds, return_after_source_handled): + calls.append((mode, seconds, return_after_source_handled)) + stop_event.set() + + with ( + patch("main_cli.sys.platform", "darwin"), + patch.object(main_cli, "Quartz", _QuartzStub), + ): + main_cli._wait_for_headless_activity(stop_event, timeout_s=0.5) + + self.assertEqual(calls, [("default", 0.1, False)]) + + def test_export_prints_current_config_as_json(self): + buf = io.StringIO() + cfg = {"version": 8, "profiles": {}, "settings": {}} + + with patch("main_cli.load_config", return_value=cfg): + rc = main_cli.export_config(stdout=buf) + + self.assertEqual(rc, 0) + self.assertEqual(json.loads(buf.getvalue()), cfg) + + def test_normalize_config_migrates_and_fills_defaults(self): + legacy = { + "version": 1, + "active_profile": "default", + "profiles": { + "default": { + "label": "Default", + "mappings": {"xbutton1": "browser_back"}, + } + }, + "settings": {}, + } + + normalized = main_cli.normalize_config(legacy) + + self.assertEqual(normalized["profiles"]["default"]["apps"], []) + self.assertEqual( + normalized["profiles"]["default"]["mappings"]["mode_shift"], + "switch_scroll_mode", + ) + self.assertIn("language", normalized["settings"]) + + def test_load_persists_normalized_config_then_starts_engine(self): + raw = { + "version": 1, + "active_profile": "default", + "profiles": {"default": {"label": "Default", "mappings": {}}}, + "settings": {}, + } + + saved = {} + + with tempfile.TemporaryDirectory() as tmp_dir: + config_path = Path(tmp_dir) / "import.json" + config_path.write_text(json.dumps(raw), encoding="utf-8") + + with ( + patch("main_cli.save_config", side_effect=lambda cfg: saved.setdefault("cfg", cfg)), + patch("main_cli.start_background_service", return_value=0) as start_background_service, + ): + rc = main_cli.load_config_and_start( + str(config_path), + ) + + self.assertEqual(rc, 0) + self.assertEqual( + saved["cfg"]["profiles"]["default"]["mappings"]["mode_shift"], + "switch_scroll_mode", + ) + start_background_service.assert_called_once_with() + + def test_load_accepts_stdin_marker(self): + raw = self._valid_config() + saved = {} + + with ( + patch("sys.stdin", io.StringIO(json.dumps(raw))), + patch("main_cli.save_config", side_effect=lambda cfg: saved.setdefault("cfg", cfg)), + patch("main_cli.start_background_service", return_value=0) as start_background_service, + ): + rc = main_cli.load_config_and_start( + "-", + ) + + self.assertEqual(rc, 0) + start_background_service.assert_called_once_with() + + def test_normalize_rejects_unknown_top_level_key(self): + raw = self._valid_config() + raw["bogus"] = True + + with self.assertRaisesRegex(ValueError, r"Unknown key at bogus"): + main_cli.normalize_config(raw) + + def test_normalize_rejects_missing_default_profile(self): + raw = self._valid_config() + raw["profiles"] = { + "work": { + "label": "Work", + "apps": ["com.example.Work"], + "mappings": { + "middle": "copy", + }, + } + } + raw["active_profile"] = "work" + + with self.assertRaisesRegex(ValueError, r"Config must define a `default` profile"): + main_cli.normalize_config(raw) + + def test_normalize_rejects_unknown_mapping_key(self): + raw = self._valid_config() + raw["profiles"]["default"]["mappings"]["not_a_button"] = "none" + + with self.assertRaisesRegex(ValueError, r"not_a_button is not a valid button mapping"): + main_cli.normalize_config(raw) + + def test_normalize_rejects_unknown_action_id(self): + raw = self._valid_config() + raw["profiles"]["default"]["mappings"]["middle"] = "definitely_not_real" + + with self.assertRaisesRegex(ValueError, r"unknown action 'definitely_not_real'"): + main_cli.normalize_config(raw) + + def test_normalize_rejects_invalid_custom_shortcut(self): + raw = self._valid_config() + raw["profiles"]["default"]["mappings"]["middle"] = "custom:super+definitely_not_real" + + with self.assertRaisesRegex(ValueError, r"unknown custom key"): + main_cli.normalize_config(raw) + + def test_normalize_rejects_wrong_setting_type_instead_of_silently_resetting(self): + raw = self._valid_config() + raw["settings"]["start_minimized"] = "yes" + + with self.assertRaisesRegex(ValueError, r"settings.start_minimized must be a boolean"): + main_cli.normalize_config(raw) + + def test_load_rejects_default_profile_with_nonempty_apps(self): + raw = self._valid_config() + raw["profiles"]["default"]["apps"] = ["com.example.App"] + + with tempfile.TemporaryDirectory() as tmp_dir: + config_path = Path(tmp_dir) / "import.json" + config_path.write_text(json.dumps(raw), encoding="utf-8") + + with self.assertRaisesRegex(ValueError, r"Default profile must have an empty `apps` list"): + main_cli.load_config_and_start(str(config_path)) + + def test_main_load_invalid_config_prints_human_readable_error(self): + raw = self._valid_config() + raw["settings"]["start_minimized"] = "yes" + + with tempfile.TemporaryDirectory() as tmp_dir: + config_path = Path(tmp_dir) / "import.json" + config_path.write_text(json.dumps(raw), encoding="utf-8") + stderr = io.StringIO() + + with ( + patch("main_cli.sys.platform", "darwin"), + patch("main_cli.setup_logging"), + patch("sys.stderr", stderr), + ): + rc = main_cli.main(["load", str(config_path)]) + + self.assertEqual(rc, 2) + self.assertIn("Error:", stderr.getvalue()) + self.assertIn("settings.start_minimized must be a boolean", stderr.getvalue()) + + def test_start_writes_launch_agent_and_bootstraps_it(self): + with tempfile.TemporaryDirectory() as tmp_dir: + plist_path = Path(tmp_dir) / main_cli.CLI_SERVICE_PLIST_NAME + launchctl_calls = [] + + def _fake_launchctl(args): + launchctl_calls.append(args) + return type("Result", (), {"returncode": 0, "stderr": ""})() + + with ( + patch("sys.platform", "darwin"), + patch("main_cli._service_plist_path", return_value=str(plist_path)), + patch("main_cli._launchctl_run", side_effect=_fake_launchctl), + ): + rc = main_cli.start_background_service() + payload = plistlib.loads(plist_path.read_bytes()) + + self.assertEqual(rc, 0) + self.assertEqual( + launchctl_calls, + [["launchctl", "bootstrap", f"gui/{main_cli.os.getuid()}", str(plist_path)]], + ) + self.assertEqual(payload["Label"], main_cli.CLI_SERVICE_LABEL) + self.assertEqual(payload["ProgramArguments"], main_cli._service_program_arguments()) + self.assertTrue(payload["RunAtLoad"]) + self.assertTrue(payload["KeepAlive"]) + + def test_stop_boots_out_and_removes_launch_agent(self): + with tempfile.TemporaryDirectory() as tmp_dir: + plist_path = Path(tmp_dir) / main_cli.CLI_SERVICE_PLIST_NAME + plist_path.write_text("placeholder", encoding="utf-8") + launchctl_calls = [] + + def _fake_launchctl(args): + launchctl_calls.append(args) + return type("Result", (), {"returncode": 0, "stderr": ""})() + + with ( + patch("sys.platform", "darwin"), + patch("main_cli._service_plist_path", return_value=str(plist_path)), + patch("main_cli._launchctl_run", side_effect=_fake_launchctl), + ): + rc = main_cli.stop_background_service() + + self.assertEqual(rc, 0) + self.assertEqual( + launchctl_calls, + [["launchctl", "bootout", f"gui/{main_cli.os.getuid()}", str(plist_path)]], + ) + self.assertFalse(plist_path.exists()) + + def test_main_start_and_stop_dispatch_to_service_helpers(self): + with ( + patch("main_cli.sys.platform", "darwin"), + patch("main_cli.setup_logging"), + patch("main_cli.start_background_service", return_value=0) as start_background_service, + patch("main_cli.stop_background_service", return_value=0) as stop_background_service, + ): + self.assertEqual(main_cli.main(["start"]), 0) + self.assertEqual(main_cli.main(["stop"]), 0) + + start_background_service.assert_called_once_with() + stop_background_service.assert_called_once_with() + + def test_main_internal_run_dispatches_to_headless_runner(self): + with ( + patch("main_cli.sys.platform", "darwin"), + patch("main_cli.setup_logging"), + patch("main_cli.run_headless_instance", return_value=0) as run_headless_instance, + ): + self.assertEqual(main_cli.main(["run"]), 0) + + run_headless_instance.assert_called_once_with() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_config_validation.py b/tests/test_config_validation.py new file mode 100644 index 0000000..c72da85 --- /dev/null +++ b/tests/test_config_validation.py @@ -0,0 +1,214 @@ +import copy +import sys +import unittest + +sys.platform = "linux" + +from core.config import DEFAULT_CONFIG +from core.config_validation import ( + ConfigValidationError, + assemble_full_config, + normalize_config, + validate_config, +) + + +class ConfigValidationTests(unittest.TestCase): + def _valid_config(self): + return copy.deepcopy(DEFAULT_CONFIG) + + def test_validate_accepts_default_config(self): + cfg = self._valid_config() + + validate_config(cfg) + + def test_normalize_migrates_and_fills_defaults(self): + legacy = { + "version": 1, + "active_profile": "default", + "profiles": { + "default": { + "label": "Default", + "mappings": {"xbutton1": "browser_back"}, + } + }, + "settings": {}, + } + + normalized = normalize_config(legacy) + + self.assertEqual(normalized["profiles"]["default"]["apps"], []) + self.assertEqual( + normalized["profiles"]["default"]["mappings"]["mode_shift"], + "switch_scroll_mode", + ) + self.assertIn("language", normalized["settings"]) + + def test_normalize_unversioned_config_treats_it_as_current_schema(self): + raw = { + "active_profile": "default", + "profiles": { + "default": { + "label": "Default", + "apps": [], + "mappings": { + "mode_shift": "custom:super+shift+4", + }, + }, + "finder": { + "label": "Finder", + "apps": ["com.apple.finder"], + "mappings": { + "xbutton1": "custom:tab", + }, + }, + }, + "settings": {}, + } + + normalized = normalize_config(raw) + + self.assertEqual(normalized["version"], DEFAULT_CONFIG["version"]) + self.assertNotIn("mode_shift", normalized["profiles"]["finder"]["mappings"]) + + assembled = assemble_full_config(normalized) + + self.assertEqual( + assembled["profiles"]["finder"]["mappings"]["mode_shift"], + "custom:super+shift+4", + ) + + def test_validate_rejects_unknown_top_level_key(self): + cfg = self._valid_config() + cfg["bogus"] = True + + with self.assertRaisesRegex(ConfigValidationError, r"Unknown key at bogus"): + validate_config(cfg) + + def test_validate_rejects_missing_default_profile(self): + cfg = self._valid_config() + cfg["profiles"] = { + "work": { + "label": "Work", + "apps": ["com.example.Work"], + "mappings": { + "middle": "copy", + }, + } + } + cfg["active_profile"] = "work" + + with self.assertRaisesRegex( + ConfigValidationError, + r"Config must define a `default` profile", + ): + validate_config(cfg) + + def test_validate_rejects_wrong_type_via_schema(self): + cfg = self._valid_config() + cfg["settings"]["start_minimized"] = "yes" + + with self.assertRaisesRegex( + ConfigValidationError, + r"settings.start_minimized must be a boolean", + ): + validate_config(cfg) + + def test_validate_rejects_unknown_mapping_key(self): + cfg = self._valid_config() + cfg["profiles"]["default"]["mappings"]["not_a_button"] = "none" + + with self.assertRaisesRegex( + ConfigValidationError, + r"not_a_button is not a valid button mapping", + ): + validate_config(cfg) + + def test_validate_rejects_unknown_action_id(self): + cfg = self._valid_config() + cfg["profiles"]["default"]["mappings"]["middle"] = "definitely_not_real" + + with self.assertRaisesRegex( + ConfigValidationError, + r"unknown action 'definitely_not_real'", + ): + validate_config(cfg) + + def test_validate_rejects_invalid_custom_shortcut(self): + cfg = self._valid_config() + cfg["profiles"]["default"]["mappings"]["middle"] = "custom:super+definitely_not_real" + + with self.assertRaisesRegex( + ConfigValidationError, + r"unknown custom key", + ): + validate_config(cfg) + + def test_validate_rejects_unknown_layout_override(self): + cfg = self._valid_config() + cfg["settings"]["device_layout_overrides"] = {"mx_master_3s": "not_a_layout"} + + with self.assertRaisesRegex( + ConfigValidationError, + r"unknown layout 'not_a_layout'", + ): + validate_config(cfg) + + def test_assemble_full_config_rejects_default_profile_with_apps(self): + cfg = self._valid_config() + cfg["profiles"]["default"]["apps"] = ["com.example.App"] + + with self.assertRaisesRegex( + ConfigValidationError, + r"Default profile must have an empty `apps` list", + ): + assemble_full_config(cfg) + + def test_assemble_full_config_fills_missing_profile_mappings_from_default_profile(self): + cfg = self._valid_config() + cfg["profiles"]["work"] = { + "label": "Work", + "apps": ["com.example.Work"], + "mappings": { + "middle": "copy", + }, + } + + assembled = assemble_full_config(cfg) + + self.assertEqual(assembled["profiles"]["work"]["mappings"]["middle"], "copy") + self.assertEqual( + assembled["profiles"]["work"]["mappings"]["xbutton1"], + cfg["profiles"]["default"]["mappings"]["xbutton1"], + ) + self.assertEqual( + assembled["profiles"]["work"]["mappings"]["mode_shift"], + cfg["profiles"]["default"]["mappings"]["mode_shift"], + ) + + def test_assemble_full_config_uses_default_profile_even_when_active_profile_is_app_specific(self): + cfg = self._valid_config() + cfg["active_profile"] = "ghostty" + cfg["profiles"]["default"]["mappings"]["mode_shift"] = "custom:super+shift+4" + cfg["profiles"]["ghostty"] = { + "label": "Ghostty", + "apps": ["com.mitchellh.ghostty"], + "mappings": { + "hscroll_left": "next_tab", + }, + } + + assembled = assemble_full_config(cfg) + + self.assertEqual( + assembled["profiles"]["ghostty"]["mappings"]["mode_shift"], + "custom:super+shift+4", + ) + self.assertEqual( + assembled["profiles"]["ghostty"]["mappings"]["xbutton1"], + cfg["profiles"]["default"]["mappings"]["xbutton1"], + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_engine.py b/tests/test_engine.py index 6bfb42d..227042e 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -89,11 +89,13 @@ def run_target(self): class EngineHorizontalScrollTests(unittest.TestCase): - def _make_engine(self): + def _make_engine(self, *, hscroll_threshold=1, hscroll_cooldown_ms=None): from core.engine import Engine cfg = copy.deepcopy(DEFAULT_CONFIG) - cfg["settings"]["hscroll_threshold"] = 1 + cfg["settings"]["hscroll_threshold"] = hscroll_threshold + if hscroll_cooldown_ms is not None: + cfg["settings"]["hscroll_cooldown_ms"] = hscroll_cooldown_ms with ( patch("core.engine.MouseHook", _FakeMouseHook), @@ -148,6 +150,29 @@ def test_hscroll_accumulates_fractional_mac_deltas(self): self.assertEqual(execute_action_mock.call_count, 1) + def test_hscroll_uses_configured_cooldown(self): + engine = self._make_engine(hscroll_cooldown_ms=100) + handler = engine._make_hscroll_handler("space_left") + + with patch("core.engine.execute_action") as execute_action_mock: + handler(SimpleNamespace( + event_type=MouseEvent.HSCROLL_LEFT, + raw_data=1, + timestamp=1.00, + )) + handler(SimpleNamespace( + event_type=MouseEvent.HSCROLL_LEFT, + raw_data=1, + timestamp=1.05, + )) + handler(SimpleNamespace( + event_type=MouseEvent.HSCROLL_LEFT, + raw_data=1, + timestamp=1.15, + )) + + self.assertEqual(execute_action_mock.call_count, 2) + def test_connection_callback_receives_current_state_immediately(self): engine = self._make_engine() engine.hook.device_connected = True diff --git a/tests/test_hid_gesture.py b/tests/test_hid_gesture.py index a2aeb1a..f36a897 100644 --- a/tests/test_hid_gesture.py +++ b/tests/test_hid_gesture.py @@ -103,7 +103,8 @@ def test_choose_gesture_candidates_prefers_known_device_cids(self): device_spec=device_spec, ) - self.assertEqual(candidates[:2], [0x00C3, 0x00D7]) + expected = [0x00D7, 0x00C3] if sys.platform == "darwin" else [0x00C3, 0x00D7] + self.assertEqual(candidates[:2], expected) def test_choose_gesture_candidates_uses_capability_heuristic(self): listener = hid_gesture.HidGestureListener() @@ -117,12 +118,26 @@ def test_choose_gesture_candidates_uses_capability_heuristic(self): self.assertEqual(candidates[0], 0x00F1) + def test_choose_gesture_candidates_prefers_virtual_gesture_button_on_macos(self): + listener = hid_gesture.HidGestureListener() + + with patch.object(sys, "platform", "darwin"): + candidates = listener._choose_gesture_candidates( + [ + {"cid": 0x00D7, "flags": 0x03B0, "mapping_flags": 0x0051}, + {"cid": 0x00C3, "flags": 0x0130, "mapping_flags": 0x0011}, + ], + ) + + self.assertEqual(candidates[:2], [0x00D7, 0x00C3]) + def test_choose_gesture_candidates_falls_back_to_defaults(self): listener = hid_gesture.HidGestureListener() self.assertEqual( listener._choose_gesture_candidates([]), - list(hid_gesture.DEFAULT_GESTURE_CIDS), + ([0x00D7, 0x00C3] if sys.platform == "darwin" + else list(hid_gesture.DEFAULT_GESTURE_CIDS)), ) diff --git a/tests/test_key_simulator.py b/tests/test_key_simulator.py index fd5d89f..679b59a 100644 --- a/tests/test_key_simulator.py +++ b/tests/test_key_simulator.py @@ -196,5 +196,25 @@ def test_mouse_button_labels_are_non_empty_strings(self): self.assertTrue(len(label) > 0) +@unittest.skipUnless(sys.platform == "darwin", "macOS key ordering is platform-specific") +class MacKeyComboTests(unittest.TestCase): + def test_split_modifier_keys_preserves_modifier_first_order(self): + modifiers, normals = key_simulator._split_modifier_keys([ + key_simulator.kVK_Command, + key_simulator.kVK_ANSI_C, + key_simulator.kVK_Shift, + key_simulator.kVK_ANSI_W, + ]) + + self.assertEqual( + modifiers, + [key_simulator.kVK_Command, key_simulator.kVK_Shift], + ) + self.assertEqual( + normals, + [key_simulator.kVK_ANSI_C, key_simulator.kVK_ANSI_W], + ) + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_mouse_hook.py b/tests/test_mouse_hook.py index 810fda6..299aa10 100644 --- a/tests/test_mouse_hook.py +++ b/tests/test_mouse_hook.py @@ -338,6 +338,72 @@ def fake_select(readable, writable, exceptional, timeout): self.assertEqual(select_calls, [0.5]) self.assertEqual(hook._evdev_device.read.call_count, 1) + +class LinuxGestureDetectionTests(unittest.TestCase): + def _reload_for_linux(self): + fake_evdev = SimpleNamespace( + ecodes=_FakeLinuxEcodes, + UInput=_FakeLinuxUInput, + InputDevice=Mock(name="InputDevice"), + ) + with ( + patch.object(sys, "platform", "linux"), + patch.dict(sys.modules, {"evdev": fake_evdev}), + ): + importlib.reload(mouse_hook) + self.addCleanup(importlib.reload, mouse_hook) + return mouse_hook + + def _make_hook(self): + module = self._reload_for_linux() + hook = module.MouseHook() + hook.configure_gestures(enabled=True, threshold=50, deadzone=40) + return module, hook + + def test_horizontal_swipe_with_vertical_noise_resolves_horizontal(self): + module, hook = self._make_hook() + hook._gesture_delta_x = -72 + hook._gesture_delta_y = -20 + + self.assertEqual( + hook._detect_gesture_event(), + module.MouseEvent.GESTURE_SWIPE_LEFT, + ) + + def test_equal_diagonal_stays_ambiguous_instead_of_becoming_vertical(self): + module, hook = self._make_hook() + hook._gesture_delta_x = -60 + hook._gesture_delta_y = -60 + + self.assertIsNone(hook._detect_gesture_event()) + + def test_repeated_left_swipes_ignore_rebound_right_within_same_hold(self): + module, hook = self._make_hook() + hook._gesture_cooldown_ms = 0 + seen = [] + hook.register( + module.MouseEvent.GESTURE_SWIPE_LEFT, + lambda event: seen.append(event.event_type), + ) + hook.register( + module.MouseEvent.GESTURE_SWIPE_RIGHT, + lambda event: seen.append(event.event_type), + ) + + hook._on_hid_gesture_down() + hook._accumulate_gesture_delta(-70, 0, "hid_rawxy") + hook._accumulate_gesture_delta(70, 0, "hid_rawxy") + hook._accumulate_gesture_delta(-70, 0, "hid_rawxy") + hook._on_hid_gesture_up() + + self.assertEqual( + seen, + [ + module.MouseEvent.GESTURE_SWIPE_LEFT, + module.MouseEvent.GESTURE_SWIPE_LEFT, + ], + ) + def test_evdev_loop_clears_rescan_and_retries_after_listen_returns(self): module = self._reload_for_linux() hook = module.MouseHook()