Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable defining per-key thresholds #11

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ By filtering such anomalies, we can hopefully remove chatter without impeding ac

## Installation

Download the repository as a zip and extract the file. The dependencies are listed in the requirements.txt. And you can install it with the command below.
Download the repository as a zip and extract the file. The dependencies are listed in the requirements.txt. And you can install it with the command below.

```shell
sudo pip3 install -r requirements.txt
Expand Down Expand Up @@ -71,6 +71,12 @@ sudo python3 -m src

- -v {0,1,2}, --verbosity {0,1,2}

- -c CONFIG_PATH, --config-path CONFIG_PATH
- Path to the configuration file specifying the threshold for each key (absolute or relative to the main project directory). The default path is `config.yaml` in the main project directory. If the file does not exist, the default threshold or the provided THRESHOLD will be used for all keys.

- -n, --new-config
- Create a new configuration file at CONFIG_PATH with commented-out entries for all available keys for a given keyboard. To customize the threshold for a key, uncomment the line and set the threshold value in milliseconds.

## Automation

Starting the script manually every time doesn't sound like the greatest idea, so
Expand All @@ -90,12 +96,17 @@ Then, copy the `chattering_fix.service` to `/etc/systemd/system/` and enable it
```shell
systemctl enable --now chattering_fix
```
You can check if the systemd unit file is properly working using
You can check if the systemd unit file is properly working using
```shell
systemctl status chattering_fix.service
```
You can also use
You can also use
```shell
journalctl -xeu chattering_fix.service
```
just to make sure that there are no errors.
just to make sure that there are no errors.

To change individual keys thresholds while the service is running, edit the config file as you normally would and then restart the service with the command below.
```shell
systemctl restart chattering_fix.service
```
43 changes: 37 additions & 6 deletions src/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@
import libevdev

from src.filtering import filter_chattering
from src.keyboard_retrieval import retrieve_keyboard_name, INPUT_DEVICES_PATH, abs_keyboard_path
from src.keyboard_retrieval import (
INPUT_DEVICES_PATH,
abs_keyboard_path,
create_config_file,
parse_config_file,
retrieve_keyboard_name,
)


@contextmanager
def get_device_handle(keyboard_name: str) -> libevdev.Device:
""" Safely get an evdev device handle. """
"""Safely get an evdev device handle."""

fd = open(abs_keyboard_path(keyboard_name), 'rb')
fd = open(abs_keyboard_path(keyboard_name), "rb")
evdev = libevdev.Device(fd)
try:
yield evdev
Expand All @@ -26,9 +32,12 @@ def get_device_handle(keyboard_name: str) -> libevdev.Device:
parser.add_argument('-k', '--keyboard', type=str, default=str(),
help=f"Name of your chattering keyboard device as listed in {INPUT_DEVICES_PATH}. "
f"If left unset, will be attempted to be retrieved automatically.")
parser.add_argument('-t', '--threshold', type=int, default=30, help="Filter time threshold in milliseconds. "
parser.add_argument('-t', '--threshold', type=int, default=None, help="Filter time threshold in milliseconds. "
"Default=30ms.")
parser.add_argument('-v', '--verbosity', type=int, default=1, choices=[0, 1, 2])
parser.add_argument('-c', '--config', type=str, default='config.yaml', help="Path to the configuration file.")
parser.add_argument('-n', '--new-config', action='store_true', help="Create a new configuration file at the path "
"defined by --config.")
args = parser.parse_args()

logging.basicConfig(
Expand All @@ -43,8 +52,30 @@ def get_device_handle(keyboard_name: str) -> libevdev.Device:
)
],
format="%(asctime)s - %(message)s",
datefmt="%H:%M:%S"
datefmt="%H:%M:%S",
)

# Load the config file if it exists.
try:
with open(args.config, "r") as file:
config = parse_config_file(file)
logging.info(f"Using configuration from {args.config}.")
logging.debug(f"Configuration: {config}")
except FileNotFoundError:
config = {}

# Use common threshold from the config if it was specified.
# Otherwise, use the default value of 30 ms.
if "default" not in config:
config["default"] = 30

# Allow overriding the threshold from the command line.
if args.threshold is not None:
config["default"] = args.threshold
logging.info(f"Overriding default threshold with {args.threshold} ms from command line.")

with get_device_handle(args.keyboard or retrieve_keyboard_name()) as device:
filter_chattering(device, args.threshold)
if args.new_config:
create_config_file(device, args.config, default_threshold=config["default"])
logging.info(f"New configuration file created at {args.config}.")
filter_chattering(device, config)
35 changes: 21 additions & 14 deletions src/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import libevdev


def filter_chattering(evdev: libevdev.Device, threshold: int) -> NoReturn:
def filter_chattering(evdev: libevdev.Device, thresholds: dict) -> NoReturn:
# grab the device - now only we see the events it emits
evdev.grab()
# create a copy of the device that we can write to - this will emit the filtered events to anyone who listens
Expand All @@ -16,43 +16,50 @@ def filter_chattering(evdev: libevdev.Device, threshold: int) -> NoReturn:
while True:
# since the descriptor is blocking, this blocks until there are events available
for e in evdev.events():
if _from_keystroke(e, threshold):
if _from_keystroke(e, thresholds):
ui_dev.send_events([e, libevdev.InputEvent(libevdev.EV_SYN.SYN_REPORT, 0)])


def _from_keystroke(event: libevdev.InputEvent, threshold: int) -> bool:
def _from_keystroke(event: libevdev.InputEvent, thresholds: dict) -> bool:
# no need to relay those - we are going to emit our own
if event.matches(libevdev.EV_SYN) or event.matches(libevdev.EV_MSC):
return False

# not sure when this would happen, but let's not crash
if event.code is None or event.value is None:
return False

# some events we don't want to filter, like EV_LED for toggling NumLock and the like, and also key hold events
if not event.matches(libevdev.EV_KEY) or event.value > 1:
logging.debug(f'FORWARDING {event.code}')
logging.debug(f"FORWARDING {event.code}")
return True

# the values are 0 for up, 1 for down and 2 for hold
if event.value == 0:
if _key_pressed[event.code]:
logging.debug(f'FORWARDING {event.code} up')
_last_key_up[event.code] = event.sec * 1E6 + event.usec
logging.debug(f"FORWARDING {event.code} up")
_last_key_up[event.code] = event.sec * 1e6 + event.usec
_key_pressed[event.code] = False
return True
else:
logging.info(f'FILTERING {event.code} up: key not pressed beforehand')
logging.info(f"FILTERING {event.code} up: key not pressed beforehand")
return False

prev = _last_key_up.get(event.code)
now = event.sec * 1E6 + event.usec
now = event.sec * 1e6 + event.usec

if prev is None or now - prev > threshold * 1E3:
logging.debug(f'FORWARDING {event.code} down')
if prev is None or now - prev > thresholds.get(event.code, thresholds["default"]) * 1e3:
logging.debug(f"FORWARDING {event.code} down")
if prev is not None and now - prev < thresholds.get(event.code, thresholds["default"]) * 1e3 * 2:
logging.debug(f"POTENTIAL chatter on {event.code} down: "
f"last key up event happened {(now - prev) / 1e3} ms ago. "
f"FORWARDING press")
_key_pressed[event.code] = True
return True

logging.info(
f'FILTERED {event.code} down: last key up event happened {(now - prev) / 1E3} ms ago')
logging.info(f"FILTERED {event.code} down: last key up event happened {(now - prev) / 1e3} ms ago")
return False


_last_key_up: Dict[libevdev.EventCode, int] = {}
_key_pressed: DefaultDict[libevdev.EventCode, bool] = defaultdict(bool)
_last_key_up: Dict[libevdev.EventCode, float] = {}
_key_pressed: DefaultDict[libevdev.EventCode, bool] = defaultdict(bool)
55 changes: 50 additions & 5 deletions src/keyboard_retrieval.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import logging
import os
import shutil
from io import TextIOWrapper
from typing import Final

INPUT_DEVICES_PATH: Final = '/dev/input/by-id'
_KEYBOARD_NAME_SUFFIX: Final = '-kbd'
import libevdev
import yaml

INPUT_DEVICES_PATH: Final = "/dev/input/by-id"
_KEYBOARD_NAME_SUFFIX: Final = "-kbd"


def retrieve_keyboard_name() -> str:
keyboard_devices = list(filter(lambda d: d.endswith(_KEYBOARD_NAME_SUFFIX), os.listdir(INPUT_DEVICES_PATH)))
Expand All @@ -14,12 +20,12 @@ def retrieve_keyboard_name() -> str:
if n_devices == 1:
logging.info(f"Found keyboard: {keyboard_devices[0]}")
return keyboard_devices[0]

# Use native Python input for user selection
print("Select a device:")
for idx, device in enumerate(keyboard_devices, start=1):
print(f"{idx}. {device}")

selected_idx = -1
while selected_idx < 1 or selected_idx > n_devices:
try:
Expand All @@ -28,9 +34,48 @@ def retrieve_keyboard_name() -> str:
print(f"Please select a number between 1 and {n_devices}")
except ValueError:
print("Please enter a valid number")

return keyboard_devices[selected_idx - 1]


def abs_keyboard_path(device: str) -> str:
return os.path.join(INPUT_DEVICES_PATH, device)


def create_config_file(device: libevdev.Device, config_path: str, default_threshold: int) -> None:
# Create a new configuration file with the default threshold.
with open(config_path, "w") as file:
file.write(f"default: {default_threshold}\n")

# Write each supported key event with the default threshold, but commented out.
supported = device.evbits
for event_type, event_codes in supported.items():
if event_type == libevdev.EV_KEY:
for event_code in event_codes:
file.write(f'# "{event_code.name}:{event_code.value}": {default_threshold}\n')

# Get the current user who invoked sudo (e.g., $SUDO_USER).
user = os.environ.get("SUDO_USER")
if user is None:
raise Exception("Script must be run with sudo.")

# Change ownership to the invoking user.
shutil.chown(config_path, user=user)

# Set permissions (read/write for the user, read-only for others).
os.chmod(config_path, 0o644) # rw-r--r--


def parse_config_file(config_file: TextIOWrapper) -> dict:
input_config = yaml.safe_load(config_file)

# Convert the keys to EventCodes.
config = {}
for key, value in input_config.items():
if key == "default":
config[key] = value
else:
event_code = libevdev.evbit("EV_KEY", key.split(":")[0])
config[event_code] = value

return config