Skip to content
80 changes: 78 additions & 2 deletions core/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Configuration manager loads/saves button mappings to a JSON file.
Configuration manager -- loads/saves button mappings to a JSON file.
Supports per-application profiles (for future use).
"""

Expand Down Expand Up @@ -29,6 +29,7 @@
"gesture": "Gesture button",
"xbutton1": "Back button",
"xbutton2": "Forward button",
"thumb_button": "Thumb button",
"hscroll_left": "Horizontal scroll left",
"hscroll_right": "Horizontal scroll right",
"mode_shift": "Mode shift button",
Expand All @@ -50,6 +51,53 @@
"gesture_down": "Gesture swipe down",
}

# Sealed set of legal values for the ``settings.wheel_divert`` kill-switch.
# Treat this like a stringly-named enum: every engine read goes through
# :func:`coerce_wheel_divert_setting` so typos in the user's config file
# never silently flip into one of the live branches.
WHEEL_DIVERT_AUTO = "auto"
WHEEL_DIVERT_OFF = "off"
WHEEL_DIVERT_VALID_VALUES: frozenset[str] = frozenset(
(WHEEL_DIVERT_AUTO, WHEEL_DIVERT_OFF)
)
WHEEL_DIVERT_DEFAULT = WHEEL_DIVERT_AUTO

_WHEEL_DIVERT_WARNED_VALUES: set[str] = set()


def coerce_wheel_divert_setting(value: object) -> str:
"""Normalize an arbitrary value into the sealed ``wheel_divert`` set.

