#!/usr/bin/env python3 # License: GPL v3 Copyright: 2018, Kovid Goyal import os import shlex import subprocess import sys from contextlib import suppress from typing import List, NoReturn, Optional, Set, Tuple from .completion import ssh_options, complete from kitty.utils import SSHConnectionData SHELL_SCRIPT = '''\ #!/bin/sh # macOS ships with an ancient version of tic that cannot read from stdin, so we # create a temp file for it tmp=$(mktemp) cat >$tmp << 'TERMEOF' TERMINFO TERMEOF tic_out=$(tic -x -o $HOME/.terminfo $tmp 2>&1) rc=$? rm $tmp if [ "$rc" != "0" ]; then echo "$tic_out"; exit 1; fi if [ -z "$USER" ]; then export USER=$(whoami); fi export TERMINFO="$HOME/.terminfo" login_shell="" python="" login_shell_is_ok() { if [ -z "$login_shell" ] || [ ! -x "$login_shell" ]; then return 1; fi case "$login_shell" in *sh) return 0; esac return 1; } detect_python() { python=$(command -v python3) if [ -z "$python" ]; then python=$(command -v python2); fi if [ -z "$python" ]; then python=python; fi } using_getent() { cmd=$(command -v getent) if [ -z "$cmd" ]; then return; fi output=$($cmd passwd $USER 2>/dev/null) if [ $? = 0 ]; then login_shell=$(echo $output | cut -d: -f7); fi } using_id() { cmd=$(command -v id) if [ -z "$cmd" ]; then return; fi output=$($cmd -P $USER 2>/dev/null) if [ $? = 0 ]; then login_shell=$(echo $output | cut -d: -f7); fi } using_passwd() { cmd=$(command -v grep) if [ -z "$cmd" ]; then return; fi output=$($cmd "^$USER:" /etc/passwd 2>/dev/null) if [ $? = 0 ]; then login_shell=$(echo $output | cut -d: -f7); fi } using_python() { detect_python if [ ! -x "$python" ]; then return; fi output=$($python -c "import pwd, os; print(pwd.getpwuid(os.geteuid()).pw_shell)") if [ $? = 0 ]; then login_shell=$output; fi } execute_with_python() { detect_python exec $python -c "import os; os.execl('$login_shell', '-' '$shell_name')" } die() { echo "$*" 1>&2 ; exit 1; } using_getent if ! login_shell_is_ok; then using_id; fi if ! login_shell_is_ok; then using_python; fi if ! login_shell_is_ok; then using_passwd; fi if ! login_shell_is_ok; then die "Could not detect login shell"; fi # If a command was passed to SSH execute it here EXEC_CMD # We need to pass the first argument to the executed program with a leading - # to make sure the shell executes as a login shell. Note that not all shells # support exec -a so we use the below to try to detect such shells shell_name=$(basename $login_shell) if [ -z "$PIPESTATUS" ]; then # the dash shell does not support exec -a and also does not define PIPESTATUS execute_with_python fi exec -a "-$shell_name" $login_shell ''' PYTHON_SCRIPT = '''\ #!/usr/bin/env python from __future__ import print_function from tempfile import NamedTemporaryFile import subprocess, os, sys, pwd, binascii, json # macOS ships with an ancient version of tic that cannot read from stdin, so we # create a temp file for it with NamedTemporaryFile() as tmp: tmp.write(binascii.unhexlify('{terminfo}')) p = subprocess.Popen(['tic', '-x', '-o', os.path.expanduser('~/.terminfo'), tmp.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() if p.wait() != 0: getattr(sys.stderr, 'buffer', sys.stderr).write(stdout + stderr) raise SystemExit('Failed to compile terminfo using tic') command_to_execute = json.loads(binascii.unhexlify('{command_to_execute}')) try: shell_path = pwd.getpwuid(os.geteuid()).pw_shell or '/bin/sh' except KeyError: shell_path = '/bin/sh' shell_name = '-' + os.path.basename(shell_path) if command_to_execute: os.execlp(shell_path, shell_path, '-c', command_to_execute) os.execlp(shell_path, shell_name) ''' def get_ssh_cli() -> Tuple[Set[str], Set[str]]: other_ssh_args: Set[str] = set() boolean_ssh_args: Set[str] = set() for k, v in ssh_options().items(): k = '-' + k if v: other_ssh_args.add(k) else: boolean_ssh_args.add(k) return boolean_ssh_args, other_ssh_args def get_connection_data(args: List[str], cwd: str = '') -> Optional[SSHConnectionData]: boolean_ssh_args, other_ssh_args = get_ssh_cli() port: Optional[int] = None expecting_port = expecting_identity = False expecting_option_val = False expecting_hostname = False host_name = identity_file = found_ssh = '' for i, arg in enumerate(args): if not found_ssh: if os.path.basename(arg).lower() in ('ssh', 'ssh.exe'): found_ssh = arg continue if expecting_hostname: host_name = arg continue if arg.startswith('-') and not expecting_option_val: if arg in boolean_ssh_args: continue if arg == '--': expecting_hostname = True if arg.startswith('-p'): if arg[2:].isdigit(): with suppress(Exception): port = int(arg[2:]) continue elif arg == '-p': expecting_port = True elif arg.startswith('-i'): if arg == '-i': expecting_identity = True else: identity_file = arg[2:] continue expecting_option_val = True continue if expecting_option_val: if expecting_port: with suppress(Exception): port = int(arg) expecting_port = False elif expecting_identity: identity_file = arg expecting_option_val = False continue if not host_name: host_name = arg if not host_name: return None if identity_file: if not os.path.isabs(identity_file): identity_file = os.path.expanduser(identity_file) if not os.path.isabs(identity_file): identity_file = os.path.normpath(os.path.join(cwd or os.getcwd(), identity_file)) return SSHConnectionData(found_ssh, host_name, port, identity_file) class InvalidSSHArgs(ValueError): def __init__(self, msg: str = ''): super().__init__(msg) self.err_msg = msg def system_exit(self) -> None: if self.err_msg: print(self.err_msg, file=sys.stderr) os.execlp('ssh', 'ssh') def parse_ssh_args(args: List[str]) -> Tuple[List[str], List[str], bool]: boolean_ssh_args, other_ssh_args = get_ssh_cli() passthrough_args = {'-' + x for x in 'Nnf'} ssh_args = [] server_args: List[str] = [] expecting_option_val = False passthrough = False stop_option_processing = False for argument in args: if len(server_args) > 1 or stop_option_processing: server_args.append(argument) continue if argument.startswith('-') and not expecting_option_val: if argument == '--': stop_option_processing = True continue # could be a multi-character option all_args = argument[1:] for i, arg in enumerate(all_args): arg = '-' + arg if arg in passthrough_args: passthrough = True if arg in boolean_ssh_args: ssh_args.append(arg) continue if arg in other_ssh_args: ssh_args.append(arg) rest = all_args[i+1:] if rest: ssh_args.append(rest) else: expecting_option_val = True break raise InvalidSSHArgs(f'unknown option -- {arg[1:]}') continue if expecting_option_val: ssh_args.append(argument) expecting_option_val = False continue server_args.append(argument) if not server_args: raise InvalidSSHArgs() return ssh_args, server_args, passthrough def quote(x: str) -> str: # we have to escape unbalanced quotes and other unparsable # args as they will break the shell script # But we do not want to quote things like * or 'echo hello' # See https://github.com/kovidgoyal/kitty/issues/1787 try: shlex.split(x) except ValueError: x = shlex.quote(x) return x def get_posix_cmd(terminfo: str, remote_args: List[str]) -> List[str]: sh_script = SHELL_SCRIPT.replace('TERMINFO', terminfo, 1) command_to_execute = '' if remote_args: # ssh simply concatenates multiple commands using a space see # line 1129 of ssh.c and on the remote side sshd.c runs the # concatenated command as shell -c cmd args = [c.replace("'", """'"'"'""") for c in remote_args] command_to_execute = "exec $login_shell -c '{}'".format(' '.join(args)) sh_script = sh_script.replace('EXEC_CMD', command_to_execute) return [f'sh -c {shlex.quote(sh_script)}'] def get_python_cmd(terminfo: str, command_to_execute: List[str]) -> List[str]: import json script = PYTHON_SCRIPT.format( terminfo=terminfo.encode('utf-8').hex(), command_to_execute=json.dumps(' '.join(command_to_execute)).encode('utf-8').hex() ) return [f'python -c "{script}"'] def main(args: List[str]) -> NoReturn: args = args[1:] use_posix = True if args and args[0] == 'use-python': args = args[1:] use_posix = False try: ssh_args, server_args, passthrough = parse_ssh_args(args) except InvalidSSHArgs as e: e.system_exit() cmd = ['ssh'] + ssh_args if passthrough: cmd += server_args else: hostname, remote_args = server_args[0], server_args[1:] if not remote_args: cmd.append('-t') cmd.append('--') cmd.append(hostname) terminfo = subprocess.check_output(['infocmp', '-a']).decode('utf-8') f = get_posix_cmd if use_posix else get_python_cmd cmd += f(terminfo, remote_args) os.execvp('ssh', cmd) if __name__ == '__main__': main(sys.argv) elif __name__ == '__completer__': setattr(sys, 'kitten_completer', complete)