Implement the permissions system for password based rc requests

This commit is contained in:
Kovid Goyal 2022-08-10 15:49:50 +05:30
parent fe07825ad9
commit 2c83b9902e
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
5 changed files with 126 additions and 35 deletions

View File

@ -38,10 +38,10 @@ from .fast_data_types import (
focus_os_window, get_boss, get_clipboard_string, get_options,
get_os_window_size, global_font_size, mark_os_window_for_close,
os_window_font_size, patch_global_colors, redirect_mouse_handling,
ring_bell, safe_pipe, set_application_quit_request, set_background_image,
set_boss, set_clipboard_string, set_in_sequence_mode, set_options,
set_os_window_size, set_os_window_title, thread_write, toggle_fullscreen,
toggle_maximized, toggle_secure_input
ring_bell, safe_pipe, send_data_to_peer, set_application_quit_request,
set_background_image, set_boss, set_clipboard_string, set_in_sequence_mode,
set_options, set_os_window_size, set_os_window_title, thread_write,
toggle_fullscreen, toggle_maximized, toggle_secure_input
)
from .key_encoding import get_name_to_functional_number_map
from .keys import get_shortcut, shortcut_matches
@ -67,7 +67,6 @@ from .utils import (
)
from .window import CommandOutput, CwdRequest, Window
RCResponse = Union[Dict[str, Any], None, AsyncResponse]
@ -445,7 +444,7 @@ class Boss:
self.window_id_map[window.id] = window
def _handle_remote_command(self, cmd: str, window: Optional[Window] = None, peer_id: int = 0) -> RCResponse:
from .remote_control import parse_cmd
from .remote_control import is_cmd_allowed, parse_cmd
response = None
window = window or None
try:
@ -460,11 +459,62 @@ class Boss:
getattr(window, 'allow_remote_control', False))
if allowed_by_channel:
return self._execute_remote_command(pcmd, window, peer_id)
q = is_cmd_allowed(pcmd, window, peer_id > 0, {})
if q is True:
return self._execute_remote_command(pcmd, window, peer_id)
if q is None:
if self.ask_if_remote_cmd_is_allowed(pcmd, window, peer_id):
return AsyncResponse()
no_response = pcmd.get('no_response') or False
if no_response:
return None
return {'ok': False, 'error': 'Remote control is disabled. Add allow_remote_control to your kitty.conf'}
def ask_if_remote_cmd_is_allowed(self, pcmd: Dict[str, Any], window: Optional[Window] = None, peer_id: int = 0) -> bool:
from kittens.tui.operations import styled
in_flight = 0
for w in self.window_id_map.values():
if w.window_custom_type == 'remote_command_permission_dialog':
in_flight += 1
if in_flight > 4:
log_error('Denying remote command permission as there are too many existing permission requests')
return False
wid = 0 if window is None else window.id
overlay_window = self.choose(
_('A program wishes to control kitty.\n\n'
'Password: {0}\n' 'Action: {1}\n\n'
'Note that allowing the password will allow all future actions using the same password, in this kitty instance.'
).format(styled(pcmd['password'], fg='yellow'), styled(pcmd['cmd'], fg='magenta')),
partial(self.remote_cmd_permission_received, pcmd, wid, peer_id),
't;green:Allow this request', 'p;yellow:Allow this password', 'r;magenta:Deny this request', 'd;red:Deny this password',
window=window, default='t'
)
if overlay_window is None:
return False
overlay_window.window_custom_type = 'remote_command_permission_dialog'
return True
def remote_cmd_permission_received(self, pcmd: Dict[str, Any], window_id: int, peer_id: int, choice: str) -> None:
from .remote_control import (
encode_response_for_peer, set_user_password_allowed
)
if choice in ('r', 'd'):
if choice == 'd':
set_user_password_allowed(pcmd['password'], False)
elif choice in ('t', 'p'):
if choice == 'p':
set_user_password_allowed(pcmd['password'], True)
window = self.window_id_map.get(window_id)
response = self._execute_remote_command(pcmd, window, peer_id)
if window is not None and response is not None and not isinstance(response, AsyncResponse):
window.send_cmd_response(response)
if peer_id > 0:
if response is None or isinstance(response, AsyncResponse):
data = b''
else:
data = encode_response_for_peer(response)
send_data_to_peer(peer_id, data)
def _execute_remote_command(self, pcmd: Dict[str, Any], window: Optional[Window] = None, peer_id: int = 0) -> RCResponse:
from .remote_control import handle_cmd
try:
@ -681,7 +731,7 @@ class Boss:
*choices: str, # The choices, see the help for the ask kitten for format of a choice
window: Optional[Window] = None, # the window associated with the confirmation
default: str = '', # the default choice when the user presses Enter
) -> None:
) -> Optional[Window]:
def callback_(res: Dict[str, Any], x: int, boss: Boss) -> None:
callback(res.get('response') or '')
cmd = ['--type=choices', '--message', msg]
@ -689,7 +739,10 @@ class Boss:
cmd += ['-d', default]
for c in choices:
cmd += ['-c', c]
self.run_kitten_with_metadata('ask', cmd, window=window, custom_callback=callback_, default_data={'response': ''})
ans = self.run_kitten_with_metadata('ask', cmd, window=window, custom_callback=callback_, default_data={'response': ''})
if isinstance(ans, Window):
return ans
return None
def get_line(
self, msg: str, # can contain newlines and ANSI formatting

View File

@ -610,7 +610,7 @@ class Options:
kitten_alias: typing.Dict[str, str] = {}
modify_font: typing.Dict[str, kitty.fonts.FontModification] = {}
narrow_symbols: typing.Dict[typing.Tuple[int, int], int] = {}
remote_control_password: typing.Dict[str, typing.Tuple[str, ...]] = {}
remote_control_password: typing.Dict[str, typing.FrozenSet[str]] = {}
symbol_map: typing.Dict[typing.Tuple[int, int], str] = {}
watcher: typing.Dict[str, str] = {}
map: typing.List[kitty.options.utils.KeyDefinition] = []

View File

@ -675,14 +675,14 @@ def config_or_absolute_path(x: str, env: Optional[Dict[str, str]] = None) -> Opt
return resolve_abs_or_config_path(x, env)
def remote_control_password(val: str, current_val: Dict[str, str]) -> Iterable[Tuple[str, Tuple[str, ...]]]:
def remote_control_password(val: str, current_val: Dict[str, str]) -> Iterable[Tuple[str, FrozenSet[str]]]:
val = val.strip()
if val:
parts = to_cmdline(val, expand=False)
if len(parts) == 1:
yield parts[0], ()
yield parts[0], frozenset()
else:
yield parts[0], tuple(parts[1:])
yield parts[0], frozenset(parts[1:])
def allow_remote_control(x: str) -> str:

View File

@ -7,11 +7,12 @@ import os
import re
import sys
from contextlib import suppress
from functools import partial
from functools import lru_cache, partial
from time import monotonic, time_ns
from types import GeneratorType
from typing import (
Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union, cast, TYPE_CHECKING
TYPE_CHECKING, Any, Dict, FrozenSet, Iterable, Iterator, List, Optional,
Tuple, Union, cast
)
from .cli import emph, parse_args
@ -19,7 +20,7 @@ from .cli_stub import RCOptions
from .constants import RC_ENCRYPTION_PROTOCOL_VERSION, appname, version
from .fast_data_types import (
AES256GCMDecrypt, AES256GCMEncrypt, EllipticCurveKey, get_boss,
read_command_response, send_data_to_peer
get_options, read_command_response, send_data_to_peer
)
from .rc.base import (
NoResponse, ParsingOfArgsFailed, PayloadGetter, all_command_names,
@ -53,8 +54,8 @@ def parse_cmd(serialized_cmd: str, encryption_key: EllipticCurveKey) -> Dict[str
pubkey = pcmd.get('pubkey', '')
if not pubkey:
log_error('Ignoring encrypted rc command without a public key')
d = AES256GCMDecrypt(encryption_key.derive_secret(base64.b85decode(pubkey)), pcmd['iv'], pcmd['tag'])
data = d.add_data_to_be_decrypted(base64.b85decode(pcmd['encrypted']), finished=True)
d = AES256GCMDecrypt(encryption_key.derive_secret(base64.b85decode(pubkey)), base64.b85decode(pcmd['iv']), base64.b85decode(pcmd['tag']))
data = d.add_data_to_be_decrypted(base64.b85decode(pcmd['encrypted']), True)
pcmd = json.loads(data)
if not isinstance(pcmd, dict) or 'version' not in pcmd:
return {}
@ -73,30 +74,37 @@ class CMDChecker:
return False
@lru_cache(maxsize=64)
def is_cmd_allowed_loader(path: str) -> CMDChecker:
import runpy
try:
m = runpy.run_path(path)
func: CMDChecker = m['is_cmd_allowed']
except Exception as e:
log_error(f'Failed to load cmd check function from {path} with error: {e}')
func = CMDChecker()
return func
@lru_cache(maxsize=1024)
def fnmatch_pattern(pat: str) -> 're.Pattern[str]':
from fnmatch import translate
return re.compile(translate(pat))
class PasswordAuthorizer:
def __init__(self, password: str, auth_items: Tuple[str, ...]) -> None:
from fnmatch import translate
import runpy
self.password = password
def __init__(self, auth_items: FrozenSet[str]) -> None:
self.command_patterns = []
self.function_checkers = []
self.user_permission: Optional[bool] = None
for item in auth_items:
if item.endswith('.py'):
path = resolve_custom_file(item)
try:
m = runpy.run_path(path)
func: CMDChecker = m['is_cmd_allowed']
except Exception as e:
log_error(f'Failed to load cmd check function from {path} with error: {e}')
self.function_checkers.append(CMDChecker())
else:
self.function_checkers.append(func)
path = os.path.abspath(resolve_custom_file(item))
self.function_checkers.append(is_cmd_allowed_loader(path))
else:
self.command_patterns.append(re.compile(translate(item)))
self.command_patterns.append(fnmatch_pattern(item))
def is_cmd_allowed(self, pcmd: Dict[str, Any], window: Optional['Window'], from_socket: bool, extra_data: Dict[str, Any]) -> Optional[bool]:
def is_cmd_allowed(self, pcmd: Dict[str, Any], window: Optional['Window'], from_socket: bool, extra_data: Dict[str, Any]) -> bool:
cmd_name = pcmd.get('cmd')
if not cmd_name:
return False
@ -108,7 +116,33 @@ class PasswordAuthorizer:
for f in self.function_checkers:
if f(pcmd, window, from_socket, extra_data):
return True
return self.user_permission
return False
@lru_cache(maxsize=256)
def password_authorizer(auth_items: FrozenSet[str]) -> PasswordAuthorizer:
return PasswordAuthorizer(auth_items)
user_password_allowed: Dict[str, bool] = {}
def is_cmd_allowed(pcmd: Dict[str, Any], window: Optional['Window'], from_socket: bool, extra_data: Dict[str, Any]) -> Optional[bool]:
pw = pcmd.get('password', '')
if not pw:
return False
q = user_password_allowed.get(pw)
if q is not None:
return q
auth_items = get_options().remote_control_password.get(pw)
if auth_items is None:
return None
pa = password_authorizer(auth_items)
return pa.is_cmd_allowed(pcmd, window, from_socket, extra_data)
def set_user_password_allowed(pwd: str, allowed: bool = True) -> None:
user_password_allowed[pwd] = allowed
def handle_cmd(boss: BossType, window: Optional[WindowType], cmd: Dict[str, Any], peer_id: int) -> Union[Dict[str, Any], None, AsyncResponse]:
@ -300,7 +334,7 @@ class CommandEncrypter:
cmd['timestamp'] = time_ns()
cmd['password'] = self.password
raw = json.dumps(cmd).encode('utf-8')
encrypted = self.encrypter.add_data_to_be_encrypted(raw, finished=True)
encrypted = self.encrypter.add_data_to_be_encrypted(raw, True)
return {
'version': version, 'iv': encode_as_base85(self.encrypter.iv), 'tag': encode_as_base85(self.encrypter.tag),
'pubkey': encode_as_base85(self.pubkey), 'encrypted': encode_as_base85(encrypted), 'enc_proto': self.encryption_version
@ -358,6 +392,8 @@ def get_password(opts: RCOptions) -> str:
ans = os.environ.get(opts.password_env, '')
if not ans and opts.use_password == 'always':
raise SystemExit('No password was found')
if ans and len(ans) > 1024:
raise SystemExit('Specified password is too long')
return ans

View File

@ -461,6 +461,8 @@ global_watchers = GlobalWatchers()
class Window:
window_custom_type: str = ''
def __init__(
self,
tab: TabType,