Write code to decrypt commands received in kitty

This commit is contained in:
Kovid Goyal 2022-08-09 21:05:09 +05:30
parent c7e3c92a0a
commit 56e83d7d07
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 29 additions and 8 deletions

View File

@ -445,7 +445,11 @@ class Boss:
from .remote_control import handle_cmd, parse_cmd from .remote_control import handle_cmd, parse_cmd
response = None response = None
window = window or None window = window or None
pcmd = parse_cmd(cmd) try:
pcmd = parse_cmd(cmd, self.encryption_key)
except Exception as e:
log_error(f'Failed to parse remote command with error: {e}')
return response
if not pcmd: if not pcmd:
return response return response
if self.allow_remote_control == 'y' or peer_id > 0 or getattr(window, 'allow_remote_control', False): if self.allow_remote_control == 'y' or peer_id > 0 or getattr(window, 'allow_remote_control', False):

View File

@ -18,8 +18,8 @@ from .cli import emph, parse_args
from .cli_stub import RCOptions from .cli_stub import RCOptions
from .constants import RC_ENCRYPTION_PROTOCOL_VERSION, appname, version from .constants import RC_ENCRYPTION_PROTOCOL_VERSION, appname, version
from .fast_data_types import ( from .fast_data_types import (
AES256GCMEncrypt, EllipticCurveKey, get_boss, read_command_response, AES256GCMDecrypt, AES256GCMEncrypt, EllipticCurveKey, get_boss,
send_data_to_peer read_command_response, send_data_to_peer
) )
from .rc.base import ( from .rc.base import (
NoResponse, ParsingOfArgsFailed, PayloadGetter, all_command_names, NoResponse, ParsingOfArgsFailed, PayloadGetter, all_command_names,
@ -27,7 +27,7 @@ from .rc.base import (
) )
from .types import AsyncResponse from .types import AsyncResponse
from .typing import BossType, WindowType from .typing import BossType, WindowType
from .utils import TTYIO, parse_address_spec, resolve_custom_file from .utils import TTYIO, log_error, parse_address_spec, resolve_custom_file
active_async_requests: Dict[str, float] = {} active_async_requests: Dict[str, float] = {}
@ -36,15 +36,32 @@ def encode_response_for_peer(response: Any) -> bytes:
return b'\x1bP@kitty-cmd' + json.dumps(response).encode('utf-8') + b'\x1b\\' return b'\x1bP@kitty-cmd' + json.dumps(response).encode('utf-8') + b'\x1b\\'
def parse_cmd(serialized_cmd: str) -> Dict[str, Any]: def parse_cmd(serialized_cmd: str, encryption_key: EllipticCurveKey) -> Dict[str, Any]:
try: try:
pcmd = json.loads(serialized_cmd) pcmd = json.loads(serialized_cmd)
if not isinstance(pcmd, dict):
return {}
except Exception: except Exception:
return {} return {}
if 'version' not in pcmd: if not isinstance(pcmd, dict) or 'version' not in pcmd:
return {} return {}
pcmd.pop('password', None)
if 'encrypted' in pcmd:
if pcmd.get('enc_proto') != RC_ENCRYPTION_PROTOCOL_VERSION:
log_error(f'Ignoring encrypted rc command with unsupported protocol: {pcmd.get("enc_proto")}')
return {}
pubkey = pcmd.get('pubkey', '')
if not pubkey:
log_error('Ignoring encrypted rc command without a public key')
d = AES256GCMDecrypt(encryption_key.derive_secret(base64.b85decode(pubkey)), pcmd['iv'], pcmd['tag'])
data = d.add_data_to_be_decrypted(base64.b85decode(pcmd['encrypted']), finished=True)
pcmd = json.loads(data)
if not isinstance(pcmd, dict) or 'version' not in pcmd:
return {}
delta = time_ns() - pcmd.pop('timestamp')
if abs(delta) > 5 * 60 * 1e9:
log_error(
f'Ignoring encrypted rc command with timestamp {delta / 1e9:.1f} seconds from now.'
' Could be an attempt at a replay attack or an incorrect clock on a remote machine.')
return {}
return pcmd return pcmd