Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 78 additions & 2 deletions core/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Configuration manager loads/saves button mappings to a JSON file.
Configuration manager -- loads/saves button mappings to a JSON file.
Supports per-application profiles (for future use).
"""

Expand Down Expand Up @@ -29,6 +29,7 @@
"gesture": "Gesture button",
"xbutton1": "Back button",
"xbutton2": "Forward button",
"thumb_button": "Thumb button",
"hscroll_left": "Horizontal scroll left",
"hscroll_right": "Horizontal scroll right",
"mode_shift": "Mode shift button",
Expand All @@ -50,6 +51,53 @@
"gesture_down": "Gesture swipe down",
}

# Sealed set of legal values for the ``settings.wheel_divert`` kill-switch.
# Treat this like a stringly-named enum: every engine read goes through
# :func:`coerce_wheel_divert_setting` so typos in the user's config file
# never silently flip into one of the live branches.
WHEEL_DIVERT_AUTO = "auto"
WHEEL_DIVERT_OFF = "off"
WHEEL_DIVERT_VALID_VALUES: frozenset[str] = frozenset(
(WHEEL_DIVERT_AUTO, WHEEL_DIVERT_OFF)
)
WHEEL_DIVERT_DEFAULT = WHEEL_DIVERT_AUTO

_WHEEL_DIVERT_WARNED_VALUES: set[str] = set()


def coerce_wheel_divert_setting(value: object) -> str:
"""Normalize an arbitrary value into the sealed ``wheel_divert`` set.

