Fix password based rc not working with generator responses

This commit is contained in:
Kovid Goyal 2022-08-10 17:24:10 +05:30
parent e9ce5c02d0
commit 0fe4f5a686
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 46 additions and 42 deletions

View File

@ -277,16 +277,18 @@ class RCIO(TTYIO):
return b''.join(ans)
def do_io(to: Optional[str], send: Dict[str, Any], no_response: bool, response_timeout: float) -> Dict[str, Any]:
payload = send.get('payload')
def do_io(
to: Optional[str], original_cmd: Dict[str, Any], no_response: bool, response_timeout: float, encrypter: 'CommandEncrypter'
) -> Dict[str, Any]:
payload = original_cmd.get('payload')
if not isinstance(payload, GeneratorType):
send_data: Union[bytes, Iterator[bytes]] = encode_send(send)
send_data: Union[bytes, Iterator[bytes]] = encode_send(encrypter(original_cmd))
else:
def send_generator() -> Iterator[bytes]:
assert payload is not None
for chunk in payload:
send['payload'] = chunk
yield encode_send(send)
original_cmd['payload'] = chunk
yield encode_send(encrypter(original_cmd))
send_data = send_generator()
io: Union[SocketIO, RCIO] = SocketIO(to) if to else RCIO()
@ -322,24 +324,42 @@ def encode_as_base85(data: bytes) -> str:
class CommandEncrypter:
encrypts: bool = True
def __init__(self, pubkey: bytes, encryption_version: str, password: str) -> None:
skey = EllipticCurveKey()
secret = skey.derive_secret(pubkey)
self.secret = skey.derive_secret(pubkey)
self.pubkey = skey.public
self.encrypter = AES256GCMEncrypt(secret)
self.encryption_version = encryption_version
self.password = password
def __call__(self, cmd: Dict[str, Any]) -> Dict[str, Any]:
encrypter = AES256GCMEncrypt(self.secret)
cmd['timestamp'] = time_ns()
cmd['password'] = self.password
raw = json.dumps(cmd).encode('utf-8')
encrypted = self.encrypter.add_data_to_be_encrypted(raw, True)
encrypted = encrypter.add_data_to_be_encrypted(raw, True)
return {
'version': version, 'iv': encode_as_base85(self.encrypter.iv), 'tag': encode_as_base85(self.encrypter.tag),
'version': version, 'iv': encode_as_base85(encrypter.iv), 'tag': encode_as_base85(encrypter.tag),
'pubkey': encode_as_base85(self.pubkey), 'encrypted': encode_as_base85(encrypted), 'enc_proto': self.encryption_version
}
def adjust_response_timeout_for_password(self, response_timeout: float) -> float:
return max(response_timeout, 120)
class NoEncryption(CommandEncrypter):
encrypts: bool = False
def __init__(self) -> None: ...
def __call__(self, cmd: Dict[str, Any]) -> Dict[str, Any]:
return cmd
def adjust_response_timeout_for_password(self, response_timeout: float) -> float:
return response_timeout
def create_basic_command(name: str, payload: Any = None, no_response: bool = False, is_asynchronous: bool = False) -> Dict[str, Any]:
ans = {'cmd': name, 'version': version, 'no_response': no_response}
@ -408,10 +428,6 @@ def get_pubkey() -> Tuple[str, bytes]:
return version, b85decode(pubkey)
def adjust_response_timeout_for_password(response_timeout: float) -> float:
return max(response_timeout, 120)
def main(args: List[str]) -> None:
global_opts, items = parse_rc_args(args)
password = get_password(global_opts)
@ -419,7 +435,7 @@ def main(args: List[str]) -> None:
encryption_version, pubkey = get_pubkey()
encrypter = CommandEncrypter(pubkey, encryption_version, password)
else:
encrypter = None
encrypter = NoEncryption()
if not items:
from kitty.shell import main as smain
@ -442,11 +458,8 @@ def main(args: List[str]) -> None:
response_timeout = c.response_timeout
if hasattr(opts, 'response_timeout'):
response_timeout = opts.response_timeout
if encrypter is not None:
response_timeout = adjust_response_timeout_for_password(response_timeout)
send = original_send_cmd = create_basic_command(cmd, payload=payload, no_response=no_response, is_asynchronous=c.is_asynchronous)
if encrypter is not None:
send = encrypter(original_send_cmd)
response_timeout = encrypter.adjust_response_timeout_for_password(response_timeout)
send = create_basic_command(cmd, payload=payload, no_response=no_response, is_asynchronous=c.is_asynchronous)
listen_on_from_env = False
if not global_opts.to and 'KITTY_LISTEN_ON' in os.environ:
global_opts.to = os.environ['KITTY_LISTEN_ON']
@ -461,14 +474,12 @@ def main(args: List[str]) -> None:
exit(msg)
import socket
try:
response = do_io(global_opts.to, send, no_response, response_timeout)
response = do_io(global_opts.to, send, no_response, response_timeout, encrypter)
except (TimeoutError, socket.timeout):
original_send_cmd.pop('payload', None)
original_send_cmd['cancel_async'] = True
if encrypter is not None:
send = encrypter(original_send_cmd)
send.pop('payload', None)
send['cancel_async'] = True
try:
do_io(global_opts.to, send, True, 10)
do_io(global_opts.to, send, True, 10, encrypter)
except KeyboardInterrupt:
sys.excepthook = lambda *a: print('Interrupted by user', file=sys.stderr)
raise

