diff --git a/kitty/boss.py b/kitty/boss.py index bcf9f7a7f..85f345d05 100644 --- a/kitty/boss.py +++ b/kitty/boss.py @@ -38,10 +38,10 @@ from .fast_data_types import ( focus_os_window, get_boss, get_clipboard_string, get_options, get_os_window_size, global_font_size, mark_os_window_for_close, os_window_font_size, patch_global_colors, redirect_mouse_handling, - ring_bell, safe_pipe, set_application_quit_request, set_background_image, - set_boss, set_clipboard_string, set_in_sequence_mode, set_options, - set_os_window_size, set_os_window_title, thread_write, toggle_fullscreen, - toggle_maximized, toggle_secure_input + ring_bell, safe_pipe, send_data_to_peer, set_application_quit_request, + set_background_image, set_boss, set_clipboard_string, set_in_sequence_mode, + set_options, set_os_window_size, set_os_window_title, thread_write, + toggle_fullscreen, toggle_maximized, toggle_secure_input ) from .key_encoding import get_name_to_functional_number_map from .keys import get_shortcut, shortcut_matches @@ -67,7 +67,6 @@ from .utils import ( ) from .window import CommandOutput, CwdRequest, Window - RCResponse = Union[Dict[str, Any], None, AsyncResponse] @@ -445,7 +444,7 @@ class Boss: self.window_id_map[window.id] = window def _handle_remote_command(self, cmd: str, window: Optional[Window] = None, peer_id: int = 0) -> RCResponse: - from .remote_control import parse_cmd + from .remote_control import is_cmd_allowed, parse_cmd response = None window = window or None try: @@ -460,11 +459,62 @@ class Boss: getattr(window, 'allow_remote_control', False)) if allowed_by_channel: return self._execute_remote_command(pcmd, window, peer_id) + q = is_cmd_allowed(pcmd, window, peer_id > 0, {}) + if q is True: + return self._execute_remote_command(pcmd, window, peer_id) + if q is None: + if self.ask_if_remote_cmd_is_allowed(pcmd, window, peer_id): + return AsyncResponse() no_response = pcmd.get('no_response') or False if no_response: return None return {'ok': False, 'error': 'Remote control is disabled. Add allow_remote_control to your kitty.conf'} + def ask_if_remote_cmd_is_allowed(self, pcmd: Dict[str, Any], window: Optional[Window] = None, peer_id: int = 0) -> bool: + from kittens.tui.operations import styled + in_flight = 0 + for w in self.window_id_map.values(): + if w.window_custom_type == 'remote_command_permission_dialog': + in_flight += 1 + if in_flight > 4: + log_error('Denying remote command permission as there are too many existing permission requests') + return False + wid = 0 if window is None else window.id + overlay_window = self.choose( + _('A program wishes to control kitty.\n\n' + 'Password: {0}\n' 'Action: {1}\n\n' + 'Note that allowing the password will allow all future actions using the same password, in this kitty instance.' + ).format(styled(pcmd['password'], fg='yellow'), styled(pcmd['cmd'], fg='magenta')), + partial(self.remote_cmd_permission_received, pcmd, wid, peer_id), + 't;green:Allow this request', 'p;yellow:Allow this password', 'r;magenta:Deny this request', 'd;red:Deny this password', + window=window, default='t' + ) + if overlay_window is None: + return False + overlay_window.window_custom_type = 'remote_command_permission_dialog' + return True + + def remote_cmd_permission_received(self, pcmd: Dict[str, Any], window_id: int, peer_id: int, choice: str) -> None: + from .remote_control import ( + encode_response_for_peer, set_user_password_allowed + ) + if choice in ('r', 'd'): + if choice == 'd': + set_user_password_allowed(pcmd['password'], False) + elif choice in ('t', 'p'): + if choice == 'p': + set_user_password_allowed(pcmd['password'], True) + window = self.window_id_map.get(window_id) + response = self._execute_remote_command(pcmd, window, peer_id) + if window is not None and response is not None and not isinstance(response, AsyncResponse): + window.send_cmd_response(response) + if peer_id > 0: + if response is None or isinstance(response, AsyncResponse): + data = b'' + else: + data = encode_response_for_peer(response) + send_data_to_peer(peer_id, data) + def _execute_remote_command(self, pcmd: Dict[str, Any], window: Optional[Window] = None, peer_id: int = 0) -> RCResponse: from .remote_control import handle_cmd try: @@ -681,7 +731,7 @@ class Boss: *choices: str, # The choices, see the help for the ask kitten for format of a choice window: Optional[Window] = None, # the window associated with the confirmation default: str = '', # the default choice when the user presses Enter - ) -> None: + ) -> Optional[Window]: def callback_(res: Dict[str, Any], x: int, boss: Boss) -> None: callback(res.get('response') or '') cmd = ['--type=choices', '--message', msg] @@ -689,7 +739,10 @@ class Boss: cmd += ['-d', default] for c in choices: cmd += ['-c', c] - self.run_kitten_with_metadata('ask', cmd, window=window, custom_callback=callback_, default_data={'response': ''}) + ans = self.run_kitten_with_metadata('ask', cmd, window=window, custom_callback=callback_, default_data={'response': ''}) + if isinstance(ans, Window): + return ans + return None def get_line( self, msg: str, # can contain newlines and ANSI formatting diff --git a/kitty/options/types.py b/kitty/options/types.py index 45a5aa00a..78f42d486 100644 --- a/kitty/options/types.py +++ b/kitty/options/types.py @@ -610,7 +610,7 @@ class Options: kitten_alias: typing.Dict[str, str] = {} modify_font: typing.Dict[str, kitty.fonts.FontModification] = {} narrow_symbols: typing.Dict[typing.Tuple[int, int], int] = {} - remote_control_password: typing.Dict[str, typing.Tuple[str, ...]] = {} + remote_control_password: typing.Dict[str, typing.FrozenSet[str]] = {} symbol_map: typing.Dict[typing.Tuple[int, int], str] = {} watcher: typing.Dict[str, str] = {} map: typing.List[kitty.options.utils.KeyDefinition] = [] diff --git a/kitty/options/utils.py b/kitty/options/utils.py index 871286fc7..9e77d5446 100644 --- a/kitty/options/utils.py +++ b/kitty/options/utils.py @@ -675,14 +675,14 @@ def config_or_absolute_path(x: str, env: Optional[Dict[str, str]] = None) -> Opt return resolve_abs_or_config_path(x, env) -def remote_control_password(val: str, current_val: Dict[str, str]) -> Iterable[Tuple[str, Tuple[str, ...]]]: +def remote_control_password(val: str, current_val: Dict[str, str]) -> Iterable[Tuple[str, FrozenSet[str]]]: val = val.strip() if val: parts = to_cmdline(val, expand=False) if len(parts) == 1: - yield parts[0], () + yield parts[0], frozenset() else: - yield parts[0], tuple(parts[1:]) + yield parts[0], frozenset(parts[1:]) def allow_remote_control(x: str) -> str: diff --git a/kitty/remote_control.py b/kitty/remote_control.py index d518f1432..6d9677d0c 100644 --- a/kitty/remote_control.py +++ b/kitty/remote_control.py @@ -7,11 +7,12 @@ import os import re import sys from contextlib import suppress -from functools import partial +from functools import lru_cache, partial from time import monotonic, time_ns from types import GeneratorType from typing import ( - Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union, cast, TYPE_CHECKING + TYPE_CHECKING, Any, Dict, FrozenSet, Iterable, Iterator, List, Optional, + Tuple, Union, cast ) from .cli import emph, parse_args @@ -19,7 +20,7 @@ from .cli_stub import RCOptions from .constants import RC_ENCRYPTION_PROTOCOL_VERSION, appname, version from .fast_data_types import ( AES256GCMDecrypt, AES256GCMEncrypt, EllipticCurveKey, get_boss, - read_command_response, send_data_to_peer + get_options, read_command_response, send_data_to_peer ) from .rc.base import ( NoResponse, ParsingOfArgsFailed, PayloadGetter, all_command_names, @@ -53,8 +54,8 @@ def parse_cmd(serialized_cmd: str, encryption_key: EllipticCurveKey) -> Dict[str pubkey = pcmd.get('pubkey', '') if not pubkey: log_error('Ignoring encrypted rc command without a public key') - d = AES256GCMDecrypt(encryption_key.derive_secret(base64.b85decode(pubkey)), pcmd['iv'], pcmd['tag']) - data = d.add_data_to_be_decrypted(base64.b85decode(pcmd['encrypted']), finished=True) + d = AES256GCMDecrypt(encryption_key.derive_secret(base64.b85decode(pubkey)), base64.b85decode(pcmd['iv']), base64.b85decode(pcmd['tag'])) + data = d.add_data_to_be_decrypted(base64.b85decode(pcmd['encrypted']), True) pcmd = json.loads(data) if not isinstance(pcmd, dict) or 'version' not in pcmd: return {} @@ -73,30 +74,37 @@ class CMDChecker: return False +@lru_cache(maxsize=64) +def is_cmd_allowed_loader(path: str) -> CMDChecker: + import runpy + try: + m = runpy.run_path(path) + func: CMDChecker = m['is_cmd_allowed'] + except Exception as e: + log_error(f'Failed to load cmd check function from {path} with error: {e}') + func = CMDChecker() + return func + + +@lru_cache(maxsize=1024) +def fnmatch_pattern(pat: str) -> 're.Pattern[str]': + from fnmatch import translate + return re.compile(translate(pat)) + + class PasswordAuthorizer: - def __init__(self, password: str, auth_items: Tuple[str, ...]) -> None: - from fnmatch import translate - import runpy - self.password = password + def __init__(self, auth_items: FrozenSet[str]) -> None: self.command_patterns = [] self.function_checkers = [] - self.user_permission: Optional[bool] = None for item in auth_items: if item.endswith('.py'): - path = resolve_custom_file(item) - try: - m = runpy.run_path(path) - func: CMDChecker = m['is_cmd_allowed'] - except Exception as e: - log_error(f'Failed to load cmd check function from {path} with error: {e}') - self.function_checkers.append(CMDChecker()) - else: - self.function_checkers.append(func) + path = os.path.abspath(resolve_custom_file(item)) + self.function_checkers.append(is_cmd_allowed_loader(path)) else: - self.command_patterns.append(re.compile(translate(item))) + self.command_patterns.append(fnmatch_pattern(item)) - def is_cmd_allowed(self, pcmd: Dict[str, Any], window: Optional['Window'], from_socket: bool, extra_data: Dict[str, Any]) -> Optional[bool]: + def is_cmd_allowed(self, pcmd: Dict[str, Any], window: Optional['Window'], from_socket: bool, extra_data: Dict[str, Any]) -> bool: cmd_name = pcmd.get('cmd') if not cmd_name: return False @@ -108,7 +116,33 @@ class PasswordAuthorizer: for f in self.function_checkers: if f(pcmd, window, from_socket, extra_data): return True - return self.user_permission + return False + + +@lru_cache(maxsize=256) +def password_authorizer(auth_items: FrozenSet[str]) -> PasswordAuthorizer: + return PasswordAuthorizer(auth_items) + + +user_password_allowed: Dict[str, bool] = {} + + +def is_cmd_allowed(pcmd: Dict[str, Any], window: Optional['Window'], from_socket: bool, extra_data: Dict[str, Any]) -> Optional[bool]: + pw = pcmd.get('password', '') + if not pw: + return False + q = user_password_allowed.get(pw) + if q is not None: + return q + auth_items = get_options().remote_control_password.get(pw) + if auth_items is None: + return None + pa = password_authorizer(auth_items) + return pa.is_cmd_allowed(pcmd, window, from_socket, extra_data) + + +def set_user_password_allowed(pwd: str, allowed: bool = True) -> None: + user_password_allowed[pwd] = allowed def handle_cmd(boss: BossType, window: Optional[WindowType], cmd: Dict[str, Any], peer_id: int) -> Union[Dict[str, Any], None, AsyncResponse]: @@ -300,7 +334,7 @@ class CommandEncrypter: cmd['timestamp'] = time_ns() cmd['password'] = self.password raw = json.dumps(cmd).encode('utf-8') - encrypted = self.encrypter.add_data_to_be_encrypted(raw, finished=True) + encrypted = self.encrypter.add_data_to_be_encrypted(raw, True) return { 'version': version, 'iv': encode_as_base85(self.encrypter.iv), 'tag': encode_as_base85(self.encrypter.tag), 'pubkey': encode_as_base85(self.pubkey), 'encrypted': encode_as_base85(encrypted), 'enc_proto': self.encryption_version @@ -358,6 +392,8 @@ def get_password(opts: RCOptions) -> str: ans = os.environ.get(opts.password_env, '') if not ans and opts.use_password == 'always': raise SystemExit('No password was found') + if ans and len(ans) > 1024: + raise SystemExit('Specified password is too long') return ans diff --git a/kitty/window.py b/kitty/window.py index 9f2b5991c..bee2e9f63 100644 --- a/kitty/window.py +++ b/kitty/window.py @@ -461,6 +461,8 @@ global_watchers = GlobalWatchers() class Window: + window_custom_type: str = '' + def __init__( self, tab: TabType,