Unknown values resolve to :data:`WHEEL_DIVERT_DEFAULT` and log a one-shot
warning per distinct typo so the user gets a single, actionable nudge
instead of a per-event flood. Type-narrows to ``str`` so call sites can
compare against the named constants without `is None` or `isinstance`
guards.
"""
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in WHEEL_DIVERT_VALID_VALUES:
return normalized
key = normalized or "<empty>"
if key not in _WHEEL_DIVERT_WARNED_VALUES:
_WHEEL_DIVERT_WARNED_VALUES.add(key)
print(
f"[Config] Unknown settings.wheel_divert={value!r}; "
f"falling back to {WHEEL_DIVERT_DEFAULT!r}. "
f"Valid values: {sorted(WHEEL_DIVERT_VALID_VALUES)}."
)
return WHEEL_DIVERT_DEFAULT
if value is not None:
marker = type(value).__name__
if marker not in _WHEEL_DIVERT_WARNED_VALUES:
_WHEEL_DIVERT_WARNED_VALUES.add(marker)
print(
f"[Config] settings.wheel_divert is not a string "
f"(got {type(value).__name__}); falling back to "
f"{WHEEL_DIVERT_DEFAULT!r}."
)
return WHEEL_DIVERT_DEFAULT

# Maps config button keys to the MouseEvent types they correspond to
BUTTON_TO_EVENTS = {
"middle": ("middle_down", "middle_up"),
Expand All @@ -60,14 +108,15 @@
"gesture_down": ("gesture_swipe_down",),
"xbutton1": ("xbutton1_down", "xbutton1_up"),
"xbutton2": ("xbutton2_down", "xbutton2_up"),
"thumb_button": ("thumb_button_down", "thumb_button_up"),
"hscroll_left": ("hscroll_left",),
"hscroll_right": ("hscroll_right",),
"mode_shift": ("mode_shift_down", "mode_shift_up"),
"dpi_switch": ("dpi_switch_down", "dpi_switch_up"),
}

DEFAULT_CONFIG = {
"version": 9,
"version": 11,
"active_profile": "default",
"profiles": {
"default": {
Expand All @@ -82,6 +131,7 @@
"gesture_down": "none",
"xbutton1": "alt_tab",
"xbutton2": "alt_tab",
"thumb_button": "none",
"hscroll_left": "browser_back",
"hscroll_right": "browser_forward",
"mode_shift": "switch_scroll_mode",
Expand Down Expand Up @@ -109,6 +159,11 @@
"ignore_trackpad": True,
"check_for_updates": True,
"update_check_state": {},
# HID++ wheel divert kill-switch:
# "auto" → enable on capable devices (MX Master family).
# "off" → never divert; force OS-layer inversion fallback.
# Sealed set; ``coerce_wheel_divert_setting`` normalizes user typos.
"wheel_divert": WHEEL_DIVERT_DEFAULT,
},
}

Expand Down Expand Up @@ -329,6 +384,24 @@ def _migrate(cfg):
settings.setdefault("ignore_trackpad", True)
cfg["version"] = 9

if version < 10:
# v10: HID++ wheel divert kill-switch. Default "auto" enables divert
# on capable devices (MX Master family). Existing installs keep
# working unchanged when the device exposes 0x2121 / 0x2150.
settings = cfg.setdefault("settings", {})
settings.setdefault("wheel_divert", WHEEL_DIVERT_DEFAULT)
cfg["version"] = 10

if version < 11:
# v11: MX Master 4 Thumb button (the small button on the front face,
# CID 0x00c3 in HID++). Existing profiles get "thumb_button": "none"
# so users opt in by mapping it in the UI. Devices without this
# button simply ignore the entry.
for pdata in cfg.get("profiles", {}).values():
mappings = pdata.setdefault("mappings", {})
mappings.setdefault("thumb_button", "none")
cfg["version"] = 11

cfg.setdefault("settings", {})
cfg["settings"].setdefault("appearance_mode", "system")
cfg["settings"].setdefault("debug_mode", False)
Expand All @@ -337,6 +410,9 @@ def _migrate(cfg):
cfg["settings"].setdefault("ignore_trackpad", True)
cfg["settings"].setdefault("check_for_updates", True)
cfg["settings"].setdefault("update_check_state", {})
cfg["settings"]["wheel_divert"] = coerce_wheel_divert_setting(
cfg["settings"].get("wheel_divert", WHEEL_DIVERT_DEFAULT)
)

# Always migrate old wmplayer.exe → Microsoft.Media.Player.exe in profile apps
for pdata in cfg.get("profiles", {}).values():
Expand Down
151 changes: 145 additions & 6 deletions core/engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Engine wires the mouse hook to the key simulator using the
Engine -- wires the mouse hook to the key simulator using the
current configuration. Sits between the hook layer and the UI.
Supports per-application auto-switching of profiles.
"""
Expand All @@ -14,6 +14,7 @@
from core.config import (
load_config, get_active_mappings, get_profile_for_app,
BUTTON_TO_EVENTS, GESTURE_DIRECTION_BUTTONS, save_config,
WHEEL_DIVERT_OFF, coerce_wheel_divert_setting,
)
from core.app_detector import AppDetector
from core.mouse_hook_types import HidRuntimeState
Expand Down Expand Up @@ -67,6 +68,12 @@ def __init__(self):
self._replay_lock = threading.Lock()
self._mouse_release_timers = {} # action_id → Timer for safety auto-release
self._lock = threading.Lock()
# HID++ native-invert tracking. `_last_native_invert_target` is
# the most recently acknowledged (invert_v, invert_h) so the
# fast-path can skip redundant device round-trips on profile changes.
self._wheel_divert_change_cb = None
self._wheel_divert_active_local = False
self._last_native_invert_target = (False, False)
self.hook.set_debug_callback(self._emit_debug)
self.hook.set_gesture_callback(self._emit_gesture_event)
self.hook.set_status_callback(self._emit_status)
Expand Down Expand Up @@ -140,6 +147,8 @@ def _setup_hooks(self):
)

self._emit_mapping_snapshot("Hook mappings refreshed", mappings)
# Drive HID++ firmware wheel-invert from settings + device capability.
self._apply_wheel_invert_setting()

for btn_key, action_id in mappings.items():
events = list(BUTTON_TO_EVENTS.get(btn_key, ()))
Expand Down Expand Up @@ -250,7 +259,7 @@ def _toggle_smart_shift(self):