View File

@ -8,9 +8,7 @@ import sys
import traceback
from contextlib import suppress
from functools import lru_cache
from typing import (
Any, Callable, Dict, Generator, Iterable, List, Optional, Tuple
)
from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple
from kittens.tui.operations import set_cursor_shape, set_window_title
@ -24,6 +22,9 @@ from .rc.base import (
ParsingOfArgsFailed, RemoteCommand, all_command_names, command_for_name,
display_subcommand_help, parse_subcommand_cli
)
from .remote_control import (
CommandEncrypter, NoEncryption, create_basic_command, do_io
)
from .types import run_once
output_prefix = '\x1b]133;C\x1b\\'
@ -145,32 +146,24 @@ def print_help(which: Optional[str] = None) -> None:
def run_cmd(
global_opts: RCOptions, cmd: str, func: RemoteCommand, opts: Any, items: List[str],
encrypter: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None
encrypter: CommandEncrypter = NoEncryption()
) -> None:
from .remote_control import (
adjust_response_timeout_for_password, create_basic_command, do_io
)
print(end=set_window_title(cmd) + output_prefix, flush=True)
payload = func.message_to_kitty(global_opts, opts, items)
no_response = func.no_response
if hasattr(opts, 'no_response'):
no_response = opts.no_response
send = original_send_cmd = create_basic_command(cmd, payload=payload, is_asynchronous=func.is_asynchronous, no_response=no_response)
if encrypter is not None:
send = encrypter(original_send_cmd)
response_timeout = func.response_timeout
if hasattr(opts, 'response_timeout'):
response_timeout = opts.response_timeout
if encrypter is not None:
response_timeout = adjust_response_timeout_for_password(response_timeout)
response_timeout = encrypter.adjust_response_timeout_for_password(response_timeout)
try:
response = do_io(global_opts.to, send, no_response, response_timeout)
response = do_io(global_opts.to, send, no_response, response_timeout, encrypter)
except TimeoutError:
original_send_cmd.pop('payload', None)
original_send_cmd['cancel_async'] = True
if encrypter is not None:
send = encrypter(original_send_cmd)
do_io(global_opts.to, send, True, 10)
do_io(global_opts.to, send, True, 10, encrypter)
print_err(f'Timed out after {response_timeout} seconds waiting for response from kitty')
return
if not response.get('ok'):
@ -182,7 +175,7 @@ def run_cmd(
print(response['data'])
def real_main(global_opts: RCOptions, encrypter: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None) -> None:
def real_main(global_opts: RCOptions, encrypter: CommandEncrypter = NoEncryption()) -> None:
init_readline()
print_help_for_seq.allow_pager = False
print('Welcome to the kitty shell!')
@ -259,7 +252,7 @@ def real_main(global_opts: RCOptions, encrypter: Optional[Callable[[Dict[str, An
continue
def main(global_opts: RCOptions, encrypter: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None) -> None:
def main(global_opts: RCOptions, encrypter: CommandEncrypter = NoEncryption()) -> None:
try:
with Completer():
real_main(global_opts, encrypter)