#!/usr/bin/env python # vim:fileencoding=utf-8 # License: GPLv3 Copyright: 2020, Kovid Goyal import json import os import shlex import shutil import subprocess import sys import tempfile import time from contextlib import suppress from typing import Any, List, Optional from kitty.cli import parse_args from kitty.cli_stub import RemoteFileCLIOptions from kitty.typing import BossType from kitty.utils import command_for_open, get_editor, open_cmd from ..ssh.main import SSHConnectionData from ..tui.handler import result_handler from ..tui.operations import ( faint, raw_mode, reset_terminal, set_cursor_visible, styled ) def option_text() -> str: return '''\ --mode -m choices=ask,edit default=ask Which mode to operate in. --path -p Path to the remote file. --hostname -h Hostname of the remote host. --ssh-connection-data The data used to connect over ssh. ''' def show_error(msg: str) -> None: print(styled(msg, fg='red')) print() print('Press any key to exit...') sys.stdout.flush() with raw_mode(): while True: try: q = sys.stdin.buffer.read(1) if q: break except (KeyboardInterrupt, EOFError): break def ask_action(opts: RemoteFileCLIOptions) -> str: print('What would you like to do with the remote file on {}:'.format(styled(opts.hostname or 'unknown', bold=True, fg='magenta'))) print(styled(opts.path or '', fg='yellow', fg_intense=True)) print() def key(x: str) -> str: return styled(x, bold=True, fg='green') def help_text(x: str) -> str: return faint(x) print('{}dit the file'.format(key('E'))) print(help_text('The file will be downloaded and opened in an editor. Any changes you save will' ' be automatically sent back to the remote machine')) print() print('{}pen the file'.format(key('O'))) print(help_text('The file will be downloaded and opened by the default open program')) print() print('{}ancel'.format(key('C'))) print() sys.stdout.flush() response = 'c' with raw_mode(): try: while True: q = sys.stdin.buffer.read(1) if q: if q in b'\x1b\x03': break with suppress(Exception): response = q.decode('utf-8').lower() if response in 'ceo': break except (KeyboardInterrupt, EOFError): pass return {'e': 'edit', 'o': 'open'}.get(response, 'cancel') def simple_copy_command(conn_data: SSHConnectionData, path: str) -> List[str]: cmd = [conn_data.binary] if conn_data.port: cmd += ['-p', str(conn_data.port)] cmd += [conn_data.hostname, 'cat', path] return cmd class ControlMaster: def __init__(self, conn_data: SSHConnectionData, remote_path: str): self.conn_data = conn_data self.remote_path = remote_path self.cmd_prefix = cmd = [ conn_data.binary, '-o', f'ControlPath=~/.ssh/kitty-master-{os.getpid()}-%r@%h:%p', '-o', 'TCPKeepAlive=yes', '-o', 'ControlPersist=yes' ] if conn_data.port: cmd += ['-p', str(conn_data.port)] self.batch_cmd_prefix = cmd + ['-o', 'BatchMode=yes'] def __enter__(self) -> 'ControlMaster': subprocess.check_call( self.cmd_prefix + ['-o', 'ControlMaster=auto', '-fN', self.conn_data.hostname]) subprocess.check_call( self.batch_cmd_prefix + ['-O', 'check', self.conn_data.hostname]) self.tdir = tempfile.mkdtemp() self.dest = os.path.join(self.tdir, os.path.basename(self.remote_path)) return self def __exit__(self, *a: Any) -> bool: subprocess.Popen( self.batch_cmd_prefix + ['-O', 'exit', self.conn_data.hostname], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL ).wait() shutil.rmtree(self.tdir) return True @property def is_alive(self) -> bool: return subprocess.Popen( self.batch_cmd_prefix + ['-O', 'check', self.conn_data.hostname], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL ).wait() == 0 def download(self) -> bool: with open(self.dest, 'wb') as f: return subprocess.run( self.batch_cmd_prefix + [self.conn_data.hostname, 'cat', self.remote_path], stdout=f, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL ).returncode == 0 def upload(self, suppress_output: bool = True) -> bool: cmd_prefix = self.cmd_prefix if suppress_output else self.batch_cmd_prefix cmd = cmd_prefix + [self.conn_data.hostname, 'cat', '>', self.remote_path] if not suppress_output: print(' '.join(map(shlex.quote, cmd))) redirect = subprocess.DEVNULL if suppress_output else None with open(self.dest, 'rb') as f: return subprocess.run(cmd, stdout=redirect, stderr=redirect, stdin=f).returncode == 0 def save_output(cmd: List[str], dest_path: str) -> bool: with open(dest_path, 'wb') as f: cp = subprocess.run(cmd, stdout=f) return cp.returncode == 0 Result = Optional[str] def main(args: List[str]) -> Result: msg = 'Ask the user what to do with the remote file' try: cli_opts, items = parse_args(args[1:], option_text, '', msg, 'kitty remote_file', result_class=RemoteFileCLIOptions) except SystemExit as e: if e.code != 0: print(e.args[0]) input('Press enter to quit...') raise SystemExit(e.code) print(set_cursor_visible(False), end='', flush=True) try: action = ask_action(cli_opts) finally: print(reset_terminal(), end='', flush=True) try: return handle_action(action, cli_opts) except Exception: print(reset_terminal(), end='', flush=True) import traceback traceback.print_exc() show_error('Failed with unhandled exception') def handle_action(action: str, cli_opts: RemoteFileCLIOptions) -> Result: conn_data = SSHConnectionData(*json.loads(cli_opts.ssh_connection_data or '')) remote_path = cli_opts.path or '' if action == 'open': print('Opening', cli_opts.path, 'from', cli_opts.hostname) cmd = simple_copy_command(conn_data, remote_path) dest = os.path.join(tempfile.mkdtemp(), os.path.basename(remote_path)) if save_output(cmd, dest): return dest show_error('Failed to copy file from remote machine') elif action == 'edit': print('Editing', cli_opts.path, 'from', cli_opts.hostname) with ControlMaster(conn_data, remote_path) as master: if not master.download(): show_error(f'Failed to download {remote_path}') return None mtime = os.path.getmtime(master.dest) print(reset_terminal(), end='', flush=True) editor = get_editor() editor_process = subprocess.Popen(editor + [master.dest]) while editor_process.poll() is None: time.sleep(0.1) newmtime = os.path.getmtime(master.dest) if newmtime > mtime: mtime = newmtime if master.is_alive: master.upload() print(reset_terminal(), end='', flush=True) if master.is_alive: if not master.upload(suppress_output=False): show_error(f'Failed to upload {remote_path}') else: show_error(f'Failed to upload {remote_path}, SSH master process died') @result_handler() def handle_result(args: List[str], data: Result, target_window_id: int, boss: BossType) -> None: if data: cmd = command_for_open(boss.opts.open_url_with) open_cmd(cmd, data) if __name__ == '__main__': main(sys.argv)