Unknown values resolve to :data:`WHEEL_DIVERT_DEFAULT` and log a one-shot
warning per distinct typo so the user gets a single, actionable nudge
instead of a per-event flood. Type-narrows to ``str`` so call sites can
compare against the named constants without `is None` or `isinstance`
guards.
"""
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in WHEEL_DIVERT_VALID_VALUES:
return normalized
key = normalized or "<empty>"
if key not in _WHEEL_DIVERT_WARNED_VALUES:
_WHEEL_DIVERT_WARNED_VALUES.add(key)
print(
f"[Config] Unknown settings.wheel_divert={value!r}; "
f"falling back to {WHEEL_DIVERT_DEFAULT!r}. "
f"Valid values: {sorted(WHEEL_DIVERT_VALID_VALUES)}."
)
return WHEEL_DIVERT_DEFAULT
if value is not None:
marker = type(value).__name__
if marker not in _WHEEL_DIVERT_WARNED_VALUES:
_WHEEL_DIVERT_WARNED_VALUES.add(marker)
print(
f"[Config] settings.wheel_divert is not a string "
f"(got {type(value).__name__}); falling back to "
f"{WHEEL_DIVERT_DEFAULT!r}."
)
return WHEEL_DIVERT_DEFAULT

# Maps config button keys to the MouseEvent types they correspond to
BUTTON_TO_EVENTS = {
"middle": ("middle_down", "middle_up"),
Expand All @@ -60,14 +108,15 @@
"gesture_down": ("gesture_swipe_down",),
"xbutton1": ("xbutton1_down", "xbutton1_up"),
"xbutton2": ("xbutton2_down", "xbutton2_up"),
"thumb_button": ("thumb_button_down", "thumb_button_up"),
"hscroll_left": ("hscroll_left",),
"hscroll_right": ("hscroll_right",),
"mode_shift": ("mode_shift_down", "mode_shift_up"),
"dpi_switch": ("dpi_switch_down", "dpi_switch_up"),
}

DEFAULT_CONFIG = {
"version": 9,
"version": 11,
"active_profile": "default",
"profiles": {
"default": {
Expand All @@ -82,6 +131,7 @@
"gesture_down": "none",
"xbutton1": "alt_tab",
"xbutton2": "alt_tab",
"thumb_button": "none",
"hscroll_left": "browser_back",
"hscroll_right": "browser_forward",
"mode_shift": "switch_scroll_mode",
Expand Down Expand Up @@ -109,6 +159,11 @@
"ignore_trackpad": True,
"check_for_updates": True,
"update_check_state": {},
# HID++ wheel divert kill-switch:
# "auto" → enable on capable devices (MX Master family).
# "off" → never divert; force OS-layer inversion fallback.
# Sealed set; ``coerce_wheel_divert_setting`` normalizes user typos.
"wheel_divert": WHEEL_DIVERT_DEFAULT,
},
}

Expand Down Expand Up @@ -329,6 +384,24 @@ def _migrate(cfg):
settings.setdefault("ignore_trackpad", True)
cfg["version"] = 9

if version < 10:
# v10: HID++ wheel divert kill-switch. Default "auto" enables divert
# on capable devices (MX Master family). Existing installs keep
# working unchanged when the device exposes 0x2121 / 0x2150.
settings = cfg.setdefault("settings", {})
settings.setdefault("wheel_divert", WHEEL_DIVERT_DEFAULT)
cfg["version"] = 10

if version < 11:
# v11: MX Master 4 Thumb button (the small button on the front face,
# CID 0x00c3 in HID++). Existing profiles get "thumb_button": "none"
# so users opt in by mapping it in the UI. Devices without this
# button simply ignore the entry.
for pdata in cfg.get("profiles", {}).values():
mappings = pdata.setdefault("mappings", {})
mappings.setdefault("thumb_button", "none")
cfg["version"] = 11

cfg.setdefault("settings", {})
cfg["settings"].setdefault("appearance_mode", "system")
cfg["settings"].setdefault("debug_mode", False)
Expand All @@ -337,6 +410,9 @@ def _migrate(cfg):
cfg["settings"].setdefault("ignore_trackpad", True)
cfg["settings"].setdefault("check_for_updates", True)
cfg["settings"].setdefault("update_check_state", {})
cfg["settings"]["wheel_divert"] = coerce_wheel_divert_setting(
cfg["settings"].get("wheel_divert", WHEEL_DIVERT_DEFAULT)
)

# Always migrate old wmplayer.exe → Microsoft.Media.Player.exe in profile apps
for pdata in cfg.get("profiles", {}).values():
Expand Down
151 changes: 145 additions & 6 deletions core/engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Engine wires the mouse hook to the key simulator using the
Engine -- wires the mouse hook to the key simulator using the
current configuration. Sits between the hook layer and the UI.
Supports per-application auto-switching of profiles.
"""
Expand All @@ -14,6 +14,7 @@
from core.config import (
load_config, get_active_mappings, get_profile_for_app,
BUTTON_TO_EVENTS, GESTURE_DIRECTION_BUTTONS, save_config,
WHEEL_DIVERT_OFF, coerce_wheel_divert_setting,
)
from core.app_detector import AppDetector
from core.mouse_hook_types import HidRuntimeState
Expand Down Expand Up @@ -67,6 +68,12 @@ def __init__(self):
self._replay_lock = threading.Lock()
self._mouse_release_timers = {} # action_id → Timer for safety auto-release
self._lock = threading.Lock()
# HID++ native-invert tracking. `_last_native_invert_target` is
# the most recently acknowledged (invert_v, invert_h) so the
# fast-path can skip redundant device round-trips on profile changes.
self._wheel_divert_change_cb = None
self._wheel_divert_active_local = False
self._last_native_invert_target = (False, False)
self.hook.set_debug_callback(self._emit_debug)
self.hook.set_gesture_callback(self._emit_gesture_event)
self.hook.set_status_callback(self._emit_status)
Expand Down Expand Up @@ -140,6 +147,8 @@ def _setup_hooks(self):
)

self._emit_mapping_snapshot("Hook mappings refreshed", mappings)
# Drive HID++ firmware wheel-invert from settings + device capability.
self._apply_wheel_invert_setting()

for btn_key, action_id in mappings.items():
events = list(BUTTON_TO_EVENTS.get(btn_key, ()))
Expand Down Expand Up @@ -250,7 +259,7 @@ def _toggle_smart_shift(self):