IMPORTANT: this is called from a HID event callback which runs on the HID
loop thread. Calling hg.set_smart_shift() directly would block waiting for
the same loop to process the pending request a deadlock that causes the
the same loop to process the pending request -- a deadlock that causes the
3-second timeout seen in the logs. Config and UI are updated synchronously;
the device write is dispatched to a separate thread.
"""
Expand All @@ -277,7 +286,7 @@ def _switch_scroll_mode(self):
"""Switch between ratchet and free-spin (Logi Options+ physical button behaviour).

SmartShift auto-switching is disabled so the chosen fixed mode takes effect.
Same deadlock caveat as _toggle_smart_shift device write runs off-thread.
Same deadlock caveat as _toggle_smart_shift -- device write runs off-thread.
"""
settings = self.cfg.get("settings", {})
current_mode = settings.get("smart_shift_mode", "ratchet")
Expand Down Expand Up @@ -333,6 +342,104 @@ def _write():
hg.set_dpi(new_dpi)
threading.Thread(target=_write, daemon=True, name="CycleDPI").start()

def _apply_wheel_invert_setting(self, *, force: bool = False) -> None:
"""Drive HID++ firmware wheel-invert from settings + device
capability. ``force=True`` re-issues writes even when cached
state matches the target -- used by ``_run_saved_settings_replay``
to realign firmware that forgot state after sleep. On success the
engine + platform hook flip ``wheel_native_invert_active`` so the
OS-layer inversion path is suppressed; on failure the OS-layer
path handles inversion."""
settings = self.cfg.get("settings", {})
kill_switch_off = (
coerce_wheel_divert_setting(settings.get("wheel_divert")) == WHEEL_DIVERT_OFF
)
invert_v = bool(settings.get("invert_vscroll", False))
invert_h = bool(settings.get("invert_hscroll", False))
device = self.connected_device
capable = bool(device and (
getattr(device, "has_hires_wheel", False)
or getattr(device, "has_thumbwheel", False)
))
# Stay True even when both invert flags are False so we own the
# wheel-mode write and a stale invert lease from a crashed Mouser
# session is reset to native on reconnect.
target_active = bool(capable and not kill_switch_off)
hg = self.hook._hid_gesture
if (
not force
and target_active == self._wheel_divert_active_local
and target_active == bool(getattr(self.hook, "wheel_native_invert_active", False))
and (not target_active or self._last_native_invert_target == (invert_v, invert_h))
):
return
ack = False
if target_active and hg is not None and hasattr(hg, "request_wheel_native_invert"):
try:
ack = bool(hg.request_wheel_native_invert(invert_v, invert_h))
except Exception as exc:
print(f"[Engine] wheel native-invert request failed: {exc}")
ack = False
elif not target_active and hg is not None and hasattr(hg, "request_wheel_native_invert"):
try:
hg.request_wheel_native_invert(False, False)
except Exception as exc:
print(f"[Engine] wheel native-invert release failed: {exc}")
new_active = bool(target_active and ack)
prev_active = self._wheel_divert_active_local
self._wheel_divert_active_local = new_active
self.hook.wheel_native_invert_active = new_active
self._last_native_invert_target = (invert_v, invert_h) if new_active else (False, False)
if hg is not None and hasattr(hg, "set_wheel_divert_active_flags"):
try:
hg.set_wheel_divert_active_flags(
bool(new_active and invert_v
and getattr(hg, "_hires_wheel_idx", None) is not None),
bool(new_active and invert_h
and getattr(hg, "_thumbwheel_idx", None) is not None),
)
except Exception as exc:
print(f"[Engine] set_wheel_divert_active_flags failed: {exc}")
if new_active != prev_active:
print(
f"[Engine] wheel native-invert -> "
f"{'ON (HID++)' if new_active else 'OFF (OS fallback)'} "
f"capable={capable} kill_switch_off={kill_switch_off} "
f"invert_v={invert_v} invert_h={invert_h} ack={ack}"
)
if not new_active and target_active:
self._emit_status(
"Firmware wheel invert FAILED on a capable device -- "
"falling back to OS-level inversion."
)
self._notify_wheel_divert_change(new_active)

def _notify_wheel_divert_change(self, active: bool) -> None:
if self._wheel_divert_change_cb is None:
return
try:
self._wheel_divert_change_cb(bool(active))
except Exception as exc:
print(f"[Engine] wheel divert change callback raised: {exc}")

def set_wheel_divert_change_callback(self, cb) -> None:
"""Register ``cb(active: bool)`` invoked whenever the HID++ wheel
divert lease toggles. Fires once immediately with the current
state. Pass ``None`` to detach the currently registered callback."""
self._wheel_divert_change_cb = cb
if cb is None:
return
try:
cb(bool(self._wheel_divert_active_local))
except Exception as exc:
print(f"[Engine] wheel divert change callback (initial) raised: {exc}")

@property
def wheel_native_invert_active(self) -> bool:
"""True iff the connected device is performing scroll inversion at
the firmware level (so the OS-layer inversion path is suppressed)."""
return bool(self._wheel_divert_active_local)

def _make_hscroll_handler(self, action_id):
def handler(event):
if not self._enabled:
Expand Down Expand Up @@ -516,6 +623,19 @@ def _run_saved_settings_replay(self):
except Exception:
pass

# Phase A.5: re-apply HID++ native wheel invert with force=True so
# firmware that forgot invert state after sleep is realigned.
self._apply_wheel_invert_setting(force=True)
native_invert_target = (
coerce_wheel_divert_setting(
self.cfg.get("settings", {}).get("wheel_divert")
) != WHEEL_DIVERT_OFF
and bool(getattr(self.connected_device, "has_hires_wheel", False)
or getattr(self.connected_device, "has_thumbwheel", False))
)
if native_invert_target and not self._wheel_divert_active_local:
replay_ok = False

time.sleep(3)
hg = self.hook._hid_gesture
if hg is None or getattr(hg, "connected_device", None) is None:
Expand Down Expand Up @@ -658,8 +778,15 @@ def _battery_poll_loop(self, stop_event):
except Exception:
pass

# Read ``_replay_inflight`` under the same lock that the
# replay thread uses to flip it, otherwise the battery
# loop can issue a Smart Shift poll partway through a
# replay round-trip and the firmware queues conflicting
# HID++ writes.
with self._replay_lock:
replay_inflight = self._replay_inflight
if (
not self._replay_inflight
not replay_inflight
and now - _last_ss >= _ss_poll_interval
and hg.smart_shift_supported
):
Expand Down Expand Up @@ -725,7 +852,7 @@ def set_dpi(self, dpi_value):
hg = self.hook._hid_gesture
if hg:
return hg.set_dpi(dpi)
print("[Engine] No HID++ connection DPI not applied")
print("[Engine] No HID++ connection -- DPI not applied")
return False

def set_smart_shift(self, mode, smart_shift_enabled=False, threshold=25):
Expand All @@ -744,7 +871,7 @@ def set_smart_shift(self, mode, smart_shift_enabled=False, threshold=25):
result = hg.set_smart_shift(mode, smart_shift_enabled, threshold)
print(f"[Engine] set_smart_shift -> {'OK' if result else 'FAILED'}")
return result
print("[Engine] set_smart_shift: No HID++ connection not applied")
print("[Engine] set_smart_shift: No HID++ connection -- not applied")
return False

@property
Expand Down Expand Up @@ -808,3 +935,15 @@ def stop(self):
self._battery_poll_thread = None
self._app_detector.stop()
self.hook.stop()
# Cancel any pending safety auto-release timers. Without this the
# threading.Timer scheduled by execute_action can still fire after
# ``stop()`` returns and call ``inject_mouse_up`` against a hook
# that has already been torn down -- one of the timers fires a
# phantom release every time Mouser quits during a long press.
timers = list(self._mouse_release_timers.values())
self._mouse_release_timers.clear()
for timer in timers:
try:
timer.cancel()
except Exception as exc: # noqa: BLE001 - shutdown must complete
print(f"[Engine] stop: failed to cancel release timer: {exc!r}")
Loading
Loading