Remove unused code

Completion and remote control have moved into kitty-tool
This commit is contained in:
Kovid Goyal 2022-11-14 12:41:25 +05:30
parent 129646c199
commit 018811c96c
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
7 changed files with 20 additions and 1552 deletions

View File

@ -6,13 +6,14 @@ import re
import sys import sys
from contextlib import suppress from contextlib import suppress
from typing import ( from typing import (
TYPE_CHECKING, Callable, Dict, Iterator, List, NamedTuple, Optional, Tuple TYPE_CHECKING, Callable, Dict, Iterator, List, NamedTuple, Optional, Tuple,
) )
from kitty.cli import parse_args from kitty.cli import parse_args
from kitty.cli_stub import AskCLIOptions from kitty.cli_stub import AskCLIOptions
from kitty.constants import cache_dir from kitty.constants import cache_dir
from kitty.fast_data_types import truncate_point_for_length, wcswidth from kitty.fast_data_types import truncate_point_for_length, wcswidth
from kitty.types import run_once
from kitty.typing import BossType, KeyEventType, TypedDict from kitty.typing import BossType, KeyEventType, TypedDict
from kitty.utils import ScreenSize from kitty.utils import ScreenSize
@ -447,6 +448,17 @@ class Choose(Handler): # {{{
# }}} # }}}
@run_once
def init_readline() -> None:
import readline
with suppress(OSError):
readline.read_init_file()
if 'libedit' in readline.__doc__:
readline.parse_and_bind("bind ^I rl_complete")
else:
readline.parse_and_bind('tab: complete')
def main(args: List[str]) -> Response: def main(args: List[str]) -> Response:
# For some reason importing readline in a key handler in the main kitty process # For some reason importing readline in a key handler in the main kitty process
# causes a crash of the python interpreter, probably because of some global # causes a crash of the python interpreter, probably because of some global
@ -478,7 +490,6 @@ def main(args: List[str]) -> Response:
import readline as rl import readline as rl
readline = rl readline = rl
from kitty.shell import init_readline
init_readline() init_readline()
response = None response = None

View File

@ -1,321 +0,0 @@
#!/usr/bin/env python
# License: GPLv3 Copyright: 2021, Kovid Goyal <kovid at kovidgoyal.net>
import os
import re
import subprocess
from typing import Callable, Dict, Iterable, Iterator, Sequence, Tuple
from kitty.complete import Completions, complete_files_and_dirs, debug
from kitty.types import run_once
from .utils import ssh_options
debug
def lines_from_file(path: str) -> Iterator[str]:
try:
f = open(os.path.expanduser(path))
except OSError:
pass
else:
yield from f
def lines_from_command(*cmd: str) -> Iterator[str]:
try:
output = subprocess.check_output(cmd).decode('utf-8')
except Exception:
return
yield from output.splitlines()
def parts_yielder(lines: Iterable[str], pfilter: Callable[[str], Iterator[str]]) -> Iterator[str]:
for line in lines:
yield from pfilter(line)
def hosts_from_config_lines(line: str) -> Iterator[str]:
parts = line.strip().split()
if len(parts) > 1 and parts[0] == 'Host':
yield parts[1]
def hosts_from_known_hosts(line: str) -> Iterator[str]:
parts = line.strip().split()
if parts:
yield re.sub(r':\d+$', '', parts[0])
def hosts_from_hosts(line: str) -> Iterator[str]:
line = line.strip()
if not line.startswith('#'):
parts = line.split()
if parts:
yield parts[0]
if len(parts) > 1:
yield parts[1]
if len(parts) > 2:
yield parts[2]
def iter_known_hosts() -> Iterator[str]:
yield from parts_yielder(lines_from_file('~/.ssh/config'), hosts_from_config_lines)
yield from parts_yielder(lines_from_file('~/.ssh/known_hosts'), hosts_from_known_hosts)
yield from parts_yielder(lines_from_file('/etc/ssh/ssh_known_hosts'), hosts_from_known_hosts)
yield from parts_yielder(lines_from_file('/etc/hosts'), hosts_from_hosts)
yield from parts_yielder(lines_from_command('getent', 'hosts'), hosts_from_hosts)
@run_once
def known_hosts() -> Tuple[str, ...]:
return tuple(sorted(filter(lambda x: '*' not in x and '[' not in x, set(iter_known_hosts()))))
# option help {{{
@run_once
def option_help_map() -> Dict[str, str]:
ans: Dict[str, str] = {}
lines = '''
-4 -- force ssh to use IPv4 addresses only
-6 -- force ssh to use IPv6 addresses only
-a -- disable forwarding of authentication agent connection
-A -- enable forwarding of the authentication agent connection
-B -- bind to specified interface before attempting to connect
-b -- specify interface to transmit on
-C -- compress data
-c -- select encryption cipher
-D -- specify a dynamic port forwarding
-E -- append log output to file instead of stderr
-e -- set escape character
-f -- go to background
-F -- specify alternate config file
-g -- allow remote hosts to connect to local forwarded ports
-G -- output configuration and exit
-i -- select identity file
-I -- specify smartcard device
-J -- connect via a jump host
-k -- disable forwarding of GSSAPI credentials
-K -- enable GSSAPI-based authentication and forwarding
-L -- specify local port forwarding
-l -- specify login name
-M -- master mode for connection sharing
-m -- specify mac algorithms
-N -- don't execute a remote command
-n -- redirect stdin from /dev/null
-O -- control an active connection multiplexing master process
-o -- specify extra options
-p -- specify port on remote host
-P -- use non privileged port
-Q -- query parameters
-q -- quiet operation
-R -- specify remote port forwarding
-s -- invoke subsystem
-S -- specify location of control socket for connection sharing
-T -- disable pseudo-tty allocation
-t -- force pseudo-tty allocation
-V -- show version number
-v -- verbose mode (multiple increase verbosity, up to 3)
-W -- forward standard input and output to host
-w -- request tunnel device forwarding
-x -- disable X11 forwarding
-X -- enable (untrusted) X11 forwarding
-Y -- enable trusted X11 forwarding
-y -- send log info via syslog instead of stderr
'''.splitlines()
for line in lines:
line = line.strip()
if line:
parts = line.split(maxsplit=2)
ans[parts[0]] = parts[2]
return ans
# }}}
# option names {{{
@run_once
def option_names() -> Tuple[str, ...]:
return tuple(filter(None, (
line.strip() for line in '''
AddKeysToAgent
AddressFamily
BatchMode
BindAddress
CanonicalDomains
CanonicalizeFallbackLocal
CanonicalizeHostname
CanonicalizeMaxDots
CanonicalizePermittedCNAMEs
CASignatureAlgorithms
CertificateFile
ChallengeResponseAuthentication
CheckHostIP
Ciphers
ClearAllForwardings
Compression
ConnectionAttempts
ConnectTimeout
ControlMaster
ControlPath
ControlPersist
DynamicForward
EscapeChar
ExitOnForwardFailure
FingerprintHash
ForwardAgent
ForwardX11
ForwardX11Timeout
ForwardX11Trusted
GatewayPorts
GlobalKnownHostsFile
GSSAPIAuthentication
GSSAPIDelegateCredentials
HashKnownHosts
Host
HostbasedAcceptedAlgorithms
HostbasedAuthentication
HostKeyAlgorithms
HostKeyAlias
Hostname
IdentitiesOnly
IdentityAgent
IdentityFile
IPQoS
KbdInteractiveAuthentication
KbdInteractiveDevices
KexAlgorithms
KnownHostsCommand
LocalCommand
LocalForward
LogLevel
MACs
Match
NoHostAuthenticationForLocalhost
NumberOfPasswordPrompts
PasswordAuthentication
PermitLocalCommand
PermitRemoteOpen
PKCS11Provider
Port
PreferredAuthentications
ProxyCommand
ProxyJump
ProxyUseFdpass
PubkeyAcceptedAlgorithms
PubkeyAuthentication
RekeyLimit
RemoteCommand
RemoteForward
RequestTTY
SendEnv
ServerAliveInterval
ServerAliveCountMax
SetEnv
StreamLocalBindMask
StreamLocalBindUnlink
StrictHostKeyChecking
TCPKeepAlive
Tunnel
TunnelDevice
UpdateHostKeys
User
UserKnownHostsFile
VerifyHostKeyDNS
VisualHostKey
XAuthLocation
'''.splitlines())))
# }}}
def complete_choices(ans: Completions, prefix: str, title: str, choices: Iterable[str], comma_separated: bool = False) -> None:
matches: Dict[str, str] = {}
word_transforms = {}
effective_prefix = prefix
hidden_prefix = ''
if comma_separated:
effective_prefix = prefix.split(',')[-1]
hidden_prefix = ','.join(prefix.split(',')[:-1])
if hidden_prefix:
hidden_prefix += ','
for q in choices:
if q.startswith(effective_prefix):
if comma_separated:
tq = q
q = f'{hidden_prefix}{q},'
word_transforms[q] = tq
matches[q] = ''
ans.add_match_group(title, matches, trailing_space=not comma_separated, word_transforms=word_transforms)
def complete_q_choices(ans: Completions, prefix: str, title: str, key: str, comma_separated: bool) -> None:
choices = (line.strip() for line in lines_from_command('ssh', '-Q', key))
complete_choices(ans, prefix, title, choices, comma_separated)
def complete_arg(ans: Completions, option_flag: str, prefix: str = '') -> None:
options = ssh_options()
option_name = options.get(option_flag[1:])
if option_name and (option_name.endswith('file') or option_name.endswith('path')):
return complete_files_and_dirs(ans, prefix, option_name)
choices = {
'mac_spec': ('MAC algorithm', 'mac', True),
'cipher_spec': ('encryption cipher', 'cipher', True),
'query_option': ('query option', 'help', False),
}
if option_name in choices:
return complete_q_choices(ans, prefix, *choices[option_name])
if option_name == 'destination':
return complete_destination(ans, prefix)
if option_name == 'ctl_cmd':
return complete_choices(ans, prefix, 'control command', ('check', 'forward', 'cancel', 'exit'))
if option_name == 'option':
matches = (x+'=' for x in option_names() if x.startswith(prefix))
word_transforms = {x+'=': x for x in option_names()}
ans.add_match_group('configure file option', matches, trailing_space=False, word_transforms=word_transforms)
def complete_destination(ans: Completions, prefix: str = '') -> None:
result = (k for k in known_hosts() if k.startswith(prefix))
ans.add_match_group('remote host name', result)
def complete_option(ans: Completions, prefix: str = '-') -> None:
hm = option_help_map()
if len(prefix) <= 1:
result = {k: v for k, v in hm.items() if k.startswith(prefix)}
ans.add_match_group('option', result)
else:
ans.add_match_group('option', {prefix: ''})
def complete(ans: Completions, words: Sequence[str], new_word: bool) -> None:
options = ssh_options()
expecting_arg = False
seen_destination = False
types = ['' for i in range(len(words))]
for i, word in enumerate(words):
if expecting_arg:
types[i] = 'arg'
expecting_arg = False
continue
if word.startswith('-'):
types[i] = 'option'
if len(word) == 2 and options.get(word[1]):
expecting_arg = True
continue
if seen_destination:
break
types[i] = 'destination'
seen_destination = True
if new_word:
if words:
if expecting_arg:
return complete_arg(ans, words[-1])
return complete_destination(ans)
if words:
if types[-1] == 'arg' and len(words) > 1:
return complete_arg(ans, words[-2], words[-1])
if types[-1] == 'destination':
return complete_destination(ans, words[-1])
if types[-1] == 'option':
return complete_option(ans, words[-1])

