diff --git a/kitty/boss.py b/kitty/boss.py index 46351eecb..c15a58be3 100644 --- a/kitty/boss.py +++ b/kitty/boss.py @@ -445,7 +445,11 @@ class Boss: from .remote_control import handle_cmd, parse_cmd response = None window = window or None - pcmd = parse_cmd(cmd) + try: + pcmd = parse_cmd(cmd, self.encryption_key) + except Exception as e: + log_error(f'Failed to parse remote command with error: {e}') + return response if not pcmd: return response if self.allow_remote_control == 'y' or peer_id > 0 or getattr(window, 'allow_remote_control', False): diff --git a/kitty/remote_control.py b/kitty/remote_control.py index 5a7e56030..56a295ade 100644 --- a/kitty/remote_control.py +++ b/kitty/remote_control.py @@ -18,8 +18,8 @@ from .cli import emph, parse_args from .cli_stub import RCOptions from .constants import RC_ENCRYPTION_PROTOCOL_VERSION, appname, version from .fast_data_types import ( - AES256GCMEncrypt, EllipticCurveKey, get_boss, read_command_response, - send_data_to_peer + AES256GCMDecrypt, AES256GCMEncrypt, EllipticCurveKey, get_boss, + read_command_response, send_data_to_peer ) from .rc.base import ( NoResponse, ParsingOfArgsFailed, PayloadGetter, all_command_names, @@ -27,7 +27,7 @@ from .rc.base import ( ) from .types import AsyncResponse from .typing import BossType, WindowType -from .utils import TTYIO, parse_address_spec, resolve_custom_file +from .utils import TTYIO, log_error, parse_address_spec, resolve_custom_file active_async_requests: Dict[str, float] = {} @@ -36,15 +36,32 @@ def encode_response_for_peer(response: Any) -> bytes: return b'\x1bP@kitty-cmd' + json.dumps(response).encode('utf-8') + b'\x1b\\' -def parse_cmd(serialized_cmd: str) -> Dict[str, Any]: +def parse_cmd(serialized_cmd: str, encryption_key: EllipticCurveKey) -> Dict[str, Any]: try: pcmd = json.loads(serialized_cmd) - if not isinstance(pcmd, dict): - return {} except Exception: return {} - if 'version' not in pcmd: + if not isinstance(pcmd, dict) or 'version' not in pcmd: return {} + pcmd.pop('password', None) + if 'encrypted' in pcmd: + if pcmd.get('enc_proto') != RC_ENCRYPTION_PROTOCOL_VERSION: + log_error(f'Ignoring encrypted rc command with unsupported protocol: {pcmd.get("enc_proto")}') + return {} + pubkey = pcmd.get('pubkey', '') + if not pubkey: + log_error('Ignoring encrypted rc command without a public key') + d = AES256GCMDecrypt(encryption_key.derive_secret(base64.b85decode(pubkey)), pcmd['iv'], pcmd['tag']) + data = d.add_data_to_be_decrypted(base64.b85decode(pcmd['encrypted']), finished=True) + pcmd = json.loads(data) + if not isinstance(pcmd, dict) or 'version' not in pcmd: + return {} + delta = time_ns() - pcmd.pop('timestamp') + if abs(delta) > 5 * 60 * 1e9: + log_error( + f'Ignoring encrypted rc command with timestamp {delta / 1e9:.1f} seconds from now.' + ' Could be an attempt at a replay attack or an incorrect clock on a remote machine.') + return {} return pcmd