Make using kitty askpass optional

This commit is contained in:
Kovid Goyal 2022-03-14 11:38:31 +05:30
parent 71027e74e0
commit 90561682cf
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
7 changed files with 71 additions and 32 deletions

View File

@ -34,7 +34,7 @@ from kitty.constants import (
from kitty.options.types import Options from kitty.options.types import Options
from kitty.shm import SharedMemory from kitty.shm import SharedMemory
from kitty.types import run_once from kitty.types import run_once
from kitty.utils import SSHConnectionData, no_echo from kitty.utils import SSHConnectionData, set_echo as turn_off_echo
from .completion import complete, ssh_options from .completion import complete, ssh_options
from .config import init_config, options_for_host from .config import init_config, options_for_host
@ -435,15 +435,15 @@ def wrap_bootstrap_script(sh_script: str, interpreter: str) -> List[str]:
def get_remote_command( def get_remote_command(
remote_args: List[str], ssh_opts: SSHOptions, remote_args: List[str], ssh_opts: SSHOptions, hostname: str = 'localhost', cli_hostname: str = '', cli_uname: str = '',
hostname: str = 'localhost', cli_hostname: str = '', cli_uname: str = '', echo_on: bool = True, echo_on: bool = True, request_data: bool = False
) -> Tuple[List[str], Dict[str, str], str]: ) -> Tuple[List[str], Dict[str, str], str]:
interpreter = ssh_opts.interpreter interpreter = ssh_opts.interpreter
q = os.path.basename(interpreter).lower() q = os.path.basename(interpreter).lower()
is_python = 'python' in q is_python = 'python' in q
sh_script, replacements, shm = bootstrap_script( sh_script, replacements, shm = bootstrap_script(
ssh_opts, script_type='py' if is_python else 'sh', remote_args=remote_args, ssh_opts, script_type='py' if is_python else 'sh', remote_args=remote_args,
cli_hostname=cli_hostname, cli_uname=cli_uname, echo_on=echo_on) cli_hostname=cli_hostname, cli_uname=cli_uname, echo_on=echo_on, request_data=request_data)
return wrap_bootstrap_script(sh_script, interpreter), replacements, shm.name return wrap_bootstrap_script(sh_script, interpreter), replacements, shm.name
@ -503,8 +503,10 @@ def dcs_to_kitty(payload: Union[bytes, str], type: str = 'ssh') -> bytes:
def drain_potential_tty_garbage(p: 'subprocess.Popen[bytes]', data_request: str) -> Iterator[None]: def drain_potential_tty_garbage(p: 'subprocess.Popen[bytes]', data_request: str) -> Iterator[None]:
ssh_started_at = time.monotonic() ssh_started_at = time.monotonic()
with open(os.open(os.ctermid(), os.O_CLOEXEC | os.O_RDWR | os.O_NOCTTY), 'wb') as tty: with open(os.open(os.ctermid(), os.O_CLOEXEC | os.O_RDWR | os.O_NOCTTY), 'wb') as tty:
tty.write(dcs_to_kitty(data_request)) if data_request:
tty.flush() turn_off_echo(tty.fileno())
tty.write(dcs_to_kitty(data_request))
tty.flush()
try: try:
yield yield
finally: finally:
@ -556,25 +558,26 @@ def run_ssh(ssh_args: List[str], server_args: List[str], found_extra_args: Tuple
use_control_master = host_opts.share_connections use_control_master = host_opts.share_connections
if use_control_master: if use_control_master:
cmd[insertion_point:insertion_point] = connection_sharing_args(host_opts, int(os.environ['KITTY_PID'])) cmd[insertion_point:insertion_point] = connection_sharing_args(host_opts, int(os.environ['KITTY_PID']))
with restore_terminal_state() as echo_on: use_kitty_askpass = host_opts.askpass == 'native' or (host_opts.askpass == 'unless-set' and 'SSH_ASKPASS' not in os.environ)
rcmd, replacements, shm_name = get_remote_command(remote_args, host_opts, hostname, hostname_for_match, uname, echo_on) need_to_request_data = not use_kitty_askpass
cmd += rcmd if use_kitty_askpass:
# We force use of askpass so that OpenSSH does not use the tty leaving
# it free for us to use
os.environ['SSH_ASKPASS_REQUIRE'] = 'force' os.environ['SSH_ASKPASS_REQUIRE'] = 'force'
if not os.environ.get('SSH_ASKPASS'): os.environ['SSH_ASKPASS'] = os.path.join(shell_integration_dir, 'ssh', 'askpass.py')
os.environ['SSH_ASKPASS'] = os.path.join(shell_integration_dir, 'ssh', 'askpass.py') with restore_terminal_state() as echo_on:
with no_echo(sys.stdin.fileno()): rcmd, replacements, shm_name = get_remote_command(
try: remote_args, host_opts, hostname, hostname_for_match, uname, echo_on, request_data=need_to_request_data)
p = subprocess.Popen(cmd) cmd += rcmd
except FileNotFoundError: try:
raise SystemExit('Could not find the ssh executable, is it in your PATH?') p = subprocess.Popen(cmd)
else: except FileNotFoundError:
with drain_potential_tty_garbage(p, 'id={REQUEST_ID}:pwfile={PASSWORD_FILENAME}:pw={DATA_PASSWORD}'.format(**replacements)): raise SystemExit('Could not find the ssh executable, is it in your PATH?')
try: else:
raise SystemExit(p.wait()) rq = '' if need_to_request_data else 'id={REQUEST_ID}:pwfile={PASSWORD_FILENAME}:pw={DATA_PASSWORD}'.format(**replacements)
except KeyboardInterrupt: with drain_potential_tty_garbage(p, rq):
raise SystemExit(1) try:
raise SystemExit(p.wait())
except KeyboardInterrupt:
raise SystemExit(1)
def main(args: List[str]) -> NoReturn: def main(args: List[str]) -> NoReturn:

View File

@ -107,4 +107,11 @@ opt('login_shell', '', long_text='''
The login shell to execute on the remote host. By default, the remote user account's The login shell to execute on the remote host. By default, the remote user account's
login shell is used.''') login shell is used.''')
opt('askpass', 'unless-set', long_text='''
Control the program SSH uses to ask for passwords or confirmation of host keys etc.
The default is to use kitty's native askpass, unless the SSH_ASKPASS environment variable
is set. Set it to :code:`ssh` to not interfere with the normal ssh askpass mechanism at all,
which typically means that ssh will prompt at the terminal. Set it to :code:`native` to always use
kitty's native, built-in askpass implementation.
''')
egr() # }}} egr() # }}}

View File

@ -7,6 +7,9 @@ from kitty.conf.utils import merge_dicts, to_bool
class Parser: class Parser:
def askpass(self, val: str, ans: typing.Dict[str, typing.Any]) -> None:
ans['askpass'] = str(val)
def copy(self, val: str, ans: typing.Dict[str, typing.Any]) -> None: def copy(self, val: str, ans: typing.Dict[str, typing.Any]) -> None:
for k, v in copy(val, ans["copy"]): for k, v in copy(val, ans["copy"]):
ans["copy"][k] = v ans["copy"][k] = v

View File

@ -5,6 +5,7 @@ import kittens.ssh.copy
option_names = ( # {{{ option_names = ( # {{{
'askpass',
'copy', 'copy',
'cwd', 'cwd',
'env', 'env',
@ -17,6 +18,7 @@ option_names = ( # {{{
class Options: class Options:
askpass: str = 'unless-set'
cwd: str = '' cwd: str = ''
hostname: str = '*' hostname: str = '*'
interpreter: str = 'sh' interpreter: str = 'sh'

View File

@ -508,16 +508,25 @@ class TTYIO:
break break
@contextmanager def set_echo(fd: int = -1, on: bool = False) -> Tuple[int, List[Union[int, List[Union[bytes, int]]]]]:
def no_echo(fd: int = -1) -> Generator[None, None, None]:
import termios import termios
if fd < 0: if fd < 0:
fd = sys.stdin.fileno() fd = sys.stdin.fileno()
old = termios.tcgetattr(fd) old = termios.tcgetattr(fd)
new = termios.tcgetattr(fd) new = termios.tcgetattr(fd)
new[3] = new[3] & ~termios.ECHO if on:
new[3] |= termios.ECHO
else:
new[3] &= ~termios.ECHO
termios.tcsetattr(fd, termios.TCSADRAIN, new)
return fd, old
@contextmanager
def no_echo(fd: int = -1) -> Generator[None, None, None]:
import termios
fd, old = set_echo(fd)
try: try:
termios.tcsetattr(fd, termios.TCSADRAIN, new)
yield yield
finally: finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old) termios.tcsetattr(fd, termios.TCSADRAIN, old)

View File

@ -23,13 +23,24 @@ HOME = os.path.expanduser('~')
login_shell = pwd.getpwuid(os.geteuid()).pw_shell or 'sh' login_shell = pwd.getpwuid(os.geteuid()).pw_shell or 'sh'
def set_echo(fd, on=False):
if fd < 0:
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
new = termios.tcgetattr(fd)
if on:
new[3] |= termios.ECHO
else:
new[3] &= ~termios.ECHO
termios.tcsetattr(fd, termios.TCSANOW, new)
return fd, old
def cleanup(): def cleanup():
global tty_fd global tty_fd
if tty_fd > -1: if tty_fd > -1:
if echo_on: if echo_on:
s = termios.tcgetattr(tty_fd) set_echo(tty_fd, True)
s[3] |= termios.ECHO
termios.tcsetattr(tty_fd, termios.TCSANOW, s)
os.close(tty_fd) os.close(tty_fd)
tty_fd = -1 tty_fd = -1
@ -218,6 +229,7 @@ def main():
tty_fd = os.open(os.ctermid(), os.O_RDWR | getattr(os, 'O_CLOEXEC', 16777216)) tty_fd = os.open(os.ctermid(), os.O_RDWR | getattr(os, 'O_CLOEXEC', 16777216))
try: try:
if request_data: if request_data:
set_echo(tty_fd, on=False)
send_data_request() send_data_request()
get_data() get_data()
finally: finally:

View File

@ -73,7 +73,10 @@ login_cwd=""
request_data="REQUEST_DATA" request_data="REQUEST_DATA"
trap "cleanup_on_bootstrap_exit" EXIT trap "cleanup_on_bootstrap_exit" EXIT
[ "$request_data" = "1" ] && dcs_to_kitty "ssh" "id="REQUEST_ID":pwfile="PASSWORD_FILENAME":pw="DATA_PASSWORD"" [ "$request_data" = "1" ] && {
command stty "-echo" < /dev/tty
dcs_to_kitty "ssh" "id="REQUEST_ID":pwfile="PASSWORD_FILENAME":pw="DATA_PASSWORD""
}
mv_files_and_dirs() { mv_files_and_dirs() {
cwd="$PWD" cwd="$PWD"