View File

@ -1,857 +0,0 @@
#!/usr/bin/env python3
# License: GPLv3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
import os
import shlex
import sys
from typing import (
Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Tuple,
Union
)
from kittens.runner import (
all_kitten_names, get_kitten_cli_docs, get_kitten_completer
)
from .cli import (
CompletionSpec, CompletionType, OptionDict, options_for_completion,
parse_option_spec, prettify
)
from .constants import config_dir, shell_integration_dir
from .fast_data_types import truncate_point_for_length, wcswidth
from .rc.base import all_command_names
from .remote_control import global_options_spec
from .shell import options_for_cmd
from .types import run_once
from .utils import screen_size_function
'''
To add completion for a new shell, you need to:
1) Add an entry to completion scripts for your shell, this is
a simple function that calls kitty's completion code and passes the
results to the shell's completion system. This can be output by
`kitty +complete setup shell_name` and its output goes into
your shell's rc file.
2) Add an input_parser function, this takes the input from
the shell for the text being completed and returns a list of words
and a boolean indicating if we are on a new word or not. This
is passed to kitty's completion system.
3) An output_serializer function that is responsible for
taking the results from kitty's completion system and converting
them into something your shell will understand.
'''
parsers: Dict[str, 'ParserFunc'] = {}
serializers: Dict[str, 'SerializerFunc'] = {}
shell_state: Dict[str, str] = {}
class MatchGroup:
def __init__(
self, x: Union[Dict[str, str], Iterable[str]],
trailing_space: bool = True,
is_files: bool = False,
word_transforms: Optional[Dict[str, str]] = None,
):
self.mdict = x if isinstance(x, dict) else dict.fromkeys(x, '')
self.trailing_space = trailing_space
self.is_files = is_files
self.word_transforms = word_transforms or {}
def __iter__(self) -> Iterator[str]:
return iter(self.mdict)
def __bool__(self) -> bool:
return bool(self.mdict)
def transformed_words(self) -> Iterator[str]:
for w in self:
yield self.word_transforms.get(w, w)
def transformed_items(self) -> Iterator[Tuple[str, str]]:
for w, desc in self.items():
yield self.word_transforms.get(w, w), desc
def items(self) -> Iterator[Tuple[str, str]]:
return iter(self.mdict.items())
def values(self) -> Iterator[str]:
return iter(self.mdict.values())
def add_prefix(self, prefix: str) -> None:
nmap = {k: prefix + k for k in self.mdict}
for k, nk in nmap.items():
self.word_transforms[nk] = self.word_transforms.pop(k, k)
self.mdict = {prefix + k: v for k, v in self.mdict.items()}
def debug(*a: Any, **kw: Any) -> None:
from kittens.tui.loop import debug_write
debug_write(*a, **kw)
class Delegate:
def __init__(self, words: Sequence[str] = (), pos: int = -1, new_word: bool = False):
self.words: Sequence[str] = words
self.pos = pos
self.num_of_unknown_args = len(words) - pos
self.new_word = new_word
def __bool__(self) -> bool:
return self.pos > -1 and self.num_of_unknown_args > 0
@property
def precommand(self) -> str:
try:
return self.words[self.pos]
except IndexError:
return ''
class Completions:
def __init__(self) -> None:
self.match_groups: Dict[str, MatchGroup] = {}
self.delegate: Delegate = Delegate()
def add_match_group(
self, name: str, x: Union[Dict[str, str], Iterable[str]],
trailing_space: bool = True,
is_files: bool = False,
word_transforms: Optional[Dict[str, str]] = None
) -> MatchGroup:
self.match_groups[name] = m = MatchGroup(x, trailing_space, is_files, word_transforms)
return m
def add_prefix(self, prefix: str) -> None:
for mg in self.match_groups.values():
mg.add_prefix(prefix)
@run_once
def remote_control_command_names() -> Tuple[str, ...]:
return tuple(sorted(x.replace('_', '-') for x in all_command_names()))
# Shell specific code {{{
def load_fish2_completion() -> str:
with open(os.path.join(shell_integration_dir, 'fish', 'vendor_completions.d', 'kitty.fish')) as f:
return f.read()
completion_scripts = {
'zsh': '''#compdef kitty
_kitty() {
local src
# Send all words up to the word the cursor is currently on
src=$(printf "%s\n" "${(@)words[1,$CURRENT]}" | kitty +complete zsh)
if [[ $? == 0 ]]; then
eval ${src}
fi
}
compdef _kitty kitty
'''.__str__,
'bash': '''
_kitty_completions() {
local src
local limit
# Send all words up to the word the cursor is currently on
let limit=1+$COMP_CWORD
src=$(printf "%s\n" "${COMP_WORDS[@]: 0:$limit}" | kitty +complete bash)
if [[ $? == 0 ]]; then
eval ${src}
fi
}
complete -o nospace -F _kitty_completions kitty
'''.__str__,
'fish': '''
function __kitty_completions
# Send all words up to the one before the cursor
commandline -cop | kitty +complete fish
end
complete -f -c kitty -a "(__kitty_completions)"
'''.__str__,
'fish2': load_fish2_completion,
}
ParseResult = Tuple[List[str], bool]
ParserFunc = Callable[[str], ParseResult]
SerializerFunc = Callable[[Completions], str]
def input_parser(func: ParserFunc) -> ParserFunc:
name = func.__name__.split('_')[0]
parsers[name] = func
return func
def output_serializer(func: SerializerFunc) -> SerializerFunc:
name = func.__name__.split('_')[0]
serializers[name] = func
return func
@input_parser
def zsh_input_parser(data: str) -> ParseResult:
matcher = shell_state.get('_matcher', '')
q = matcher.lower().split(':', maxsplit=1)[0]
if q in ('l', 'r', 'b', 'e'):
# this is zsh anchor based matching
# https://zsh.sourceforge.io/Doc/Release/Completion-Widgets.html#Completion-Matching-Control
# can be specified with matcher-list and some systems do it by default,
# for example, Debian, which adds the following to zshrc
# zstyle ':completion:*' matcher-list '' 'm:{a-z}={A-Z}' 'm:{a-zA-Z}={A-Za-z}' 'r:|[._-]=* r:|=* l:|=*'
# For some reason that I dont have the
# time/interest to figure out, returning completion candidates for
# these matcher types break completion, so just abort in this case.
raise SystemExit(1)
new_word = data.endswith('\n\n')
words = data.rstrip().splitlines()
return words, new_word
@input_parser
def bash_input_parser(data: str) -> ParseResult:
new_word = data.endswith('\n\n')
words = data.rstrip().splitlines()
return words, new_word
@input_parser
def fish_input_parser(data: str) -> ParseResult:
return data.rstrip().splitlines(), True
@input_parser
def fish2_input_parser(data: str) -> ParseResult:
return bash_input_parser(data)
@output_serializer
def zsh_output_serializer(ans: Completions) -> str:
lines = []
try:
screen = screen_size_function(sys.stderr.fileno())()
except OSError:
width = 80
else:
width = screen.cols
def fmt_desc(word: str, desc: str, max_word_len: int) -> Iterator[str]:
if not desc:
yield word
return
desc = prettify(desc.splitlines()[0])
multiline = False
if wcswidth(word) > max_word_len:
max_desc_len = width - 2
multiline = True
else:
word = word.ljust(max_word_len)
max_desc_len = width - max_word_len - 3
if wcswidth(desc) > max_desc_len:
desc = desc[:truncate_point_for_length(desc, max_desc_len - 2)]
desc += ''
if multiline:
ans = f'{word}\n {desc}'
else:
ans = f'{word} {desc}'
yield ans
for description, matches in ans.match_groups.items():
cmd = ['compadd', '-U', '-J', shlex.quote(description), '-X', shlex.quote(f'%B{description}%b')]
if not matches.trailing_space:
cmd += ['-S', '""']
if matches.is_files:
cmd.append('-f')
allm = tuple(matches)
if len(allm) > 1:
common_prefix = os.path.commonprefix(allm)
if common_prefix and os.sep in common_prefix:
common_prefix = os.path.dirname(common_prefix).rstrip(os.sep) + os.sep
cmd.extend(('-p', shlex.quote(common_prefix)))
matches = MatchGroup({k[len(common_prefix):]: v for k, v in matches.items()})
has_descriptions = any(matches.values())
if has_descriptions or matches.word_transforms:
lines.append('compdescriptions=(')
try:
sz = max(map(wcswidth, matches.transformed_words()))
except ValueError:
sz = 0
limit = min(16, sz)
for word, desc in matches.transformed_items():
lines.extend(map(shlex.quote, fmt_desc(word, desc, limit)))
lines.append(')')
if has_descriptions:
cmd.append('-l')
cmd.append('-d')
cmd.append('compdescriptions')
cmd.append('--')
for word in matches:
cmd.append(shlex.quote(word))
lines.append(' '.join(cmd) + ';')
if ans.delegate:
if ans.delegate.num_of_unknown_args == 1 and not ans.delegate.new_word:
lines.append('_command_names -e')
elif ans.delegate.precommand:
for i in range(ans.delegate.pos + 1):
lines.append('shift words')
lines.append('(( CURRENT-- ))')
lines.append(f'_normal -p "{ans.delegate.precommand}"')
result = '\n'.join(lines)
# debug(result)
return result
@output_serializer
def bash_output_serializer(ans: Completions) -> str:
lines = []
for description, matches in ans.match_groups.items():
for word in matches:
if matches.trailing_space:
word += ' '
lines.append(f'COMPREPLY+=({shlex.quote(word)})')
# debug('\n'.join(lines))
return '\n'.join(lines)
@output_serializer
def fish_output_serializer(ans: Completions) -> str:
lines = []
for description, matches in ans.match_groups.items():
for word in matches:
lines.append(word.replace('\n', ' '))
# debug('\n'.join(lines))
return '\n'.join(lines)
@output_serializer
def fish2_output_serializer(ans: Completions) -> str:
lines = []
for description, matches in ans.match_groups.items():
for word, desc in matches.items():
q = word
if desc:
q = f'{q}\t{desc}'
lines.append(q.replace('\n', ' '))
# debug('\n'.join(lines))
return '\n'.join(lines)
# }}}
def completions_for_first_word(ans: Completions, prefix: str, entry_points: Iterable[str], namespaced_entry_points: Iterable[str]) -> None:
cmds = [f'@{c}' for c in remote_control_command_names()]
ans.add_match_group('Entry points', {
k: '' for k in
list(entry_points) + cmds + [f'+{k}' for k in namespaced_entry_points]
if not prefix or k.startswith(prefix)
})
if prefix:
ans.delegate = Delegate([prefix], 0)
def kitty_cli_opts(ans: Completions, prefix: Optional[str] = None) -> None:
if not prefix:
return
matches = {}
for opt in options_for_completion():
if isinstance(opt, str):
continue
aliases = frozenset(x for x in opt['aliases'] if x.startswith(prefix)) if prefix else opt['aliases']
for alias in aliases:
matches[alias] = opt['help'].strip()
ans.add_match_group('Options', matches)
def complete_kitty_cli_arg(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None:
prefix = prefix or ''
if not opt:
if unknown_args.num_of_unknown_args > 0:
ans.delegate = unknown_args
return
dest = opt['dest']
if dest == 'override':
from kitty.config import option_names_for_completion
k = 'Config directives'
ans.add_match_group(k, {k+'=': '' for k in option_names_for_completion() if k.startswith(prefix)}, trailing_space=False)
elif dest == 'listen_on':
if ':' not in prefix:
k = 'Address type'
ans.add_match_group(k, {x: x for x in ('unix:', 'tcp:') if x.startswith(prefix)}, trailing_space=False)
elif prefix.startswith('unix:') and not prefix.startswith('@'):
complete_files_and_dirs(ans, prefix[len('unix:'):], files_group_name='UNIX sockets', add_prefix='unix:')
else:
complete_basic_option_args(ans, opt, prefix)
def basic_option_arg_completer(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None:
prefix = prefix or ''
if not opt:
if unknown_args.num_of_unknown_args > 0:
ans.delegate = unknown_args
return
complete_basic_option_args(ans, opt, prefix)
CompleteArgsFunc = Callable[[Completions, Optional[OptionDict], str, Delegate], None]
def complete_alias_map(
ans: Completions,
words: Sequence[str],
new_word: bool,
option_map: Dict[str, OptionDict],
complete_args: CompleteArgsFunc = basic_option_arg_completer,
) -> None:
expecting_arg = False
opt: Optional[OptionDict] = None
last_word = words[-1] if words else ''
for i, w in enumerate(words):
if expecting_arg:
prev_word = '' if i == 0 else words[i-1]
if w == '=' and i > 0 and prev_word.startswith('--') and prev_word != '--':
if w is not last_word:
continue
long_opt = option_map.get(prev_word)
if long_opt is not None:
complete_args(ans, long_opt, '', Delegate())
return
if w is last_word and not new_word:
if opt is not None:
complete_args(ans, opt, w, Delegate())
return
expecting_arg = False
continue
if w is last_word and not new_word and w.startswith('--') and w != '--':
parts = w.split('=', 1)
if len(parts) == 2:
long_opt = option_map.get(parts[0])
if long_opt is not None:
complete_args(ans, long_opt, parts[1], Delegate())
ans.add_prefix(f'{parts[0]}=')
return
opt = option_map.get(w)
if w is last_word and not new_word:
if w.startswith('-'):
ans.add_match_group('Options', {k: opt['help'] for k, opt in option_map.items() if k.startswith(last_word)})
else:
complete_args(ans, None, last_word, Delegate(words, i))
return
if opt is None:
complete_args(ans, None, '' if new_word else last_word, Delegate(words, i, new_word))
if w.startswith('--') and '=' in w:
continue
return # some non-option word encountered
expecting_arg = not opt.get('type', '').startswith('bool-')
if expecting_arg:
if opt is not None:
complete_args(ans, opt, '' if new_word else last_word, Delegate())
else:
prefix = '' if new_word else last_word
complete_args(ans, None, prefix, Delegate())
ans.add_match_group('Options', {k: opt['help'] for k, opt in option_map.items() if k.startswith(prefix)})
def complete_cli(
ans: Completions,
words: Sequence[str],
new_word: bool,
) -> None:
option_map = {}
for opt in options_for_completion():
if not isinstance(opt, str):
for alias in opt['aliases']:
option_map[alias] = opt
complete_alias_map(ans, words, new_word, option_map, complete_kitty_cli_arg)
def global_options_for_remote_cmd() -> Dict[str, OptionDict]:
seq, disabled = parse_option_spec(global_options_spec())
ans: Dict[str, OptionDict] = {}
for opt in seq:
if isinstance(opt, str):
continue
for alias in opt['aliases']:
ans[alias] = opt
return ans
def complete_remote_command(ans: Completions, cmd_name: str, words: Sequence[str], new_word: bool) -> None:
aliases, alias_map = options_for_cmd(cmd_name)
args_completer: CompleteArgsFunc = basic_option_arg_completer
complete_alias_map(ans, words, new_word, alias_map, complete_args=args_completer)
def complete_launch_wrapper(ans: Completions, words: Sequence[str], new_word: bool, allow_files: bool = True) -> None:
from kitty.launch import clone_safe_opts
aliases, alias_map = options_for_cmd('launch')
alias_map = {k: v for k, v in alias_map.items() if v['dest'] in clone_safe_opts()}
args_completer: CompleteArgsFunc = basic_option_arg_completer
if allow_files:
args_completer = remote_files_completer('Files', ('*',))
complete_alias_map(ans, words, new_word, alias_map, complete_args=args_completer)
def path_completion(prefix: str = '') -> Tuple[List[str], List[str]]:
prefix = prefix.replace(r'\ ', ' ')
dirs, files = [], []
base = '.'
if prefix.endswith('/'):
base = prefix
elif '/' in prefix:
base = os.path.dirname(prefix)
src = os.path.expandvars(os.path.expanduser(base))
src_prefix = os.path.abspath(os.path.expandvars(os.path.expanduser(prefix))) if prefix else ''
try:
items: Iterable['os.DirEntry[str]'] = os.scandir(src)
except FileNotFoundError:
items = ()
for x in items:
abspath = os.path.abspath(x.path)
if prefix and not abspath.startswith(src_prefix):
continue
if prefix:
q = prefix + abspath[len(src_prefix):].lstrip(os.sep)
q = os.path.expandvars(os.path.expanduser(q))
else:
q = os.path.relpath(abspath)
if x.is_dir():
dirs.append(q.rstrip(os.sep) + os.sep)
else:
files.append(q)
return dirs, files
def complete_files_and_dirs(
ans: Completions,
prefix: str,
files_group_name: str = 'Files',
predicate: Optional[Callable[[str], bool]] = None,
add_prefix: Optional[str] = None
) -> None:
dirs, files_ = path_completion(prefix or '')
files: Iterable[str] = filter(predicate, files_)
if add_prefix:
dirs = list(add_prefix + x for x in dirs)
files = (add_prefix + x for x in files)
if dirs:
ans.add_match_group('Directories', dirs, trailing_space=False, is_files=True)
if files:
ans.add_match_group(files_group_name, files, is_files=True)
def filter_files_from_completion_spec(spec: CompletionSpec) -> Callable[['os.DirEntry[str]', str], bool]:
if spec.extensions:
extensions = frozenset(os.extsep + x.lower() for x in spec.extensions)
else:
extensions = frozenset()
if spec.mime_patterns:
import re
from fnmatch import translate
mimes = tuple(re.compile(translate(x)) for x in spec.mime_patterns)
from .guess_mime_type import guess_type
else:
mimes = ()
if mimes or extensions:
def check_file(x: 'os.DirEntry[str]', result: str) -> bool:
if extensions:
q = result.lower()
for ext in extensions:
if q.endswith(ext):
return True
if mimes:
mq = guess_type(result)
if mq:
for mime in mimes:
if mime.match(mq):
return True
return False
else:
def check_file(x: 'os.DirEntry[str]', result: str) -> bool:
return True
return check_file
def complete_file_path(ans: Completions, spec: CompletionSpec, prefix: str, only_dirs: bool = False) -> None:
prefix = prefix.replace(r'\ ', ' ')
if spec.relative_to is spec.relative_to.__class__.cwd:
relative_to = os.getcwd()
else:
relative_to = config_dir
src_dir = relative_to
check_against = prefix
prefix_result_with = prefix
files, dirs = [], []
if prefix:
expanded_prefix = os.path.expandvars(os.path.expanduser(prefix))
check_against = os.path.basename(expanded_prefix)
prefix_result_with = os.path.dirname(expanded_prefix).rstrip(os.sep) + os.sep
if os.path.isabs(expanded_prefix):
src_dir = os.path.dirname(expanded_prefix)
elif os.sep in expanded_prefix or (os.altsep and os.altsep in expanded_prefix):
src_dir = os.path.join(relative_to, os.path.dirname(expanded_prefix))
else:
prefix_result_with = ''
try:
items: Iterable['os.DirEntry[str]'] = os.scandir(src_dir)
except OSError:
items = ()
check_file = filter_files_from_completion_spec(spec)
for x in items:
if not x.name.startswith(check_against):
continue
result = prefix_result_with + x.name
if x.is_dir():
dirs.append(result.rstrip(os.sep) + os.sep)
else:
if check_file(x, result):
files.append(result)
if dirs:
ans.add_match_group('Directories', dirs, trailing_space=False, is_files=True)
if not only_dirs and files:
ans.add_match_group(spec.group or 'Files', files, is_files=True)
def complete_path(ans: Completions, opt: OptionDict, prefix: str) -> None:
spec = opt['completion']
if spec.kwds:
kwds = [x for x in spec.kwds if x.startswith(prefix)]
if kwds:
ans.add_match_group('Keywords', kwds)
if spec.type is CompletionType.file:
complete_file_path(ans, spec, prefix)
elif spec.type is CompletionType.directory:
complete_file_path(ans, spec, prefix, only_dirs=True)
def complete_basic_option_args(ans: Completions, opt: OptionDict, prefix: str) -> None:
if opt['choices']:
ans.add_match_group(f'Choices for {opt["dest"]}', tuple(k for k in opt['choices'] if k.startswith(prefix)))
elif opt['completion'].type is not CompletionType.none:
complete_path(ans, opt, prefix)
def complete_dirs(ans: Completions, prefix: str = '') -> None:
dirs, files_ = path_completion(prefix or '')
if dirs:
ans.add_match_group('Directories', dirs, trailing_space=False, is_files=True)
def complete_icat_args(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None:
from .guess_mime_type import guess_type
def icat_file_predicate(filename: str) -> bool:
mt = guess_type(filename, allow_filesystem_access=True)
if mt and mt.startswith('image/'):
return True
return False
if opt is None:
complete_files_and_dirs(ans, prefix, 'Images', icat_file_predicate)
else:
complete_basic_option_args(ans, opt, prefix)
def complete_themes_args(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None:
if opt is None:
from kittens.themes.collection import load_themes
themes = load_themes(cache_age=-1, ignore_no_cache=True)
names = tuple(t.name for t in themes if t.name.startswith(prefix))
ans.add_match_group('Themes', names)
else:
complete_basic_option_args(ans, opt, prefix)
def remote_files_completer(name: str, matchers: Tuple[str, ...]) -> CompleteArgsFunc:
def complete_files_map(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None:
def predicate(filename: str) -> bool:
for m in matchers:
if isinstance(m, str):
from fnmatch import fnmatch
return fnmatch(filename, m)
return False
if opt is None:
complete_files_and_dirs(ans, prefix, name, predicate)
else:
complete_basic_option_args(ans, opt, prefix)
return complete_files_map
def remote_args_completer(title: str, words: Iterable[str]) -> CompleteArgsFunc:
items = sorted(words)
def complete_names_for_arg(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None:
if opt is None:
ans.add_match_group(title, {c: '' for c in items if c.startswith(prefix)})
else:
complete_basic_option_args(ans, opt, prefix)
return complete_names_for_arg
def remote_command_completer(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None:
if opt is None:
words = unknown_args.words[unknown_args.pos:]
new_word = unknown_args.new_word
if not words or (len(words) == 1 and not new_word):
prefix = (words or ('',))[0]
ans.add_match_group('Remote control commands', {c: '' for c in remote_control_command_names() if c.startswith(prefix)})
else:
complete_remote_command(ans, words[0], words[1:], new_word)
else:
basic_option_arg_completer(ans, opt, prefix, unknown_args)
def config_file_predicate(filename: str) -> bool:
return filename.endswith('.conf')
def complete_diff_args(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None:
if opt is None:
complete_files_and_dirs(ans, prefix, 'Files')
elif opt['dest'] == 'config':
complete_files_and_dirs(ans, prefix, 'Config Files', config_file_predicate)
else:
complete_basic_option_args(ans, opt, prefix)
def complete_kitten(ans: Completions, kitten: str, words: Sequence[str], new_word: bool) -> None:
try:
completer = get_kitten_completer(kitten)
except SystemExit:
completer = None
if completer is not None:
completer(ans, words, new_word)
return
try:
cd = get_kitten_cli_docs(kitten)
except SystemExit:
cd = None
if cd is None:
return
options = cd['options']()
seq = parse_option_spec(options)[0]
option_map = {}
for opt in seq:
if not isinstance(opt, str):
for alias in opt['aliases']:
option_map[alias] = opt
complete_alias_map(ans, words, new_word, option_map, {
'icat': complete_icat_args,
'diff': complete_diff_args,
'themes': complete_themes_args,
}.get(kitten, basic_option_arg_completer))
def find_completions(words: Sequence[str], new_word: bool, entry_points: Iterable[str], namespaced_entry_points: Iterable[str]) -> Completions:
ans = Completions()
if not words:
return ans
exe = os.path.basename(words[0])
if exe in ('edit-in-kitty', 'clone-in-kitty'):
complete_launch_wrapper(ans, words[1:], new_word, allow_files=words[0] != 'clone-in-kitty')
return ans
if exe != 'kitty':
return ans
words = words[1:]
if not words or (len(words) == 1 and not new_word):
if words and words[0].startswith('--') and '=' in words[0]:
complete_cli(ans, words, new_word)
return ans
prefix = words[0] if words else ''
completions_for_first_word(ans, prefix, entry_points, namespaced_entry_points)
kitty_cli_opts(ans, prefix)
return ans
if words[0] == '@':
complete_alias_map(ans, words[1:], new_word, global_options_for_remote_cmd(), remote_command_completer)
return ans
if words[0].startswith('@'):
if len(words) == 1 and not new_word:
prefix = words[0]
ans.add_match_group('Remote control commands', {f'@{c}': '' for c in remote_control_command_names() if c.startswith(prefix)})
else:
complete_remote_command(ans, words[0][1:], words[1:], new_word)
return ans
if words[0] == '+':
if len(words) == 1 or (len(words) == 2 and not new_word):
prefix = words[1] if len(words) > 1 else ''
ans.add_match_group('Entry points', {c: '' for c in namespaced_entry_points if c.startswith(prefix)})
else:
if words[1] == 'kitten':
if len(words) == 2 or (len(words) == 3 and not new_word):
ans.add_match_group('Kittens', (k for k in all_kitten_names() if k.startswith('' if len(words) == 2 else words[2])))
else:
complete_kitten(ans, words[2], words[3:], new_word)
elif words[1] == 'open':
complete_cli(ans, words[2:], new_word)
return ans
if words[0].startswith('+'):
if len(words) == 1:
if new_word:
if words[0] == '+kitten':
ans.add_match_group('Kittens', all_kitten_names())
elif words[0] == '+open':
complete_cli(ans, words[1:], new_word)
else:
prefix = words[0]
ans.add_match_group('Entry points', (c for c in namespaced_entry_points if c.startswith(prefix)))
else:
if words[0] == '+kitten':
if len(words) == 2 and not new_word:
ans.add_match_group('Kittens', (k for k in all_kitten_names() if k.startswith(words[1])))
else:
complete_kitten(ans, words[1], words[2:], new_word)
elif words[0] == '+open':
complete_cli(ans, words[1:], new_word)
else:
complete_cli(ans, words, new_word)
return ans
def setup(cstyle: str) -> None:
print(completion_scripts[cstyle]())
def main(args: Sequence[str], entry_points: Iterable[str], namespaced_entry_points: Iterable[str]) -> None:
if not args:
raise SystemExit('Must specify completion style')
cstyle = args[0]
if cstyle == 'setup':
return setup(args[1])
data = sys.stdin.read()
shell_state.clear()
for x in args[1:]:
parts = x.split('=', maxsplit=1)
if len(parts) == 2:
shell_state[parts[0]] = parts[1]
try:
parser = parsers[cstyle]
serializer = serializers[cstyle]
except KeyError:
raise SystemExit(f'Unknown completion style: {cstyle}')
words, new_word = parser(data)
ans = find_completions(words, new_word, entry_points, namespaced_entry_points)
print(serializer(ans), end='')

View File

@ -18,11 +18,6 @@ def list_fonts(args: List[str]) -> None:
list_main(args) list_main(args)
def remote_control(args: List[str]) -> None:
from kitty.remote_control import main as rc_main
rc_main(args)
def runpy(args: List[str]) -> None: def runpy(args: List[str]) -> None:
if len(args) < 2: if len(args) < 2:
raise SystemExit('Usage: kitty +runpy "some python code"') raise SystemExit('Usage: kitty +runpy "some python code"')
@ -46,11 +41,6 @@ def hold(args: List[str]) -> None:
raise SystemExit(ret) raise SystemExit(ret)
def complete(args: List[str]) -> None:
from kitty.complete import main as complete_main
complete_main(args[1:], entry_points, namespaced_entry_points)
def open_urls(args: List[str]) -> None: def open_urls(args: List[str]) -> None:
setattr(sys, 'cmdline_args_for_open', True) setattr(sys, 'cmdline_args_for_open', True)
sys.argv = ['kitty'] + args[1:] sys.argv = ['kitty'] + args[1:]
@ -159,12 +149,10 @@ entry_points = {
'icat': icat, 'icat': icat,
'list-fonts': list_fonts, 'list-fonts': list_fonts,
'@': remote_control,
'+': namespaced, '+': namespaced,
} }
namespaced_entry_points = {k: v for k, v in entry_points.items() if k[0] not in '+@'} namespaced_entry_points = {k: v for k, v in entry_points.items() if k[0] not in '+@'}
namespaced_entry_points['hold'] = hold namespaced_entry_points['hold'] = hold
namespaced_entry_points['complete'] = complete
namespaced_entry_points['runpy'] = runpy namespaced_entry_points['runpy'] = runpy
namespaced_entry_points['launch'] = launch namespaced_entry_points['launch'] = launch
namespaced_entry_points['open'] = open_urls namespaced_entry_points['open'] = open_urls
@ -195,9 +183,7 @@ def main() -> None:
first_arg = '' if len(sys.argv) < 2 else sys.argv[1] first_arg = '' if len(sys.argv) < 2 else sys.argv[1]
func = entry_points.get(first_arg) func = entry_points.get(first_arg)
if func is None: if func is None:
if first_arg.startswith('@'): if first_arg.startswith('+'):
remote_control(['@', first_arg[1:]] + sys.argv[2:])
elif first_arg.startswith('+'):
namespaced(['+', first_arg[1:]] + sys.argv[2:]) namespaced(['+', first_arg[1:]] + sys.argv[2:])
else: else:
from kitty.main import main as kitty_main from kitty.main import main as kitty_main

View File

@ -227,7 +227,6 @@ def prewarm() -> None:
for kitten in all_kitten_names(): for kitten in all_kitten_names():
with suppress(Exception): with suppress(Exception):
import_module(f'kittens.{kitten}.main') import_module(f'kittens.{kitten}.main')
import_module('kitty.complete')
class MemoryViewReadWrapperBytes(io.BufferedIOBase): class MemoryViewReadWrapperBytes(io.BufferedIOBase):

View File

@ -11,21 +11,18 @@ from functools import lru_cache, partial
from time import monotonic, time_ns from time import monotonic, time_ns
from types import GeneratorType from types import GeneratorType
from typing import ( from typing import (
TYPE_CHECKING, Any, Dict, FrozenSet, Iterable, Iterator, List, Optional, TYPE_CHECKING, Any, Dict, FrozenSet, Iterable, Iterator, List, Optional, Tuple,
Tuple, Union, cast Union, cast,
) )
from .cli import emph, parse_args from .cli import parse_args
from .cli_stub import RCOptions from .cli_stub import RCOptions
from .constants import RC_ENCRYPTION_PROTOCOL_VERSION, appname, version from .constants import RC_ENCRYPTION_PROTOCOL_VERSION, appname, version
from .fast_data_types import ( from .fast_data_types import (
AES256GCMDecrypt, AES256GCMEncrypt, EllipticCurveKey, get_boss, AES256GCMDecrypt, AES256GCMEncrypt, EllipticCurveKey, get_boss, get_options,
get_options, read_command_response, send_data_to_peer read_command_response, send_data_to_peer,
)
from .rc.base import (
NoResponse, ParsingOfArgsFailed, PayloadGetter, all_command_names,
command_for_name, parse_subcommand_cli
) )
from .rc.base import NoResponse, PayloadGetter, all_command_names, command_for_name
from .types import AsyncResponse from .types import AsyncResponse
from .typing import BossType, WindowType from .typing import BossType, WindowType
from .utils import TTYIO, log_error, parse_address_spec, resolve_custom_file from .utils import TTYIO, log_error, parse_address_spec, resolve_custom_file
@ -481,81 +478,3 @@ def get_pubkey() -> Tuple[str, bytes]:
raise SystemExit('KITTY_PUBLIC_KEY has unknown version, if you are running on a remote system, update kitty on this system') raise SystemExit('KITTY_PUBLIC_KEY has unknown version, if you are running on a remote system, update kitty on this system')
from base64 import b85decode from base64 import b85decode
return version, b85decode(pubkey) return version, b85decode(pubkey)
def main(args: List[str]) -> None:
global_opts, items = parse_rc_args(args)
password = get_password(global_opts)
if password:
encryption_version, pubkey = get_pubkey()
encrypter = CommandEncrypter(pubkey, encryption_version, password)
else:
encrypter = NoEncryption()
if not items:
from kitty.shell import main as smain
smain(global_opts, encrypter)
return
cmd = items[0]
try:
c = command_for_name(cmd)
except KeyError:
raise SystemExit('{} is not a known command. Known commands are: {}'.format(
emph(cmd), ', '.join(x.replace('_', '-') for x in all_command_names())))
opts, items = parse_subcommand_cli(c, items)
try:
payload = c.message_to_kitty(global_opts, opts, items)
except ParsingOfArgsFailed as err:
exit(str(err))
no_response = False
if hasattr(opts, 'no_response'):
no_response = opts.no_response
response_timeout = c.response_timeout
if hasattr(opts, 'response_timeout'):
response_timeout = opts.response_timeout
response_timeout = encrypter.adjust_response_timeout_for_password(response_timeout)
send = create_basic_command(cmd, payload=payload, no_response=no_response, is_asynchronous=c.is_asynchronous)
listen_on_from_env = False
if not global_opts.to and 'KITTY_LISTEN_ON' in os.environ:
global_opts.to = os.environ['KITTY_LISTEN_ON']
listen_on_from_env = False
if global_opts.to:
try:
parse_address_spec(global_opts.to)
except Exception:
msg = f'Invalid listen on address: {global_opts.to}'
if listen_on_from_env:
msg += '. The KITTY_LISTEN_ON environment variable is set incorrectly'
exit(msg)
import socket
try:
response = do_io(global_opts.to, send, no_response, response_timeout, encrypter)
except (TimeoutError, socket.timeout):
send.pop('payload', None)
send['cancel_async'] = True
try:
do_io(global_opts.to, send, True, 10, encrypter)
except KeyboardInterrupt:
sys.excepthook = lambda *a: print('Interrupted by user', file=sys.stderr)
raise
except SocketClosed as e:
raise SystemExit(str(e))
raise SystemExit(f'Timed out after {response_timeout} seconds waiting for response from kitty')
except KeyboardInterrupt:
sys.excepthook = lambda *a: print('Interrupted by user', file=sys.stderr)
raise
except FileNotFoundError:
raise SystemExit(f'No listen on socket found at: {global_opts.to}')
except SocketClosed as e:
raise SystemExit(str(e))
if no_response:
return
if not response.get('ok'):
if response.get('tb'):
print(response['tb'], file=sys.stderr)
raise SystemExit(response['error'])
data = response.get('data')
if data is not None:
if c.string_return_is_error and isinstance(data, str):
raise SystemExit(data)
print(data)

View File

@ -1,269 +0,0 @@
#!/usr/bin/env python3
# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
import os
import shlex
import sys
import traceback
from contextlib import suppress
from functools import lru_cache
from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple
from kittens.tui.operations import set_cursor_shape, set_window_title
from .cli import (
OptionDict, emph, green, italic, parse_option_spec, print_help_for_seq,
title
)
from .cli_stub import RCOptions
from .constants import cache_dir, kitty_face
from .rc.base import (
ParsingOfArgsFailed, RemoteCommand, all_command_names, command_for_name,
display_subcommand_help, parse_subcommand_cli
)
from .remote_control import (
CommandEncrypter, NoEncryption, create_basic_command, do_io
)
from .types import run_once
output_prefix = '\x1b]133;C\x1b\\'
is_libedit = False
@run_once
def match_commands() -> Tuple[str, ...]:
all_commands = tuple(sorted(x.replace('_', '-') for x in all_command_names()))
return tuple(sorted(all_commands + ('exit', 'help', 'quit')))
@run_once
def init_readline() -> None:
import readline
global is_libedit
with suppress(OSError):
readline.read_init_file()
if 'libedit' in readline.__doc__:
readline.parse_and_bind("bind ^I rl_complete")
is_libedit = True
else:
readline.parse_and_bind('tab: complete')
def cmd_names_matching(prefix: str) -> Generator[str, None, None]:
for cmd in match_commands():
if not prefix or cmd.startswith(prefix):
yield cmd + ' '
@lru_cache()
def options_for_cmd(cmd: str) -> Tuple[Tuple[str, ...], Dict[str, OptionDict]]:
alias_map: Dict[str, OptionDict] = {}
try:
func = command_for_name(cmd)
except KeyError:
return (), alias_map
if not func.options_spec:
return (), alias_map
seq, disabled = parse_option_spec(func.options_spec)
ans = []
for opt in seq:
if isinstance(opt, str):
continue
for alias in opt['aliases']:
ans.append(alias)
alias_map[alias] = opt
return tuple(sorted(ans)), alias_map
def options_matching(prefix: str, cmd: str, last_word: str, aliases: Iterable[str], alias_map: Dict[str, OptionDict]) -> Generator[str, None, None]:
for alias in aliases:
if (not prefix or alias.startswith(prefix)) and alias.startswith('--'):
yield alias + ' '
class Completer:
def __init__(self) -> None:
self.matches: List[str] = []
ddir = cache_dir()
os.makedirs(ddir, exist_ok=True)
self.history_path = os.path.join(ddir, 'shell.history')
def complete(self, text: str, state: int) -> Optional[str]:
import readline
if state == 0:
line = readline.get_line_buffer()
cmdline = shlex.split(line)
if len(cmdline) < 2 and not line.endswith(' '):
self.matches = list(cmd_names_matching(text))
else:
self.matches = list(options_matching(text, cmdline[0], cmdline[-1], *options_for_cmd(cmdline[0])))
if state < len(self.matches):
return self.matches[state]
return None
def __enter__(self) -> 'Completer':
import readline
with suppress(Exception):
readline.read_history_file(self.history_path)
readline.set_completer(self.complete)
delims = readline.get_completer_delims()
readline.set_completer_delims(delims.replace('-', ''))
return self
def __exit__(self, *a: Any) -> None:
import readline
readline.write_history_file(self.history_path)
def print_err(*a: Any, **kw: Any) -> None:
kw['file'] = sys.stderr
print(*a, **kw)
def print_help(which: Optional[str] = None) -> None:
if which is None:
print('Control kitty by sending it commands.')
print()
print(title('Commands') + ':')
for cmd in all_command_names():
c = command_for_name(cmd)
print(' ', green(c.name))
print(' ', c.short_desc)
print(' ', green('exit'))
print(' ', 'Exit this shell')
print('\nUse help {} for help on individual commands'.format(italic('command')))
else:
try:
func = command_for_name(which)
except KeyError:
if which == 'exit':
print('Exit this shell')
elif which == 'help':
print('Show help')
else:
print(f'Unknown command: {emph(which)}')
return
display_subcommand_help(func)
def run_cmd(
global_opts: RCOptions, cmd: str, func: RemoteCommand, opts: Any, items: List[str],
encrypter: CommandEncrypter = NoEncryption()
) -> None:
print(end=set_window_title(cmd) + output_prefix, flush=True)
payload = func.message_to_kitty(global_opts, opts, items)
no_response = False
if hasattr(opts, 'no_response'):
no_response = opts.no_response
send = original_send_cmd = create_basic_command(cmd, payload=payload, is_asynchronous=func.is_asynchronous, no_response=no_response)
response_timeout = func.response_timeout
if hasattr(opts, 'response_timeout'):
response_timeout = opts.response_timeout
response_timeout = encrypter.adjust_response_timeout_for_password(response_timeout)
try:
response = do_io(global_opts.to, send, no_response, response_timeout, encrypter)
except TimeoutError:
original_send_cmd.pop('payload', None)
original_send_cmd['cancel_async'] = True
do_io(global_opts.to, send, True, 10, encrypter)
print_err(f'Timed out after {response_timeout} seconds waiting for response from kitty')
return
if not response.get('ok'):
if response.get('tb'):
print_err(response['tb'])
print_err(response['error'])
return
if 'data' in response:
print(response['data'])
def real_main(global_opts: RCOptions, encrypter: CommandEncrypter = NoEncryption()) -> None:
init_readline()
print_help_for_seq.allow_pager = False
print('Welcome to the kitty shell!')
print('Use {} for assistance or {} to quit'.format(green('help'), green('exit')))
awid = os.environ.pop('KITTY_SHELL_ACTIVE_WINDOW_ID', None)
if awid is not None:
atid = os.environ.pop('KITTY_SHELL_ACTIVE_TAB_ID', None)
am = f'Previously active window id: {awid}'
if atid is not None:
am += f' and tab id: {atid}'
print(am)
pre_prompt = set_window_title('The kitty shell') + set_cursor_shape('bar')
pre_prompt += f'\x1b]133;A;redraw={0 if is_libedit else 1}\x1b\\'
while True:
try:
print(end=pre_prompt)
try:
scmdline = input(f'{kitty_face} ')
except UnicodeEncodeError:
scmdline = input('kitty> ')
except EOFError:
break
except KeyboardInterrupt:
print()
continue
print(end=set_cursor_shape(), flush=True)
if not scmdline:
continue
try:
cmdline = shlex.split(scmdline)
except Exception:
print_err(f'"{emph(scmdline)}" is invalid. Use "help" to see a list of commands.')
continue
cmd = cmdline[0].lower()
try:
func = command_for_name(cmd)
except KeyError:
if cmd in ('exit', 'quit'):
break
print(end=output_prefix, flush=True)
if cmd == 'help':
print_help(cmdline[1] if len(cmdline) > 1 else None)
continue
print_err(f'"{emph(cmd)}" is an unknown command. Use "help" to see a list of commands.')
continue
try:
opts, items = parse_subcommand_cli(func, cmdline)
except SystemExit as e:
if e.code != 0:
print(end=output_prefix, flush=True)
print_err(e)
print_err('Use "{}" to see how to use this command.'.format(emph(f'help {cmd}')))
continue
except Exception:
print(end=output_prefix, flush=True)
print_err('Unhandled error:')
traceback.print_exc()
continue
else:
try:
run_cmd(global_opts, cmd, func, opts, items, encrypter)
except (SystemExit, ParsingOfArgsFailed) as e:
print(end=output_prefix, flush=True)
print_err(e)
continue
except KeyboardInterrupt:
print(end=output_prefix, flush=True)
print()
continue
except Exception:
print(end=output_prefix, flush=True)
print_err('Unhandled error:')
traceback.print_exc()
continue
def main(global_opts: RCOptions, encrypter: CommandEncrypter = NoEncryption()) -> None:
try:
with Completer():
real_main(global_opts, encrypter)
except Exception:
traceback.print_exc()
input('Press Enter to quit')
raise SystemExit(1)