diff --git a/kitty/constants.py b/kitty/constants.py index c4c25b8bc..201f4807b 100644 --- a/kitty/constants.py +++ b/kitty/constants.py @@ -28,6 +28,7 @@ _plat = sys.platform.lower() is_macos: bool = 'darwin' in _plat is_freebsd: bool = 'freebsd' in _plat is_running_from_develop: bool = False +RC_ENCRYPTION_PROTOCOL_VERSION = '1' if getattr(sys, 'frozen', False): extensions_dir: str = getattr(sys, 'kitty_run_data')['extensions_dir'] diff --git a/kitty/remote_control.py b/kitty/remote_control.py index e7a33a751..5a7e56030 100644 --- a/kitty/remote_control.py +++ b/kitty/remote_control.py @@ -1,13 +1,14 @@ #!/usr/bin/env python3 # License: GPL v3 Copyright: 2018, Kovid Goyal +import base64 import json import os import re import sys from contextlib import suppress from functools import partial -from time import monotonic +from time import monotonic, time_ns from types import GeneratorType from typing import ( Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union, cast @@ -15,15 +16,18 @@ from typing import ( from .cli import emph, parse_args from .cli_stub import RCOptions -from .constants import appname, version -from .fast_data_types import get_boss, read_command_response, send_data_to_peer +from .constants import RC_ENCRYPTION_PROTOCOL_VERSION, appname, version +from .fast_data_types import ( + AES256GCMEncrypt, EllipticCurveKey, get_boss, read_command_response, + send_data_to_peer +) from .rc.base import ( NoResponse, ParsingOfArgsFailed, PayloadGetter, all_command_names, command_for_name, parse_subcommand_cli ) from .types import AsyncResponse from .typing import BossType, WindowType -from .utils import TTYIO, parse_address_spec +from .utils import TTYIO, parse_address_spec, resolve_custom_file active_async_requests: Dict[str, float] = {} @@ -89,6 +93,36 @@ An address for the kitty instance to control. Corresponds to the address given to the kitty instance via the :option:`kitty --listen-on` option. If not specified, messages are sent to the controlling terminal for this process, i.e. they will only work if this process is run within an existing kitty window. + + +--password +A password to use when contacting kitty. This will cause kitty to ask the user +for permission to perform the specified action, unless the password has been +accepted before or is pre-configured in :file:`kitty.conf`. + + +--password-file +completion=type:file relative:conf kwds:- +default=rc-pass +A file from which to read the password. Trailing whitespace is ignored. Relative +paths are resolved from the kitty configuration directory. Use - to read from STDIN. +Used if no :option:`kitty @ --password` is supplied. Defaults to checking for the +:file:`rc-pass` file in the kitty configuration directory. + + +--password-env +default=KITTY_RC_PASSWORD +The name of an environment variable to read the password from. +Used if no :option:`kitty @ --password-file` is supplied. Defaults +to checking the KITTY_RC_PASSWORD. + + +--use-password +default=if-available +choices=if-available,never,always +If no password is available, kitty will usually just send the remote control command +without a password. This option can be used to force it to :code:`always` or :code:`never` use +the supplied password. '''.format, appname=appname) @@ -169,9 +203,9 @@ def do_io(to: Optional[str], send: Dict[str, Any], no_response: bool, response_t cli_msg = ( - 'Control {appname} by sending it commands. Set the' - ' :opt:`allow_remote_control` option in :file:`kitty.conf` or use a password, for this' - ' to work.' + 'Control {appname} by sending it commands. Set the' + ' :opt:`allow_remote_control` option in :file:`kitty.conf` or use a password, for this' + ' to work.' ).format(appname=appname) @@ -185,6 +219,31 @@ def parse_rc_args(args: List[str]) -> Tuple[RCOptions, List[str]]: return parse_args(args[1:], global_options_spec, 'command ...', msg, f'{appname} @', result_class=RCOptions) +def encode_as_base85(data: bytes) -> str: + return base64.b85encode(data).decode('ascii') + + +class CommandEncrypter: + + def __init__(self, pubkey: bytes, encryption_version: str, password: str) -> None: + skey = EllipticCurveKey() + secret = skey.derive_secret(pubkey) + self.pubkey = skey.public + self.encrypter = AES256GCMEncrypt(secret) + self.encryption_version = encryption_version + self.password = password + + def __call__(self, cmd: Dict[str, Any]) -> Dict[str, Any]: + cmd['timestamp'] = time_ns() + cmd['password'] = self.password + raw = json.dumps(cmd).encode('utf-8') + encrypted = self.encrypter.add_data_to_be_encrypted(raw, finished=True) + return { + 'version': version, 'iv': encode_as_base85(self.encrypter.iv), 'tag': encode_as_base85(self.encrypter.tag), + 'pubkey': encode_as_base85(self.pubkey), 'encrypted': encode_as_base85(encrypted), 'enc_proto': self.encryption_version + } + + def create_basic_command(name: str, payload: Any = None, no_response: bool = False, is_asynchronous: bool = False) -> Dict[str, Any]: ans = {'cmd': name, 'version': version, 'no_response': no_response} if payload is not None: @@ -211,12 +270,61 @@ def send_response_to_client(data: Any = None, error: str = '', peer_id: int = 0, w.send_cmd_response(response) +def get_password(opts: RCOptions) -> str: + if opts.use_password == 'never': + return '' + ans = '' + if opts.password: + ans = opts.password + if not ans and opts.password_file: + if opts.password_file == '-': + ans = sys.stdin.read().strip() + try: + tty_fd = os.open(os.ctermid(), os.O_RDONLY | os.O_CLOEXEC) + except OSError: + pass + else: + os.dup2(tty_fd, sys.stdin.fileno()) + else: + try: + with open(resolve_custom_file(opts.password_file)) as f: + ans = f.read().strip() + except OSError: + pass + if not ans and opts.password_env: + ans = os.environ.get(opts.password_env, '') + if not ans and opts.use_password == 'always': + raise SystemExit('No password was found') + return ans + + +def get_pubkey() -> Tuple[str, bytes]: + raw = os.environ.get('KITTY_PUBLIC_KEY', '') + if not raw: + raise SystemExit('Password usage requested but KITTY_PUBLIC_KEY environment variable is not available') + version, pubkey = raw.split(':', 1) + if version != RC_ENCRYPTION_PROTOCOL_VERSION: + raise SystemExit('KITTY_PUBLIC_KEY has unknown version, if you are running on a remote system, update kitty on this system') + from base64 import b85decode + return version, b85decode(pubkey) + + +def adjust_response_timeout_for_password(response_timeout: float) -> float: + return max(response_timeout, 120) + + def main(args: List[str]) -> None: global_opts, items = parse_rc_args(args) + password = get_password(global_opts) + if password: + encryption_version, pubkey = get_pubkey() + encrypter = CommandEncrypter(pubkey, encryption_version, password) + else: + encrypter = None if not items: from kitty.shell import main as smain - smain(global_opts) + smain(global_opts, encrypter) return cmd = items[0] try: @@ -235,7 +343,11 @@ def main(args: List[str]) -> None: response_timeout = c.response_timeout if hasattr(opts, 'response_timeout'): response_timeout = opts.response_timeout - send = create_basic_command(cmd, payload=payload, no_response=no_response, is_asynchronous=c.is_asynchronous) + if encrypter is not None: + response_timeout = adjust_response_timeout_for_password(response_timeout) + send = original_send_cmd = create_basic_command(cmd, payload=payload, no_response=no_response, is_asynchronous=c.is_asynchronous) + if encrypter is not None: + send = encrypter(original_send_cmd) listen_on_from_env = False if not global_opts.to and 'KITTY_LISTEN_ON' in os.environ: global_opts.to = os.environ['KITTY_LISTEN_ON'] @@ -252,8 +364,10 @@ def main(args: List[str]) -> None: try: response = do_io(global_opts.to, send, no_response, response_timeout) except (TimeoutError, socket.timeout): - send.pop('payload', None) - send['cancel_async'] = True + original_send_cmd.pop('payload', None) + original_send_cmd['cancel_async'] = True + if encrypter is not None: + send = encrypter(original_send_cmd) do_io(global_opts.to, send, True, 10) raise SystemExit(f'Timed out after {response_timeout} seconds waiting for response from kitty') if no_response: diff --git a/kitty/shell.py b/kitty/shell.py index 3c9cc1d03..9324b21e2 100644 --- a/kitty/shell.py +++ b/kitty/shell.py @@ -8,7 +8,9 @@ import sys import traceback from contextlib import suppress from functools import lru_cache -from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple +from typing import ( + Any, Callable, Dict, Generator, Iterable, List, Optional, Tuple +) from kittens.tui.operations import set_cursor_shape, set_window_title @@ -141,22 +143,33 @@ def print_help(which: Optional[str] = None) -> None: display_subcommand_help(func) -def run_cmd(global_opts: RCOptions, cmd: str, func: RemoteCommand, opts: Any, items: List[str]) -> None: - from .remote_control import create_basic_command, do_io +def run_cmd( + global_opts: RCOptions, cmd: str, func: RemoteCommand, opts: Any, items: List[str], + encrypter: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None +) -> None: + from .remote_control import ( + adjust_response_timeout_for_password, create_basic_command, do_io + ) print(end=set_window_title(cmd) + output_prefix, flush=True) payload = func.message_to_kitty(global_opts, opts, items) no_response = func.no_response if hasattr(opts, 'no_response'): no_response = opts.no_response - send = create_basic_command(cmd, payload=payload, is_asynchronous=func.is_asynchronous, no_response=no_response) + send = original_send_cmd = create_basic_command(cmd, payload=payload, is_asynchronous=func.is_asynchronous, no_response=no_response) + if encrypter is not None: + send = encrypter(original_send_cmd) response_timeout = func.response_timeout if hasattr(opts, 'response_timeout'): response_timeout = opts.response_timeout + if encrypter is not None: + response_timeout = adjust_response_timeout_for_password(response_timeout) try: response = do_io(global_opts.to, send, no_response, response_timeout) except TimeoutError: - send.pop('payload', None) - send['cancel_async'] = True + original_send_cmd.pop('payload', None) + original_send_cmd['cancel_async'] = True + if encrypter is not None: + send = encrypter(original_send_cmd) do_io(global_opts.to, send, True, 10) print_err(f'Timed out after {response_timeout} seconds waiting for response from kitty') return @@ -169,7 +182,7 @@ def run_cmd(global_opts: RCOptions, cmd: str, func: RemoteCommand, opts: Any, it print(response['data']) -def real_main(global_opts: RCOptions) -> None: +def real_main(global_opts: RCOptions, encrypter: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None) -> None: init_readline() print_help_for_seq.allow_pager = False print('Welcome to the kitty shell!') @@ -230,7 +243,7 @@ def real_main(global_opts: RCOptions) -> None: continue else: try: - run_cmd(global_opts, cmd, func, opts, items) + run_cmd(global_opts, cmd, func, opts, items, encrypter) except (SystemExit, ParsingOfArgsFailed) as e: print(end=output_prefix, flush=True) print_err(e) @@ -246,10 +259,10 @@ def real_main(global_opts: RCOptions) -> None: continue -def main(global_opts: RCOptions) -> None: +def main(global_opts: RCOptions, encrypter: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None) -> None: try: with Completer(): - real_main(global_opts) + real_main(global_opts, encrypter) except Exception: traceback.print_exc() input('Press Enter to quit')