#!/usr/bin/env python # License: GPLv3 Copyright: 2022, Kovid Goyal import atexit import base64 import getpass import io import os import pwd import re import select import shutil import subprocess import sys import tarfile import tempfile import termios import tty tty_fd = -1 original_termios_state = None data_dir = shell_integration_dir = '' leading_data = b'' HOME = os.path.expanduser('~') login_shell = pwd.getpwuid(os.geteuid()).pw_shell or 'sh' def cleanup(): global tty_fd, original_termios_state if tty_fd > -1: if original_termios_state is not None: termios.tcsetattr(tty_fd, termios.TCSANOW, original_termios_state) original_termios_state = None os.close(tty_fd) tty_fd = -1 def write_all(fd, data) -> None: if isinstance(data, str): data = data.encode('utf-8') data = memoryview(data) while data: try: n = os.write(fd, data) except BlockingIOError: continue if not n: break data = data[n:] def dcs_to_kitty(type, payload): if isinstance(payload, str): payload = payload.encode('utf-8') payload = base64.standard_b64encode(payload) return b'\033P@kitty-' + type.encode('ascii') + b'|' + payload + b'\033\\' def send_data_request(): hostname = os.environ.get('HOSTNAME') or os.uname().nodename write_all(tty_fd, dcs_to_kitty( 'ssh', 'id=REQUEST_ID:hostname={}:pwfile=PASSWORD_FILENAME:user={}:compression=bz2:pw=DATA_PASSWORD'.format(hostname, getpass.getuser()))) def debug(msg): data = dcs_to_kitty('print', 'debug: {}'.format(msg)) if tty_fd == -1: with open(os.ctermid(), 'wb') as fl: write_all(fl.fileno(), data) else: write_all(tty_fd, data) def unquote_env_val(x): return re.sub('\\\\([\\$`\x22\n])', r'\1', x[1:-1]) def apply_env_vars(raw): global login_shell def process_defn(defn): parts = defn.split('=', 1) if len(parts) == 1: key, val = parts[0], '' else: key, val = parts val = os.path.expandvars(unquote_env_val(val)) os.environ[key] = val for line in raw.splitlines(): if line.startswith('export '): process_defn(line.split(' ', 1)[-1]) elif line.startswith('unset '): os.environ.pop(line.split(' ', 1)[-1], None) login_shell = os.environ.pop('KITTY_LOGIN_SHELL', login_shell) def move(src, base_dest): for x in os.scandir(src): dest = os.path.join(base_dest, x.name) if x.is_dir(follow_symlinks=False): os.makedirs(dest, exist_ok=True) move(x.path, dest) else: os.rename(x.path, dest) def compile_terminfo(base): tic = shutil.which('tic') if not tic: return tname = '.terminfo' if os.path.exists('/usr/share/misc/terminfo.cdb'): tname += '.cdb' os.environ['TERMINFO'] = os.path.join(HOME, tname) cp = subprocess.run( [tic, '-x', '-o', os.path.join(base, tname), os.path.join(base, '.terminfo', 'kitty.terminfo')], stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) if cp.returncode != 0: sys.stderr.buffer.write(cp.stdout) raise SystemExit('Failed to compile the terminfo database') def get_data(): global data_dir, shell_integration_dir, leading_data data = b'' while data.count(b'\036') < 2: select.select([tty_fd], [], []) n = os.read(tty_fd, 64) if not n: raise SystemExit('Unexpected EOF while reading data from terminal') data += n leading_data, size, data = data.split(b'\036', 2) if size.startswith(b'!'): raise SystemExit(size[1:].decode('utf-8', 'replace')) size = int(size) while len(data) < size: select.select([tty_fd], [], []) n = os.read(tty_fd, size - len(data)) if not n: raise SystemExit('Unexpected EOF while reading data from terminal') data += n cleanup() if leading_data: # clear current line as it might have things echoed on it from leading_data # because we only turn off echo in this script whereas the leading bytes could # have been sent before the script had a chance to run print(end='\r\033[K') data = base64.standard_b64decode(data) with tempfile.TemporaryDirectory(dir=HOME, prefix='.kitty-ssh-kitten-untar-') as tdir, tarfile.open(fileobj=io.BytesIO(data)) as tf: tf.extractall(tdir) with open(tdir + '/data.sh') as f: env_vars = f.read() apply_env_vars(env_vars) data_dir = os.path.join(HOME, os.environ.pop('KITTY_SSH_KITTEN_DATA_DIR')) shell_integration_dir = os.path.join(data_dir, 'shell-integration') compile_terminfo(tdir + '/home') move(tdir + '/home', HOME) if os.path.exists(tdir + '/root'): move(tdir + '/root', '/') def exec_zsh_with_integration(): zdotdir = os.environ.get('ZDOTDIR') or '' if not zdotdir: zdotdir = HOME os.environ.pop('KITTY_ORIG_ZDOTDIR', None) # ensure this is not propagated else: os.environ['KITTY_ORIG_ZDOTDIR'] = zdotdir # dont prevent zsh-newuser-install from running for q in ('.zshrc', '.zshenv', '.zprofile', '.zlogin'): if os.path.exists(os.path.join(HOME, q)): os.environ['ZDOTDIR'] = shell_integration_dir + '/zsh' os.execlp(login_shell, os.path.basename(login_shell), '-l') os.environ.pop('KITTY_ORIG_ZDOTDIR', None) # ensure this is not propagated def exec_fish_with_integration(): if not os.environ.get('XDG_DATA_DIRS'): os.environ['XDG_DATA_DIRS'] = shell_integration_dir else: os.environ['XDG_DATA_DIRS'] = shell_integration_dir + ':' + os.environ['XDG_DATA_DIRS'] os.environ['KITTY_FISH_XDG_DATA_DIR'] = shell_integration_dir os.execlp(login_shell, os.path.basename(login_shell), '-l') def exec_bash_with_integration(): os.environ['ENV'] = os.path.join(shell_integration_dir, 'bash', 'kitty.bash') os.environ['KITTY_BASH_INJECT'] = '1' if not os.environ.get('HISTFILE'): os.environ['HISTFILE'] = os.path.join(HOME, '.bash_history') os.environ['KITTY_BASH_UNEXPORT_HISTFILE'] = '1' os.execlp(login_shell, os.path.basename('login_shell'), '--posix') def exec_with_shell_integration(): shell_name = os.path.basename(login_shell).lower() if shell_name == 'zsh': exec_zsh_with_integration() if shell_name == 'fish': exec_fish_with_integration() if shell_name == 'bash': exec_bash_with_integration() def main(): global tty_fd, original_termios_state, login_shell try: tty_fd = os.open(os.ctermid(), os.O_RDWR | os.O_NONBLOCK | os.O_CLOEXEC) except OSError: pass else: try: original_termios_state = termios.tcgetattr(tty_fd) except OSError: pass else: tty.setraw(tty_fd, termios.TCSANOW) new_state = termios.tcgetattr(tty_fd) new_state[3] &= ~termios.ECHO new_state[-1][termios.VMIN] = 1 new_state[-1][termios.VTIME] = 0 termios.tcsetattr(tty_fd, termios.TCSANOW, new_state) if original_termios_state is not None: try: send_data_request() get_data() finally: cleanup() cwd = os.environ.pop('KITTY_LOGIN_CWD', '') if cwd: os.chdir(cwd) ksi = frozenset(filter(None, os.environ.get('KITTY_SHELL_INTEGRATION', '').split())) exec_cmd = b'EXEC_CMD' if exec_cmd: cmd = base64.standard_b64decode(exec_cmd).decode('utf-8') os.execlp(login_shell, os.path.basename(login_shell), '-c', cmd) TEST_SCRIPT # noqa if ksi and 'no-rc' not in ksi: exec_with_shell_integration() os.environ.pop('KITTY_SHELL_INTEGRATION', None) os.execlp(login_shell, '-' + os.path.basename(login_shell)) atexit.register(cleanup) main()