From fe07825ad9f69bac26b3414fdb6241cea2757554 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Wed, 10 Aug 2022 11:06:40 +0530 Subject: [PATCH] Work on new permissions framework for rc commands --- kitty/boss.py | 38 ++++++++++++++++++++------------ kitty/remote_control.py | 48 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 71 insertions(+), 15 deletions(-) diff --git a/kitty/boss.py b/kitty/boss.py index c15a58be3..bcf9f7a7f 100644 --- a/kitty/boss.py +++ b/kitty/boss.py @@ -68,6 +68,9 @@ from .utils import ( from .window import CommandOutput, CwdRequest, Window +RCResponse = Union[Dict[str, Any], None, AsyncResponse] + + class OSWindowDict(TypedDict): id: int platform_window_id: Optional[int] @@ -441,8 +444,8 @@ class Boss: self.child_monitor.add_child(window.id, window.child.pid, window.child.child_fd, window.screen) self.window_id_map[window.id] = window - def _handle_remote_command(self, cmd: str, window: Optional[Window] = None, peer_id: int = 0) -> Union[Dict[str, Any], None, AsyncResponse]: - from .remote_control import handle_cmd, parse_cmd + def _handle_remote_command(self, cmd: str, window: Optional[Window] = None, peer_id: int = 0) -> RCResponse: + from .remote_control import parse_cmd response = None window = window or None try: @@ -452,18 +455,25 @@ class Boss: return response if not pcmd: return response - if self.allow_remote_control == 'y' or peer_id > 0 or getattr(window, 'allow_remote_control', False): - try: - response = handle_cmd(self, window, pcmd, peer_id) - except Exception as err: - import traceback - response = {'ok': False, 'error': str(err)} - if not getattr(err, 'hide_traceback', False): - response['tb'] = traceback.format_exc() - else: - no_response = pcmd.get('no_response') or False - if not no_response: - response = {'ok': False, 'error': 'Remote control is disabled. Add allow_remote_control to your kitty.conf'} + allowed_by_channel = ( + self.allow_remote_control == 'y' or (peer_id > 0 and self.allow_remote_control == 'socket-only') or + getattr(window, 'allow_remote_control', False)) + if allowed_by_channel: + return self._execute_remote_command(pcmd, window, peer_id) + no_response = pcmd.get('no_response') or False + if no_response: + return None + return {'ok': False, 'error': 'Remote control is disabled. Add allow_remote_control to your kitty.conf'} + + def _execute_remote_command(self, pcmd: Dict[str, Any], window: Optional[Window] = None, peer_id: int = 0) -> RCResponse: + from .remote_control import handle_cmd + try: + response = handle_cmd(self, window, pcmd, peer_id) + except Exception as err: + import traceback + response = {'ok': False, 'error': str(err)} + if not getattr(err, 'hide_traceback', False): + response['tb'] = traceback.format_exc() return response @ac('misc', ''' diff --git a/kitty/remote_control.py b/kitty/remote_control.py index 56a295ade..d518f1432 100644 --- a/kitty/remote_control.py +++ b/kitty/remote_control.py @@ -11,7 +11,7 @@ from functools import partial from time import monotonic, time_ns from types import GeneratorType from typing import ( - Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union, cast + Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union, cast, TYPE_CHECKING ) from .cli import emph, parse_args @@ -30,6 +30,8 @@ from .typing import BossType, WindowType from .utils import TTYIO, log_error, parse_address_spec, resolve_custom_file active_async_requests: Dict[str, float] = {} +if TYPE_CHECKING: + from .window import Window def encode_response_for_peer(response: Any) -> bytes: @@ -65,6 +67,50 @@ def parse_cmd(serialized_cmd: str, encryption_key: EllipticCurveKey) -> Dict[str return pcmd +class CMDChecker: + + def __call__(self, pcmd: Dict[str, Any], window: Optional['Window'], from_socket: bool, extra_data: Dict[str, Any]) -> bool: + return False + + +class PasswordAuthorizer: + + def __init__(self, password: str, auth_items: Tuple[str, ...]) -> None: + from fnmatch import translate + import runpy + self.password = password + self.command_patterns = [] + self.function_checkers = [] + self.user_permission: Optional[bool] = None + for item in auth_items: + if item.endswith('.py'): + path = resolve_custom_file(item) + try: + m = runpy.run_path(path) + func: CMDChecker = m['is_cmd_allowed'] + except Exception as e: + log_error(f'Failed to load cmd check function from {path} with error: {e}') + self.function_checkers.append(CMDChecker()) + else: + self.function_checkers.append(func) + else: + self.command_patterns.append(re.compile(translate(item))) + + def is_cmd_allowed(self, pcmd: Dict[str, Any], window: Optional['Window'], from_socket: bool, extra_data: Dict[str, Any]) -> Optional[bool]: + cmd_name = pcmd.get('cmd') + if not cmd_name: + return False + if not self.function_checkers and not self.command_patterns: + return True + for x in self.command_patterns: + if x.match(cmd_name) is not None: + return True + for f in self.function_checkers: + if f(pcmd, window, from_socket, extra_data): + return True + return self.user_permission + + def handle_cmd(boss: BossType, window: Optional[WindowType], cmd: Dict[str, Any], peer_id: int) -> Union[Dict[str, Any], None, AsyncResponse]: v = cmd['version'] no_response = cmd.get('no_response', False)