Work on new permissions framework for rc commands

This commit is contained in:
Kovid Goyal 2022-08-10 11:06:40 +05:30
parent 56e83d7d07
commit fe07825ad9
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 71 additions and 15 deletions

View File

@ -68,6 +68,9 @@ from .utils import (
from .window import CommandOutput, CwdRequest, Window
RCResponse = Union[Dict[str, Any], None, AsyncResponse]
class OSWindowDict(TypedDict):
id: int
platform_window_id: Optional[int]
@ -441,8 +444,8 @@ class Boss:
self.child_monitor.add_child(window.id, window.child.pid, window.child.child_fd, window.screen)
self.window_id_map[window.id] = window
def _handle_remote_command(self, cmd: str, window: Optional[Window] = None, peer_id: int = 0) -> Union[Dict[str, Any], None, AsyncResponse]:
from .remote_control import handle_cmd, parse_cmd
def _handle_remote_command(self, cmd: str, window: Optional[Window] = None, peer_id: int = 0) -> RCResponse:
from .remote_control import parse_cmd
response = None
window = window or None
try:
@ -452,18 +455,25 @@ class Boss:
return response
if not pcmd:
return response
if self.allow_remote_control == 'y' or peer_id > 0 or getattr(window, 'allow_remote_control', False):
try:
response = handle_cmd(self, window, pcmd, peer_id)
except Exception as err:
import traceback
response = {'ok': False, 'error': str(err)}
if not getattr(err, 'hide_traceback', False):
response['tb'] = traceback.format_exc()
else:
no_response = pcmd.get('no_response') or False
if not no_response:
response = {'ok': False, 'error': 'Remote control is disabled. Add allow_remote_control to your kitty.conf'}
allowed_by_channel = (
self.allow_remote_control == 'y' or (peer_id > 0 and self.allow_remote_control == 'socket-only') or
getattr(window, 'allow_remote_control', False))
if allowed_by_channel:
return self._execute_remote_command(pcmd, window, peer_id)
no_response = pcmd.get('no_response') or False
if no_response:
return None
return {'ok': False, 'error': 'Remote control is disabled. Add allow_remote_control to your kitty.conf'}
def _execute_remote_command(self, pcmd: Dict[str, Any], window: Optional[Window] = None, peer_id: int = 0) -> RCResponse:
from .remote_control import handle_cmd
try:
response = handle_cmd(self, window, pcmd, peer_id)
except Exception as err:
import traceback
response = {'ok': False, 'error': str(err)}
if not getattr(err, 'hide_traceback', False):
response['tb'] = traceback.format_exc()
return response
@ac('misc', '''

View File

@ -11,7 +11,7 @@ from functools import partial
from time import monotonic, time_ns
from types import GeneratorType
from typing import (
Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union, cast
Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union, cast, TYPE_CHECKING
)
from .cli import emph, parse_args
@ -30,6 +30,8 @@ from .typing import BossType, WindowType
from .utils import TTYIO, log_error, parse_address_spec, resolve_custom_file
active_async_requests: Dict[str, float] = {}
if TYPE_CHECKING:
from .window import Window
def encode_response_for_peer(response: Any) -> bytes:
@ -65,6 +67,50 @@ def parse_cmd(serialized_cmd: str, encryption_key: EllipticCurveKey) -> Dict[str
return pcmd
class CMDChecker:
def __call__(self, pcmd: Dict[str, Any], window: Optional['Window'], from_socket: bool, extra_data: Dict[str, Any]) -> bool:
return False
class PasswordAuthorizer:
def __init__(self, password: str, auth_items: Tuple[str, ...]) -> None:
from fnmatch import translate
import runpy
self.password = password
self.command_patterns = []
self.function_checkers = []
self.user_permission: Optional[bool] = None
for item in auth_items:
if item.endswith('.py'):
path = resolve_custom_file(item)
try:
m = runpy.run_path(path)
func: CMDChecker = m['is_cmd_allowed']
except Exception as e:
log_error(f'Failed to load cmd check function from {path} with error: {e}')
self.function_checkers.append(CMDChecker())
else:
self.function_checkers.append(func)
else:
self.command_patterns.append(re.compile(translate(item)))
def is_cmd_allowed(self, pcmd: Dict[str, Any], window: Optional['Window'], from_socket: bool, extra_data: Dict[str, Any]) -> Optional[bool]:
cmd_name = pcmd.get('cmd')
if not cmd_name:
return False
if not self.function_checkers and not self.command_patterns:
return True
for x in self.command_patterns:
if x.match(cmd_name) is not None:
return True
for f in self.function_checkers:
if f(pcmd, window, from_socket, extra_data):
return True
return self.user_permission
def handle_cmd(boss: BossType, window: Optional[WindowType], cmd: Dict[str, Any], peer_id: int) -> Union[Dict[str, Any], None, AsyncResponse]:
v = cmd['version']
no_response = cmd.get('no_response', False)