IMPORTANT: this is called from a HID event callback which runs on the HID
loop thread. Calling hg.set_smart_shift() directly would block waiting for
the same loop to process the pending request a deadlock that causes the
the same loop to process the pending request -- a deadlock that causes the
3-second timeout seen in the logs. Config and UI are updated synchronously;
the device write is dispatched to a separate thread.
"""
Expand All @@ -277,7 +286,7 @@ def _switch_scroll_mode(self):
"""Switch between ratchet and free-spin (Logi Options+ physical button behaviour).

SmartShift auto-switching is disabled so the chosen fixed mode takes effect.
Same deadlock caveat as _toggle_smart_shift device write runs off-thread.
Same deadlock caveat as _toggle_smart_shift -- device write runs off-thread.
"""
settings = self.cfg.get("settings", {})
current_mode = settings.get("smart_shift_mode", "ratchet")
Expand Down Expand Up @@ -333,6 +342,104 @@ def _write():
hg.set_dpi(new_dpi)
threading.Thread(target=_write, daemon=True, name="CycleDPI").start()

def _apply_wheel_invert_setting(self, *, force: bool = False) -> None:
"""Drive HID++ firmware wheel-invert from settings + device
capability. ``force=True`` re-issues writes even when cached
state matches the target -- used by ``_run_saved_settings_replay``
to realign firmware that forgot state after sleep. On success the
engine + platform hook flip ``wheel_native_invert_active`` so the
OS-layer inversion path is suppressed; on failure the OS-layer
path handles inversion."""
settings = self.cfg.get("settings", {})
kill_switch_off = (
coerce_wheel_divert_setting(settings.get("wheel_divert")) == WHEEL_DIVERT_OFF
)
invert_v = bool(settings.get("invert_vscroll", False))
invert_h = bool(settings.get("invert_hscroll", False))
device = self.connected_device
capable = bool(device and (
getattr(device, "has_hires_wheel", False)
or getattr(device, "has_thumbwheel", False)
))
# Stay True even when both invert flags are False so we own the
# wheel-mode write and a stale invert lease from a crashed Mouser
# session is reset to native on reconnect.
target_active = bool(capable and not kill_switch_off)
hg = self.hook._hid_gesture
if (
not force
and target_active == self._wheel_divert_active_local
and target_active == bool(getattr(self.hook, "wheel_native_invert_active", False))
and (not target_active or self._last_native_invert_target == (invert_v, invert_h))
):
return
ack = False
if target_active and hg is not None and hasattr(hg, "request_wheel_native_invert"):
try:
ack = bool(hg.request_wheel_native_invert(invert_v, invert_h))
except Exception as exc:
print(f"[Engine] wheel native-invert request failed: {exc}")
ack = False
elif not target_active and hg is not None and hasattr(hg, "request_wheel_native_invert"):
try:
hg.request_wheel_native_invert(False, False)
except Exception as exc:
print(f"[Engine] wheel native-invert release failed: {exc}")
new_active = bool(target_active and ack)
prev_active = self._wheel_divert_active_local
self._wheel_divert_active_local = new_active
self.hook.wheel_native_invert_active = new_active
self._last_native_invert_target = (invert_v, invert_h) if new_active else (False, False)
if hg is not None and hasattr(hg, "set_wheel_divert_active_flags"):
try:
hg.set_wheel_divert_active_flags(
bool(new_active and invert_v
and getattr(hg, "_hires_wheel_idx", None) is not None),
bool(new_active and invert_h
and getattr(hg, "_thumbwheel_idx", None) is not None),
)
except Exception as exc:
print(f"[Engine] set_wheel_divert_active_flags failed: {exc}")
if new_active != prev_active:
print(
f"[Engine] wheel native-invert -> "
f"{'ON (HID++)' if new_active else 'OFF (OS fallback)'} "
f"capable={capable} kill_switch_off={kill_switch_off} "
f"invert_v={invert_v} invert_h={invert_h} ack={ack}"
)
if not new_active and target_active:
self._emit_status(
"Firmware wheel invert FAILED on a capable device -- "
"falling back to OS-level inversion."
)
self._notify_wheel_divert_change(new_active)

def _notify_wheel_divert_change(self, active: bool) -> None:
if self._wheel_divert_change_cb is None:
return
try:
self._wheel_divert_change_cb(bool(active))
except Exception as exc:
print(f"[Engine] wheel divert change callback raised: {exc}")

def set_wheel_divert_change_callback(self, cb) -> None:
"""Register ``cb(active: bool)`` invoked whenever the HID++ wheel
divert lease toggles. Fires once immediately with the current
state. Pass ``None`` to detach the currently registered callback."""
self._wheel_divert_change_cb = cb
if cb is None:
return
try:
cb(bool(self._wheel_divert_active_local))
except Exception as exc:
print(f"[Engine] wheel divert change callback (initial) raised: {exc}")

@property
def wheel_native_invert_active(self) -> bool:
"""True iff the connected device is performing scroll inversion at
the firmware level (so the OS-layer inversion path is suppressed)."""
return bool(self._wheel_divert_active_local)

def _make_hscroll_handler(self, action_id):
def handler(event):
if not self._enabled:
Expand Down Expand Up @@ -516,6 +623,19 @@ def _run_saved_settings_replay(self):
except Exception:
pass

# Phase A.5: re-apply HID++ native wheel invert with force=True so
# firmware that forgot invert state after sleep is realigned.
self._apply_wheel_invert_setting(force=True)
native_invert_target = (
coerce_wheel_divert_setting(
self.cfg.get("settings", {}).get("wheel_divert")
) != WHEEL_DIVERT_OFF
and bool(getattr(self.connected_device, "has_hires_wheel", False)
or getattr(self.connected_device, "has_thumbwheel", False))
)
if native_invert_target and not self._wheel_divert_active_local:
replay_ok = False

time.sleep(3)
hg = self.hook._hid_gesture
if hg is None or getattr(hg, "connected_device", None) is None:
Expand Down Expand Up @@ -658,8 +778,15 @@ def _battery_poll_loop(self, stop_event):
except Exception:
pass

# Read ``_replay_inflight`` under the same lock that the
# replay thread uses to flip it, otherwise the battery
# loop can issue a Smart Shift poll partway through a
# replay round-trip and the firmware queues conflicting
# HID++ writes.
with self._replay_lock:
replay_inflight = self._replay_inflight
if (
not self._replay_inflight
not replay_inflight
and now - _last_ss >= _ss_poll_interval
and hg.smart_shift_supported
):
Expand Down Expand Up @@ -725,7 +852,7 @@ def set_dpi(self, dpi_value):
hg = self.hook._hid_gesture
if hg:
return hg.set_dpi(dpi)
print("[Engine] No HID++ connection DPI not applied")
print("[Engine] No HID++ connection -- DPI not applied")
return False

def set_smart_shift(self, mode, smart_shift_enabled=False, threshold=25):
Expand All @@ -744,7 +871,7 @@ def set_smart_shift(self, mode, smart_shift_enabled=False, threshold=25):
result = hg.set_smart_shift(mode, smart_shift_enabled, threshold)
print(f"[Engine] set_smart_shift -> {'OK' if result else 'FAILED'}")
return result
print("[Engine] set_smart_shift: No HID++ connection not applied")
print("[Engine] set_smart_shift: No HID++ connection -- not applied")
return False

@property
Expand Down Expand Up @@ -808,3 +935,15 @@ def stop(self):
self._battery_poll_thread = None
self._app_detector.stop()
self.hook.stop()
# Cancel any pending safety auto-release timers. Without this the
# threading.Timer scheduled by execute_action can still fire after
# ``stop()`` returns and call ``inject_mouse_up`` against a hook
# that has already been torn down -- one of the timers fires a
# phantom release every time Mouser quits during a long press.
timers = list(self._mouse_release_timers.values())
self._mouse_release_timers.clear()
for timer in timers:
try:
timer.cancel()
except Exception as exc: # noqa: BLE001 - shutdown must complete
print(f"[Engine] stop: failed to cancel release timer: {exc!r}")
Loading
Loading