More work on completion for the SSH kitten

This commit is contained in:
Kovid Goyal 2021-06-25 17:50:08 +05:30
parent 21ce0e90bf
commit d8d5a8fada
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
4 changed files with 131 additions and 5 deletions

View File

@ -164,6 +164,14 @@ def get_kitten_cli_docs(kitten: str) -> Any:
return ans
def get_kitten_completer(kitten: str) -> Any:
run_kitten(kitten, run_name='__completer__')
ans = getattr(sys, 'kitten_completer', None)
if ans is not None:
delattr(sys, 'kitten_completer')
return ans
def get_kitten_conf_docs(kitten: str) -> Definition:
setattr(sys, 'options_definition', None)
run_kitten(kitten, run_name='__conf__')

View File

@ -5,10 +5,13 @@
import os
import re
import subprocess
from typing import Callable, Dict, Iterable, Iterator, Tuple
from typing import Callable, Dict, Iterable, Iterator, Sequence, Tuple
from kitty.complete import Completions, debug
from kitty.types import run_once
debug
def lines_from_file(path: str) -> Iterator[str]:
try:
@ -97,3 +100,107 @@ def ssh_options() -> Dict[str, str]:
else:
ans.update(dict.fromkeys(q[1:], ''))
return ans
# option help {{{
@run_once
def option_help_map() -> Dict[str, str]:
ans: Dict[str, str] = {}
lines = '''
-4 -- force ssh to use IPv4 addresses only
-6 -- force ssh to use IPv6 addresses only
-a -- disable forwarding of authentication agent connection
-A -- enable forwarding of the authentication agent connection
-B -- bind to specified interface before attempting to connect
-b -- specify interface to transmit on
-C -- compress data
-c -- select encryption cipher
-D -- specify a dynamic port forwarding
-E -- append log output to file instead of stderr
-e -- set escape character
-f -- go to background
-F -- specify alternate config file
-g -- allow remote hosts to connect to local forwarded ports
-G -- output configuration and exit
-i -- select identity file
-I -- specify smartcard device
-J -- connect via a jump host
-k -- disable forwarding of GSSAPI credentials
-K -- enable GSSAPI-based authentication and forwarding
-L -- specify local port forwarding
-l -- specify login name
-M -- master mode for connection sharing
-m -- specify mac algorithms
-N -- don't execute a remote command
-n -- redirect stdin from /dev/null
-O -- control an active connection multiplexing master process
-o -- specify extra options
-p -- specify port on remote host
-P -- use non privileged port
-Q -- query parameters
-q -- quiet operation
-R -- specify remote port forwarding
-s -- invoke subsystem
-S -- specify location of control socket for connection sharing
-T -- disable pseudo-tty allocation
-t -- force pseudo-tty allocation
-V -- show version number
-v -- verbose mode (multiple increase verbosity, up to 3)
-W -- forward standard input and output to host
-w -- request tunnel device forwarding
-x -- disable X11 forwarding
-X -- enable (untrusted) X11 forwarding
-Y -- enable trusted X11 forwarding
-y -- send log info via syslog instead of stderr
'''.splitlines()
for line in lines:
line = line.strip()
if line:
parts = line.split(maxsplit=2)
ans[parts[0]] = parts[2]
return ans
# }}}
def complete_arg(ans: Completions, option_name: str, prefix: str = '') -> None:
pass
def complete_destination(ans: Completions, prefix: str = '') -> None:
result = {k: '' for k in known_hosts() if k.startswith(prefix)}
ans.match_groups['remote host name'] = result
def complete_option(ans: Completions, prefix: str = '-') -> None:
result = {k: v for k, v in option_help_map().items() if k.startswith(prefix)}
ans.match_groups['option'] = result
def complete(ans: Completions, words: Sequence[str], new_word: bool) -> None:
options = ssh_options()
expecting_arg = False
types = ['' for i in range(len(words))]
for i, word in enumerate(words):
if expecting_arg:
types[i] = 'arg'
expecting_arg = False
continue
if word.startswith('-'):
types[i] = 'option'
if len(word) == 2 and options.get(word[1]):
expecting_arg = True
continue
types[i] = 'destination'
break
if new_word:
if words:
if expecting_arg:
return complete_arg(ans, words[-1])
return complete_destination(ans)
if words:
if types[-1] == 'arg' and len(words) > 1:
return complete_arg(ans, words[-2], words[-1])
if types[-1] == 'destination':
return complete_destination(ans, words[-1])
if words[-1] == '-':
return complete_option(ans)

View File

@ -8,7 +8,7 @@ import subprocess
import sys
from contextlib import suppress
from typing import List, NoReturn, Optional, Set, Tuple
from .completion import ssh_options
from .completion import ssh_options, complete
from kitty.utils import SSHConnectionData
@ -272,3 +272,5 @@ def main(args: List[str]) -> NoReturn:
if __name__ == '__main__':
main(sys.argv)
elif __name__ == '__completer__':
setattr(sys, 'kitten_completer', complete)

View File

@ -9,7 +9,9 @@ from typing import (
Any, Callable, Dict, Iterable, List, Optional, Sequence, Set, Tuple
)
from kittens.runner import all_kitten_names, get_kitten_cli_docs
from kittens.runner import (
all_kitten_names, get_kitten_cli_docs, get_kitten_completer
)
from .cli import (
OptionDict, OptionSpecSeq, options_for_completion, parse_option_spec
@ -39,7 +41,7 @@ them into something your shell will understand.
parsers: Dict[str, Callable] = {}
serializers: Dict[str, Callable] = {}
MathGroup = Dict[str, str]
MatchGroup = Dict[str, str]
def debug(*a: Any, **kw: Any) -> None:
@ -69,7 +71,7 @@ class Delegate:
class Completions:
def __init__(self) -> None:
self.match_groups: Dict[str, MathGroup] = {}
self.match_groups: Dict[str, MatchGroup] = {}
self.no_space_groups: Set[str] = set()
self.files_groups: Set[str] = set()
self.delegate: Delegate = Delegate()
@ -450,6 +452,13 @@ def complete_diff_args(ans: Completions, opt: Optional[OptionDict], prefix: str,
def complete_kitten(ans: Completions, kitten: str, words: Sequence[str], new_word: bool) -> None:
try:
completer = get_kitten_completer(kitten)
except SystemExit:
completer = None
if completer is not None:
completer(ans, words, new_word)
return
try:
cd = get_kitten_cli_docs(kitten)
except SystemExit: