diff --git a/kittens/ask/main.py b/kittens/ask/main.py index 332fad47e..fe1fe3264 100644 --- a/kittens/ask/main.py +++ b/kittens/ask/main.py @@ -6,13 +6,14 @@ import re import sys from contextlib import suppress from typing import ( - TYPE_CHECKING, Callable, Dict, Iterator, List, NamedTuple, Optional, Tuple + TYPE_CHECKING, Callable, Dict, Iterator, List, NamedTuple, Optional, Tuple, ) from kitty.cli import parse_args from kitty.cli_stub import AskCLIOptions from kitty.constants import cache_dir from kitty.fast_data_types import truncate_point_for_length, wcswidth +from kitty.types import run_once from kitty.typing import BossType, KeyEventType, TypedDict from kitty.utils import ScreenSize @@ -447,6 +448,17 @@ class Choose(Handler): # {{{ # }}} +@run_once +def init_readline() -> None: + import readline + with suppress(OSError): + readline.read_init_file() + if 'libedit' in readline.__doc__: + readline.parse_and_bind("bind ^I rl_complete") + else: + readline.parse_and_bind('tab: complete') + + def main(args: List[str]) -> Response: # For some reason importing readline in a key handler in the main kitty process # causes a crash of the python interpreter, probably because of some global @@ -478,7 +490,6 @@ def main(args: List[str]) -> Response: import readline as rl readline = rl - from kitty.shell import init_readline init_readline() response = None diff --git a/kittens/ssh/completion.py b/kittens/ssh/completion.py deleted file mode 100644 index 8a2710451..000000000 --- a/kittens/ssh/completion.py +++ /dev/null @@ -1,321 +0,0 @@ -#!/usr/bin/env python -# License: GPLv3 Copyright: 2021, Kovid Goyal - -import os -import re -import subprocess -from typing import Callable, Dict, Iterable, Iterator, Sequence, Tuple - -from kitty.complete import Completions, complete_files_and_dirs, debug -from kitty.types import run_once -from .utils import ssh_options - -debug - - -def lines_from_file(path: str) -> Iterator[str]: - try: - f = open(os.path.expanduser(path)) - except OSError: - pass - else: - yield from f - - -def lines_from_command(*cmd: str) -> Iterator[str]: - try: - output = subprocess.check_output(cmd).decode('utf-8') - except Exception: - return - yield from output.splitlines() - - -def parts_yielder(lines: Iterable[str], pfilter: Callable[[str], Iterator[str]]) -> Iterator[str]: - for line in lines: - yield from pfilter(line) - - -def hosts_from_config_lines(line: str) -> Iterator[str]: - parts = line.strip().split() - if len(parts) > 1 and parts[0] == 'Host': - yield parts[1] - - -def hosts_from_known_hosts(line: str) -> Iterator[str]: - parts = line.strip().split() - if parts: - yield re.sub(r':\d+$', '', parts[0]) - - -def hosts_from_hosts(line: str) -> Iterator[str]: - line = line.strip() - if not line.startswith('#'): - parts = line.split() - if parts: - yield parts[0] - if len(parts) > 1: - yield parts[1] - if len(parts) > 2: - yield parts[2] - - -def iter_known_hosts() -> Iterator[str]: - yield from parts_yielder(lines_from_file('~/.ssh/config'), hosts_from_config_lines) - yield from parts_yielder(lines_from_file('~/.ssh/known_hosts'), hosts_from_known_hosts) - yield from parts_yielder(lines_from_file('/etc/ssh/ssh_known_hosts'), hosts_from_known_hosts) - yield from parts_yielder(lines_from_file('/etc/hosts'), hosts_from_hosts) - yield from parts_yielder(lines_from_command('getent', 'hosts'), hosts_from_hosts) - - -@run_once -def known_hosts() -> Tuple[str, ...]: - return tuple(sorted(filter(lambda x: '*' not in x and '[' not in x, set(iter_known_hosts())))) - - -# option help {{{ -@run_once -def option_help_map() -> Dict[str, str]: - ans: Dict[str, str] = {} - lines = ''' --4 -- force ssh to use IPv4 addresses only --6 -- force ssh to use IPv6 addresses only --a -- disable forwarding of authentication agent connection --A -- enable forwarding of the authentication agent connection --B -- bind to specified interface before attempting to connect --b -- specify interface to transmit on --C -- compress data --c -- select encryption cipher --D -- specify a dynamic port forwarding --E -- append log output to file instead of stderr --e -- set escape character --f -- go to background --F -- specify alternate config file --g -- allow remote hosts to connect to local forwarded ports --G -- output configuration and exit --i -- select identity file --I -- specify smartcard device --J -- connect via a jump host --k -- disable forwarding of GSSAPI credentials --K -- enable GSSAPI-based authentication and forwarding --L -- specify local port forwarding --l -- specify login name --M -- master mode for connection sharing --m -- specify mac algorithms --N -- don't execute a remote command --n -- redirect stdin from /dev/null --O -- control an active connection multiplexing master process --o -- specify extra options --p -- specify port on remote host --P -- use non privileged port --Q -- query parameters --q -- quiet operation --R -- specify remote port forwarding --s -- invoke subsystem --S -- specify location of control socket for connection sharing --T -- disable pseudo-tty allocation --t -- force pseudo-tty allocation --V -- show version number --v -- verbose mode (multiple increase verbosity, up to 3) --W -- forward standard input and output to host --w -- request tunnel device forwarding --x -- disable X11 forwarding --X -- enable (untrusted) X11 forwarding --Y -- enable trusted X11 forwarding --y -- send log info via syslog instead of stderr -'''.splitlines() - for line in lines: - line = line.strip() - if line: - parts = line.split(maxsplit=2) - ans[parts[0]] = parts[2] - return ans -# }}} - - -# option names {{{ -@run_once -def option_names() -> Tuple[str, ...]: - return tuple(filter(None, ( - line.strip() for line in ''' -AddKeysToAgent -AddressFamily -BatchMode -BindAddress -CanonicalDomains -CanonicalizeFallbackLocal -CanonicalizeHostname -CanonicalizeMaxDots -CanonicalizePermittedCNAMEs -CASignatureAlgorithms -CertificateFile -ChallengeResponseAuthentication -CheckHostIP -Ciphers -ClearAllForwardings -Compression -ConnectionAttempts -ConnectTimeout -ControlMaster -ControlPath -ControlPersist -DynamicForward -EscapeChar -ExitOnForwardFailure -FingerprintHash -ForwardAgent -ForwardX11 -ForwardX11Timeout -ForwardX11Trusted -GatewayPorts -GlobalKnownHostsFile -GSSAPIAuthentication -GSSAPIDelegateCredentials -HashKnownHosts -Host -HostbasedAcceptedAlgorithms -HostbasedAuthentication -HostKeyAlgorithms -HostKeyAlias -Hostname -IdentitiesOnly -IdentityAgent -IdentityFile -IPQoS -KbdInteractiveAuthentication -KbdInteractiveDevices -KexAlgorithms -KnownHostsCommand -LocalCommand -LocalForward -LogLevel -MACs -Match -NoHostAuthenticationForLocalhost -NumberOfPasswordPrompts -PasswordAuthentication -PermitLocalCommand -PermitRemoteOpen -PKCS11Provider -Port -PreferredAuthentications -ProxyCommand -ProxyJump -ProxyUseFdpass -PubkeyAcceptedAlgorithms -PubkeyAuthentication -RekeyLimit -RemoteCommand -RemoteForward -RequestTTY -SendEnv -ServerAliveInterval -ServerAliveCountMax -SetEnv -StreamLocalBindMask -StreamLocalBindUnlink -StrictHostKeyChecking -TCPKeepAlive -Tunnel -TunnelDevice -UpdateHostKeys -User -UserKnownHostsFile -VerifyHostKeyDNS -VisualHostKey -XAuthLocation -'''.splitlines()))) -# }}} - - -def complete_choices(ans: Completions, prefix: str, title: str, choices: Iterable[str], comma_separated: bool = False) -> None: - matches: Dict[str, str] = {} - word_transforms = {} - effective_prefix = prefix - hidden_prefix = '' - if comma_separated: - effective_prefix = prefix.split(',')[-1] - hidden_prefix = ','.join(prefix.split(',')[:-1]) - if hidden_prefix: - hidden_prefix += ',' - for q in choices: - if q.startswith(effective_prefix): - if comma_separated: - tq = q - q = f'{hidden_prefix}{q},' - word_transforms[q] = tq - matches[q] = '' - ans.add_match_group(title, matches, trailing_space=not comma_separated, word_transforms=word_transforms) - - -def complete_q_choices(ans: Completions, prefix: str, title: str, key: str, comma_separated: bool) -> None: - choices = (line.strip() for line in lines_from_command('ssh', '-Q', key)) - complete_choices(ans, prefix, title, choices, comma_separated) - - -def complete_arg(ans: Completions, option_flag: str, prefix: str = '') -> None: - options = ssh_options() - option_name = options.get(option_flag[1:]) - if option_name and (option_name.endswith('file') or option_name.endswith('path')): - return complete_files_and_dirs(ans, prefix, option_name) - choices = { - 'mac_spec': ('MAC algorithm', 'mac', True), - 'cipher_spec': ('encryption cipher', 'cipher', True), - 'query_option': ('query option', 'help', False), - } - if option_name in choices: - return complete_q_choices(ans, prefix, *choices[option_name]) - if option_name == 'destination': - return complete_destination(ans, prefix) - if option_name == 'ctl_cmd': - return complete_choices(ans, prefix, 'control command', ('check', 'forward', 'cancel', 'exit')) - if option_name == 'option': - matches = (x+'=' for x in option_names() if x.startswith(prefix)) - word_transforms = {x+'=': x for x in option_names()} - ans.add_match_group('configure file option', matches, trailing_space=False, word_transforms=word_transforms) - - -def complete_destination(ans: Completions, prefix: str = '') -> None: - result = (k for k in known_hosts() if k.startswith(prefix)) - ans.add_match_group('remote host name', result) - - -def complete_option(ans: Completions, prefix: str = '-') -> None: - hm = option_help_map() - if len(prefix) <= 1: - result = {k: v for k, v in hm.items() if k.startswith(prefix)} - ans.add_match_group('option', result) - else: - ans.add_match_group('option', {prefix: ''}) - - -def complete(ans: Completions, words: Sequence[str], new_word: bool) -> None: - options = ssh_options() - expecting_arg = False - seen_destination = False - types = ['' for i in range(len(words))] - for i, word in enumerate(words): - if expecting_arg: - types[i] = 'arg' - expecting_arg = False - continue - if word.startswith('-'): - types[i] = 'option' - if len(word) == 2 and options.get(word[1]): - expecting_arg = True - continue - if seen_destination: - break - types[i] = 'destination' - seen_destination = True - if new_word: - if words: - if expecting_arg: - return complete_arg(ans, words[-1]) - return complete_destination(ans) - if words: - if types[-1] == 'arg' and len(words) > 1: - return complete_arg(ans, words[-2], words[-1]) - if types[-1] == 'destination': - return complete_destination(ans, words[-1]) - if types[-1] == 'option': - return complete_option(ans, words[-1]) diff --git a/kitty/complete.py b/kitty/complete.py deleted file mode 100644 index dd53f9ebd..000000000 --- a/kitty/complete.py +++ /dev/null @@ -1,857 +0,0 @@ -#!/usr/bin/env python3 -# License: GPLv3 Copyright: 2018, Kovid Goyal - -import os -import shlex -import sys -from typing import ( - Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Tuple, - Union -) - -from kittens.runner import ( - all_kitten_names, get_kitten_cli_docs, get_kitten_completer -) - -from .cli import ( - CompletionSpec, CompletionType, OptionDict, options_for_completion, - parse_option_spec, prettify -) -from .constants import config_dir, shell_integration_dir -from .fast_data_types import truncate_point_for_length, wcswidth -from .rc.base import all_command_names -from .remote_control import global_options_spec -from .shell import options_for_cmd -from .types import run_once -from .utils import screen_size_function - -''' -To add completion for a new shell, you need to: - -1) Add an entry to completion scripts for your shell, this is -a simple function that calls kitty's completion code and passes the -results to the shell's completion system. This can be output by -`kitty +complete setup shell_name` and its output goes into -your shell's rc file. - -2) Add an input_parser function, this takes the input from -the shell for the text being completed and returns a list of words -and a boolean indicating if we are on a new word or not. This -is passed to kitty's completion system. - -3) An output_serializer function that is responsible for -taking the results from kitty's completion system and converting -them into something your shell will understand. -''' - -parsers: Dict[str, 'ParserFunc'] = {} -serializers: Dict[str, 'SerializerFunc'] = {} -shell_state: Dict[str, str] = {} - - -class MatchGroup: - - def __init__( - self, x: Union[Dict[str, str], Iterable[str]], - trailing_space: bool = True, - is_files: bool = False, - word_transforms: Optional[Dict[str, str]] = None, - ): - self.mdict = x if isinstance(x, dict) else dict.fromkeys(x, '') - self.trailing_space = trailing_space - self.is_files = is_files - self.word_transforms = word_transforms or {} - - def __iter__(self) -> Iterator[str]: - return iter(self.mdict) - - def __bool__(self) -> bool: - return bool(self.mdict) - - def transformed_words(self) -> Iterator[str]: - for w in self: - yield self.word_transforms.get(w, w) - - def transformed_items(self) -> Iterator[Tuple[str, str]]: - for w, desc in self.items(): - yield self.word_transforms.get(w, w), desc - - def items(self) -> Iterator[Tuple[str, str]]: - return iter(self.mdict.items()) - - def values(self) -> Iterator[str]: - return iter(self.mdict.values()) - - def add_prefix(self, prefix: str) -> None: - nmap = {k: prefix + k for k in self.mdict} - for k, nk in nmap.items(): - self.word_transforms[nk] = self.word_transforms.pop(k, k) - self.mdict = {prefix + k: v for k, v in self.mdict.items()} - - -def debug(*a: Any, **kw: Any) -> None: - from kittens.tui.loop import debug_write - debug_write(*a, **kw) - - -class Delegate: - - def __init__(self, words: Sequence[str] = (), pos: int = -1, new_word: bool = False): - self.words: Sequence[str] = words - self.pos = pos - self.num_of_unknown_args = len(words) - pos - self.new_word = new_word - - def __bool__(self) -> bool: - return self.pos > -1 and self.num_of_unknown_args > 0 - - @property - def precommand(self) -> str: - try: - return self.words[self.pos] - except IndexError: - return '' - - -class Completions: - - def __init__(self) -> None: - self.match_groups: Dict[str, MatchGroup] = {} - self.delegate: Delegate = Delegate() - - def add_match_group( - self, name: str, x: Union[Dict[str, str], Iterable[str]], - trailing_space: bool = True, - is_files: bool = False, - word_transforms: Optional[Dict[str, str]] = None - ) -> MatchGroup: - self.match_groups[name] = m = MatchGroup(x, trailing_space, is_files, word_transforms) - return m - - def add_prefix(self, prefix: str) -> None: - for mg in self.match_groups.values(): - mg.add_prefix(prefix) - - -@run_once -def remote_control_command_names() -> Tuple[str, ...]: - return tuple(sorted(x.replace('_', '-') for x in all_command_names())) - - -# Shell specific code {{{ - - -def load_fish2_completion() -> str: - with open(os.path.join(shell_integration_dir, 'fish', 'vendor_completions.d', 'kitty.fish')) as f: - return f.read() - - -completion_scripts = { - 'zsh': '''#compdef kitty - -_kitty() { - local src - # Send all words up to the word the cursor is currently on - src=$(printf "%s\n" "${(@)words[1,$CURRENT]}" | kitty +complete zsh) - if [[ $? == 0 ]]; then - eval ${src} - fi -} -compdef _kitty kitty -'''.__str__, - 'bash': ''' -_kitty_completions() { - local src - local limit - # Send all words up to the word the cursor is currently on - let limit=1+$COMP_CWORD - src=$(printf "%s\n" "${COMP_WORDS[@]: 0:$limit}" | kitty +complete bash) - if [[ $? == 0 ]]; then - eval ${src} - fi -} - -complete -o nospace -F _kitty_completions kitty -'''.__str__, - 'fish': ''' -function __kitty_completions - # Send all words up to the one before the cursor - commandline -cop | kitty +complete fish -end - -complete -f -c kitty -a "(__kitty_completions)" -'''.__str__, - 'fish2': load_fish2_completion, -} - -ParseResult = Tuple[List[str], bool] -ParserFunc = Callable[[str], ParseResult] -SerializerFunc = Callable[[Completions], str] - - -def input_parser(func: ParserFunc) -> ParserFunc: - name = func.__name__.split('_')[0] - parsers[name] = func - return func - - -def output_serializer(func: SerializerFunc) -> SerializerFunc: - name = func.__name__.split('_')[0] - serializers[name] = func - return func - - -@input_parser -def zsh_input_parser(data: str) -> ParseResult: - matcher = shell_state.get('_matcher', '') - q = matcher.lower().split(':', maxsplit=1)[0] - if q in ('l', 'r', 'b', 'e'): - # this is zsh anchor based matching - # https://zsh.sourceforge.io/Doc/Release/Completion-Widgets.html#Completion-Matching-Control - # can be specified with matcher-list and some systems do it by default, - # for example, Debian, which adds the following to zshrc - # zstyle ':completion:*' matcher-list '' 'm:{a-z}={A-Z}' 'm:{a-zA-Z}={A-Za-z}' 'r:|[._-]=* r:|=* l:|=*' - # For some reason that I dont have the - # time/interest to figure out, returning completion candidates for - # these matcher types break completion, so just abort in this case. - raise SystemExit(1) - new_word = data.endswith('\n\n') - words = data.rstrip().splitlines() - return words, new_word - - -@input_parser -def bash_input_parser(data: str) -> ParseResult: - new_word = data.endswith('\n\n') - words = data.rstrip().splitlines() - return words, new_word - - -@input_parser -def fish_input_parser(data: str) -> ParseResult: - return data.rstrip().splitlines(), True - - -@input_parser -def fish2_input_parser(data: str) -> ParseResult: - return bash_input_parser(data) - - -@output_serializer -def zsh_output_serializer(ans: Completions) -> str: - lines = [] - - try: - screen = screen_size_function(sys.stderr.fileno())() - except OSError: - width = 80 - else: - width = screen.cols - - def fmt_desc(word: str, desc: str, max_word_len: int) -> Iterator[str]: - if not desc: - yield word - return - desc = prettify(desc.splitlines()[0]) - multiline = False - if wcswidth(word) > max_word_len: - max_desc_len = width - 2 - multiline = True - else: - word = word.ljust(max_word_len) - max_desc_len = width - max_word_len - 3 - if wcswidth(desc) > max_desc_len: - desc = desc[:truncate_point_for_length(desc, max_desc_len - 2)] - desc += '…' - - if multiline: - ans = f'{word}\n {desc}' - else: - ans = f'{word} {desc}' - yield ans - - for description, matches in ans.match_groups.items(): - cmd = ['compadd', '-U', '-J', shlex.quote(description), '-X', shlex.quote(f'%B{description}%b')] - if not matches.trailing_space: - cmd += ['-S', '""'] - if matches.is_files: - cmd.append('-f') - allm = tuple(matches) - if len(allm) > 1: - common_prefix = os.path.commonprefix(allm) - if common_prefix and os.sep in common_prefix: - common_prefix = os.path.dirname(common_prefix).rstrip(os.sep) + os.sep - cmd.extend(('-p', shlex.quote(common_prefix))) - matches = MatchGroup({k[len(common_prefix):]: v for k, v in matches.items()}) - has_descriptions = any(matches.values()) - if has_descriptions or matches.word_transforms: - lines.append('compdescriptions=(') - try: - sz = max(map(wcswidth, matches.transformed_words())) - except ValueError: - sz = 0 - limit = min(16, sz) - for word, desc in matches.transformed_items(): - lines.extend(map(shlex.quote, fmt_desc(word, desc, limit))) - lines.append(')') - if has_descriptions: - cmd.append('-l') - cmd.append('-d') - cmd.append('compdescriptions') - cmd.append('--') - for word in matches: - cmd.append(shlex.quote(word)) - lines.append(' '.join(cmd) + ';') - - if ans.delegate: - if ans.delegate.num_of_unknown_args == 1 and not ans.delegate.new_word: - lines.append('_command_names -e') - elif ans.delegate.precommand: - for i in range(ans.delegate.pos + 1): - lines.append('shift words') - lines.append('(( CURRENT-- ))') - lines.append(f'_normal -p "{ans.delegate.precommand}"') - result = '\n'.join(lines) - # debug(result) - return result - - -@output_serializer -def bash_output_serializer(ans: Completions) -> str: - lines = [] - for description, matches in ans.match_groups.items(): - for word in matches: - if matches.trailing_space: - word += ' ' - lines.append(f'COMPREPLY+=({shlex.quote(word)})') - # debug('\n'.join(lines)) - return '\n'.join(lines) - - -@output_serializer -def fish_output_serializer(ans: Completions) -> str: - lines = [] - for description, matches in ans.match_groups.items(): - for word in matches: - lines.append(word.replace('\n', ' ')) - # debug('\n'.join(lines)) - return '\n'.join(lines) - - -@output_serializer -def fish2_output_serializer(ans: Completions) -> str: - lines = [] - for description, matches in ans.match_groups.items(): - for word, desc in matches.items(): - q = word - if desc: - q = f'{q}\t{desc}' - lines.append(q.replace('\n', ' ')) - # debug('\n'.join(lines)) - return '\n'.join(lines) - -# }}} - - -def completions_for_first_word(ans: Completions, prefix: str, entry_points: Iterable[str], namespaced_entry_points: Iterable[str]) -> None: - cmds = [f'@{c}' for c in remote_control_command_names()] - ans.add_match_group('Entry points', { - k: '' for k in - list(entry_points) + cmds + [f'+{k}' for k in namespaced_entry_points] - if not prefix or k.startswith(prefix) - }) - if prefix: - ans.delegate = Delegate([prefix], 0) - - -def kitty_cli_opts(ans: Completions, prefix: Optional[str] = None) -> None: - if not prefix: - return - matches = {} - for opt in options_for_completion(): - if isinstance(opt, str): - continue - aliases = frozenset(x for x in opt['aliases'] if x.startswith(prefix)) if prefix else opt['aliases'] - for alias in aliases: - matches[alias] = opt['help'].strip() - ans.add_match_group('Options', matches) - - -def complete_kitty_cli_arg(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None: - prefix = prefix or '' - if not opt: - if unknown_args.num_of_unknown_args > 0: - ans.delegate = unknown_args - return - dest = opt['dest'] - if dest == 'override': - from kitty.config import option_names_for_completion - k = 'Config directives' - ans.add_match_group(k, {k+'=': '' for k in option_names_for_completion() if k.startswith(prefix)}, trailing_space=False) - elif dest == 'listen_on': - if ':' not in prefix: - k = 'Address type' - ans.add_match_group(k, {x: x for x in ('unix:', 'tcp:') if x.startswith(prefix)}, trailing_space=False) - elif prefix.startswith('unix:') and not prefix.startswith('@'): - complete_files_and_dirs(ans, prefix[len('unix:'):], files_group_name='UNIX sockets', add_prefix='unix:') - else: - complete_basic_option_args(ans, opt, prefix) - - -def basic_option_arg_completer(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None: - prefix = prefix or '' - if not opt: - if unknown_args.num_of_unknown_args > 0: - ans.delegate = unknown_args - return - complete_basic_option_args(ans, opt, prefix) - - -CompleteArgsFunc = Callable[[Completions, Optional[OptionDict], str, Delegate], None] - - -def complete_alias_map( - ans: Completions, - words: Sequence[str], - new_word: bool, - option_map: Dict[str, OptionDict], - complete_args: CompleteArgsFunc = basic_option_arg_completer, -) -> None: - expecting_arg = False - opt: Optional[OptionDict] = None - last_word = words[-1] if words else '' - for i, w in enumerate(words): - if expecting_arg: - prev_word = '' if i == 0 else words[i-1] - if w == '=' and i > 0 and prev_word.startswith('--') and prev_word != '--': - if w is not last_word: - continue - long_opt = option_map.get(prev_word) - if long_opt is not None: - complete_args(ans, long_opt, '', Delegate()) - return - if w is last_word and not new_word: - if opt is not None: - complete_args(ans, opt, w, Delegate()) - return - expecting_arg = False - continue - if w is last_word and not new_word and w.startswith('--') and w != '--': - parts = w.split('=', 1) - if len(parts) == 2: - long_opt = option_map.get(parts[0]) - if long_opt is not None: - complete_args(ans, long_opt, parts[1], Delegate()) - ans.add_prefix(f'{parts[0]}=') - return - opt = option_map.get(w) - if w is last_word and not new_word: - if w.startswith('-'): - ans.add_match_group('Options', {k: opt['help'] for k, opt in option_map.items() if k.startswith(last_word)}) - else: - complete_args(ans, None, last_word, Delegate(words, i)) - return - if opt is None: - complete_args(ans, None, '' if new_word else last_word, Delegate(words, i, new_word)) - if w.startswith('--') and '=' in w: - continue - return # some non-option word encountered - expecting_arg = not opt.get('type', '').startswith('bool-') - if expecting_arg: - if opt is not None: - complete_args(ans, opt, '' if new_word else last_word, Delegate()) - else: - prefix = '' if new_word else last_word - complete_args(ans, None, prefix, Delegate()) - ans.add_match_group('Options', {k: opt['help'] for k, opt in option_map.items() if k.startswith(prefix)}) - - -def complete_cli( - ans: Completions, - words: Sequence[str], - new_word: bool, -) -> None: - option_map = {} - for opt in options_for_completion(): - if not isinstance(opt, str): - for alias in opt['aliases']: - option_map[alias] = opt - complete_alias_map(ans, words, new_word, option_map, complete_kitty_cli_arg) - - -def global_options_for_remote_cmd() -> Dict[str, OptionDict]: - seq, disabled = parse_option_spec(global_options_spec()) - ans: Dict[str, OptionDict] = {} - for opt in seq: - if isinstance(opt, str): - continue - for alias in opt['aliases']: - ans[alias] = opt - return ans - - -def complete_remote_command(ans: Completions, cmd_name: str, words: Sequence[str], new_word: bool) -> None: - aliases, alias_map = options_for_cmd(cmd_name) - args_completer: CompleteArgsFunc = basic_option_arg_completer - complete_alias_map(ans, words, new_word, alias_map, complete_args=args_completer) - - -def complete_launch_wrapper(ans: Completions, words: Sequence[str], new_word: bool, allow_files: bool = True) -> None: - from kitty.launch import clone_safe_opts - aliases, alias_map = options_for_cmd('launch') - alias_map = {k: v for k, v in alias_map.items() if v['dest'] in clone_safe_opts()} - args_completer: CompleteArgsFunc = basic_option_arg_completer - if allow_files: - args_completer = remote_files_completer('Files', ('*',)) - complete_alias_map(ans, words, new_word, alias_map, complete_args=args_completer) - - -def path_completion(prefix: str = '') -> Tuple[List[str], List[str]]: - prefix = prefix.replace(r'\ ', ' ') - dirs, files = [], [] - base = '.' - if prefix.endswith('/'): - base = prefix - elif '/' in prefix: - base = os.path.dirname(prefix) - src = os.path.expandvars(os.path.expanduser(base)) - src_prefix = os.path.abspath(os.path.expandvars(os.path.expanduser(prefix))) if prefix else '' - try: - items: Iterable['os.DirEntry[str]'] = os.scandir(src) - except FileNotFoundError: - items = () - for x in items: - abspath = os.path.abspath(x.path) - if prefix and not abspath.startswith(src_prefix): - continue - if prefix: - q = prefix + abspath[len(src_prefix):].lstrip(os.sep) - q = os.path.expandvars(os.path.expanduser(q)) - else: - q = os.path.relpath(abspath) - if x.is_dir(): - dirs.append(q.rstrip(os.sep) + os.sep) - else: - files.append(q) - return dirs, files - - -def complete_files_and_dirs( - ans: Completions, - prefix: str, - files_group_name: str = 'Files', - predicate: Optional[Callable[[str], bool]] = None, - add_prefix: Optional[str] = None -) -> None: - dirs, files_ = path_completion(prefix or '') - files: Iterable[str] = filter(predicate, files_) - if add_prefix: - dirs = list(add_prefix + x for x in dirs) - files = (add_prefix + x for x in files) - - if dirs: - ans.add_match_group('Directories', dirs, trailing_space=False, is_files=True) - if files: - ans.add_match_group(files_group_name, files, is_files=True) - - -def filter_files_from_completion_spec(spec: CompletionSpec) -> Callable[['os.DirEntry[str]', str], bool]: - - if spec.extensions: - extensions = frozenset(os.extsep + x.lower() for x in spec.extensions) - else: - extensions = frozenset() - - if spec.mime_patterns: - import re - from fnmatch import translate - mimes = tuple(re.compile(translate(x)) for x in spec.mime_patterns) - from .guess_mime_type import guess_type - else: - mimes = () - - if mimes or extensions: - def check_file(x: 'os.DirEntry[str]', result: str) -> bool: - if extensions: - q = result.lower() - for ext in extensions: - if q.endswith(ext): - return True - if mimes: - mq = guess_type(result) - if mq: - for mime in mimes: - if mime.match(mq): - return True - return False - else: - def check_file(x: 'os.DirEntry[str]', result: str) -> bool: - return True - - return check_file - - -def complete_file_path(ans: Completions, spec: CompletionSpec, prefix: str, only_dirs: bool = False) -> None: - prefix = prefix.replace(r'\ ', ' ') - if spec.relative_to is spec.relative_to.__class__.cwd: - relative_to = os.getcwd() - else: - relative_to = config_dir - src_dir = relative_to - check_against = prefix - prefix_result_with = prefix - files, dirs = [], [] - if prefix: - expanded_prefix = os.path.expandvars(os.path.expanduser(prefix)) - check_against = os.path.basename(expanded_prefix) - prefix_result_with = os.path.dirname(expanded_prefix).rstrip(os.sep) + os.sep - if os.path.isabs(expanded_prefix): - src_dir = os.path.dirname(expanded_prefix) - elif os.sep in expanded_prefix or (os.altsep and os.altsep in expanded_prefix): - src_dir = os.path.join(relative_to, os.path.dirname(expanded_prefix)) - else: - prefix_result_with = '' - try: - items: Iterable['os.DirEntry[str]'] = os.scandir(src_dir) - except OSError: - items = () - check_file = filter_files_from_completion_spec(spec) - for x in items: - if not x.name.startswith(check_against): - continue - result = prefix_result_with + x.name - if x.is_dir(): - dirs.append(result.rstrip(os.sep) + os.sep) - else: - if check_file(x, result): - files.append(result) - if dirs: - ans.add_match_group('Directories', dirs, trailing_space=False, is_files=True) - if not only_dirs and files: - ans.add_match_group(spec.group or 'Files', files, is_files=True) - - -def complete_path(ans: Completions, opt: OptionDict, prefix: str) -> None: - spec = opt['completion'] - if spec.kwds: - kwds = [x for x in spec.kwds if x.startswith(prefix)] - if kwds: - ans.add_match_group('Keywords', kwds) - if spec.type is CompletionType.file: - complete_file_path(ans, spec, prefix) - elif spec.type is CompletionType.directory: - complete_file_path(ans, spec, prefix, only_dirs=True) - - -def complete_basic_option_args(ans: Completions, opt: OptionDict, prefix: str) -> None: - if opt['choices']: - ans.add_match_group(f'Choices for {opt["dest"]}', tuple(k for k in opt['choices'] if k.startswith(prefix))) - elif opt['completion'].type is not CompletionType.none: - complete_path(ans, opt, prefix) - - -def complete_dirs(ans: Completions, prefix: str = '') -> None: - dirs, files_ = path_completion(prefix or '') - if dirs: - ans.add_match_group('Directories', dirs, trailing_space=False, is_files=True) - - -def complete_icat_args(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None: - from .guess_mime_type import guess_type - - def icat_file_predicate(filename: str) -> bool: - mt = guess_type(filename, allow_filesystem_access=True) - if mt and mt.startswith('image/'): - return True - return False - - if opt is None: - complete_files_and_dirs(ans, prefix, 'Images', icat_file_predicate) - else: - complete_basic_option_args(ans, opt, prefix) - - -def complete_themes_args(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None: - if opt is None: - from kittens.themes.collection import load_themes - themes = load_themes(cache_age=-1, ignore_no_cache=True) - names = tuple(t.name for t in themes if t.name.startswith(prefix)) - ans.add_match_group('Themes', names) - else: - complete_basic_option_args(ans, opt, prefix) - - -def remote_files_completer(name: str, matchers: Tuple[str, ...]) -> CompleteArgsFunc: - - def complete_files_map(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None: - - def predicate(filename: str) -> bool: - for m in matchers: - if isinstance(m, str): - from fnmatch import fnmatch - return fnmatch(filename, m) - return False - - if opt is None: - complete_files_and_dirs(ans, prefix, name, predicate) - else: - complete_basic_option_args(ans, opt, prefix) - return complete_files_map - - -def remote_args_completer(title: str, words: Iterable[str]) -> CompleteArgsFunc: - items = sorted(words) - - def complete_names_for_arg(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None: - if opt is None: - ans.add_match_group(title, {c: '' for c in items if c.startswith(prefix)}) - else: - complete_basic_option_args(ans, opt, prefix) - - return complete_names_for_arg - - -def remote_command_completer(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None: - if opt is None: - words = unknown_args.words[unknown_args.pos:] - new_word = unknown_args.new_word - if not words or (len(words) == 1 and not new_word): - prefix = (words or ('',))[0] - ans.add_match_group('Remote control commands', {c: '' for c in remote_control_command_names() if c.startswith(prefix)}) - else: - complete_remote_command(ans, words[0], words[1:], new_word) - else: - basic_option_arg_completer(ans, opt, prefix, unknown_args) - - -def config_file_predicate(filename: str) -> bool: - return filename.endswith('.conf') - - -def complete_diff_args(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None: - if opt is None: - complete_files_and_dirs(ans, prefix, 'Files') - elif opt['dest'] == 'config': - complete_files_and_dirs(ans, prefix, 'Config Files', config_file_predicate) - else: - complete_basic_option_args(ans, opt, prefix) - - -def complete_kitten(ans: Completions, kitten: str, words: Sequence[str], new_word: bool) -> None: - try: - completer = get_kitten_completer(kitten) - except SystemExit: - completer = None - if completer is not None: - completer(ans, words, new_word) - return - try: - cd = get_kitten_cli_docs(kitten) - except SystemExit: - cd = None - if cd is None: - return - options = cd['options']() - seq = parse_option_spec(options)[0] - option_map = {} - for opt in seq: - if not isinstance(opt, str): - for alias in opt['aliases']: - option_map[alias] = opt - complete_alias_map(ans, words, new_word, option_map, { - 'icat': complete_icat_args, - 'diff': complete_diff_args, - 'themes': complete_themes_args, - }.get(kitten, basic_option_arg_completer)) - - -def find_completions(words: Sequence[str], new_word: bool, entry_points: Iterable[str], namespaced_entry_points: Iterable[str]) -> Completions: - ans = Completions() - if not words: - return ans - exe = os.path.basename(words[0]) - if exe in ('edit-in-kitty', 'clone-in-kitty'): - complete_launch_wrapper(ans, words[1:], new_word, allow_files=words[0] != 'clone-in-kitty') - return ans - if exe != 'kitty': - return ans - words = words[1:] - if not words or (len(words) == 1 and not new_word): - if words and words[0].startswith('--') and '=' in words[0]: - complete_cli(ans, words, new_word) - return ans - prefix = words[0] if words else '' - completions_for_first_word(ans, prefix, entry_points, namespaced_entry_points) - kitty_cli_opts(ans, prefix) - return ans - if words[0] == '@': - complete_alias_map(ans, words[1:], new_word, global_options_for_remote_cmd(), remote_command_completer) - return ans - if words[0].startswith('@'): - if len(words) == 1 and not new_word: - prefix = words[0] - ans.add_match_group('Remote control commands', {f'@{c}': '' for c in remote_control_command_names() if c.startswith(prefix)}) - else: - complete_remote_command(ans, words[0][1:], words[1:], new_word) - return ans - if words[0] == '+': - if len(words) == 1 or (len(words) == 2 and not new_word): - prefix = words[1] if len(words) > 1 else '' - ans.add_match_group('Entry points', {c: '' for c in namespaced_entry_points if c.startswith(prefix)}) - else: - if words[1] == 'kitten': - if len(words) == 2 or (len(words) == 3 and not new_word): - ans.add_match_group('Kittens', (k for k in all_kitten_names() if k.startswith('' if len(words) == 2 else words[2]))) - else: - complete_kitten(ans, words[2], words[3:], new_word) - elif words[1] == 'open': - complete_cli(ans, words[2:], new_word) - return ans - if words[0].startswith('+'): - if len(words) == 1: - if new_word: - if words[0] == '+kitten': - ans.add_match_group('Kittens', all_kitten_names()) - elif words[0] == '+open': - complete_cli(ans, words[1:], new_word) - else: - prefix = words[0] - ans.add_match_group('Entry points', (c for c in namespaced_entry_points if c.startswith(prefix))) - else: - if words[0] == '+kitten': - if len(words) == 2 and not new_word: - ans.add_match_group('Kittens', (k for k in all_kitten_names() if k.startswith(words[1]))) - else: - complete_kitten(ans, words[1], words[2:], new_word) - elif words[0] == '+open': - complete_cli(ans, words[1:], new_word) - else: - complete_cli(ans, words, new_word) - - return ans - - -def setup(cstyle: str) -> None: - print(completion_scripts[cstyle]()) - - -def main(args: Sequence[str], entry_points: Iterable[str], namespaced_entry_points: Iterable[str]) -> None: - if not args: - raise SystemExit('Must specify completion style') - cstyle = args[0] - if cstyle == 'setup': - return setup(args[1]) - data = sys.stdin.read() - shell_state.clear() - for x in args[1:]: - parts = x.split('=', maxsplit=1) - if len(parts) == 2: - shell_state[parts[0]] = parts[1] - try: - parser = parsers[cstyle] - serializer = serializers[cstyle] - except KeyError: - raise SystemExit(f'Unknown completion style: {cstyle}') - words, new_word = parser(data) - ans = find_completions(words, new_word, entry_points, namespaced_entry_points) - print(serializer(ans), end='') diff --git a/kitty/entry_points.py b/kitty/entry_points.py index f210e4025..bdae03d96 100644 --- a/kitty/entry_points.py +++ b/kitty/entry_points.py @@ -18,11 +18,6 @@ def list_fonts(args: List[str]) -> None: list_main(args) -def remote_control(args: List[str]) -> None: - from kitty.remote_control import main as rc_main - rc_main(args) - - def runpy(args: List[str]) -> None: if len(args) < 2: raise SystemExit('Usage: kitty +runpy "some python code"') @@ -46,11 +41,6 @@ def hold(args: List[str]) -> None: raise SystemExit(ret) -def complete(args: List[str]) -> None: - from kitty.complete import main as complete_main - complete_main(args[1:], entry_points, namespaced_entry_points) - - def open_urls(args: List[str]) -> None: setattr(sys, 'cmdline_args_for_open', True) sys.argv = ['kitty'] + args[1:] @@ -159,12 +149,10 @@ entry_points = { 'icat': icat, 'list-fonts': list_fonts, - '@': remote_control, '+': namespaced, } namespaced_entry_points = {k: v for k, v in entry_points.items() if k[0] not in '+@'} namespaced_entry_points['hold'] = hold -namespaced_entry_points['complete'] = complete namespaced_entry_points['runpy'] = runpy namespaced_entry_points['launch'] = launch namespaced_entry_points['open'] = open_urls @@ -195,9 +183,7 @@ def main() -> None: first_arg = '' if len(sys.argv) < 2 else sys.argv[1] func = entry_points.get(first_arg) if func is None: - if first_arg.startswith('@'): - remote_control(['@', first_arg[1:]] + sys.argv[2:]) - elif first_arg.startswith('+'): + if first_arg.startswith('+'): namespaced(['+', first_arg[1:]] + sys.argv[2:]) else: from kitty.main import main as kitty_main diff --git a/kitty/prewarm.py b/kitty/prewarm.py index 3ebe24ace..95356b4b2 100644 --- a/kitty/prewarm.py +++ b/kitty/prewarm.py @@ -227,7 +227,6 @@ def prewarm() -> None: for kitten in all_kitten_names(): with suppress(Exception): import_module(f'kittens.{kitten}.main') - import_module('kitty.complete') class MemoryViewReadWrapperBytes(io.BufferedIOBase): diff --git a/kitty/remote_control.py b/kitty/remote_control.py index f90a2a778..4fbc855c3 100644 --- a/kitty/remote_control.py +++ b/kitty/remote_control.py @@ -11,21 +11,18 @@ from functools import lru_cache, partial from time import monotonic, time_ns from types import GeneratorType from typing import ( - TYPE_CHECKING, Any, Dict, FrozenSet, Iterable, Iterator, List, Optional, - Tuple, Union, cast + TYPE_CHECKING, Any, Dict, FrozenSet, Iterable, Iterator, List, Optional, Tuple, + Union, cast, ) -from .cli import emph, parse_args +from .cli import parse_args from .cli_stub import RCOptions from .constants import RC_ENCRYPTION_PROTOCOL_VERSION, appname, version from .fast_data_types import ( - AES256GCMDecrypt, AES256GCMEncrypt, EllipticCurveKey, get_boss, - get_options, read_command_response, send_data_to_peer -) -from .rc.base import ( - NoResponse, ParsingOfArgsFailed, PayloadGetter, all_command_names, - command_for_name, parse_subcommand_cli + AES256GCMDecrypt, AES256GCMEncrypt, EllipticCurveKey, get_boss, get_options, + read_command_response, send_data_to_peer, ) +from .rc.base import NoResponse, PayloadGetter, all_command_names, command_for_name from .types import AsyncResponse from .typing import BossType, WindowType from .utils import TTYIO, log_error, parse_address_spec, resolve_custom_file @@ -481,81 +478,3 @@ def get_pubkey() -> Tuple[str, bytes]: raise SystemExit('KITTY_PUBLIC_KEY has unknown version, if you are running on a remote system, update kitty on this system') from base64 import b85decode return version, b85decode(pubkey) - - -def main(args: List[str]) -> None: - global_opts, items = parse_rc_args(args) - password = get_password(global_opts) - if password: - encryption_version, pubkey = get_pubkey() - encrypter = CommandEncrypter(pubkey, encryption_version, password) - else: - encrypter = NoEncryption() - - if not items: - from kitty.shell import main as smain - smain(global_opts, encrypter) - return - cmd = items[0] - try: - c = command_for_name(cmd) - except KeyError: - raise SystemExit('{} is not a known command. Known commands are: {}'.format( - emph(cmd), ', '.join(x.replace('_', '-') for x in all_command_names()))) - opts, items = parse_subcommand_cli(c, items) - try: - payload = c.message_to_kitty(global_opts, opts, items) - except ParsingOfArgsFailed as err: - exit(str(err)) - no_response = False - if hasattr(opts, 'no_response'): - no_response = opts.no_response - response_timeout = c.response_timeout - if hasattr(opts, 'response_timeout'): - response_timeout = opts.response_timeout - response_timeout = encrypter.adjust_response_timeout_for_password(response_timeout) - send = create_basic_command(cmd, payload=payload, no_response=no_response, is_asynchronous=c.is_asynchronous) - listen_on_from_env = False - if not global_opts.to and 'KITTY_LISTEN_ON' in os.environ: - global_opts.to = os.environ['KITTY_LISTEN_ON'] - listen_on_from_env = False - if global_opts.to: - try: - parse_address_spec(global_opts.to) - except Exception: - msg = f'Invalid listen on address: {global_opts.to}' - if listen_on_from_env: - msg += '. The KITTY_LISTEN_ON environment variable is set incorrectly' - exit(msg) - import socket - try: - response = do_io(global_opts.to, send, no_response, response_timeout, encrypter) - except (TimeoutError, socket.timeout): - send.pop('payload', None) - send['cancel_async'] = True - try: - do_io(global_opts.to, send, True, 10, encrypter) - except KeyboardInterrupt: - sys.excepthook = lambda *a: print('Interrupted by user', file=sys.stderr) - raise - except SocketClosed as e: - raise SystemExit(str(e)) - raise SystemExit(f'Timed out after {response_timeout} seconds waiting for response from kitty') - except KeyboardInterrupt: - sys.excepthook = lambda *a: print('Interrupted by user', file=sys.stderr) - raise - except FileNotFoundError: - raise SystemExit(f'No listen on socket found at: {global_opts.to}') - except SocketClosed as e: - raise SystemExit(str(e)) - if no_response: - return - if not response.get('ok'): - if response.get('tb'): - print(response['tb'], file=sys.stderr) - raise SystemExit(response['error']) - data = response.get('data') - if data is not None: - if c.string_return_is_error and isinstance(data, str): - raise SystemExit(data) - print(data) diff --git a/kitty/shell.py b/kitty/shell.py deleted file mode 100644 index 21cae8bc4..000000000 --- a/kitty/shell.py +++ /dev/null @@ -1,269 +0,0 @@ -#!/usr/bin/env python3 -# License: GPL v3 Copyright: 2018, Kovid Goyal - -import os -import shlex -import sys -import traceback -from contextlib import suppress -from functools import lru_cache -from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple - -from kittens.tui.operations import set_cursor_shape, set_window_title - -from .cli import ( - OptionDict, emph, green, italic, parse_option_spec, print_help_for_seq, - title -) -from .cli_stub import RCOptions -from .constants import cache_dir, kitty_face -from .rc.base import ( - ParsingOfArgsFailed, RemoteCommand, all_command_names, command_for_name, - display_subcommand_help, parse_subcommand_cli -) -from .remote_control import ( - CommandEncrypter, NoEncryption, create_basic_command, do_io -) -from .types import run_once - -output_prefix = '\x1b]133;C\x1b\\' -is_libedit = False - - -@run_once -def match_commands() -> Tuple[str, ...]: - all_commands = tuple(sorted(x.replace('_', '-') for x in all_command_names())) - return tuple(sorted(all_commands + ('exit', 'help', 'quit'))) - - -@run_once -def init_readline() -> None: - import readline - global is_libedit - with suppress(OSError): - readline.read_init_file() - if 'libedit' in readline.__doc__: - readline.parse_and_bind("bind ^I rl_complete") - is_libedit = True - else: - readline.parse_and_bind('tab: complete') - - -def cmd_names_matching(prefix: str) -> Generator[str, None, None]: - for cmd in match_commands(): - if not prefix or cmd.startswith(prefix): - yield cmd + ' ' - - -@lru_cache() -def options_for_cmd(cmd: str) -> Tuple[Tuple[str, ...], Dict[str, OptionDict]]: - alias_map: Dict[str, OptionDict] = {} - try: - func = command_for_name(cmd) - except KeyError: - return (), alias_map - if not func.options_spec: - return (), alias_map - seq, disabled = parse_option_spec(func.options_spec) - ans = [] - for opt in seq: - if isinstance(opt, str): - continue - for alias in opt['aliases']: - ans.append(alias) - alias_map[alias] = opt - return tuple(sorted(ans)), alias_map - - -def options_matching(prefix: str, cmd: str, last_word: str, aliases: Iterable[str], alias_map: Dict[str, OptionDict]) -> Generator[str, None, None]: - for alias in aliases: - if (not prefix or alias.startswith(prefix)) and alias.startswith('--'): - yield alias + ' ' - - -class Completer: - - def __init__(self) -> None: - self.matches: List[str] = [] - ddir = cache_dir() - os.makedirs(ddir, exist_ok=True) - self.history_path = os.path.join(ddir, 'shell.history') - - def complete(self, text: str, state: int) -> Optional[str]: - import readline - if state == 0: - line = readline.get_line_buffer() - cmdline = shlex.split(line) - if len(cmdline) < 2 and not line.endswith(' '): - self.matches = list(cmd_names_matching(text)) - else: - self.matches = list(options_matching(text, cmdline[0], cmdline[-1], *options_for_cmd(cmdline[0]))) - if state < len(self.matches): - return self.matches[state] - return None - - def __enter__(self) -> 'Completer': - import readline - with suppress(Exception): - readline.read_history_file(self.history_path) - readline.set_completer(self.complete) - delims = readline.get_completer_delims() - readline.set_completer_delims(delims.replace('-', '')) - return self - - def __exit__(self, *a: Any) -> None: - import readline - readline.write_history_file(self.history_path) - - -def print_err(*a: Any, **kw: Any) -> None: - kw['file'] = sys.stderr - print(*a, **kw) - - -def print_help(which: Optional[str] = None) -> None: - if which is None: - print('Control kitty by sending it commands.') - print() - print(title('Commands') + ':') - for cmd in all_command_names(): - c = command_for_name(cmd) - print(' ', green(c.name)) - print(' ', c.short_desc) - print(' ', green('exit')) - print(' ', 'Exit this shell') - print('\nUse help {} for help on individual commands'.format(italic('command'))) - else: - try: - func = command_for_name(which) - except KeyError: - if which == 'exit': - print('Exit this shell') - elif which == 'help': - print('Show help') - else: - print(f'Unknown command: {emph(which)}') - return - display_subcommand_help(func) - - -def run_cmd( - global_opts: RCOptions, cmd: str, func: RemoteCommand, opts: Any, items: List[str], - encrypter: CommandEncrypter = NoEncryption() -) -> None: - print(end=set_window_title(cmd) + output_prefix, flush=True) - payload = func.message_to_kitty(global_opts, opts, items) - no_response = False - if hasattr(opts, 'no_response'): - no_response = opts.no_response - send = original_send_cmd = create_basic_command(cmd, payload=payload, is_asynchronous=func.is_asynchronous, no_response=no_response) - response_timeout = func.response_timeout - if hasattr(opts, 'response_timeout'): - response_timeout = opts.response_timeout - response_timeout = encrypter.adjust_response_timeout_for_password(response_timeout) - try: - response = do_io(global_opts.to, send, no_response, response_timeout, encrypter) - except TimeoutError: - original_send_cmd.pop('payload', None) - original_send_cmd['cancel_async'] = True - do_io(global_opts.to, send, True, 10, encrypter) - print_err(f'Timed out after {response_timeout} seconds waiting for response from kitty') - return - if not response.get('ok'): - if response.get('tb'): - print_err(response['tb']) - print_err(response['error']) - return - if 'data' in response: - print(response['data']) - - -def real_main(global_opts: RCOptions, encrypter: CommandEncrypter = NoEncryption()) -> None: - init_readline() - print_help_for_seq.allow_pager = False - print('Welcome to the kitty shell!') - print('Use {} for assistance or {} to quit'.format(green('help'), green('exit'))) - awid = os.environ.pop('KITTY_SHELL_ACTIVE_WINDOW_ID', None) - if awid is not None: - atid = os.environ.pop('KITTY_SHELL_ACTIVE_TAB_ID', None) - am = f'Previously active window id: {awid}' - if atid is not None: - am += f' and tab id: {atid}' - print(am) - - pre_prompt = set_window_title('The kitty shell') + set_cursor_shape('bar') - pre_prompt += f'\x1b]133;A;redraw={0 if is_libedit else 1}\x1b\\' - while True: - try: - print(end=pre_prompt) - try: - scmdline = input(f'{kitty_face} ') - except UnicodeEncodeError: - scmdline = input('kitty> ') - except EOFError: - break - except KeyboardInterrupt: - print() - continue - print(end=set_cursor_shape(), flush=True) - if not scmdline: - continue - try: - cmdline = shlex.split(scmdline) - except Exception: - print_err(f'"{emph(scmdline)}" is invalid. Use "help" to see a list of commands.') - continue - - cmd = cmdline[0].lower() - - try: - func = command_for_name(cmd) - except KeyError: - if cmd in ('exit', 'quit'): - break - print(end=output_prefix, flush=True) - if cmd == 'help': - print_help(cmdline[1] if len(cmdline) > 1 else None) - continue - print_err(f'"{emph(cmd)}" is an unknown command. Use "help" to see a list of commands.') - continue - - try: - opts, items = parse_subcommand_cli(func, cmdline) - except SystemExit as e: - if e.code != 0: - print(end=output_prefix, flush=True) - print_err(e) - print_err('Use "{}" to see how to use this command.'.format(emph(f'help {cmd}'))) - continue - except Exception: - print(end=output_prefix, flush=True) - print_err('Unhandled error:') - traceback.print_exc() - continue - else: - try: - run_cmd(global_opts, cmd, func, opts, items, encrypter) - except (SystemExit, ParsingOfArgsFailed) as e: - print(end=output_prefix, flush=True) - print_err(e) - continue - except KeyboardInterrupt: - print(end=output_prefix, flush=True) - print() - continue - except Exception: - print(end=output_prefix, flush=True) - print_err('Unhandled error:') - traceback.print_exc() - continue - - -def main(global_opts: RCOptions, encrypter: CommandEncrypter = NoEncryption()) -> None: - try: - with Completer(): - real_main(global_opts, encrypter) - except Exception: - traceback.print_exc() - input('Press Enter to quit') - raise SystemExit(1)