diff --git a/kitty/remote_control.py b/kitty/remote_control.py index 33c2a141b..8d4b4e537 100644 --- a/kitty/remote_control.py +++ b/kitty/remote_control.py @@ -277,16 +277,18 @@ class RCIO(TTYIO): return b''.join(ans) -def do_io(to: Optional[str], send: Dict[str, Any], no_response: bool, response_timeout: float) -> Dict[str, Any]: - payload = send.get('payload') +def do_io( + to: Optional[str], original_cmd: Dict[str, Any], no_response: bool, response_timeout: float, encrypter: 'CommandEncrypter' +) -> Dict[str, Any]: + payload = original_cmd.get('payload') if not isinstance(payload, GeneratorType): - send_data: Union[bytes, Iterator[bytes]] = encode_send(send) + send_data: Union[bytes, Iterator[bytes]] = encode_send(encrypter(original_cmd)) else: def send_generator() -> Iterator[bytes]: assert payload is not None for chunk in payload: - send['payload'] = chunk - yield encode_send(send) + original_cmd['payload'] = chunk + yield encode_send(encrypter(original_cmd)) send_data = send_generator() io: Union[SocketIO, RCIO] = SocketIO(to) if to else RCIO() @@ -322,24 +324,42 @@ def encode_as_base85(data: bytes) -> str: class CommandEncrypter: + encrypts: bool = True + def __init__(self, pubkey: bytes, encryption_version: str, password: str) -> None: skey = EllipticCurveKey() - secret = skey.derive_secret(pubkey) + self.secret = skey.derive_secret(pubkey) self.pubkey = skey.public - self.encrypter = AES256GCMEncrypt(secret) self.encryption_version = encryption_version self.password = password def __call__(self, cmd: Dict[str, Any]) -> Dict[str, Any]: + encrypter = AES256GCMEncrypt(self.secret) cmd['timestamp'] = time_ns() cmd['password'] = self.password raw = json.dumps(cmd).encode('utf-8') - encrypted = self.encrypter.add_data_to_be_encrypted(raw, True) + encrypted = encrypter.add_data_to_be_encrypted(raw, True) return { - 'version': version, 'iv': encode_as_base85(self.encrypter.iv), 'tag': encode_as_base85(self.encrypter.tag), + 'version': version, 'iv': encode_as_base85(encrypter.iv), 'tag': encode_as_base85(encrypter.tag), 'pubkey': encode_as_base85(self.pubkey), 'encrypted': encode_as_base85(encrypted), 'enc_proto': self.encryption_version } + def adjust_response_timeout_for_password(self, response_timeout: float) -> float: + return max(response_timeout, 120) + + +class NoEncryption(CommandEncrypter): + + encrypts: bool = False + + def __init__(self) -> None: ... + + def __call__(self, cmd: Dict[str, Any]) -> Dict[str, Any]: + return cmd + + def adjust_response_timeout_for_password(self, response_timeout: float) -> float: + return response_timeout + def create_basic_command(name: str, payload: Any = None, no_response: bool = False, is_asynchronous: bool = False) -> Dict[str, Any]: ans = {'cmd': name, 'version': version, 'no_response': no_response} @@ -408,10 +428,6 @@ def get_pubkey() -> Tuple[str, bytes]: return version, b85decode(pubkey) -def adjust_response_timeout_for_password(response_timeout: float) -> float: - return max(response_timeout, 120) - - def main(args: List[str]) -> None: global_opts, items = parse_rc_args(args) password = get_password(global_opts) @@ -419,7 +435,7 @@ def main(args: List[str]) -> None: encryption_version, pubkey = get_pubkey() encrypter = CommandEncrypter(pubkey, encryption_version, password) else: - encrypter = None + encrypter = NoEncryption() if not items: from kitty.shell import main as smain @@ -442,11 +458,8 @@ def main(args: List[str]) -> None: response_timeout = c.response_timeout if hasattr(opts, 'response_timeout'): response_timeout = opts.response_timeout - if encrypter is not None: - response_timeout = adjust_response_timeout_for_password(response_timeout) - send = original_send_cmd = create_basic_command(cmd, payload=payload, no_response=no_response, is_asynchronous=c.is_asynchronous) - if encrypter is not None: - send = encrypter(original_send_cmd) + response_timeout = encrypter.adjust_response_timeout_for_password(response_timeout) + send = create_basic_command(cmd, payload=payload, no_response=no_response, is_asynchronous=c.is_asynchronous) listen_on_from_env = False if not global_opts.to and 'KITTY_LISTEN_ON' in os.environ: global_opts.to = os.environ['KITTY_LISTEN_ON'] @@ -461,14 +474,12 @@ def main(args: List[str]) -> None: exit(msg) import socket try: - response = do_io(global_opts.to, send, no_response, response_timeout) + response = do_io(global_opts.to, send, no_response, response_timeout, encrypter) except (TimeoutError, socket.timeout): - original_send_cmd.pop('payload', None) - original_send_cmd['cancel_async'] = True - if encrypter is not None: - send = encrypter(original_send_cmd) + send.pop('payload', None) + send['cancel_async'] = True try: - do_io(global_opts.to, send, True, 10) + do_io(global_opts.to, send, True, 10, encrypter) except KeyboardInterrupt: sys.excepthook = lambda *a: print('Interrupted by user', file=sys.stderr) raise diff --git a/kitty/shell.py b/kitty/shell.py index 9324b21e2..be266406b 100644 --- a/kitty/shell.py +++ b/kitty/shell.py @@ -8,9 +8,7 @@ import sys import traceback from contextlib import suppress from functools import lru_cache -from typing import ( - Any, Callable, Dict, Generator, Iterable, List, Optional, Tuple -) +from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple from kittens.tui.operations import set_cursor_shape, set_window_title @@ -24,6 +22,9 @@ from .rc.base import ( ParsingOfArgsFailed, RemoteCommand, all_command_names, command_for_name, display_subcommand_help, parse_subcommand_cli ) +from .remote_control import ( + CommandEncrypter, NoEncryption, create_basic_command, do_io +) from .types import run_once output_prefix = '\x1b]133;C\x1b\\' @@ -145,32 +146,24 @@ def print_help(which: Optional[str] = None) -> None: def run_cmd( global_opts: RCOptions, cmd: str, func: RemoteCommand, opts: Any, items: List[str], - encrypter: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None + encrypter: CommandEncrypter = NoEncryption() ) -> None: - from .remote_control import ( - adjust_response_timeout_for_password, create_basic_command, do_io - ) print(end=set_window_title(cmd) + output_prefix, flush=True) payload = func.message_to_kitty(global_opts, opts, items) no_response = func.no_response if hasattr(opts, 'no_response'): no_response = opts.no_response send = original_send_cmd = create_basic_command(cmd, payload=payload, is_asynchronous=func.is_asynchronous, no_response=no_response) - if encrypter is not None: - send = encrypter(original_send_cmd) response_timeout = func.response_timeout if hasattr(opts, 'response_timeout'): response_timeout = opts.response_timeout - if encrypter is not None: - response_timeout = adjust_response_timeout_for_password(response_timeout) + response_timeout = encrypter.adjust_response_timeout_for_password(response_timeout) try: - response = do_io(global_opts.to, send, no_response, response_timeout) + response = do_io(global_opts.to, send, no_response, response_timeout, encrypter) except TimeoutError: original_send_cmd.pop('payload', None) original_send_cmd['cancel_async'] = True - if encrypter is not None: - send = encrypter(original_send_cmd) - do_io(global_opts.to, send, True, 10) + do_io(global_opts.to, send, True, 10, encrypter) print_err(f'Timed out after {response_timeout} seconds waiting for response from kitty') return if not response.get('ok'): @@ -182,7 +175,7 @@ def run_cmd( print(response['data']) -def real_main(global_opts: RCOptions, encrypter: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None) -> None: +def real_main(global_opts: RCOptions, encrypter: CommandEncrypter = NoEncryption()) -> None: init_readline() print_help_for_seq.allow_pager = False print('Welcome to the kitty shell!') @@ -259,7 +252,7 @@ def real_main(global_opts: RCOptions, encrypter: Optional[Callable[[Dict[str, An continue -def main(global_opts: RCOptions, encrypter: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None) -> None: +def main(global_opts: RCOptions, encrypter: CommandEncrypter = NoEncryption()) -> None: try: with Completer(): real_main(global_opts, encrypter)