From 7f77bf5bdf02b71fa90d92acbe7527a293b6d884 Mon Sep 17 00:00:00 2001 From: igo95862 Date: Sun, 9 Jun 2024 19:02:25 +0500 Subject: [PATCH] Format code with black Add formatting check to linters. --- .github/workflows/ci.yaml | 19 +- docs/man_generator.py | 80 +- src/bubblejail/bubblejail_cli.py | 92 +-- src/bubblejail/bubblejail_cli_autocomplete.py | 22 +- src/bubblejail/bubblejail_cli_metadata.py | 164 ++-- src/bubblejail/bubblejail_directories.py | 158 ++-- src/bubblejail/bubblejail_gui_qt.py | 131 ++-- src/bubblejail/bubblejail_helper.py | 120 ++- src/bubblejail/bubblejail_instance.py | 85 +- src/bubblejail/bubblejail_runner.py | 118 ++- src/bubblejail/bubblejail_seccomp.py | 51 +- src/bubblejail/bubblejail_utils.py | 12 +- src/bubblejail/bwrap_config.py | 49 +- src/bubblejail/exceptions.py | 27 +- src/bubblejail/services.py | 726 +++++++++--------- test/test_auto_completion.py | 14 +- test/test_full_run.py | 4 +- test/test_helper.py | 65 +- test/test_profiles.py | 9 +- test/test_service_info.py | 21 +- tools/base.py | 16 + tools/containers/build_ci_images.py | 5 +- tools/jinja2_run.py | 8 +- tools/run_format.py | 42 + tools/run_linters.py | 49 +- tools/run_test_bubblejail.py | 20 +- 26 files changed, 1049 insertions(+), 1058 deletions(-) create mode 100644 tools/base.py create mode 100644 tools/run_format.py diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b48b8d2..af3f513 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -16,11 +16,12 @@ jobs: image: docker.io/archlinux:latest steps: - name: Install linters and Python dependencies - run: | - pacman --noconfirm -Syu \ - git \ - python-pyflakes reuse \ - mypy python-pyqt6 python-tomli-w + run: > + pacman --noconfirm -Syu + git + python-pyflakes reuse + mypy python-pyqt6 python-tomli-w + python-black python-isort - name: Checkout uses: actions/checkout@v4 - name: Add safe git directory @@ -28,7 +29,7 @@ jobs: git config --global --add safe.directory "$GITHUB_WORKSPACE" - name: Run linters run: | - python3 tools/run_linters.py + python3 -m tools.run_linters build: name: Build @@ -37,9 +38,9 @@ jobs: image: docker.io/archlinux:latest steps: - name: Install build dependencies - run: | - pacman --noconfirm -Syu \ - git python python-jinja scdoc meson + run: > + pacman --noconfirm -Syu + git python python-jinja scdoc meson - name: Checkout uses: actions/checkout@v4 - name: Run meson diff --git a/docs/man_generator.py b/docs/man_generator.py index 37f04ac..fc7ed51 100644 --- a/docs/man_generator.py +++ b/docs/man_generator.py @@ -19,15 +19,15 @@ def scdoc_paragraph(s: Iterator[str]) -> str: - return '\n\n'.join(s) + return "\n\n".join(s) def scdoc_indent(s: str, indent_level: int = 1) -> str: - return indent(s, '\t'*indent_level) + return indent(s, "\t" * indent_level) SUBCOMMAND_HELP = { - 'run': """The arguments are optional if you have + "run": """The arguments are optional if you have _executable_name_ key set in config. Otherwise, you *must* specify arguments to run. @@ -40,29 +40,29 @@ def scdoc_indent(s: str, indent_level: int = 1) -> str: the sandbox. If _--wait_ option is passed the output of the command will be returned. """, - 'create': """When a new instance is created a desktop + "create": """When a new instance is created a desktop entry will be also created. Creating an instance from profile will print some import tips that you can use to import configuration from unsandboxed application. """, - 'generate-desktop-entry': """Desktop entry can either be specified + "generate-desktop-entry": """Desktop entry can either be specified by profile, path, name or extracted from metadata when instance was created. """, - 'edit': """After exiting the editor, the file is validated and + "edit": """After exiting the editor, the file is validated and only written if validation is successful. _EDITOR_ environmental variable must be set. """, - 'list': '\n'.join( - f"- *{x}*" for x in - BUBBLEJAIL_CMD['list']['add_argument']['list_what']['choices'] - ) + "list": "\n".join( + f"- *{x}*" + for x in BUBBLEJAIL_CMD["list"]["add_argument"]["list_what"]["choices"] + ), } OPTION_HELP = { - 'run': { - '--debug-bwrap-args': """The instance name must be separated with + "run": { + "--debug-bwrap-args": """The instance name must be separated with extra -- from the instance name. This option can be repeated multiple times for multiple args to bwrap. @@ -75,8 +75,8 @@ def scdoc_indent(s: str, indent_level: int = 1) -> str: """ }, - 'create': { - '--profile': """If omitted an empty profile will be used and + "create": { + "--profile": """If omitted an empty profile will be used and the user will have to define the configuration manually. There is also a _generic_ profile which has some common settings such @@ -87,16 +87,16 @@ def scdoc_indent(s: str, indent_level: int = 1) -> str: def format_option(subcommand: str, option: str) -> Iterator[str]: - option_data = BUBBLEJAIL_CMD[subcommand]['add_argument'][option] + option_data = BUBBLEJAIL_CMD[subcommand]["add_argument"][option] yield f"*{option}*" - option_action = option_data.get('action') + option_action = option_data.get("action") match option_action: - case 'store_true' | 'store_false': + case "store_true" | "store_false": return - option_metavar = option_data.get('metavar') + option_metavar = option_data.get("metavar") match option_metavar: case str(): yield f"<{option_metavar}>" @@ -109,12 +109,12 @@ def format_option(subcommand: str, option: str) -> Iterator[str]: def get_option_description(subcommand: str, option: str) -> tuple[str, ...]: - option_data = BUBBLEJAIL_CMD[subcommand]['add_argument'][option] - option_help = option_data['help'] + option_data = BUBBLEJAIL_CMD[subcommand]["add_argument"][option] + option_help = option_data["help"] try: option_extra_description = OPTION_HELP[subcommand][option] except KeyError: - option_extra_description = '' + option_extra_description = "" return option_help, option_extra_description @@ -122,22 +122,22 @@ def get_option_description(subcommand: str, option: str) -> tuple[str, ...]: def get_options(subcommand: str) -> tuple[str, ...]: return tuple( filter( - lambda x: x.startswith('-'), - BUBBLEJAIL_CMD[subcommand]['add_argument'].keys(), + lambda x: x.startswith("-"), + BUBBLEJAIL_CMD[subcommand]["add_argument"].keys(), ) ) def format_arg_names(subcommand: str) -> Iterator[str]: if get_options(subcommand): - yield '[options...]' + yield "[options...]" - add_arguments_dict = BUBBLEJAIL_CMD[subcommand]['add_argument'] + add_arguments_dict = BUBBLEJAIL_CMD[subcommand]["add_argument"] for add_argument, options in add_arguments_dict.items(): - if add_argument.startswith('-'): + if add_argument.startswith("-"): continue - if options.get('nargs'): + if options.get("nargs"): yield f"[{add_argument}...]" else: yield f"[{add_argument}]" @@ -145,8 +145,8 @@ def format_arg_names(subcommand: str) -> Iterator[str]: def get_subcommand_description(subcommand: str) -> tuple[str, ...]: return ( - BUBBLEJAIL_CMD[subcommand]['description'], - SUBCOMMAND_HELP.get(subcommand, ''), + BUBBLEJAIL_CMD[subcommand]["description"], + SUBCOMMAND_HELP.get(subcommand, ""), ) @@ -155,10 +155,10 @@ def generate_cmd_man(template_dir: Path) -> None: loader=FileSystemLoader(template_dir), undefined=StrictUndefined, ) - env.filters['scdoc_indent'] = scdoc_indent - env.filters['scdoc_paragraph'] = scdoc_paragraph + env.filters["scdoc_indent"] = scdoc_indent + env.filters["scdoc_paragraph"] = scdoc_paragraph - template = env.get_template('bubblejail.1.scd.jinja2') + template = env.get_template("bubblejail.1.scd.jinja2") print( template.render( @@ -173,7 +173,7 @@ def generate_cmd_man(template_dir: Path) -> None: def generate_services_man(template_dir: Path) -> None: - modules['xdg'] = MagicMock() + modules["xdg"] = MagicMock() from bubblejail.services import SERVICES_CLASSES @@ -181,9 +181,9 @@ def generate_services_man(template_dir: Path) -> None: loader=FileSystemLoader(template_dir), undefined=StrictUndefined, ) - env.filters['scdoc_indent'] = scdoc_indent + env.filters["scdoc_indent"] = scdoc_indent - template = env.get_template('bubblejail.services.5.scd.jinja2') + template = env.get_template("bubblejail.services.5.scd.jinja2") print( template.render( @@ -193,25 +193,25 @@ def generate_services_man(template_dir: Path) -> None: GENERATORS = { - 'cmd': generate_cmd_man, - 'services': generate_services_man, + "cmd": generate_cmd_man, + "services": generate_services_man, } def main() -> None: arg_parse = ArgumentParser() arg_parse.add_argument( - '--template-dir', + "--template-dir", required=True, type=Path, ) arg_parse.add_argument( - 'generator', + "generator", choices=GENERATORS.keys(), ) args = vars(arg_parse.parse_args()) - generator_func_name = args.pop('generator') + generator_func_name = args.pop("generator") GENERATORS[generator_func_name](**args) diff --git a/src/bubblejail/bubblejail_cli.py b/src/bubblejail/bubblejail_cli.py index 26cb67d..d7b6e3a 100644 --- a/src/bubblejail/bubblejail_cli.py +++ b/src/bubblejail/bubblejail_cli.py @@ -28,41 +28,42 @@ def iter_subcommands() -> Generator[str, None, None]: def iter_subcommand_options( - subcommand_text: str, + subcommand_text: str, ) -> Generator[str, None, None]: yield from ( - x - for x in - BUBBLEJAIL_CMD[subcommand_text]['add_argument'] - if x.startswith('--') + x for x in BUBBLEJAIL_CMD[subcommand_text]["add_argument"] if x.startswith("--") ) def iter_list_choices() -> Iterable[str]: - choices = BUBBLEJAIL_CMD['list']['add_argument']['list_what']['choices'] + choices = BUBBLEJAIL_CMD["list"]["add_argument"]["list_what"]["choices"] assert isinstance(choices, set) return choices -def _extra_args_converter(command_sequence: list[str] - ) -> Generator[str, None, None]: +def _extra_args_converter(command_sequence: list[str]) -> Generator[str, None, None]: command_iter = iter(command_sequence) try: argword = next(command_iter) except StopIteration: - raise ValueError('Expected at least one argument') + raise ValueError("Expected at least one argument") yield f"--{argword}" yield from command_iter -def run_bjail(instance_name: str, - args_to_instance: list[str], - wait: bool, dry_run: bool, debug_bwrap_args: list[list[str]], - debug_shell: bool, debug_log_dbus: bool, - debug_helper_script: Optional[Path]) -> None: +def run_bjail( + instance_name: str, + args_to_instance: list[str], + wait: bool, + dry_run: bool, + debug_bwrap_args: list[list[str]], + debug_shell: bool, + debug_log_dbus: bool, + debug_helper_script: Optional[Path], +) -> None: try: instance = BubblejailDirectories.instance_get(instance_name) @@ -74,7 +75,8 @@ def run_bjail(instance_name: str, else: print("Instance already running.", file=stderr) print( - "Sending command to the instance: ", args_to_instance, + "Sending command to the instance: ", + args_to_instance, file=stderr, ) @@ -115,11 +117,13 @@ def run_bjail(instance_name: str, try: subprocess_run( ( - 'notify-send', - '--urgency', 'critical', - '--icon', 'bubblejail-config', + "notify-send", + "--urgency", + "critical", + "--icon", + "bubblejail-config", f"Failed to run instance: {instance_name}", - f"Exception: {format_exc(0)}" + f"Exception: {format_exc(0)}", ) ) except FileNotFoundError: @@ -131,22 +135,22 @@ def run_bjail(instance_name: str, def bjail_list(list_what: str) -> None: str_iterator: Iterator[str] - if list_what == 'instances': + if list_what == "instances": str_iterator = iter_instance_names() - elif list_what == 'profiles': + elif list_what == "profiles": str_iterator = BubblejailDirectories.iter_profile_names() - elif list_what == 'services': + elif list_what == "services": str_iterator = (x.name for x in SERVICES_CLASSES) - elif list_what == 'subcommands': + elif list_what == "subcommands": str_iterator = iter_subcommands() for string in str_iterator: print(string) -def bjail_create(new_instance_name: str, - profile: Optional[str], - no_desktop_entry: bool) -> None: +def bjail_create( + new_instance_name: str, profile: Optional[str], no_desktop_entry: bool +) -> None: BubblejailDirectories.create_new_instance( new_name=new_instance_name, profile_name=profile, @@ -160,9 +164,9 @@ def bjail_edit(instance_name: str) -> None: async_run(instance.edit_config_in_editor()) -def bjail_create_desktop_entry(instance_name: str, - profile: Optional[str], - desktop_entry: Optional[str]) -> None: +def bjail_create_desktop_entry( + instance_name: str, profile: Optional[str], desktop_entry: Optional[str] +) -> None: BubblejailDirectories.overwrite_desktop_entry_for_profile( instance_name=instance_name, profile_name=profile, @@ -171,28 +175,25 @@ def bjail_create_desktop_entry(instance_name: str, COMMANDS_FUNCS: dict[str, Callable[..., None]] = { - 'run': run_bjail, - 'create': bjail_create, - 'list': bjail_list, - 'edit': bjail_edit, - 'generate-desktop-entry': bjail_create_desktop_entry, + "run": run_bjail, + "create": bjail_create, + "list": bjail_list, + "edit": bjail_edit, + "generate-desktop-entry": bjail_create_desktop_entry, } def create_arg_parser() -> ArgumentParser: parser = ArgumentParser( - description=( - 'Bubblejail is a bubblewrap based sandboxing utility.' - ) + description=("Bubblejail is a bubblewrap based sandboxing utility.") ) subparsers = parser.add_subparsers( - required=True, - description='Available subcommands.' + required=True, description="Available subcommands." ) for subcommand_name, subcommand_data in BUBBLEJAIL_CMD.items(): subfunction = COMMANDS_FUNCS[subcommand_name] - description = subcommand_data['description'] - subcommand_add_argument = subcommand_data['add_argument'] + description = subcommand_data["description"] + subcommand_add_argument = subcommand_data["add_argument"] subparser = subparsers.add_parser( subcommand_name, description=description, @@ -211,20 +212,21 @@ def create_arg_parser() -> ArgumentParser: def bubblejail_main(arg_list: Optional[list[str]] = None) -> None: # Short circuit to auto-complete - if len(argv) > 1 and argv[1] == 'auto-complete': + if len(argv) > 1 and argv[1] == "auto-complete": from .bubblejail_cli_autocomplete import run_autocomplete + run_autocomplete() return parser = create_arg_parser() parser.add_argument( - '--version', - action='version', + "--version", + action="version", version=BubblejailSettings.VERSION, ) args_dict = vars(parser.parse_args(arg_list)) - func = args_dict.pop('func') + func = args_dict.pop("func") func(**args_dict) diff --git a/src/bubblejail/bubblejail_cli_autocomplete.py b/src/bubblejail/bubblejail_cli_autocomplete.py index 1e33a90..8989ced 100644 --- a/src/bubblejail/bubblejail_cli_autocomplete.py +++ b/src/bubblejail/bubblejail_cli_autocomplete.py @@ -27,10 +27,10 @@ def auto_complete_parser(self, current_cmd: str) -> None: self.last_auto_complete = iter_subcommands() if current_cmd[-1].isspace(): - words.append('') + words.append("") - want_instance_set = {'edit', 'run', 'generate-desktop-entry'} - base_options = {'--help', '--version'} + want_instance_set = {"edit", "run", "generate-desktop-entry"} + base_options = {"--help", "--version"} # enumerate words to allow LL parser lookahead enumer_words = enumerate(words) @@ -39,7 +39,7 @@ def auto_complete_parser(self, current_cmd: str) -> None: while True: index, token = next(enumer_words) # If its an option autocomplete to base options - if token.startswith('-'): + if token.startswith("-"): self.last_auto_complete = base_options continue else: @@ -74,21 +74,19 @@ def auto_complete_parser(self, current_cmd: str) -> None: self.last_auto_complete = tuple() return - if token.startswith('-'): + if token.startswith("-"): # Parse base options and subcommands self.last_auto_complete = subcommand_options continue - if subcommand == 'list': + if subcommand == "list": self.last_auto_complete = iter_list_choices() subject_set = True continue - if words[index - 1] == '--profile': + if words[index - 1] == "--profile": # Wants profile - self.last_auto_complete = ( - BubblejailDirectories.iter_profile_names() - ) + self.last_auto_complete = BubblejailDirectories.iter_profile_names() continue if subcommand in want_instance_set: @@ -111,8 +109,8 @@ def auto_complete(self, current_cmd: str) -> Iterable[str]: def run_autocomplete() -> None: parser = ArgumentParser() - parser.add_argument('auto_complete') - parser.add_argument('current_cmd') + parser.add_argument("auto_complete") + parser.add_argument("current_cmd") args = parser.parse_args() for x in AutoCompleteParser().auto_complete(args.current_cmd): diff --git a/src/bubblejail/bubblejail_cli_metadata.py b/src/bubblejail/bubblejail_cli_metadata.py index dd7a49e..700f5e4 100644 --- a/src/bubblejail/bubblejail_cli_metadata.py +++ b/src/bubblejail/bubblejail_cli_metadata.py @@ -12,120 +12,120 @@ class CmdMetaDataDict(TypedDict): add_argument: dict[str, dict[str, Any]] argument: str - description: 'str' + description: "str" + BUBBLEJAIL_CMD: dict[str, CmdMetaDataDict] = { - 'run': { - 'add_argument': { - '--debug-shell': { - 'action': 'store_true', - 'help': ( - 'Opens a shell inside the sandbox instead of ' - 'running program. Useful for debugging.' + "run": { + "add_argument": { + "--debug-shell": { + "action": "store_true", + "help": ( + "Opens a shell inside the sandbox instead of " + "running program. Useful for debugging." ), }, - '--dry-run': { - 'action': 'store_true', - 'help': ( - 'Prints the bwrap and xdg-desktop-entry arguments ' - 'instead of running.' + "--dry-run": { + "action": "store_true", + "help": ( + "Prints the bwrap and xdg-desktop-entry arguments " + "instead of running." ), }, - '--debug-helper-script': { - 'type': Path, - 'help': ( - 'Use the specified helper script. ' - 'This is mainly development command.' + "--debug-helper-script": { + "type": Path, + "help": ( + "Use the specified helper script. " + "This is mainly development command." ), - 'metavar': 'script_path', + "metavar": "script_path", }, - '--debug-log-dbus': { - 'action': 'store_true', - 'help': 'Enables D-Bus proxy logging.', + "--debug-log-dbus": { + "action": "store_true", + "help": "Enables D-Bus proxy logging.", }, - '--wait': { - 'action': 'store_true', - 'help': ( - 'Wait on the command inserted in to sandbox ' - 'and get the output.' + "--wait": { + "action": "store_true", + "help": ( + "Wait on the command inserted in to sandbox " "and get the output." ), }, - '--debug-bwrap-args': { - 'action': 'append', - 'nargs': '+', - 'help': ( - 'Add extra option to bwrap. ' - 'First argument will be prefixed with `--`.' + "--debug-bwrap-args": { + "action": "append", + "nargs": "+", + "help": ( + "Add extra option to bwrap. " + "First argument will be prefixed with `--`." ), - 'metavar': ('bwrap_option', 'bwrap_option_args'), + "metavar": ("bwrap_option", "bwrap_option_args"), }, - 'instance_name': { - 'help': 'Instance to run.', + "instance_name": { + "help": "Instance to run.", }, - 'args_to_instance': { - 'nargs': ARG_REMAINDER, - 'help': 'Command and its arguments to run inside instance.', + "args_to_instance": { + "nargs": ARG_REMAINDER, + "help": "Command and its arguments to run inside instance.", }, }, - 'argument': 'instance', - 'description': 'Launch instance or run command inside.', + "argument": "instance", + "description": "Launch instance or run command inside.", }, - 'create': { - 'add_argument': { - '--profile': { - 'help': 'Bubblejail profile to use.', - 'metavar': 'profile', + "create": { + "add_argument": { + "--profile": { + "help": "Bubblejail profile to use.", + "metavar": "profile", }, - '--no-desktop-entry': { - 'action': 'store_false', - 'help': 'Do not create desktop entry.', + "--no-desktop-entry": { + "action": "store_false", + "help": "Do not create desktop entry.", }, - 'new_instance_name': { - 'help': 'New instance name.', + "new_instance_name": { + "help": "New instance name.", }, }, - 'argument': 'any', - 'description': 'Create new bubblejail instance.', + "argument": "any", + "description": "Create new bubblejail instance.", }, - 'list': { - 'add_argument': { - 'list_what': { - 'choices': { - 'instances', - 'profiles', - 'services', + "list": { + "add_argument": { + "list_what": { + "choices": { + "instances", + "profiles", + "services", }, - 'default': 'instances', - 'help': 'Type of entity to list.', + "default": "instances", + "help": "Type of entity to list.", }, }, - 'argument': 'any', - 'description': 'List certain bubblejail entities.', + "argument": "any", + "description": "List certain bubblejail entities.", }, - 'edit': { - 'add_argument': { - 'instance_name': { - 'help': 'Instance to edit config.', + "edit": { + "add_argument": { + "instance_name": { + "help": "Instance to edit config.", }, }, - 'argument': 'instance', - 'description': 'Open instance config in $EDITOR.', + "argument": "instance", + "description": "Open instance config in $EDITOR.", }, - 'generate-desktop-entry': { - 'add_argument': { - '--profile': { - 'help': 'Use desktop entry specified in profile.', - 'metavar': 'profile', + "generate-desktop-entry": { + "add_argument": { + "--profile": { + "help": "Use desktop entry specified in profile.", + "metavar": "profile", }, - '--desktop-entry': { - 'help': 'Desktop entry name or path to use.', - 'metavar': 'name_or_path', + "--desktop-entry": { + "help": "Desktop entry name or path to use.", + "metavar": "name_or_path", }, - 'instance_name': { - 'help': 'Instance to generate desktop entry for', + "instance_name": { + "help": "Instance to generate desktop entry for", }, }, - 'argument': 'instance', - 'description': 'Generate XDG desktop entry for an instance.', + "argument": "instance", + "description": "Generate XDG desktop entry for an instance.", }, } diff --git a/src/bubblejail/bubblejail_directories.py b/src/bubblejail/bubblejail_directories.py index c190ad3..001c51b 100644 --- a/src/bubblejail/bubblejail_directories.py +++ b/src/bubblejail/bubblejail_directories.py @@ -21,11 +21,11 @@ UsrSharePath = Path(BubblejailSettings.SHARE_PATH_STR) SysConfPath = Path(BubblejailSettings.SYSCONF_PATH_STR) -UsrShareApplicationsPath = UsrSharePath / 'applications' +UsrShareApplicationsPath = UsrSharePath / "applications" -PackageConfisgPath = UsrSharePath / 'bubblejail' -SystemConfigsPath = SysConfPath / 'bubblejail' -UserConfigDir = Path(xdg_config_home) / 'bubblejail' +PackageConfisgPath = UsrSharePath / "bubblejail" +SystemConfigsPath = SysConfPath / "bubblejail" +UserConfigDir = Path(xdg_config_home) / "bubblejail" def convert_old_conf_to_new() -> None: @@ -35,14 +35,14 @@ def convert_old_conf_to_new() -> None: print(f"Converting {instance_directory.stem}") - old_conf_path = instance_directory / 'config.toml' - with open(old_conf_path, mode='rb') as old_conf_file: + old_conf_path = instance_directory / "config.toml" + with open(old_conf_path, mode="rb") as old_conf_file: old_conf_dict = toml_load(old_conf_file) new_conf: Dict[str, Any] = {} try: - services_list = old_conf_dict.pop('services') + services_list = old_conf_dict.pop("services") except KeyError: services_list = [] @@ -50,16 +50,16 @@ def convert_old_conf_to_new() -> None: new_conf[service_name] = {} try: - old_service_dict = old_conf_dict.pop('service') + old_service_dict = old_conf_dict.pop("service") except KeyError: old_service_dict = {} for service_name, service_dict in old_service_dict.items(): new_conf[service_name] = service_dict - new_conf['common'] = old_conf_dict + new_conf["common"] = old_conf_dict - with open(instance_directory / FILE_NAME_SERVICES, mode='xb') as f: + with open(instance_directory / FILE_NAME_SERVICES, mode="xb") as f: toml_dump(new_conf, f) @@ -78,12 +78,12 @@ def instance_get(cls, instance_name: str) -> BubblejailInstance: @classmethod def profile_get(cls, profile_name: str) -> BubblejailProfile: - profile_file_name = profile_name + '.toml' + profile_file_name = profile_name + ".toml" for profiles_directory in cls.iter_profile_directories(): possible_profile_path = profiles_directory / profile_file_name try: - with open(possible_profile_path, mode='rb') as profile_file: + with open(possible_profile_path, mode="rb") as profile_file: return BubblejailProfile(**toml_load(profile_file)) except FileNotFoundError: continue @@ -92,9 +92,7 @@ def profile_get(cls, profile_name: str) -> BubblejailProfile: @classmethod def iter_profile_names(cls) -> Generator[str, None, None]: - for profiles_directory in ( - BubblejailDirectories.iter_profile_directories() - ): + for profiles_directory in BubblejailDirectories.iter_profile_directories(): try: for profile_file in profiles_directory.iterdir(): yield profile_file.stem @@ -104,13 +102,13 @@ def iter_profile_names(cls) -> Generator[str, None, None]: @classmethod def iter_profile_directories(cls) -> PathGeneratorType: for conf_dir in cls.iterm_config_dirs(): - profiles_dir = conf_dir / 'profiles' + profiles_dir = conf_dir / "profiles" yield profiles_dir @classmethod def iterm_config_dirs(cls) -> PathGeneratorType: try: - conf_directories = environ['BUBBLEJAIL_CONFDIRS'] + conf_directories = environ["BUBBLEJAIL_CONFDIRS"] except KeyError: UserConfigDir.mkdir(parents=True, exist_ok=True) yield UserConfigDir @@ -118,15 +116,15 @@ def iterm_config_dirs(cls) -> PathGeneratorType: yield PackageConfisgPath return - yield from (Path(x) for x in conf_directories.split(':')) + yield from (Path(x) for x in conf_directories.split(":")) @classmethod def create_new_instance( - cls, - new_name: str, - profile_name: Optional[str] = None, - create_dot_desktop: bool = False, - print_import_tips: bool = False, + cls, + new_name: str, + profile_name: Optional[str] = None, + create_dot_desktop: bool = False, + print_import_tips: bool = False, ) -> BubblejailInstance: instance_directory = next(cls.iter_instances_directories()) / new_name @@ -134,18 +132,21 @@ def create_new_instance( # Exception will be raised if directory already exists instance_directory.mkdir(mode=0o700, parents=True) # Make home directory - (instance_directory / 'home').mkdir(mode=0o700) + (instance_directory / "home").mkdir(mode=0o700) # Profile - profile: BubblejailProfile = cls.profile_get( - profile_name) if profile_name is not None else BubblejailProfile() + profile: BubblejailProfile = ( + cls.profile_get(profile_name) + if profile_name is not None + else BubblejailProfile() + ) # Make config.json with (instance_directory / FILE_NAME_SERVICES).open( - mode='xb') as instance_conf_file: + mode="xb" + ) as instance_conf_file: - toml_dump(profile.config.get_service_conf_dict(), - instance_conf_file) + toml_dump(profile.config.get_service_conf_dict(), instance_conf_file) instance = BubblejailInstance(instance_directory) @@ -162,7 +163,7 @@ def create_new_instance( instance.metadata_creation_profile_name = profile_name if profile_name is not None and print_import_tips: - print('Import tips: ', profile.import_tips) + print("Import tips: ", profile.import_tips) return instance @@ -170,19 +171,19 @@ def create_new_instance( def iter_bubblejail_data_directories(cls) -> PathGeneratorType: # TODO: Add ability to create custom data directories try: - data_directories = environ['BUBBLEJAIL_DATADIRS'] + data_directories = environ["BUBBLEJAIL_DATADIRS"] except KeyError: - home_path = Path(xdg_data_home + '/bubblejail') + home_path = Path(xdg_data_home + "/bubblejail") home_path.mkdir(exist_ok=True, parents=True) yield home_path return - yield from (Path(x) for x in data_directories.split(':')) + yield from (Path(x) for x in data_directories.split(":")) @classmethod def iter_instances_directories(cls) -> PathGeneratorType: for data_dir in cls.iter_bubblejail_data_directories(): - instances_dir_path = (data_dir / 'instances') + instances_dir_path = data_dir / "instances" instances_dir_path.mkdir(exist_ok=True) yield instances_dir_path @@ -193,15 +194,14 @@ def iter_instances_path(cls) -> PathGeneratorType: @classmethod def desktop_entries_dir_get(cls) -> Path: - return Path(xdg_data_home + '/applications') + return Path(xdg_data_home + "/applications") @classmethod - def desktop_entry_name_to_path(cls, - desktop_entry_name: str) -> Optional[Path]: - if '/' not in desktop_entry_name: + def desktop_entry_name_to_path(cls, desktop_entry_name: str) -> Optional[Path]: + if "/" not in desktop_entry_name: # Desktop entry was passed without absolute or relative path - if not desktop_entry_name.endswith('.desktop'): - possible_name = desktop_entry_name + '.desktop' + if not desktop_entry_name.endswith(".desktop"): + possible_name = desktop_entry_name + ".desktop" else: possible_name = desktop_entry_name possible_path = UsrShareApplicationsPath / possible_name @@ -215,7 +215,8 @@ def desktop_entry_name_to_path(cls, @classmethod def overwrite_desktop_entry_for_profile( - cls, instance_name: str, + cls, + instance_name: str, profile_object: Optional[BubblejailProfile] = None, profile_name: Optional[str] = None, desktop_entry_name: Optional[str] = None, @@ -228,8 +229,7 @@ def overwrite_desktop_entry_for_profile( if desktop_entry_name is not None: # 1. Desktop entry path was passed. # 2. Desktop entry name was passed - dot_desktop_path = cls.desktop_entry_name_to_path( - desktop_entry_name) + dot_desktop_path = cls.desktop_entry_name_to_path(desktop_entry_name) elif profile_object is not None: # 3. Profile was passed directly profile = profile_object @@ -243,40 +243,37 @@ def overwrite_desktop_entry_for_profile( profile = cls.profile_get(instance.metadata_creation_profile_name) dot_desktop_path = profile.find_desktop_entry() else: - raise RuntimeError('No profile or desktop entry specified') + raise RuntimeError("No profile or desktop entry specified") if dot_desktop_path is None: - raise RuntimeError( - "Couldn't resolve desktop entry path." - ) + raise RuntimeError("Couldn't resolve desktop entry path.") - new_dot_desktop = IniFile.IniFile( - filename=str(dot_desktop_path)) + new_dot_desktop = IniFile.IniFile(filename=str(dot_desktop_path)) for group_name in new_dot_desktop.groups(): # Modify Exec - old_exec = new_dot_desktop.get( - key='Exec', group=group_name - ) + old_exec = new_dot_desktop.get(key="Exec", group=group_name) if not old_exec: continue new_dot_desktop.set( - key='Exec', - value=(f"bubblejail run -- {instance_name} " - f"{' '.join(old_exec.split())}"), - group=group_name) + key="Exec", + value=( + f"bubblejail run -- {instance_name} " + f"{' '.join(old_exec.split())}" + ), + group=group_name, + ) # Modify name new_dot_desktop.set( key="Name", - group='Desktop Entry', + group="Desktop Entry", value=f"{instance_name} bubble", ) # Three ways to resolve what file to write to - new_dot_desktop_path = (cls.desktop_entries_dir_get() - / dot_desktop_path.name) + new_dot_desktop_path = cls.desktop_entries_dir_get() / dot_desktop_path.name if not new_dot_desktop_path.exists(): # 1. If the entry under same name as the one # we are overwriting does not exist use the same name @@ -288,67 +285,62 @@ def overwrite_desktop_entry_for_profile( ... else: # 3. Use the generic name - new_dot_desktop_path = (cls.desktop_entries_dir_get() - / f"bubble_{instance_name}.desktop") + new_dot_desktop_path = ( + cls.desktop_entries_dir_get() / f"bubble_{instance_name}.desktop" + ) - new_dot_desktop.write( - filename=str(new_dot_desktop_path) - ) + new_dot_desktop.write(filename=str(new_dot_desktop_path)) # Update desktop MIME database # Requires `update-desktop-database` binary # Arch package desktop-file-utils - print('Updating desktop MIME database') + print("Updating desktop MIME database") cls.update_mime_database() @classmethod def update_mime_database(cls) -> None: try: subprocess_run( - args=( - 'update-desktop-database', - str(cls.desktop_entries_dir_get()) - ) + args=("update-desktop-database", str(cls.desktop_entries_dir_get())) ) except FileNotFoundError: from warnings import warn + warn( ( - 'Could not find update-desktop-database command. ' - 'Desktop entry database has not been updated.' + "Could not find update-desktop-database command. " + "Desktop entry database has not been updated." ) ) @classmethod def generate_empty_desktop_entry( - cls, - instance_name: str, + cls, + instance_name: str, ) -> None: new_dot_desktop = IniFile.IniFile() - new_dot_desktop.addGroup('Desktop Entry') + new_dot_desktop.addGroup("Desktop Entry") new_dot_desktop.set( - key='Exec', - value=f"bubblejail run {instance_name}", - group='Desktop Entry' + key="Exec", value=f"bubblejail run {instance_name}", group="Desktop Entry" ) # Modify name new_dot_desktop.set( key="Name", - group='Desktop Entry', + group="Desktop Entry", value=f"{instance_name} bubble", ) # Add type new_dot_desktop.set( - key='Type', - group='Desktop Entry', - value='Application', + key="Type", + group="Desktop Entry", + value="Application", ) new_dot_desktop_path_str = str( - cls.desktop_entries_dir_get() / - f"bubble_{instance_name}.desktop") + cls.desktop_entries_dir_get() / f"bubble_{instance_name}.desktop" + ) new_dot_desktop.write(filename=new_dot_desktop_path_str) diff --git a/src/bubblejail/bubblejail_gui_qt.py b/src/bubblejail/bubblejail_gui_qt.py index e1cd8e8..ac7da02 100644 --- a/src/bubblejail/bubblejail_gui_qt.py +++ b/src/bubblejail/bubblejail_gui_qt.py @@ -45,6 +45,7 @@ class BubblejailGuiWidget: def __init__(self) -> None: self.widget = QWidget() + # region Config edit classes @@ -91,12 +92,10 @@ def __init__( self.line_edit_widgets: List[QLineEdit] = [] - self.add_button = QPushButton('Add') + self.add_button = QPushButton("Add") self.add_button.setToolTip(self.description) self.vertical_layout.addWidget(self.add_button) - self.add_button.clicked.connect( - self.add_line_edit - ) + self.add_button.clicked.connect(self.add_line_edit) if not data: self.add_line_edit() else: @@ -107,9 +106,7 @@ def __init__( def set_data(self, str_list: List[str]) -> None: for string in str_list: - self.add_line_edit( - existing_string=string - ) + self.add_line_edit(existing_string=string) def remove_line_edit(self, line_edit_widget: QLineEdit) -> None: self.line_edit_widgets.remove(line_edit_widget) @@ -119,29 +116,28 @@ def remove_line_edit(self, line_edit_widget: QLineEdit) -> None: if not self.line_edit_widgets: self.add_line_edit() - def add_line_edit(self, *args: List[Any], - existing_string: Optional[str] = None,) -> None: + def add_line_edit( + self, + *args: List[Any], + existing_string: Optional[str] = None, + ) -> None: if isinstance(existing_string, str): # HACK: PyQt5 calls this function with bool when callsed by signal # to avoid passing bool to init check for str as existing string new_line_edit = QLineEdit(existing_string) else: - new_line_edit = QLineEdit('') + new_line_edit = QLineEdit("") new_line_edit.setToolTip(self.description) self.line_edit_widgets.append(new_line_edit) - new_push_button = QPushButton('❌') + new_push_button = QPushButton("❌") self.form_layout.addRow(new_push_button, new_line_edit) - new_push_button.clicked.connect( - partial( - self.remove_line_edit, new_line_edit - ) - ) + new_push_button.clicked.connect(partial(self.remove_line_edit, new_line_edit)) def get_string_list(self) -> list[str]: text_list = [x.text() for x in self.line_edit_widgets] @@ -243,7 +239,7 @@ def __init__( bubblejail_setting_name: str, ): if isinstance(data, list): - data = ' '.join(data) + data = " ".join(data) super().__init__( name=name, @@ -274,7 +270,7 @@ def __init__( super().__init__( name=name, description=description, - data='None', + data="None", bubblejail_setting_name=bubblejail_setting_name, ) self.horizontal_layout = QHBoxLayout() @@ -287,7 +283,7 @@ def __init__( self.combobox = QComboBox() self.combobox.setToolTip(description) self.horizontal_layout.addWidget(self.combobox) - self.combobox.addItem('None') + self.combobox.addItem("None") def add_item(self, new_item: str) -> None: self.combobox.addItem(new_item) @@ -325,13 +321,13 @@ def __init__( SettingFieldMetadata, setting_field.metadata, ) - if setting_metadata['is_deprecated']: + if setting_metadata["is_deprecated"]: continue match str(setting_field.type): - case 'bool': + case "bool": widget_class: Type[OptionWidgetBase] = OptionWidgetBool - case 'str': + case "str": widget_class = OptionWidgetStr case "str | list[str]": widget_class = OptionWidgetSpaceSeparatedStr @@ -356,8 +352,8 @@ def __init__( setting_value = default_value new_widget = widget_class( - name=setting_metadata['pretty_name'], - description=setting_metadata['description'], + name=setting_metadata["pretty_name"], + description=setting_metadata["description"], data=setting_value, bubblejail_setting_name=setting_field.name, ) @@ -401,6 +397,7 @@ def bubblejail_read_service_dict(self) -> dict[str, Any]: return new_dict + # endregion Config edit classes # region Central Widgets @@ -413,9 +410,7 @@ def __init__(self, parent: BubblejailConfigApp): class InstanceEditWidget(CentralWidgets): - def __init__(self, - parent: BubblejailConfigApp, - instance_name: str): + def __init__(self, parent: BubblejailConfigApp, instance_name: str): super().__init__(parent=parent) self.main_layout = QVBoxLayout() @@ -423,16 +418,15 @@ def __init__(self, header = QHBoxLayout() # Back button - back_button = QPushButton('Back') + back_button = QPushButton("Back") back_button.clicked.connect(self.parent.switch_to_selector) header.addWidget(back_button) # Label header_label = QLabel(f"Editing {instance_name}") header.addWidget(header_label) # Save button - save_button = QPushButton('Save') - save_button.clicked.connect( - partial(InstanceEditWidget.set_instance_data, self)) + save_button = QPushButton("Save") + save_button.clicked.connect(partial(InstanceEditWidget.set_instance_data, self)) header.addWidget(save_button) self.main_layout.addLayout(header) @@ -447,17 +441,18 @@ def __init__(self, self.scroll_area.setWidget(self.scrolled_widget) # Instance - self.bubblejail_instance = BubblejailDirectories.instance_get( - instance_name) + self.bubblejail_instance = BubblejailDirectories.instance_get(instance_name) self.instance_config = self.bubblejail_instance._read_config() services_settings_dicts: ServicesConfDictType = ( - self.instance_config.get_service_conf_dict()) + self.instance_config.get_service_conf_dict() + ) self.service_widgets: List[ServiceWidget] = [] for service in SERVICES_CLASSES: try: service_settings_dict: None | ServiceSettingsDict = ( - services_settings_dicts[service.name]) + services_settings_dicts[service.name] + ) except KeyError: service_settings_dict = None @@ -468,7 +463,8 @@ def __init__(self, new_service_widget.group_widget.clicked.connect( partial( - InstanceEditWidget.refresh_conflicts, self, + InstanceEditWidget.refresh_conflicts, + self, ) ) @@ -509,9 +505,10 @@ def refresh_conflicts(self, new_state: bool) -> None: class CreateInstanceWidget(CentralWidgets): - def __init__(self, - parent: BubblejailConfigApp, - ): + def __init__( + self, + parent: BubblejailConfigApp, + ): super().__init__(parent=parent) self.main_layout = QVBoxLayout() @@ -519,40 +516,40 @@ def __init__(self, header = QHBoxLayout() # Back button - back_button = QPushButton('Back') + back_button = QPushButton("Back") back_button.clicked.connect(self.parent.switch_to_selector) header.addWidget(back_button) # Save button - self.save_button = QPushButton('Create') + self.save_button = QPushButton("Create") self.save_button.clicked.connect( - partial(CreateInstanceWidget.create_instance, self)) + partial(CreateInstanceWidget.create_instance, self) + ) header.addWidget(self.save_button) self.main_layout.addLayout(header) self.name_widget = OptionWidgetStr( - name='Instance name', - description='Name with which the instance will be created', - data='', - bubblejail_setting_name='', + name="Instance name", + description="Name with which the instance will be created", + data="", + bubblejail_setting_name="", ) self.main_layout.addWidget(self.name_widget.widget) self.profile_select_widget = OptionWidgetCombobox( - name='Select profile:', - description='Select profile to create instance with.', - bubblejail_setting_name='', + name="Select profile:", + description="Select profile to create instance with.", + bubblejail_setting_name="", ) self.main_layout.addWidget(self.profile_select_widget.widget) self.profile_select_widget.combobox.textActivated.connect( - self.selection_changed) - - self.name_widget.line_edit.textChanged.connect( - self.refresh_create_button + self.selection_changed ) - self.profile_text = QLabel('No profile selected') + self.name_widget.line_edit.textChanged.connect(self.refresh_create_button) + + self.profile_text = QLabel("No profile selected") self.main_layout.addWidget(self.profile_text) self.current_profile: Optional[BubblejailProfile] = None @@ -567,25 +564,25 @@ def __init__(self, def can_be_created(self) -> Tuple[bool, str]: current_name = self.name_widget.get_str() if not current_name: - return False, '⚠ Name is empty' + return False, "⚠ Name is empty" else: try: BubblejailDirectories.instance_get(current_name) - return False, '⚠ Name is already used' + return False, "⚠ Name is already used" except BubblejailInstanceNotFoundError: ... if self.current_profile is None: - return True, 'Create empty profile' + return True, "Create empty profile" if ( self.current_profile.desktop_entries_paths and self.current_profile.find_desktop_entry() is None ): warn_text = ( - '⚠ WARNING \n' - 'Desktop entry does not exist\n' - 'Maybe you don\'t have application installed?' + "⚠ WARNING \n" + "Desktop entry does not exist\n" + "Maybe you don't have application installed?" ) return False, warn_text else: @@ -601,7 +598,7 @@ def refresh_create_button(self) -> None: self.profile_text.setText(new_text) def selection_changed(self, new_text: str) -> None: - if new_text == 'None': + if new_text == "None": self.current_profile = None else: self.current_profile = BubblejailDirectories.profile_get(new_text) @@ -618,9 +615,9 @@ def selection_changed(self, new_text: str) -> None: def create_instance(self) -> None: new_instance_name = self.name_widget.get_str() if not new_instance_name: - raise RuntimeError('No instance name given') + raise RuntimeError("No instance name given") profile_name: Optional[str] = self.profile_select_widget.get_selected() - if profile_name == 'None': + if profile_name == "None": profile_name = None BubblejailDirectories.create_new_instance( @@ -647,7 +644,8 @@ def __init__(self, parent: BubblejailConfigApp): self.layout_vertical.addWidget(self.scroll_area) self.list_of_instances_widget.clicked.connect( - self.parent.switch_to_instance_edit) + self.parent.switch_to_instance_edit + ) self.widget.setLayout(self.layout_vertical) @@ -657,10 +655,9 @@ def __init__(self, parent: BubblejailConfigApp): self.list_of_instances_widget.addItem(new_list_item_widgets) # Create button - self.create_button = QPushButton('Create instance') + self.create_button = QPushButton("Create instance") self.layout_vertical.addWidget(self.create_button) - self.create_button.clicked.connect( - self.parent.switch_to_create_instance) + self.create_button.clicked.connect(self.parent.switch_to_create_instance) # endregion Central Widgets diff --git a/src/bubblejail/bubblejail_helper.py b/src/bubblejail/bubblejail_helper.py index 5d55f91..b0c2d38 100644 --- a/src/bubblejail/bubblejail_helper.py +++ b/src/bubblejail/bubblejail_helper.py @@ -28,7 +28,7 @@ from collections.abc import Generator from typing import Any, Literal, Type - RpcMethods = Literal['ping', 'run'] + RpcMethods = Literal["ping", "run"] RpcData = dict[str, bool | str | list[str]] | list[str] RpcType = dict[str, str | RpcData | RpcMethods | None] @@ -36,23 +36,24 @@ class JsonRpcRequest: @staticmethod def _dict_to_json_byte_line(rpc_dict: dict[Any, Any]) -> bytes: - string_form = json_dumps(rpc_dict) + '\n' + string_form = json_dumps(rpc_dict) + "\n" return string_form.encode() @staticmethod def _json_byte_line_to_dict(data: bytes) -> dict[Any, Any]: decoded_dict: dict[Any, Any] = json_loads(data) # Replace 'id' with 'request_id' - decoded_dict['request_id'] = decoded_dict['id'] - decoded_dict.pop('id') + decoded_dict["request_id"] = decoded_dict["id"] + decoded_dict.pop("id") return decoded_dict - def __init__(self, - method: RpcMethods, - request_id: str | None = None, - params: RpcData | None = None, - ): + def __init__( + self, + method: RpcMethods, + request_id: str | None = None, + params: RpcData | None = None, + ): self.request_id = request_id self.method = method self.params = params @@ -76,12 +77,12 @@ def _get_reponse_bytes(self, rpc_data: RpcData) -> bytes: class RequestPing(JsonRpcRequest): def __init__(self, request_id: str | None = None) -> None: super().__init__( - method='ping', + method="ping", request_id=request_id, ) def response_ping(self) -> bytes: - return self._get_reponse_bytes(['pong']) + return self._get_reponse_bytes(["pong"]) class RequestRun(JsonRpcRequest): @@ -89,29 +90,29 @@ def __init__( self, args_to_run: list[str], wait_response: bool = False, - request_id: str | None = None + request_id: str | None = None, ) -> None: super().__init__( - method='run', + method="run", request_id=request_id, params={ - 'args_to_run': args_to_run, - 'wait_response': wait_response, + "args_to_run": args_to_run, + "wait_response": wait_response, }, ) self.args_to_run = args_to_run self.wait_response = wait_response def response_run(self, text: str) -> bytes: - return self._get_reponse_bytes({'return': text}) + return self._get_reponse_bytes({"return": text}) def decode_response(self, text: bytes) -> str: - possible_str = json_loads(text)['result']['return'] + possible_str = json_loads(text)["result"]["return"] if isinstance(possible_str, str): return possible_str else: - raise TypeError('Expected str in response.') + raise TypeError("Expected str in response.") if TYPE_CHECKING: @@ -121,20 +122,17 @@ def decode_response(self, text: bytes) -> str: def request_selector(data: bytes) -> RpcRequests: decoded_dict = JsonRpcRequest._json_byte_line_to_dict(data) - method = decoded_dict['method'] - request_id = decoded_dict['request_id'] - params = decoded_dict['params'] + method = decoded_dict["method"] + request_id = decoded_dict["request_id"] + params = decoded_dict["params"] match method: - case 'ping': + case "ping": return RequestPing(request_id=request_id) - case 'run': - return RequestRun( - request_id=request_id, - **params - ) + case "run": + return RequestRun(request_id=request_id, **params) case _: - raise TypeError('Unknown rpc method.') + raise TypeError("Unknown rpc method.") def handle_children() -> None: @@ -151,9 +149,7 @@ def handle_children() -> None: return if __debug__: - print('Reaped: ', exit_pid, - ' Exit code: ', exit_code, - flush=True) + print("Reaped: ", exit_pid, " Exit code: ", exit_code, flush=True) except ChildProcessError: ... @@ -168,9 +164,9 @@ def signal_all_children() -> None: nonlocal signal_counter signal_counter += 1 # Send SIGTERM to all our children - for task_dir in Path('/proc/self/task/').iterdir(): + for task_dir in Path("/proc/self/task/").iterdir(): # Open task - with open(task_dir / 'children') as children_file: + with open(task_dir / "children") as children_file: children_file_pids = children_file.read().split() for pid in (int(pid_str) for pid_str in children_file_pids): if signal_counter > 20: @@ -230,7 +226,7 @@ def __init__( @classmethod def iter_proc_process_directories(cls) -> Generator[Path, None, None]: - proc_path = Path('/proc') + proc_path = Path("/proc") # Iterate over items in /proc for proc_item in proc_path.iterdir(): # If we found something without number as name @@ -243,7 +239,7 @@ def proc_has_process_command(cls, process_command: str) -> bool: for process_dir in cls.iter_proc_process_directories(): # read cmdline file containing cmd arguments try: - with open(process_dir / 'stat') as stat_file: + with open(process_dir / "stat") as stat_file: # Read file and split by white space # The command argument is a second white space # separated argument so we only need to split 2 times @@ -260,10 +256,10 @@ def proc_has_process_command(cls, process_command: str) -> bool: @classmethod def process_has_child(cls) -> bool: - for task_dir in Path('/proc/self/task/').iterdir(): + for task_dir in Path("/proc/self/task/").iterdir(): # Open task - with open(task_dir / 'children') as children_file: + with open(task_dir / "children") as children_file: children_file_contents = children_file.read() if children_file_contents: # Children file will be empty if there are not children @@ -274,8 +270,9 @@ def process_has_child(cls) -> bool: async def termninator_watcher(self) -> None: if __debug__: print( - 'self.terminator_look_for_command: ', - repr(self.terminator_look_for_command)) + "self.terminator_look_for_command: ", + repr(self.terminator_look_for_command), + ) while True: try: @@ -290,7 +287,7 @@ async def termninator_watcher(self) -> None: if is_time_to_termniate: if __debug__: - print('No children found. Terminating.') + print("No children found. Terminating.") create_task(self.stop_async()) return except CancelledError: @@ -329,19 +326,16 @@ async def run_command( return None - async def client_handler( - self, - reader: StreamReader, - writer: StreamWriter) -> None: + async def client_handler(self, reader: StreamReader, writer: StreamWriter) -> None: if __debug__: - print('Client connected', flush=True) + print("Client connected", flush=True) while True: line = await reader.readline() if not line: if __debug__: - print('Reached end of reader. Returnning', flush=True) + print("Reached end of reader. Returnning", flush=True) writer.close() await writer.wait_closed() return @@ -351,8 +345,7 @@ async def client_handler( match request: case RequestPing(): response = request.response_ping() - case RequestRun(args_to_run=args_to_run, - wait_response=wait_response): + case RequestRun(args_to_run=args_to_run, wait_response=wait_response): run_stdout = await self.run_command( args_to_run=args_to_run, std_in_out_mode=PIPE if wait_response else None, @@ -376,7 +369,7 @@ async def start_async(self) -> None: sock=self.socket, ) if __debug__: - print('Started unix server', flush=True) + print("Started unix server", flush=True) self.termninator_watcher_task = create_task(self.termninator_watcher()) if self.startup_args: @@ -387,14 +380,13 @@ async def start_async(self) -> None: async def stop_async(self) -> None: self.terminated.set() - print('Terminated', flush=True) + print("Terminated", flush=True) - async def __aenter__(self) -> None: - ... + async def __aenter__(self) -> None: ... async def __aexit__( - self, exc_type: Type[Exception], - exc: Exception, tb: Any) -> None: + self, exc_type: Type[Exception], exc: Exception, tb: Any + ) -> None: if ( self.termninator_watcher_task is not None and not self.termninator_watcher_task.done() @@ -417,13 +409,13 @@ def get_helper_argument_parser() -> ArgumentParser: parser = ArgumentParser() parser.add_argument( - '--helper-socket', + "--helper-socket", type=int, required=True, ) parser.add_argument( - '--shell', - action='store_true', + "--shell", + action="store_true", ) parser.add_argument( "--ready-fd", @@ -431,7 +423,7 @@ def get_helper_argument_parser() -> ArgumentParser: ) parser.add_argument( - 'args_to_run', + "args_to_run", nargs="*", ) @@ -446,14 +438,12 @@ def bubblejail_helper_main() -> None: if parsed_args.ready_fd is not None: with open(parsed_args.ready_fd) as f: if "bubblejail-ready" != f.read(): - raise RuntimeError( - "Could not read 'bubblejail-ready' from ready fd." - ) + raise RuntimeError("Could not read 'bubblejail-ready' from ready fd.") if not parsed_args.shell: startup_args = parsed_args.args_to_run else: - startup_args = ['/bin/sh'] + startup_args = ["/bin/sh"] helper = BubblejailHelper( socket(AF_UNIX, fileno=parsed_args.helper_socket), @@ -466,17 +456,17 @@ async def run_helper() -> None: await helper event_loop = new_event_loop() - run_helper_task = event_loop.create_task(run_helper(), name='Run helper') + run_helper_task = event_loop.create_task(run_helper(), name="Run helper") event_loop.add_signal_handler(SIGCHLD, handle_children) event_loop.add_signal_handler(SIGTERM, terminate_children, run_helper_task) try: event_loop.run_until_complete(run_helper_task) except CancelledError: - print('Termninated by CancelledError') + print("Termninated by CancelledError") finally: event_loop.close() -if __name__ == '__main__': +if __name__ == "__main__": bubblejail_helper_main() diff --git a/src/bubblejail/bubblejail_instance.py b/src/bubblejail/bubblejail_instance.py index dd2fbea..8ad7591 100644 --- a/src/bubblejail/bubblejail_instance.py +++ b/src/bubblejail/bubblejail_instance.py @@ -41,7 +41,7 @@ def __init__(self, instance_home: Path): # region Paths @cached_property def runtime_dir(self) -> Path: - return Path(get_runtime_dir() + f'/bubblejail/{self.name}') + return Path(get_runtime_dir() + f"/bubblejail/{self.name}") @cached_property def path_config_file(self) -> Path: @@ -53,24 +53,24 @@ def path_metadata_file(self) -> Path: @cached_property def path_home_directory(self) -> Path: - return self.instance_directory / 'home' + return self.instance_directory / "home" @cached_property def path_runtime_helper_dir(self) -> Path: """Helper run-time directory""" - return self.runtime_dir / 'helper' + return self.runtime_dir / "helper" @cached_property def path_runtime_helper_socket(self) -> Path: - return self.path_runtime_helper_dir / 'helper.socket' + return self.path_runtime_helper_dir / "helper.socket" @cached_property def path_runtime_dbus_session_socket(self) -> Path: - return self.runtime_dir / 'dbus_session_proxy' + return self.runtime_dir / "dbus_session_proxy" @cached_property def path_runtime_dbus_system_socket(self) -> Path: - return self.runtime_dir / 'dbus_system_proxy' + return self.runtime_dir / "dbus_system_proxy" # endregion Paths @@ -87,7 +87,7 @@ def _save_metadata_key(self, key: str, value: Any) -> None: toml_dict = self._get_metadata_dict() toml_dict[key] = value - with open(self.path_metadata_file, mode='wb') as metadata_file: + with open(self.path_metadata_file, mode="wb") as metadata_file: toml_dump(toml_dict, metadata_file) def _get_metadata_value(self, key: str) -> str | None: @@ -102,23 +102,23 @@ def _get_metadata_value(self, key: str) -> str | None: @property def metadata_creation_profile_name(self) -> str | None: - return self._get_metadata_value('creation_profile_name') + return self._get_metadata_value("creation_profile_name") @metadata_creation_profile_name.setter def metadata_creation_profile_name(self, profile_name: str) -> None: self._save_metadata_key( - key='creation_profile_name', + key="creation_profile_name", value=profile_name, ) @property def metadata_desktop_entry_name(self) -> str | None: - return self._get_metadata_value('desktop_entry_name') + return self._get_metadata_value("desktop_entry_name") @metadata_desktop_entry_name.setter def metadata_desktop_entry_name(self, desktop_entry_name: str) -> None: self._save_metadata_key( - key='desktop_entry_name', + key="desktop_entry_name", value=desktop_entry_name, ) @@ -129,8 +129,8 @@ def _read_config_file(self) -> str: return f.read() def _read_config( - self, - config_contents: str | None = None) -> BubblejailInstanceConfig: + self, config_contents: str | None = None + ) -> BubblejailInstanceConfig: if config_contents is None: config_contents = self._read_config_file() @@ -140,7 +140,7 @@ def _read_config( return BubblejailInstanceConfig(conf_dict) def save_config(self, config: BubblejailInstanceConfig) -> None: - with open(self.path_config_file, mode='wb') as conf_file: + with open(self.path_config_file, mode="wb") as conf_file: toml_dump(config.get_service_conf_dict(), conf_file) async def send_run_rpc( @@ -203,23 +203,26 @@ async def async_run_init( if debug_helper_script is not None: with open(debug_helper_script) as f: runner.helper_executable = [ - 'python', '-X', 'dev', - '-c', f.read(), + "python", + "-X", + "dev", + "-c", + f.read(), ] if dry_run: runner.genetate_args() - print('Bwrap options:') - print(' '.join(runner.bwrap_options_args)) + print("Bwrap options:") + print(" ".join(runner.bwrap_options_args)) - print('Helper options:') - print(' '.join(runner.helper_arguments())) + print("Helper options:") + print(" ".join(runner.helper_arguments())) - print('Run args:') - print(' '.join(args_to_run)) + print("Run args:") + print(" ".join(args_to_run)) - print('D-Bus session args:') - print(' '.join(runner.dbus_proxy_args)) + print("D-Bus session args:") + print(" ".join(runner.dbus_proxy_args)) return async with AsyncExitStack() as exit_stack: @@ -231,14 +234,16 @@ async def async_run_init( try: await task_bwrap_main except CancelledError: - print('Bwrap cancelled') + print("Bwrap cancelled") if bwrap_process.returncode != 0: - raise BubblewrapRunError(( - "Bubblewrap failed. " - "Try running bubblejail in terminal to see the " - "exact error." - )) + raise BubblewrapRunError( + ( + "Bubblewrap failed. " + "Try running bubblejail in terminal to see the " + "exact error." + ) + ) print("Bubblewrap terminated") @@ -246,19 +251,19 @@ async def edit_config_in_editor(self) -> None: # Create temporary directory with TemporaryDirectory() as tempdir: # Create path to temporary file and write exists config - temp_file_path = Path(tempdir + 'temp.toml') - with open(temp_file_path, mode='w') as tempfile: + temp_file_path = Path(tempdir + "temp.toml") + with open(temp_file_path, mode="w") as tempfile: tempfile.write(self._read_config_file()) initial_modification_time = stat(temp_file_path).st_mtime # Launch EDITOR on the temporary file - run_args = [environ['EDITOR'], str(temp_file_path)] + run_args = [environ["EDITOR"], str(temp_file_path)] p = await create_subprocess_exec(*run_args) await p.wait() # If file was not modified do nothing if initial_modification_time >= stat(temp_file_path).st_mtime: - print('File not modified. Not overwriting config') + print("File not modified. Not overwriting config") return # Verify that the new config is valid and save to variable @@ -268,7 +273,7 @@ async def edit_config_in_editor(self) -> None: cast(ServicesConfDictType, toml_loads(new_config_toml)) ) # Write to instance config file - with open(self.path_config_file, mode='w') as conf_file: + with open(self.path_config_file, mode="w") as conf_file: conf_file.write(new_config_toml) @@ -277,16 +282,14 @@ def __init__( self, dot_desktop_path: list[str] | str | None = None, is_gtk_application: bool = False, - services: ServicesConfDictType | None = None, - description: str = 'No description', - import_tips: str = 'None', + services: ServicesConfDictType | None = None, + description: str = "No description", + import_tips: str = "None", ) -> None: match dot_desktop_path: case list(): - self.desktop_entries_paths = [ - Path(x) for x in dot_desktop_path - ] + self.desktop_entries_paths = [Path(x) for x in dot_desktop_path] case str(): self.desktop_entries_paths = [Path(dot_desktop_path)] case None: diff --git a/src/bubblejail/bubblejail_runner.py b/src/bubblejail/bubblejail_runner.py index ee3d8c9..5e3a85e 100644 --- a/src/bubblejail/bubblejail_runner.py +++ b/src/bubblejail/bubblejail_runner.py @@ -35,13 +35,7 @@ if TYPE_CHECKING: from asyncio import Task from asyncio.subprocess import Process - from collections.abc import ( - AsyncIterator, - Awaitable, - Callable, - Iterable, - Iterator, - ) + from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Iterator from contextlib import AsyncExitStack from typing import IO @@ -71,8 +65,7 @@ def __init__( self.bwrap_temp_files: list[IO[bytes]] = [] self.file_descriptors_to_pass: list[int] = [] # Helper - self.helper_executable: list[str] = [ - BubblejailSettings.HELPER_PATH_STR] + self.helper_executable: list[str] = [BubblejailSettings.HELPER_PATH_STR] self.helper_runtime_dir = parent.path_runtime_helper_dir self.helper_socket_path = parent.path_runtime_helper_socket self.helper_socket = socket(AF_UNIX) @@ -131,35 +124,31 @@ def genetate_args(self) -> None: dbus_system_opts: set[str] = set() seccomp_state: SeccompState | None = None # Unshare all - self.bwrap_options_args.append('--unshare-all') + self.bwrap_options_args.append("--unshare-all") # Die with parent - self.bwrap_options_args.append('--die-with-parent') + self.bwrap_options_args.append("--die-with-parent") # We have our own reaper - self.bwrap_options_args.append('--as-pid-1') + self.bwrap_options_args.append("--as-pid-1") if not self.is_shell_debug: # Set new session - self.bwrap_options_args.append('--new-session') + self.bwrap_options_args.append("--new-session") # Proc - self.bwrap_options_args.extend(('--proc', '/proc')) + self.bwrap_options_args.extend(("--proc", "/proc")) # Devtmpfs - self.bwrap_options_args.extend(('--dev', '/dev')) + self.bwrap_options_args.extend(("--dev", "/dev")) # Unset all variables - self.bwrap_options_args.append('--clearenv') + self.bwrap_options_args.append("--clearenv") # Pass terminal variables if debug shell activated if self.is_shell_debug: if term_env := environ.get("TERM"): - self.bwrap_options_args.extend( - ("--setenv", "TERM", term_env) - ) + self.bwrap_options_args.extend(("--setenv", "TERM", term_env)) if colorterm_env := environ.get("COLORTERM"): - self.bwrap_options_args.extend( - ("--setenv", "COLORTERM", colorterm_env) - ) + self.bwrap_options_args.extend(("--setenv", "COLORTERM", colorterm_env)) for service in self.instance_config.iter_services(): config_iterator = service.iter_bwrap_options() @@ -174,8 +163,7 @@ def genetate_args(self) -> None: if isinstance(config, ServiceWantsHomeBind): config = config_iterator.send(self.home_bind_path) elif isinstance(config, ServiceWantsDbusSessionBind): - config = config_iterator.send( - self.dbus_session_socket_path) + config = config_iterator.send(self.dbus_session_socket_path) if isinstance(config, BwrapConfigBase): self.bwrap_options_args.extend(config.to_args()) @@ -184,11 +172,10 @@ def genetate_args(self) -> None: temp_f = copy_data_to_temp_file(config.content) self.bwrap_temp_files.append(temp_f) temp_file_descriptor = temp_f.fileno() - self.file_descriptors_to_pass.append( - temp_file_descriptor) + self.file_descriptors_to_pass.append(temp_file_descriptor) self.bwrap_options_args.extend( ( - '--ro-bind-data', + "--ro-bind-data", str(temp_file_descriptor), config.dest, ) @@ -206,72 +193,70 @@ def genetate_args(self) -> None: # TODO: implement priority self.executable_args.extend(config.launch_args) else: - raise TypeError('Unknown bwrap config.') + raise TypeError("Unknown bwrap config.") if seccomp_state is not None: seccomp_temp_file = seccomp_state.export_to_temp_file() seccomp_fd = seccomp_temp_file.fileno() self.file_descriptors_to_pass.append(seccomp_fd) self.bwrap_temp_files.append(seccomp_temp_file) - self.bwrap_options_args.extend(('--seccomp', str(seccomp_fd))) + self.bwrap_options_args.extend(("--seccomp", str(seccomp_fd))) - self.post_init_hooks.extend( - self.instance_config.iter_post_init_hooks() - ) - self.post_shutdown_hooks.extend( - self.instance_config.iter_post_shutdown_hooks() - ) + self.post_init_hooks.extend(self.instance_config.iter_post_init_hooks()) + self.post_shutdown_hooks.extend(self.instance_config.iter_post_shutdown_hooks()) # region dbus # Session dbus - self.dbus_proxy_args.extend(( - 'xdg-dbus-proxy', - environ['DBUS_SESSION_BUS_ADDRESS'], - str(self.dbus_session_socket_path), - )) + self.dbus_proxy_args.extend( + ( + "xdg-dbus-proxy", + environ["DBUS_SESSION_BUS_ADDRESS"], + str(self.dbus_session_socket_path), + ) + ) - self.dbus_proxy_pipe_read, self.dbus_proxy_pipe_write \ - = pipe2(O_NONBLOCK | O_CLOEXEC) + self.dbus_proxy_pipe_read, self.dbus_proxy_pipe_write = pipe2( + O_NONBLOCK | O_CLOEXEC + ) self.dbus_proxy_args.append(f"--fd={self.dbus_proxy_pipe_write}") self.dbus_proxy_args.extend(dbus_session_opts) - self.dbus_proxy_args.append('--filter') + self.dbus_proxy_args.append("--filter") if self.is_log_dbus: - self.dbus_proxy_args.append('--log') + self.dbus_proxy_args.append("--log") # System dbus - self.dbus_proxy_args.extend(( - 'unix:path=/run/dbus/system_bus_socket', - str(self.dbus_system_socket_path), - )) + self.dbus_proxy_args.extend( + ( + "unix:path=/run/dbus/system_bus_socket", + str(self.dbus_system_socket_path), + ) + ) self.dbus_proxy_args.extend(dbus_system_opts) - self.dbus_proxy_args.append('--filter') + self.dbus_proxy_args.append("--filter") if self.is_log_dbus: - self.dbus_proxy_args.append('--log') + self.dbus_proxy_args.append("--log") # Bind twice, in /var and /run self.bwrap_options_args.extend( Bind( - str(self.dbus_system_socket_path), - '/var/run/dbus/system_bus_socket').to_args() + str(self.dbus_system_socket_path), "/var/run/dbus/system_bus_socket" + ).to_args() ) self.bwrap_options_args.extend( Bind( - str(self.dbus_system_socket_path), - '/run/dbus/system_bus_socket').to_args() + str(self.dbus_system_socket_path), "/run/dbus/system_bus_socket" + ).to_args() ) # endregion dbus # Info fd pipe - self.info_fd_pipe_read, self.info_fd_pipe_write = ( - pipe2(O_NONBLOCK | O_CLOEXEC) - ) + self.info_fd_pipe_read, self.info_fd_pipe_write = pipe2(O_NONBLOCK | O_CLOEXEC) self.file_descriptors_to_pass.append(self.info_fd_pipe_write) - self.bwrap_options_args.extend( - ("--info-fd", f"{self.info_fd_pipe_write}")) + self.bwrap_options_args.extend(("--info-fd", f"{self.info_fd_pipe_write}")) running_loop = get_running_loop() running_loop.add_reader( self.info_fd_pipe_read, @@ -279,8 +264,8 @@ def genetate_args(self) -> None: ) if self.post_init_hooks: - self.ready_fd_pipe_read, self.ready_fd_pipe_write = ( - pipe2(O_NONBLOCK | O_CLOEXEC) + self.ready_fd_pipe_read, self.ready_fd_pipe_write = pipe2( + O_NONBLOCK | O_CLOEXEC ) self.file_descriptors_to_pass.append(self.ready_fd_pipe_read) @@ -306,7 +291,7 @@ def read_info_fd(self) -> None: self.sandboxed_pid.set_result(info_dict["child-pid"]) def get_args_file_descriptor(self) -> int: - options_null = '\0'.join(self.bwrap_options_args) + options_null = "\0".join(self.bwrap_options_args) args_tempfile = copy_data_to_temp_file(options_null.encode()) args_tempfile_fileno = args_tempfile.fileno() @@ -326,7 +311,7 @@ async def _run_post_init_hooks(self) -> None: if self.ready_fd_pipe_read and self.ready_fd_pipe_write: with ( open(self.ready_fd_pipe_read), - open(self.ready_fd_pipe_write, mode="w") as f + open(self.ready_fd_pipe_write, mode="w") as f, ): f.write("bubblejail-ready") @@ -416,8 +401,7 @@ def proxy_ready_callback() -> None: await wait_for(dbus_proxy_ready_future, timeout=1) if dbus_proxy_process.returncode is not None: raise ValueError( - "dbus proxy error code: " - f"{self.dbus_proxy_process.returncode}" + "dbus proxy error code: " f"{self.dbus_proxy_process.returncode}" ) yield None finally: @@ -436,9 +420,9 @@ async def setup_bubblewrap_subprocess( self, run_args: Iterable[str] | None = None, ) -> AsyncIterator[Process]: - bwrap_args = ['/usr/bin/bwrap'] + bwrap_args = ["/usr/bin/bwrap"] # Pass option args file descriptor - bwrap_args.append('--args') + bwrap_args.append("--args") bwrap_args.append(str(self.get_args_file_descriptor())) bwrap_args.append("--") diff --git a/src/bubblejail/bubblejail_seccomp.py b/src/bubblejail/bubblejail_seccomp.py index d367857..479f4c6 100644 --- a/src/bubblejail/bubblejail_seccomp.py +++ b/src/bubblejail/bubblejail_seccomp.py @@ -9,23 +9,20 @@ from typing import TYPE_CHECKING from .bwrap_config import SeccompDirective, SeccompSyscallErrno -from .exceptions import ( - BubblejailLibseccompError, - LibseccompSyscallResolutionError, -) +from .exceptions import BubblejailLibseccompError, LibseccompSyscallResolutionError if TYPE_CHECKING: from collections.abc import Callable from typing import IO, Any -SCMP_ACT_ALLOW = c_uint(0x7fff0000) +SCMP_ACT_ALLOW = c_uint(0x7FFF0000) ARCH_X86 = c_uint32(3 | 0x40000000) def get_scmp_act_errno(error_code: int) -> c_uint32: - return c_uint32(0x00050000 | (error_code & 0x0000ffff)) + return c_uint32(0x00050000 | (error_code & 0x0000FFFF)) class Libseccomp: @@ -37,8 +34,7 @@ def check_libseccomp_ptr( ) -> int: if result is None: raise BubblejailLibseccompError( - f"Libseccomp null pointer " - f"when calling {func.__name__}." + f"Libseccomp null pointer " f"when calling {func.__name__}." ) return result @@ -53,9 +49,9 @@ def check_syscall_resolve( if result < 0: if (syscall_bytes := arguments[0].value) is None: - syscall_name = 'NULL' + syscall_name = "NULL" else: - syscall_name = syscall_bytes.decode('utf-8') + syscall_name = syscall_bytes.decode("utf-8") raise LibseccompSyscallResolutionError( f"Failed to resolve syscall {syscall_name}." @@ -71,37 +67,32 @@ def check_libseccomp_int( ) -> int: if result < 0: raise BubblejailLibseccompError( - f"Libseccomp returned error {result} " - f"when calling {func.__name__}." + f"Libseccomp returned error {result} " f"when calling {func.__name__}." ) return result def __init__(self) -> None: - libseccomp = CDLL(find_library('seccomp')) + libseccomp = CDLL(find_library("seccomp")) self.libseccomp = libseccomp # HACK: mypy is not smart enough to realize that # restype affects the function passed to errcheck seccomp_init = libseccomp.seccomp_init - seccomp_init.argtypes = (c_uint, ) + seccomp_init.argtypes = (c_uint,) seccomp_init.restype = c_void_p seccomp_init.errcheck = self.check_libseccomp_ptr # type: ignore - self.init: Callable[[c_uint], c_void_p] = ( - seccomp_init - ) + self.init: Callable[[c_uint], c_void_p] = seccomp_init seccomp_load = libseccomp.seccomp_load - seccomp_load.argtypes = (c_void_p, ) + seccomp_load.argtypes = (c_void_p,) seccomp_load.restype = c_int seccomp_load.errcheck = self.check_libseccomp_int # type: ignore - self.load: Callable[[c_void_p], c_int] = ( - seccomp_load - ) + self.load: Callable[[c_void_p], c_int] = seccomp_load seccomp_syscall_resolve_name = libseccomp.seccomp_syscall_resolve_name - seccomp_syscall_resolve_name.argtypes = (c_char_p, ) + seccomp_syscall_resolve_name.argtypes = (c_char_p,) seccomp_syscall_resolve_name.restype = c_int seccomp_syscall_resolve_name.errcheck = ( self.check_syscall_resolve # type: ignore @@ -122,25 +113,19 @@ def __init__(self) -> None: seccomp_export_pfc.argtypes = (c_void_p, c_int) seccomp_export_pfc.restype = c_int seccomp_export_pfc.errcheck = self.check_libseccomp_int # type: ignore - self.export_pfc: Callable[[c_void_p, c_int], c_int] = ( - seccomp_export_pfc - ) + self.export_pfc: Callable[[c_void_p, c_int], c_int] = seccomp_export_pfc seccomp_export_bpf = libseccomp.seccomp_export_bpf seccomp_export_bpf.argtypes = (c_void_p, c_int) seccomp_export_bpf.restype = c_int seccomp_export_bpf.errcheck = self.check_libseccomp_int # type: ignore - self.export_bpf: Callable[[c_void_p, c_int], c_int] = ( - seccomp_export_bpf - ) + self.export_bpf: Callable[[c_void_p, c_int], c_int] = seccomp_export_bpf seccomp_arch_add = libseccomp.seccomp_arch_add seccomp_arch_add.argtypes = (c_void_p, c_uint32) seccomp_arch_add.restype = c_int seccomp_arch_add.errcheck = self.check_libseccomp_int # type: ignore - self.arch_add: Callable[[c_void_p, c_uint32], c_int] = ( - seccomp_arch_add - ) + self.arch_add: Callable[[c_void_p, c_uint32], c_int] = seccomp_arch_add class SeccompState: @@ -150,7 +135,7 @@ def __init__(self) -> None: self._seccomp_ruleset_ptr = self.libseccomp.init(SCMP_ACT_ALLOW) - if machine() == 'x86_64': + if machine() == "x86_64": self.libseccomp.arch_add(self._seccomp_ruleset_ptr, ARCH_X86) # TODO: Add armv7 on aarch64 systems @@ -180,7 +165,7 @@ def add_directive(self, directive: SeccompDirective) -> None: if not skip_on_not_exists: raise case _: - raise TypeError('Unknown seccomp directive.') + raise TypeError("Unknown seccomp directive.") def load(self) -> None: self.libseccomp.load(self._seccomp_ruleset_ptr) diff --git a/src/bubblejail/bubblejail_utils.py b/src/bubblejail/bubblejail_utils.py index 04a12bb..6a1867f 100644 --- a/src/bubblejail/bubblejail_utils.py +++ b/src/bubblejail/bubblejail_utils.py @@ -2,12 +2,12 @@ # SPDX-FileCopyrightText: 2019-2022 igo95862 from __future__ import annotations -FILE_NAME_SERVICES = 'services.toml' -FILE_NAME_METADATA = 'metadata_v1.toml' +FILE_NAME_SERVICES = "services.toml" +FILE_NAME_METADATA = "metadata_v1.toml" class BubblejailSettings: - HELPER_PATH_STR: str = '/usr/lib/bubblejail/bubblejail-helper' - SHARE_PATH_STR: str = '/usr/share' - SYSCONF_PATH_STR: str = '/etc' - VERSION: str = 'UNDEFINED' + HELPER_PATH_STR: str = "/usr/lib/bubblejail/bubblejail-helper" + SHARE_PATH_STR: str = "/usr/share" + SYSCONF_PATH_STR: str = "/etc" + VERSION: str = "UNDEFINED" diff --git a/src/bubblejail/bwrap_config.py b/src/bubblejail/bwrap_config.py index f7fc641..0929819 100644 --- a/src/bubblejail/bwrap_config.py +++ b/src/bubblejail/bwrap_config.py @@ -28,14 +28,14 @@ def __init__(self, permissions: Optional[int] = None): def to_args(self) -> Generator[str, None, None]: if self.permissions is not None: - yield '--perms' - yield oct(self.permissions).lstrip('0o') + yield "--perms" + yield oct(self.permissions).lstrip("0o") yield from super().to_args() class DirCreate(BwrapOptionWithPermissions): - arg_word = '--dir' + arg_word = "--dir" def __init__(self, dest: Pathlike, permissions: Optional[int] = None): super().__init__(permissions) @@ -47,7 +47,7 @@ def to_args(self) -> Generator[str, None, None]: class Symlink(BwrapConfigBase): - arg_word = '--symlink' + arg_word = "--symlink" def __init__(self, source: Pathlike, dest: Pathlike): super().__init__() @@ -61,7 +61,7 @@ def to_args(self) -> Generator[str, None, None]: class EnvrimentalVar(BwrapConfigBase): - arg_word = '--setenv' + arg_word = "--setenv" def __init__(self, var_name: str, var_value: Optional[str] = None): super().__init__() @@ -72,12 +72,11 @@ def to_args(self) -> Generator[str, None, None]: yield from super().to_args() yield self.var_name - yield (self.var_value if self.var_value is not None - else environ[self.var_name]) + yield (self.var_value if self.var_value is not None else environ[self.var_name]) class ReadOnlyBind(BwrapConfigBase): - arg_word = '--ro-bind' + arg_word = "--ro-bind" def __init__(self, source: Pathlike, dest: Optional[Pathlike] = None): super().__init__() @@ -92,27 +91,27 @@ def to_args(self) -> Generator[str, None, None]: class ReadOnlyBindTry(ReadOnlyBind): - arg_word = '--ro-bind-try' + arg_word = "--ro-bind-try" class Bind(ReadOnlyBind): - arg_word = '--bind' + arg_word = "--bind" class BindTry(ReadOnlyBind): - arg_word = '--bind-try' + arg_word = "--bind-try" class DevBind(ReadOnlyBind): - arg_word = '--dev-bind' + arg_word = "--dev-bind" class DevBindTry(ReadOnlyBind): - arg_word = '--dev-bind-try' + arg_word = "--dev-bind-try" class ChangeDir(BwrapConfigBase): - arg_word = '--chdir' + arg_word = "--chdir" def __init__(self, dest: Pathlike): super().__init__() @@ -141,7 +140,7 @@ def __init__(self, content: bytes, dest: Pathlike): class DbusCommon: - arg_word: str = 'ERROR' + arg_word: str = "ERROR" def __init__(self, bus_name: str): self.bus_name = bus_name @@ -150,20 +149,18 @@ def to_args(self) -> str: return f"{self.arg_word}={self.bus_name}" -class DbusSessionArgs(DbusCommon): - ... +class DbusSessionArgs(DbusCommon): ... -class DbusSystemArgs(DbusCommon): - ... +class DbusSystemArgs(DbusCommon): ... class DbusSessionTalkTo(DbusSessionArgs): - arg_word = '--talk' + arg_word = "--talk" class DbusSessionOwn(DbusSessionArgs): - arg_word = '--own' + arg_word = "--own" class DbusSessionRule(DbusSessionArgs): @@ -202,8 +199,7 @@ def to_args(self) -> str: return self.bus_name -class SeccompDirective: - ... +class SeccompDirective: ... class SeccompSyscallErrno(SeccompDirective): @@ -220,8 +216,9 @@ def __init__( class LaunchArguments: def __init__( - self, - launch_args: List[str], - priority: int = 0,) -> None: + self, + launch_args: List[str], + priority: int = 0, + ) -> None: self.launch_args = launch_args self.priority = priority diff --git a/src/bubblejail/exceptions.py b/src/bubblejail/exceptions.py index 1043de8..bbb964a 100644 --- a/src/bubblejail/exceptions.py +++ b/src/bubblejail/exceptions.py @@ -3,37 +3,28 @@ from __future__ import annotations -class BubblejailException(Exception): - ... +class BubblejailException(Exception): ... -class ServiceError(BubblejailException): - ... +class ServiceError(BubblejailException): ... -class ServiceConflictError(ServiceError): - ... +class ServiceConflictError(ServiceError): ... -class BubblejailInstanceNotFoundError(BubblejailException): - ... +class BubblejailInstanceNotFoundError(BubblejailException): ... -class BubblewrapRunError(BubblejailException): - ... +class BubblewrapRunError(BubblejailException): ... -class BubblejailLibseccompError(BubblejailException): - ... +class BubblejailLibseccompError(BubblejailException): ... -class LibseccompSyscallResolutionError(BubblejailLibseccompError): - ... +class LibseccompSyscallResolutionError(BubblejailLibseccompError): ... -class BubblejailInitializationError(BubblejailException): - ... +class BubblejailInitializationError(BubblejailException): ... -class BubblejailDependencyError(BubblejailInitializationError): - ... +class BubblejailDependencyError(BubblejailInitializationError): ... diff --git a/src/bubblejail/services.py b/src/bubblejail/services.py index c6c5eba..e98f62c 100644 --- a/src/bubblejail/services.py +++ b/src/bubblejail/services.py @@ -9,20 +9,13 @@ get_running_loop, wait_for, ) -from dataclasses import ( - asdict, - dataclass, - field, - fields, - is_dataclass, - make_dataclass, -) +from contextlib import ExitStack +from dataclasses import asdict, dataclass, field, fields, is_dataclass, make_dataclass from multiprocessing import Process from os import O_CLOEXEC, O_NONBLOCK, environ, getpid, getuid, pipe2, readlink from pathlib import Path from shutil import which from typing import TYPE_CHECKING, TypedDict -from contextlib import ExitStack from xdg import BaseDirectory @@ -66,23 +59,23 @@ # region Service Typing -class ServiceWantsSend: - ... +class ServiceWantsSend: ... -class ServiceWantsHomeBind(ServiceWantsSend): - ... +class ServiceWantsHomeBind(ServiceWantsSend): ... -class ServiceWantsDbusSessionBind(ServiceWantsSend): - ... +class ServiceWantsDbusSessionBind(ServiceWantsSend): ... if TYPE_CHECKING: ServiceIterTypes = ( - BwrapConfigBase | FileTransfer | - SeccompDirective | LaunchArguments | - ServiceWantsSend | DbusCommon + BwrapConfigBase + | FileTransfer + | SeccompDirective + | LaunchArguments + | ServiceWantsSend + | DbusCommon ) ServiceSendType = Path @@ -103,34 +96,39 @@ class SettingFieldMetadata(TypedDict): description: str is_deprecated: bool + # endregion Service Options # region HelperFunctions -XDG_DESKTOP_VARS: frozenset[str] = frozenset({ - 'XDG_CURRENT_DESKTOP', 'DESKTOP_SESSION', - 'XDG_SESSION_TYPE', 'XDG_SESSION_DESKTOP'}) +XDG_DESKTOP_VARS: frozenset[str] = frozenset( + { + "XDG_CURRENT_DESKTOP", + "DESKTOP_SESSION", + "XDG_SESSION_TYPE", + "XDG_SESSION_DESKTOP", + } +) def generate_path_var() -> str: """Filters PATH variable to locations with /usr prefix""" # Split by semicolon - paths = environ['PATH'].split(':') + paths = environ["PATH"].split(":") # Filter by /usr and /tmp then join by semicolon - return ':'.join(filter( - lambda s: s.startswith('/usr/') - or s == '/bin' - or s == '/sbin', - paths)) + return ":".join( + filter(lambda s: s.startswith("/usr/") or s == "/bin" or s == "/sbin", paths) + ) def generate_toolkits() -> Generator[ServiceIterTypes, None, None]: config_home_path = Path(BaseDirectory.xdg_config_home) - kde_globals_conf = config_home_path / 'kdeglobals' + kde_globals_conf = config_home_path / "kdeglobals" if kde_globals_conf.exists(): yield ReadOnlyBind(kde_globals_conf) + # endregion HelperFunctions @@ -145,11 +143,9 @@ def __init__(self, context: BubblejailRunContext): def iter_bwrap_options(self) -> ServiceGeneratorType: yield from () - async def post_init_hook(self, pid: int) -> None: - ... + async def post_init_hook(self, pid: int) -> None: ... - async def post_shutdown_hook(self) -> None: - ... + async def post_shutdown_hook(self) -> None: ... @classmethod def has_settings(cls) -> bool: @@ -170,32 +166,33 @@ def iter_settings_fields(cls) -> Iterator[Field[Any]]: # Pre version 0.6.0 home bind path -OLD_HOME_BIND = Path('/home/user') +OLD_HOME_BIND = Path("/home/user") class BubblejailDefaults(BubblejailService): def iter_bwrap_options(self) -> ServiceGeneratorType: # Distro packaged libraries and binaries - yield ReadOnlyBind('/usr') - yield ReadOnlyBind('/opt') + yield ReadOnlyBind("/usr") + yield ReadOnlyBind("/opt") # Recreate symlinks in / or mount them read-only if its not a symlink. # Should be portable between distros. - for root_path in Path('/').iterdir(): + for root_path in Path("/").iterdir(): if ( - root_path.name.startswith('lib') # /lib /lib64 /lib32 - or root_path.name == 'bin' - or root_path.name == 'sbin'): + root_path.name.startswith("lib") # /lib /lib64 /lib32 + or root_path.name == "bin" + or root_path.name == "sbin" + ): if root_path.is_symlink(): yield Symlink(readlink(root_path), root_path) else: yield ReadOnlyBind(root_path) - yield ReadOnlyBind('/etc') + yield ReadOnlyBind("/etc") # Temporary directories - yield DirCreate('/tmp') - yield DirCreate('/var') + yield DirCreate("/tmp") + yield DirCreate("/var") yield DirCreate(self.xdg_runtime_dir, permissions=0o700) @@ -203,7 +200,7 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: real_home = Path.home() home_path = yield ServiceWantsHomeBind() yield Bind(home_path, real_home) - yield EnvrimentalVar('HOME', str(real_home)) + yield EnvrimentalVar("HOME", str(real_home)) # Compatibility symlink if real_home != OLD_HOME_BIND: yield Symlink(real_home, OLD_HOME_BIND) @@ -212,44 +209,64 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: # Set environmental variables from getpass import getuser - yield EnvrimentalVar('USER', getuser()) - yield EnvrimentalVar('USERNAME', getuser()) - yield EnvrimentalVar('PATH', generate_path_var()) - yield EnvrimentalVar('XDG_RUNTIME_DIR', str(self.xdg_runtime_dir)) - yield EnvrimentalVar('LANG') + yield EnvrimentalVar("USER", getuser()) + yield EnvrimentalVar("USERNAME", getuser()) + yield EnvrimentalVar("PATH", generate_path_var()) + yield EnvrimentalVar("XDG_RUNTIME_DIR", str(self.xdg_runtime_dir)) + + yield EnvrimentalVar("LANG") - if not environ.get('BUBBLEJAIL_DISABLE_SECCOMP_DEFAULTS'): + if not environ.get("BUBBLEJAIL_DISABLE_SECCOMP_DEFAULTS"): for blocked_syscal in ( - "bdflush", "io_pgetevents", - "kexec_file_load", "kexec_load", - "migrate_pages", "move_pages", - "nfsservctl", "nice", "oldfstat", - "oldlstat", "oldolduname", "oldstat", - "olduname", "pciconfig_iobase", "pciconfig_read", - "pciconfig_write", "sgetmask", "ssetmask", "swapcontext", - "swapoff", "swapon", "sysfs", "uselib", "userfaultfd", - "ustat", "vm86", "vm86old", "vmsplice", - - "bpf", "fanotify_init", "lookup_dcookie", - "perf_event_open", "quotactl", "setdomainname", + "bdflush", + "io_pgetevents", + "kexec_file_load", + "kexec_load", + "migrate_pages", + "move_pages", + "nfsservctl", + "nice", + "oldfstat", + "oldlstat", + "oldolduname", + "oldstat", + "olduname", + "pciconfig_iobase", + "pciconfig_read", + "pciconfig_write", + "sgetmask", + "ssetmask", + "swapcontext", + "swapoff", + "swapon", + "sysfs", + "uselib", + "userfaultfd", + "ustat", + "vm86", + "vm86old", + "vmsplice", + "bpf", + "fanotify_init", + "lookup_dcookie", + "perf_event_open", + "quotactl", + "setdomainname", "sethostname", - # "chroot", # Firefox and Chromium fails if chroot is not available - - "delete_module", "init_module", - "finit_module", "query_module", - + "delete_module", + "init_module", + "finit_module", + "query_module", "acct", - - "iopl", "ioperm", - - "settimeofday", "stime", - "clock_settime", "clock_settime64" - - "vhangup", - + "iopl", + "ioperm", + "settimeofday", + "stime", + "clock_settime", + "clock_settime64" "vhangup", ): yield SeccompSyscallErrno( blocked_syscal, @@ -258,11 +275,11 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: ) # Bind session socket inside the sandbox - dbus_session_inside_path = self.xdg_runtime_dir / 'bus' + dbus_session_inside_path = self.xdg_runtime_dir / "bus" dbus_session_outside_path = yield ServiceWantsDbusSessionBind() yield EnvrimentalVar( - 'DBUS_SESSION_BUS_ADDRESS', - f"unix:path={dbus_session_inside_path}") + "DBUS_SESSION_BUS_ADDRESS", f"unix:path={dbus_session_inside_path}" + ) yield Bind( dbus_session_outside_path, dbus_session_inside_path, @@ -271,9 +288,9 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: def __repr__(self) -> str: return "Bubblejail defaults." - name = 'default' - pretty_name = 'Default settings' - description = ('Settings that must be present in any instance') + name = "default" + pretty_name = "Default settings" + description = "Settings that must be present in any instance" class CommonSettings(BubblejailService): @@ -281,42 +298,42 @@ class CommonSettings(BubblejailService): @dataclass class Settings: executable_name: str | list[str] = field( - default='', + default="", metadata=SettingFieldMetadata( - pretty_name='Default arguments', + pretty_name="Default arguments", description=( - 'Default arguments to run when no arguments were provided' + "Default arguments to run when no arguments were provided" ), is_deprecated=False, - ) + ), ) share_local_time: bool = field( default=False, metadata=SettingFieldMetadata( - pretty_name='Share local time', - description='This option has no effect since version 0.6.0', + pretty_name="Share local time", + description="This option has no effect since version 0.6.0", is_deprecated=True, - ) + ), ) filter_disk_sync: bool = field( default=False, metadata=SettingFieldMetadata( - pretty_name='Filter disk sync', + pretty_name="Filter disk sync", description=( - 'Do not allow flushing disk. ' - 'Useful for EA Origin client that tries to flush ' - 'to disk too often.' + "Do not allow flushing disk. " + "Useful for EA Origin client that tries to flush " + "to disk too often." ), is_deprecated=False, - ) + ), ) dbus_name: str = field( - default='', + default="", metadata=SettingFieldMetadata( - pretty_name='Application\'s D-Bus name', - description='D-Bus name allowed to acquire and own', + pretty_name="Application's D-Bus name", + description="D-Bus name allowed to acquire and own", is_deprecated=False, - ) + ), ) def iter_bwrap_options(self) -> ServiceGeneratorType: @@ -330,14 +347,14 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: yield LaunchArguments(settings.executable_name) if settings.filter_disk_sync: - yield SeccompSyscallErrno('sync', 0) - yield SeccompSyscallErrno('fsync', 0) + yield SeccompSyscallErrno("sync", 0) + yield SeccompSyscallErrno("fsync", 0) if dbus_name := settings.dbus_name: yield DbusSessionOwn(dbus_name) - name = 'common' - pretty_name = 'Common Settings' + name = "common" + pretty_name = "Common Settings" description = "Settings that don't fit in any particular category" @@ -384,29 +401,31 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: if x in environ: yield EnvrimentalVar(x) - yield EnvrimentalVar('DISPLAY') + yield EnvrimentalVar("DISPLAY") if x11_socket_path := self.x11_socket_path(environ["DISPLAY"]): yield ReadOnlyBind(x11_socket_path) - x_authority_path_str = environ.get('XAUTHORITY') + x_authority_path_str = environ.get("XAUTHORITY") if x_authority_path_str is not None: - yield ReadOnlyBind(x_authority_path_str, '/tmp/.Xauthority') - yield EnvrimentalVar('XAUTHORITY', '/tmp/.Xauthority') + yield ReadOnlyBind(x_authority_path_str, "/tmp/.Xauthority") + yield EnvrimentalVar("XAUTHORITY", "/tmp/.Xauthority") yield from generate_toolkits() - name = 'x11' - pretty_name = 'X11 windowing system' - description = ('Gives access to X11 socket.\n' - 'This is generally the default Linux windowing system.') + name = "x11" + pretty_name = "X11 windowing system" + description = ( + "Gives access to X11 socket.\n" + "This is generally the default Linux windowing system." + ) class Wayland(BubblejailService): def iter_bwrap_options(self) -> ServiceGeneratorType: try: - wayland_display_env = environ['WAYLAND_DISPLAY'] + wayland_display_env = environ["WAYLAND_DISPLAY"] except KeyError: print("No wayland display.") @@ -414,23 +433,24 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: if x in environ: yield EnvrimentalVar(x) - yield EnvrimentalVar('GDK_BACKEND', 'wayland') - yield EnvrimentalVar('MOZ_DBUS_REMOTE', '1') - yield EnvrimentalVar('MOZ_ENABLE_WAYLAND', '1') + yield EnvrimentalVar("GDK_BACKEND", "wayland") + yield EnvrimentalVar("MOZ_DBUS_REMOTE", "1") + yield EnvrimentalVar("MOZ_ENABLE_WAYLAND", "1") - yield EnvrimentalVar('WAYLAND_DISPLAY', 'wayland-0') - original_socket_path = (Path(BaseDirectory.get_runtime_dir()) - / wayland_display_env) + yield EnvrimentalVar("WAYLAND_DISPLAY", "wayland-0") + original_socket_path = ( + Path(BaseDirectory.get_runtime_dir()) / wayland_display_env + ) - new_socket_path = self.xdg_runtime_dir / 'wayland-0' + new_socket_path = self.xdg_runtime_dir / "wayland-0" yield Bind(original_socket_path, new_socket_path) yield from generate_toolkits() - name = 'wayland' - pretty_name = 'Wayland windowing system' + name = "wayland" + pretty_name = "Wayland windowing system" description = ( - 'Make sure you are running Wayland session\n' - 'and your application supports Wayland' + "Make sure you are running Wayland session\n" + "and your application supports Wayland" ) @@ -446,10 +466,10 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: yield ShareNetwork() - name = 'network' - pretty_name = 'Network access' - description = 'Gives access to network.' - conflicts = frozenset(('slirp4netns', )) + name = "network" + pretty_name = "Network access" + description = "Gives access to network." + conflicts = frozenset(("slirp4netns",)) class PulseAudio(BubblejailService): @@ -457,12 +477,12 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: yield Bind( f"{BaseDirectory.get_runtime_dir()}/pulse/native", - self.xdg_runtime_dir / 'pulse/native', + self.xdg_runtime_dir / "pulse/native", ) - name = 'pulse_audio' - pretty_name = 'Pulse Audio' - description = 'Default audio system in most distros' + name = "pulse_audio" + pretty_name = "Pulse Audio" + description = "Default audio system in most distros" class HomeShare(BubblejailService): @@ -472,10 +492,10 @@ class Settings: home_paths: list[str] = field( default_factory=list, metadata=SettingFieldMetadata( - pretty_name='List of paths', - description='Path to share with sandbox', + pretty_name="List of paths", + description="Path to share with sandbox", is_deprecated=False, - ) + ), ) def iter_bwrap_options(self) -> ServiceGeneratorType: @@ -487,9 +507,9 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: Path.home() / path_relative_to_home, ) - name = 'home_share' - pretty_name = 'Home Share' - description = 'Share directories or files relative to home' + name = "home_share" + pretty_name = "Home Share" + description = "Share directories or files relative to home" class DirectRendering(BubblejailService): @@ -499,14 +519,14 @@ class Settings: enable_aco: bool = field( default=False, metadata=SettingFieldMetadata( - pretty_name='Enable ACO', + pretty_name="Enable ACO", description=( - 'Enables high performance vulkan shader ' - 'compiler for AMD GPUs. Enabled by default ' - 'since mesa 20.02' + "Enables high performance vulkan shader " + "compiler for AMD GPUs. Enabled by default " + "since mesa 20.02" ), is_deprecated=True, - ) + ), ) def iter_bwrap_options(self) -> ServiceGeneratorType: @@ -515,14 +535,14 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: # Bind /dev/dri and /sys/dev/char and /sys/devices # Get names of cardX and renderX in /dev/dri - dev_dri_path = Path('/dev/dri/') + dev_dri_path = Path("/dev/dri/") device_names = set() for x in dev_dri_path.iterdir(): if x.is_char_device(): device_names.add(x.stem) # Resolve links in /sys/dev/char/ - sys_dev_char_path = Path('/sys/dev/char/') + sys_dev_char_path = Path("/sys/dev/char/") # For each symlink in /sys/dev/char/ resolve # and see if they point to cardX or renderX for x in sys_dev_char_path.iterdir(): @@ -537,16 +557,16 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: # We want to bind the /sys/devices/..pcie_id../ yield DevBind(x_resolved.parents[1]) - yield DevBind('/dev/dri') + yield DevBind("/dev/dri") # Nvidia specific binds - for x in Path('/dev/').iterdir(): - if x.name.startswith('nvidia'): + for x in Path("/dev/").iterdir(): + if x.name.startswith("nvidia"): yield DevBind(x) - name = 'direct_rendering' - pretty_name = 'Direct Rendering' - description = 'Provides access to GPU' + name = "direct_rendering" + pretty_name = "Direct Rendering" + description = "Provides access to GPU" class Systray(BubblejailService): @@ -556,12 +576,12 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: object_path="/StatusNotifierWatcher", ) - name = 'systray' - pretty_name = 'System tray icons' + name = "systray" + pretty_name = "System tray icons" description = ( - 'Provides access to D-Bus API for creating tray icons\n' - 'This is not the only way to create tray icons but\n' - 'the most common one.' + "Provides access to D-Bus API for creating tray icons\n" + "This is not the only way to create tray icons but\n" + "the most common one." ) @@ -569,8 +589,8 @@ class Joystick(BubblejailService): def iter_bwrap_options(self) -> ServiceGeneratorType: look_for_names: set[str] = set() - dev_input_path = Path('/dev/input') - sys_class_input_path = Path('/sys/class/input') + dev_input_path = Path("/dev/input") + sys_class_input_path = Path("/sys/class/input") js_names: set[str] = set() for input_dev in dev_input_path.iterdir(): if not input_dev.is_char_device(): @@ -595,7 +615,7 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: js_reloved = sys_class_input_js.resolve() js_input_path = js_reloved.parents[0] for input_element in js_input_path.iterdir(): - if input_element.name.startswith('event'): + if input_element.name.startswith("event"): look_for_names.add(input_element.name) # Find the *-joystick in /dev/input/by-path/ @@ -613,12 +633,12 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: pci_path = sys_class_path.resolve() yield DevBind(pci_path.parents[2]) - name = 'joystick' - pretty_name = 'Joysticks and gamepads' + name = "joystick" + pretty_name = "Joysticks and gamepads" description = ( - 'Windowing systems (x11 and wayland) do not support gamepads.\n' - 'Every game has to read from device files directly.\n' - 'This service provides access to them.' + "Windowing systems (x11 and wayland) do not support gamepads.\n" + "Every game has to read from device files directly.\n" + "This service provides access to them." ) @@ -629,18 +649,18 @@ class Settings: paths: list[str] = field( default_factory=list, metadata=SettingFieldMetadata( - pretty_name='Read/Write paths', - description='Path to share with sandbox', + pretty_name="Read/Write paths", + description="Path to share with sandbox", is_deprecated=False, - ) + ), ) read_only_paths: list[str] = field( default_factory=list, metadata=SettingFieldMetadata( - pretty_name='Read only paths', - description='Path to share read-only with sandbox', + pretty_name="Read only paths", + description="Path to share read-only with sandbox", is_deprecated=False, - ) + ), ) def iter_bwrap_options(self) -> ServiceGeneratorType: @@ -652,20 +672,15 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: for x in settings.read_only_paths: yield ReadOnlyBind(x) - name = 'root_share' - pretty_name = 'Root share' - description = ( - 'Share directories or files relative to root /' - ) + name = "root_share" + pretty_name = "Root share" + description = "Share directories or files relative to root /" class OpenJDK(BubblejailService): - name = 'openjdk' - pretty_name = 'Java' - description = ( - 'Enable for applications that require Java\n' - 'Example: Minecraft' - ) + name = "openjdk" + pretty_name = "Java" + description = "Enable for applications that require Java\n" "Example: Minecraft" display_in_gui = False @@ -677,9 +692,9 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: object_path="/org/freedesktop/Notifications", ) - name = 'notify' - pretty_name = 'Notifications' - description = 'Ability to send notifications to desktop' + name = "notify" + pretty_name = "Notifications" + description = "Ability to send notifications to desktop" class GnomeToolkit(BubblejailService): @@ -689,79 +704,80 @@ class Settings: gnome_portal: bool = field( default=False, metadata=SettingFieldMetadata( - pretty_name='GNOME Portal', - description='Access to GNOME Portal D-Bus API', + pretty_name="GNOME Portal", + description="Access to GNOME Portal D-Bus API", is_deprecated=False, - ) + ), ) dconf_dbus: bool = field( default=False, metadata=SettingFieldMetadata( - pretty_name='Dconf D-Bus', - description='Access to dconf D-Bus API', + pretty_name="Dconf D-Bus", + description="Access to dconf D-Bus API", is_deprecated=False, - ) + ), ) gnome_vfs_dbus: bool = field( default=False, metadata=SettingFieldMetadata( - pretty_name='GNOME VFS', - description='Access to GNOME Virtual File System D-Bus API', + pretty_name="GNOME VFS", + description="Access to GNOME Virtual File System D-Bus API", is_deprecated=False, - ) + ), ) def iter_bwrap_options(self) -> ServiceGeneratorType: settings = self.context.get_settings(GnomeToolkit.Settings) if settings.gnome_portal: - yield EnvrimentalVar('GTK_USE_PORTAL', '1') - yield DbusSessionTalkTo('org.freedesktop.portal.*') + yield EnvrimentalVar("GTK_USE_PORTAL", "1") + yield DbusSessionTalkTo("org.freedesktop.portal.*") if settings.dconf_dbus: - yield DbusSessionTalkTo('ca.desrt.dconf') + yield DbusSessionTalkTo("ca.desrt.dconf") if settings.gnome_vfs_dbus: - yield DbusSessionTalkTo('org.gtk.vfs.*') + yield DbusSessionTalkTo("org.gtk.vfs.*") # TODO: org.a11y.Bus accessibility services # Needs both dbus and socket, socket is address is # acquired from dbus - name = 'gnome_toolkit' - pretty_name = 'GNOME toolkit' - description = 'Access to GNOME APIs' + name = "gnome_toolkit" + pretty_name = "GNOME toolkit" + description = "Access to GNOME APIs" display_in_gui = False class Pipewire(BubblejailService): def iter_bwrap_options(self) -> ServiceGeneratorType: - PIPEWIRE_SOCKET_NAME = 'pipewire-0' - original_socket_path = (Path(BaseDirectory.get_runtime_dir()) - / PIPEWIRE_SOCKET_NAME) + PIPEWIRE_SOCKET_NAME = "pipewire-0" + original_socket_path = ( + Path(BaseDirectory.get_runtime_dir()) / PIPEWIRE_SOCKET_NAME + ) - new_socket_path = self.xdg_runtime_dir / 'pipewire-0' + new_socket_path = self.xdg_runtime_dir / "pipewire-0" yield ReadOnlyBind(original_socket_path, new_socket_path) - name = 'pipewire' - pretty_name = 'Pipewire' - description = 'Pipewire sound and screencapture system' + name = "pipewire" + pretty_name = "Pipewire" + description = "Pipewire sound and screencapture system" class VideoForLinux(BubblejailService): def iter_bwrap_options(self) -> ServiceGeneratorType: - yield DevBindTry('/dev/v4l') - yield DevBindTry('/sys/class/video4linux') - yield DevBindTry('/sys/bus/media/') + yield DevBindTry("/dev/v4l") + yield DevBindTry("/sys/class/video4linux") + yield DevBindTry("/sys/bus/media/") try: - sys_v4l_iterator = Path('/sys/class/video4linux').iterdir() + sys_v4l_iterator = Path("/sys/class/video4linux").iterdir() for sys_path in sys_v4l_iterator: pcie_path = sys_path.resolve() - for char_path in Path('/sys/dev/char/').iterdir(): + for char_path in Path("/sys/dev/char/").iterdir(): if char_path.resolve() == pcie_path: yield Symlink(readlink(char_path), char_path) @@ -769,11 +785,11 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: except FileNotFoundError: ... - for dev_path in Path('/dev').iterdir(): + for dev_path in Path("/dev").iterdir(): name = dev_path.name - if not (name.startswith('video') or name.startswith('media')): + if not (name.startswith("video") or name.startswith("media")): continue if not name[5:].isnumeric(): @@ -781,47 +797,47 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: yield DevBind(dev_path) - name = 'v4l' - pretty_name = 'Video4Linux' - description = 'Video capture. (webcams and etc.)' + name = "v4l" + pretty_name = "Video4Linux" + description = "Video capture. (webcams and etc.)" class IBus(BubblejailService): def iter_bwrap_options(self) -> ServiceGeneratorType: - yield EnvrimentalVar('IBUS_USE_PORTAL', '1') - yield EnvrimentalVar('GTK_IM_MODULE', 'ibus') - yield EnvrimentalVar('QT_IM_MODULE', 'ibus') - yield EnvrimentalVar('XMODIFIERS', '@im=ibus') - yield EnvrimentalVar('GLFW_IM_MODULE', 'ibus') - yield DbusSessionTalkTo('org.freedesktop.portal.IBus.*') - - name = 'ibus' - pretty_name = 'IBus input method' + yield EnvrimentalVar("IBUS_USE_PORTAL", "1") + yield EnvrimentalVar("GTK_IM_MODULE", "ibus") + yield EnvrimentalVar("QT_IM_MODULE", "ibus") + yield EnvrimentalVar("XMODIFIERS", "@im=ibus") + yield EnvrimentalVar("GLFW_IM_MODULE", "ibus") + yield DbusSessionTalkTo("org.freedesktop.portal.IBus.*") + + name = "ibus" + pretty_name = "IBus input method" description = ( - 'Gives access to IBus input method.\n' - 'This is generally the default input method for multilingual input.' + "Gives access to IBus input method.\n" + "This is generally the default input method for multilingual input." ) - conflicts = frozenset(('fcitx', )) + conflicts = frozenset(("fcitx",)) class Fcitx(BubblejailService): def iter_bwrap_options(self) -> ServiceGeneratorType: - yield EnvrimentalVar('GTK_IM_MODULE', 'fcitx') - yield EnvrimentalVar('QT_IM_MODULE', 'fcitx') - yield EnvrimentalVar('XMODIFIERS', '@im=fcitx') - yield EnvrimentalVar('SDL_IM_MODULE', 'fcitx') - yield EnvrimentalVar('GLFW_IM_MODULE', 'ibus') - yield DbusSessionTalkTo('org.freedesktop.portal.Fcitx.*') - yield DbusSessionTalkTo('org.freedesktop.portal.IBus.*') - - name = 'fcitx' - pretty_name = 'Fcitx/Fcitx5 input method' + yield EnvrimentalVar("GTK_IM_MODULE", "fcitx") + yield EnvrimentalVar("QT_IM_MODULE", "fcitx") + yield EnvrimentalVar("XMODIFIERS", "@im=fcitx") + yield EnvrimentalVar("SDL_IM_MODULE", "fcitx") + yield EnvrimentalVar("GLFW_IM_MODULE", "ibus") + yield DbusSessionTalkTo("org.freedesktop.portal.Fcitx.*") + yield DbusSessionTalkTo("org.freedesktop.portal.IBus.*") + + name = "fcitx" + pretty_name = "Fcitx/Fcitx5 input method" description = ( - 'Gives access to Fcitx/Fcitx5 input method.\n' - 'This is another popular input method framework.' + "Gives access to Fcitx/Fcitx5 input method.\n" + "This is another popular input method framework." ) - conflicts = frozenset(('ibus', )) + conflicts = frozenset(("ibus",)) class Slirp4netns(BubblejailService): @@ -831,34 +847,31 @@ class Settings: dns_servers: list[str] = field( default_factory=list, metadata=SettingFieldMetadata( - pretty_name='DNS servers', + pretty_name="DNS servers", description=( - 'DNS servers used. ' - 'Internal DNS server is always used.' + "DNS servers used. " "Internal DNS server is always used." ), is_deprecated=False, - ) + ), ) outbound_addr: str = field( - default='', + default="", metadata=SettingFieldMetadata( - pretty_name='Outbound address or deivce', + pretty_name="Outbound address or deivce", description=( - 'Address or device to bind to. ' - 'If not set the default address would be used.' + "Address or device to bind to. " + "If not set the default address would be used." ), is_deprecated=False, - ) + ), ) disable_host_loopback: bool = field( default=True, metadata=SettingFieldMetadata( - pretty_name='Disable host loopback access', - description=( - 'Prohibit connecting to host\'s loopback interface' - ), + pretty_name="Disable host loopback access", + description=("Prohibit connecting to host's loopback interface"), is_deprecated=False, - ) + ), ) def __init__(self, context: BubblejailRunContext) -> None: @@ -875,11 +888,8 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: dns_servers.append("10.0.2.3") yield FileTransfer( - b"\n".join( - f"nameserver {x}".encode() - for x in dns_servers - ), - '/etc/resolv.conf' + b"\n".join(f"nameserver {x}".encode() for x in dns_servers), + "/etc/resolv.conf", ) async def post_init_hook(self, pid: int) -> None: @@ -891,18 +901,12 @@ async def post_init_hook(self, pid: int) -> None: from lxns.namespaces import NetworkNamespace with ExitStack() as exit_stack: - target_namespace = exit_stack.enter_context( - NetworkNamespace.from_pid(pid) - ) - parent_ns = exit_stack.enter_context( - target_namespace.get_user_namespace() - ) + target_namespace = exit_stack.enter_context(NetworkNamespace.from_pid(pid)) + parent_ns = exit_stack.enter_context(target_namespace.get_user_namespace()) parent_ns_fd = parent_ns.fileno() parent_ns_path = f"/proc/{getpid()}/fd/{parent_ns_fd}" - ready_pipe_read, ready_pipe_write = ( - pipe2(O_NONBLOCK | O_CLOEXEC) - ) + ready_pipe_read, ready_pipe_write = pipe2(O_NONBLOCK | O_CLOEXEC) exit_stack.enter_context(open(ready_pipe_write)) ready_pipe = exit_stack.enter_context(open(ready_pipe_read)) @@ -917,20 +921,18 @@ async def post_init_hook(self, pid: int) -> None: slirp4netns_args = [ slirp_bin_path, f"--ready={ready_pipe_write}", - '--configure', + "--configure", f"--userns-path={parent_ns_path}", ] if outbound_addr: - slirp4netns_args.append( - (f"--outbound-addr={outbound_addr}") - ) + slirp4netns_args.append((f"--outbound-addr={outbound_addr}")) if disable_host_loopback: - slirp4netns_args.append('--disable-host-loopback') + slirp4netns_args.append("--disable-host-loopback") slirp4netns_args.append(str(pid)) - slirp4netns_args.append('tap0') + slirp4netns_args.append("tap0") self.slirp_process = await create_subprocess_exec( *slirp4netns_args, @@ -943,8 +945,7 @@ async def post_init_hook(self, pid: int) -> None: ) early_process_end_task = loop.create_task( - self.slirp_process.wait(), - name="Early slirp4netns process end" + self.slirp_process.wait(), name="Early slirp4netns process end" ) early_process_end_task.add_done_callback( lambda _: slirp_ready_task.cancel() @@ -953,9 +954,7 @@ async def post_init_hook(self, pid: int) -> None: try: await wait_for(slirp_ready_task, timeout=3) except CancelledError: - raise BubblejailInitializationError( - "Slirp4netns initialization failed" - ) + raise BubblejailInitializationError("Slirp4netns initialization failed") finally: loop.remove_reader(ready_pipe_read) early_process_end_task.cancel() @@ -970,13 +969,12 @@ async def post_shutdown_hook(self) -> None: except ProcessLookupError: ... - name = 'slirp4netns' - pretty_name = 'Slirp4netns networking' + name = "slirp4netns" + pretty_name = "Slirp4netns networking" description = ( - "Independent networking stack for sandbox. " - "Requires slirp4netns executable." + "Independent networking stack for sandbox. " "Requires slirp4netns executable." ) - conflicts = frozenset(('network', )) + conflicts = frozenset(("network",)) class NamespacesLimits(BubblejailService): @@ -986,83 +984,69 @@ class Settings: user: int = field( default=0, metadata=SettingFieldMetadata( - pretty_name='Max number of user namespaces', + pretty_name="Max number of user namespaces", description=( - 'Limiting user namespaces blocks acquiring new ' - 'capabilities and privileges inside namespaces.' + "Limiting user namespaces blocks acquiring new " + "capabilities and privileges inside namespaces." ), is_deprecated=False, - ) + ), ) mount: int = field( default=0, metadata=SettingFieldMetadata( - pretty_name='Max number of mount namespaces', - description=( - 'Limits number mount namespaces.' - ), + pretty_name="Max number of mount namespaces", + description=("Limits number mount namespaces."), is_deprecated=False, - ) + ), ) pid: int = field( default=0, metadata=SettingFieldMetadata( - pretty_name='Max number of PID namespaces', - description=( - 'Limits number PID namespaces.' - ), + pretty_name="Max number of PID namespaces", + description=("Limits number PID namespaces."), is_deprecated=False, - ) + ), ) ipc: int = field( default=0, metadata=SettingFieldMetadata( - pretty_name='Max number of IPC namespaces', - description=( - 'Limits number IPC namespaces.' - ), + pretty_name="Max number of IPC namespaces", + description=("Limits number IPC namespaces."), is_deprecated=False, - ) + ), ) net: int = field( default=0, metadata=SettingFieldMetadata( - pretty_name='Max number of net namespaces', - description=( - 'Limits number net namespaces.' - ), + pretty_name="Max number of net namespaces", + description=("Limits number net namespaces."), is_deprecated=False, - ) + ), ) time: int = field( default=0, metadata=SettingFieldMetadata( - pretty_name='Max number of time namespaces', - description=( - 'Limits number time namespaces.' - ), + pretty_name="Max number of time namespaces", + description=("Limits number time namespaces."), is_deprecated=False, - ) + ), ) uts: int = field( default=0, metadata=SettingFieldMetadata( - pretty_name='Max number of UTS namespaces', - description=( - 'Limits number UTS namespaces.' - ), + pretty_name="Max number of UTS namespaces", + description=("Limits number UTS namespaces."), is_deprecated=False, - ) + ), ) cgroup: int = field( default=0, metadata=SettingFieldMetadata( - pretty_name='Max number of cgroups namespaces', - description=( - 'Limits number cgroups namespaces.' - ), + pretty_name="Max number of cgroups namespaces", + description=("Limits number cgroups namespaces."), is_deprecated=False, - ) + ), ) @staticmethod @@ -1093,19 +1077,23 @@ async def post_init_hook(self, pid: int) -> None: namespace_files_to_limits: dict[str, int] = {} if (user_ns_limit := settings.user) >= 0: namespace_files_to_limits["max_user_namespaces"] = ( - user_ns_limit and user_ns_limit + 1) + user_ns_limit and user_ns_limit + 1 + ) if (mount_ns_limit := settings.mount) >= 0: namespace_files_to_limits["max_mnt_namespaces"] = ( - mount_ns_limit and mount_ns_limit + 1) + mount_ns_limit and mount_ns_limit + 1 + ) if (pid_ns_limit := settings.pid) >= 0: namespace_files_to_limits["max_pid_namespaces"] = ( - pid_ns_limit and pid_ns_limit + 1) + pid_ns_limit and pid_ns_limit + 1 + ) if (ipc_ns_limit := settings.ipc) >= 0: namespace_files_to_limits["max_ipc_namespaces"] = ( - ipc_ns_limit and ipc_ns_limit + 1) + ipc_ns_limit and ipc_ns_limit + 1 + ) if (net_ns_limit := settings.net) >= 0: if not self.context.is_service_enabled(Network): @@ -1118,15 +1106,16 @@ async def post_init_hook(self, pid: int) -> None: if (uts_ns_limit := settings.uts) >= 0: namespace_files_to_limits["max_uts_namespaces"] = ( - uts_ns_limit and uts_ns_limit + 1) + uts_ns_limit and uts_ns_limit + 1 + ) if (cgroup_ns_limit := settings.cgroup) >= 0: namespace_files_to_limits["max_cgroup_namespaces"] = ( - cgroup_ns_limit and cgroup_ns_limit + 1) + cgroup_ns_limit and cgroup_ns_limit + 1 + ) setter_process = Process( - target=self.set_namespaces_limits, - args=(pid, namespace_files_to_limits) + target=self.set_namespaces_limits, args=(pid, namespace_files_to_limits) ) try: setter_process.start() @@ -1160,35 +1149,35 @@ class Settings: raw_bwrap_args: list[str] = field( default_factory=list, metadata=SettingFieldMetadata( - pretty_name='Raw bwrap args', + pretty_name="Raw bwrap args", description=( - 'Raw arguments to add to bwrap invocation. ' - 'See bubblewrap documentation.' + "Raw arguments to add to bwrap invocation. " + "See bubblewrap documentation." ), is_deprecated=False, - ) + ), ) raw_dbus_session_args: list[str] = field( default_factory=list, metadata=SettingFieldMetadata( - pretty_name='Raw xdg-dbus-proxy session args', + pretty_name="Raw xdg-dbus-proxy session args", description=( - 'Raw arguments to add to xdg-dbus-proxy session ' - 'invocation. See xdg-dbus-proxy documentation.' + "Raw arguments to add to xdg-dbus-proxy session " + "invocation. See xdg-dbus-proxy documentation." ), is_deprecated=False, - ) + ), ) raw_dbus_system_args: list[str] = field( default_factory=list, metadata=SettingFieldMetadata( - pretty_name='Raw xdg-dbus-proxy system args', + pretty_name="Raw xdg-dbus-proxy system args", description=( - 'Raw arguments to add to xdg-dbus-proxy system ' - 'invocation. See xdg-dbus-proxy documentation.' + "Raw arguments to add to xdg-dbus-proxy system " + "invocation. See xdg-dbus-proxy documentation." ), is_deprecated=False, - ) + ), ) def iter_bwrap_options(self) -> ServiceGeneratorType: @@ -1213,11 +1202,26 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: SERVICES_CLASSES: tuple[Type[BubblejailService], ...] = ( - CommonSettings, X11, Wayland, - Network, PulseAudio, HomeShare, DirectRendering, - Systray, Joystick, RootShare, OpenJDK, Notifications, - GnomeToolkit, Pipewire, VideoForLinux, IBus, Fcitx, - Slirp4netns, NamespacesLimits, Debug, + CommonSettings, + X11, + Wayland, + Network, + PulseAudio, + HomeShare, + DirectRendering, + Systray, + Joystick, + RootShare, + OpenJDK, + Notifications, + GnomeToolkit, + Pipewire, + VideoForLinux, + IBus, + Fcitx, + Slirp4netns, + NamespacesLimits, + Debug, ) SERVICES_MAP: dict[str, Type[BubblejailService]] = { @@ -1226,7 +1230,7 @@ def iter_bwrap_options(self) -> ServiceGeneratorType: if TYPE_CHECKING: - T = TypeVar('T', bound="object") + T = TypeVar("T", bound="object") class BubblejailRunContext: @@ -1262,9 +1266,7 @@ def __init__(self, conf_dict: ServicesConfDictType | None = None): if conf_dict is not None: self.set_services(conf_dict) - def set_services( - self, - new_services_datas: ServicesConfDictType) -> None: + def set_services(self, new_services_datas: ServicesConfDictType) -> None: declared_services: set[str] = set() self.services.clear() @@ -1279,15 +1281,12 @@ def set_services( service_settings = service_settings_class(**service_options_dict) - self.service_settings_to_type[service_settings_class] = ( - service_settings - ) + self.service_settings_to_type[service_settings_class] = service_settings self.service_settings[service_name] = service_settings declared_services.add(service_name) - if conflicting_services := ( - declared_services & service_class.conflicts): + if conflicting_services := (declared_services & service_class.conflicts): raise ServiceConflictError( f"Service conflict between {service_name} and " f"{', '.join(conflicting_services)}" @@ -1314,22 +1313,19 @@ def iter_services( yield from self.services.values() - def iter_post_init_hooks( - self - ) -> Iterator[Callable[[int], Awaitable[None]]]: + def iter_post_init_hooks(self) -> Iterator[Callable[[int], Awaitable[None]]]: for service in self.services.values(): - if (service.__class__.post_init_hook - is BubblejailService.post_init_hook): + if service.__class__.post_init_hook is BubblejailService.post_init_hook: continue yield service.post_init_hook - def iter_post_shutdown_hooks( - self - ) -> Iterator[Callable[[], Awaitable[None]]]: + def iter_post_shutdown_hooks(self) -> Iterator[Callable[[], Awaitable[None]]]: for service in self.services.values(): - if (service.__class__.post_shutdown_hook - is BubblejailService.post_shutdown_hook): + if ( + service.__class__.post_shutdown_hook + is BubblejailService.post_shutdown_hook + ): continue yield service.post_shutdown_hook diff --git a/test/test_auto_completion.py b/test/test_auto_completion.py index d1d8e91..bd39a1f 100644 --- a/test/test_auto_completion.py +++ b/test/test_auto_completion.py @@ -15,26 +15,26 @@ def setUp(self) -> None: def test_second_arg(self) -> None: self.assertEqual( - tuple(self.parser.auto_complete('bubblejail ')), - tuple(BUBBLEJAIL_CMD.keys()) + tuple(self.parser.auto_complete("bubblejail ")), + tuple(BUBBLEJAIL_CMD.keys()), ) self.assertEqual( - tuple(self.parser.auto_complete('bubblejail lis')), - tuple(BUBBLEJAIL_CMD.keys()) + tuple(self.parser.auto_complete("bubblejail lis")), + tuple(BUBBLEJAIL_CMD.keys()), ) self.assertEqual( - tuple(self.parser.auto_complete('bubblejail asd ')), + tuple(self.parser.auto_complete("bubblejail asd ")), tuple(), ) def test_subcommand(self) -> None: self.assertEqual( - tuple(self.parser.auto_complete('bubblejail list ')), + tuple(self.parser.auto_complete("bubblejail list ")), tuple(iter_list_choices()), ) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/test/test_full_run.py b/test/test_full_run.py index 828e138..42f6ed4 100644 --- a/test/test_full_run.py +++ b/test/test_full_run.py @@ -92,9 +92,7 @@ async def test_full_run(self) -> None: self.assertTrue(self.bubblejail_data_dir.exists()) with self.subTest("Validate desktop entry"): - for desktop_entry_file in ( - self.data_dir / "applications" - ).iterdir(): + for desktop_entry_file in (self.data_dir / "applications").iterdir(): subprocess_run( ("desktop-file-validate", str(desktop_entry_file)), check=True, diff --git a/test/test_helper.py b/test/test_helper.py index 1b4817c..471a1ce 100644 --- a/test/test_helper.py +++ b/test/test_helper.py @@ -30,7 +30,7 @@ def setUp(self) -> None: self.temp_dir = TemporaryDirectory() self.addCleanup(self.temp_dir.cleanup) self.temp_dir_path = Path(self.temp_dir.name) - self.test_socket_path = self.temp_dir_path / 'test_socket' + self.test_socket_path = self.temp_dir_path / "test_socket" self.test_socket = socket(AF_UNIX) self.test_socket.bind(bytes(self.test_socket_path)) @@ -55,14 +55,14 @@ async def asyncSetUp(self) -> None: async def test_ping(self) -> None: """Test pinging helper over unix socket""" - ping_request = RequestPing('test') + ping_request = RequestPing("test") self.writer.write(ping_request.to_json_byte_line()) await self.writer.drain() response = await self.reader.readline() - print('Response bytes:', response) - self.assertIn(b'pong', response, 'No pong in response') + print("Response bytes:", response) + self.assertIn(b"pong", response, "No pong in response") async def asyncTearDown(self) -> None: create_task(self.helper.stop_async()) @@ -78,38 +78,34 @@ def setUp(self) -> None: def test_parser(self) -> None: """Test how helper argument parser works""" - required_args = ['--helper-socket', '0', '--'] + required_args = ["--helper-socket", "0", "--"] - with self.subTest('No shell'): + with self.subTest("No shell"): no_shell_example_args = [ - '/bin/true', - '--long-opt', '-e', '-test', - '/bin/false', '--shell', + "/bin/true", + "--long-opt", + "-e", + "-test", + "/bin/false", + "--shell", ] - parsed_args = self.parser.parse_args( - required_args + no_shell_example_args - ) + parsed_args = self.parser.parse_args(required_args + no_shell_example_args) self.assertFalse(parsed_args.shell) self.assertEqual(parsed_args.args_to_run, no_shell_example_args) - with self.subTest('Shell plus args'): - with_shell_example_args = [ - '/bin/ls', '-l' - ] + with self.subTest("Shell plus args"): + with_shell_example_args = ["/bin/ls", "-l"] parsed_args = self.parser.parse_args( - ['--shell'] + required_args + with_shell_example_args + ["--shell"] + required_args + with_shell_example_args ) self.assertTrue(parsed_args.shell) - self.assertEqual( - parsed_args.args_to_run, with_shell_example_args) + self.assertEqual(parsed_args.args_to_run, with_shell_example_args) - with self.subTest('Only shell'): - parsed_args = self.parser.parse_args( - ['--shell'] + required_args - ) + with self.subTest("Only shell"): + parsed_args = self.parser.parse_args(["--shell"] + required_args) self.assertTrue(parsed_args.shell) self.assertEqual(parsed_args.args_to_run, []) @@ -120,33 +116,34 @@ class PidTrackerTest(IsolatedAsyncioTestCase): async def test_process_detection(self) -> None: """Test process detection""" - with self.subTest('PID tracking: no child of this process'): + with self.subTest("PID tracking: no child of this process"): self.assertFalse(BubblejailHelper.process_has_child()) child_process = await create_subprocess_exec( - 'sleep', '1d', + "sleep", + "1d", ) - with self.subTest('PID tracking: has child method'): - self.assertTrue( - BubblejailHelper.process_has_child()) + with self.subTest("PID tracking: has child method"): + self.assertTrue(BubblejailHelper.process_has_child()) - with self.subTest('PID tracking by command: right command'): + with self.subTest("PID tracking by command: right command"): # WARN: This will give false positive if you have # sleep running anywhere on the system # Maybein the future we can setup quick pid namespace - self.assertTrue( - BubblejailHelper.proc_has_process_command('sleep')) + self.assertTrue(BubblejailHelper.proc_has_process_command("sleep")) - with self.subTest('PID tracking by command: wrong command'): + with self.subTest("PID tracking by command: wrong command"): self.assertFalse( BubblejailHelper.proc_has_process_command( - 'asdjhaikefrasendiklfnsmzkjledf')) + "asdjhaikefrasendiklfnsmzkjledf" + ) + ) # Cleanup child_process.terminate() await child_process.wait() -if __name__ == '__main__': +if __name__ == "__main__": unittest_main() diff --git a/test/test_profiles.py b/test/test_profiles.py index bcb3b24..2b5ae23 100644 --- a/test/test_profiles.py +++ b/test/test_profiles.py @@ -9,21 +9,18 @@ from bubblejail.bubblejail_instance import BubblejailProfile - PROJECT_ROOT_PATH = Path(__file__).parent.parent class TestProfiles(TestCase): def test_profiles(self) -> None: - profiles_str_path = ( - PROJECT_ROOT_PATH / 'data/usr-share/bubblejail/profiles' - ) + profiles_str_path = PROJECT_ROOT_PATH / "data/usr-share/bubblejail/profiles" for profile_path in profiles_str_path.iterdir(): with self.subTest(profile_path.stem): - with open(profile_path, mode='rb') as f: + with open(profile_path, mode="rb") as f: BubblejailProfile(**toml_load(f)) -if __name__ == '__main__': +if __name__ == "__main__": unittest_main() diff --git a/test/test_service_info.py b/test/test_service_info.py index 0cb1f15..a619fa1 100644 --- a/test/test_service_info.py +++ b/test/test_service_info.py @@ -6,14 +6,8 @@ from unittest import TestCase from unittest import main as unittest_main - from bubblejail.exceptions import ServiceConflictError -from bubblejail.services import ( - SERVICES_CLASSES, - SERVICES_MAP, - X11, - ServiceContainer, -) +from bubblejail.services import SERVICES_CLASSES, SERVICES_MAP, X11, ServiceContainer class TestServices(TestCase): @@ -41,13 +35,9 @@ def test_x11_socket_bind(self) -> None: "/tmp/.X11-unix/X1", ) - self.assertIsNone( - X11.x11_socket_path("tcp/localhost:1") - ) + self.assertIsNone(X11.x11_socket_path("tcp/localhost:1")) - self.assertIsNone( - X11.x11_socket_path("unix/localhost:1") - ) + self.assertIsNone(X11.x11_socket_path("unix/localhost:1")) def test_service_conflict_relationship(self) -> None: # Test that conflict points to existing service @@ -56,7 +46,8 @@ def test_service_conflict_relationship(self) -> None: for conflict in service.conflicts: conflict_service = SERVICES_MAP[conflict] self.assertIn( - service.name, conflict_service.conflicts, + service.name, + conflict_service.conflicts, msg=( f"Reverse conflict of {service.name} " f"to {conflict_service.name} not found" @@ -78,5 +69,5 @@ def test_service_conflict_load(self) -> None: ServiceContainer(test_good_config) -if __name__ == '__main__': +if __name__ == "__main__": unittest_main() diff --git a/tools/base.py b/tools/base.py new file mode 100644 index 0000000..1ed7d82 --- /dev/null +++ b/tools/base.py @@ -0,0 +1,16 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +# SPDX-FileCopyrightText: 2024 igo95862 +from __future__ import annotations + +from pathlib import Path + +PROJECT_ROOT_PATH = Path(__file__).parent.parent +BUILD_DIR = PROJECT_ROOT_PATH / "build" +PYTHON_SOURCES: list[Path] = [ + PROJECT_ROOT_PATH / "src", + PROJECT_ROOT_PATH / "tools", + PROJECT_ROOT_PATH / "test", + PROJECT_ROOT_PATH / "docs/man_generator.py", +] + +__all__ = ("PROJECT_ROOT_PATH", "BUILD_DIR", "PYTHON_SOURCES") diff --git a/tools/containers/build_ci_images.py b/tools/containers/build_ci_images.py index 1c242e8..edd2847 100644 --- a/tools/containers/build_ci_images.py +++ b/tools/containers/build_ci_images.py @@ -4,9 +4,9 @@ from __future__ import annotations from argparse import ArgumentParser +from contextlib import contextmanager from functools import partial from subprocess import run as subprocess_run -from contextlib import contextmanager from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -169,8 +169,7 @@ def build_analysis_image(distro: str) -> None: install_packages_for_distro( container_name, distro, - DISTRO_CODE_ANALYSIS_TOOLS[distro] - + DISTRO_PYTHON_RUNTIME_DEPS[distro], + DISTRO_CODE_ANALYSIS_TOOLS[distro] + DISTRO_PYTHON_RUNTIME_DEPS[distro], ) diff --git a/tools/jinja2_run.py b/tools/jinja2_run.py index e29119b..380bc50 100644 --- a/tools/jinja2_run.py +++ b/tools/jinja2_run.py @@ -31,18 +31,18 @@ def execute_template( def main() -> None: arg_parse = ArgumentParser() arg_parse.add_argument( - '--define', - action='append', + "--define", + action="append", nargs=2, default=[], ) arg_parse.add_argument( - '--template-dir', + "--template-dir", required=True, type=Path, ) arg_parse.add_argument( - 'template_name', + "template_name", ) execute_template(**vars(arg_parse.parse_args())) diff --git a/tools/run_format.py b/tools/run_format.py new file mode 100644 index 0000000..9dc3103 --- /dev/null +++ b/tools/run_format.py @@ -0,0 +1,42 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +# SPDX-FileCopyrightText: 2024 igo95862 +from __future__ import annotations + +from subprocess import run + +from .base import PROJECT_ROOT_PATH, PYTHON_SOURCES + + +def format_with_black(check: bool = False) -> None: + black_args = ["black"] + + if check: + black_args.extend(("--check", "--diff")) + + black_args.extend(map(str, PYTHON_SOURCES)) + + run( + args=black_args, + cwd=PROJECT_ROOT_PATH, + check=check, + ) + + +def format_with_isort(check: bool = False) -> None: + isort_args = ["isort", "--profile", "black"] + + if check: + isort_args.extend(("--check", "--diff")) + + isort_args.extend(map(str, PYTHON_SOURCES)) + + run( + args=isort_args, + cwd=PROJECT_ROOT_PATH, + check=check, + ) + + +if __name__ == "__main__": + format_with_isort() + format_with_black() diff --git a/tools/run_linters.py b/tools/run_linters.py index 945c712..938c92f 100644 --- a/tools/run_linters.py +++ b/tools/run_linters.py @@ -4,22 +4,16 @@ from pathlib import Path from subprocess import CalledProcessError, run +from sys import stderr -PROJECT_ROOT_PATH = Path(__file__).parent.parent -BUILD_DIR = PROJECT_ROOT_PATH / "build" -PYTHON_SOURCES: list[Path] = [ - PROJECT_ROOT_PATH / "src", - PROJECT_ROOT_PATH / "tools", - PROJECT_ROOT_PATH / "test", - PROJECT_ROOT_PATH / "docs/man_generator.py", -] -LXNS_SUBPROJECT_PYTHON_SOURCE = ( - PROJECT_ROOT_PATH / "subprojects/python-lxns/src/" -) +from .base import BUILD_DIR, PROJECT_ROOT_PATH, PYTHON_SOURCES +from .run_format import format_with_black, format_with_isort + +LXNS_SUBPROJECT_PYTHON_SOURCE = PROJECT_ROOT_PATH / "subprojects/python-lxns/src/" def run_linter(args: list[str | Path]) -> bool: - print("Running:", args[0]) + print("Running:", args[0], file=stderr) try: run( args=args, @@ -42,22 +36,41 @@ def run_mypy() -> bool: "mypy", "--pretty", "--strict", - "--cache-dir", cache_dir, + "--cache-dir", + cache_dir, "--ignore-missing-imports", - *PYTHON_SOURCES + *PYTHON_SOURCES, ] if LXNS_SUBPROJECT_PYTHON_SOURCE.exists(): mypy_args.append(LXNS_SUBPROJECT_PYTHON_SOURCE) - return run_linter( - mypy_args - ) + return run_linter(mypy_args) def run_reuse() -> bool: return run_linter(["reuse", "lint"]) +def run_black() -> bool: + print("Running: black", file=stderr) + try: + format_with_black(check=True) + except CalledProcessError: + return True + + return False + + +def run_isort() -> bool: + print("Running: isort", file=stderr) + try: + format_with_isort(check=True) + except CalledProcessError: + return True + + return False + + def main() -> None: BUILD_DIR.mkdir(exist_ok=True) @@ -66,6 +79,8 @@ def main() -> None: has_failed |= run_pyflakes() has_failed |= run_mypy() has_failed |= run_reuse() + has_failed |= run_black() + has_failed |= run_isort() if has_failed: raise SystemExit(1) diff --git a/tools/run_test_bubblejail.py b/tools/run_test_bubblejail.py index 5fb7831..283f107 100755 --- a/tools/run_test_bubblejail.py +++ b/tools/run_test_bubblejail.py @@ -47,7 +47,7 @@ def setup_test_env() -> None: def setup_mocks() -> None: from bubblejail.bubblejail_instance import BubblejailInstance - helper_path = PROJECT_ROOT_PATH / 'src/bubblejail/bubblejail_helper.py' + helper_path = PROJECT_ROOT_PATH / "src/bubblejail/bubblejail_helper.py" original_run = BubblejailInstance.async_run_init async def run_with_helper_script( @@ -64,7 +64,7 @@ async def run_with_helper_script( setattr( BubblejailInstance, - 'async_run_init', + "async_run_init", run_with_helper_script, ) @@ -72,17 +72,17 @@ async def run_with_helper_script( runtime_dir_path.mkdir(exist_ok=True) def runtime_dir(self: BubblejailInstance) -> Path: - return runtime_dir_path / f'bubblejail/{self.name}' + return runtime_dir_path / f"bubblejail/{self.name}" setattr( BubblejailInstance, - 'runtime_dir', + "runtime_dir", property(fget=runtime_dir), ) def shell_main() -> None: - history_file = BUILD_DIR / 'bubblejail_cmd_history' + history_file = BUILD_DIR / "bubblejail_cmd_history" history_file.touch(exist_ok=True) read_history_file(history_file) set_history_length(1000) @@ -93,7 +93,7 @@ def shell_main() -> None: while True: try: - input_line = input('bubblejail>> ') + input_line = input("bubblejail>> ") except EOFError: print() return @@ -119,15 +119,15 @@ def gui_main() -> None: TEST_RUNNERS = { - 'shell': shell_main, - 'gui': gui_main, + "shell": shell_main, + "gui": gui_main, } def main() -> None: arg_parser = ArgumentParser() arg_parser.add_argument( - 'runner', + "runner", choices=TEST_RUNNERS.keys(), ) args = arg_parser.parse_args() @@ -135,5 +135,5 @@ def main() -> None: TEST_RUNNERS[args.runner]() -if __name__ == '__main__': +if __name__ == "__main__": main()