diff --git a/kitty/utils.py b/kitty/utils.py index 9d6500360..78649162b 100644 --- a/kitty/utils.py +++ b/kitty/utils.py @@ -9,7 +9,7 @@ import os import re import string import sys -from contextlib import suppress +from contextlib import contextmanager, suppress from functools import lru_cache from time import monotonic from typing import ( @@ -501,6 +501,21 @@ class TTYIO: break +@contextmanager +def no_echo(fd: int = -1) -> Generator[None, None, None]: + import termios + if fd < 0: + fd = sys.stdin.fileno() + old = termios.tcgetattr(fd) + new = termios.tcgetattr(fd) + new[3] = new[3] & ~termios.ECHO + try: + termios.tcsetattr(fd, termios.TCSADRAIN, new) + yield + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old) + + def natsort_ints(iterable: Iterable[str]) -> List[str]: def convert(text: str) -> Union[int, str]: diff --git a/kitty_tests/__init__.py b/kitty_tests/__init__.py index 2caf0cd56..c85b8023d 100644 --- a/kitty_tests/__init__.py +++ b/kitty_tests/__init__.py @@ -1,23 +1,31 @@ #!/usr/bin/env python3 # License: GPL v3 Copyright: 2016, Kovid Goyal +import fcntl +import io import os +import select +import shlex +import struct +import sys +import termios +from pty import CHILD, fork from unittest import TestCase from kitty.config import finalize_keys, finalize_mouse_mappings from kitty.fast_data_types import ( - Cursor, HistoryBuf, LineBuf, Screen, set_options + Cursor, HistoryBuf, LineBuf, Screen, get_options, parse_bytes, set_options ) from kitty.options.parse import merge_result_dicts from kitty.options.types import Options, defaults from kitty.types import MouseEvent +from kitty.utils import no_echo, write_all class Callbacks: - def __init__(self, opts) -> None: + def __init__(self) -> None: self.clear() - self.opts = opts def write(self, data) -> None: self.wtcbuf += data @@ -68,11 +76,12 @@ class Callbacks: def on_mouse_event(self, event): ev = MouseEvent(**event) - action_def = self.opts.mousemap.get(ev) + opts = get_options() + action_def = opts.mousemap.get(ev) if not action_def: return False self.current_mouse_button = ev.button - for action in self.opts.alias_map.resolve_aliases(action_def, 'mouse_map'): + for action in opts.alias_map.resolve_aliases(action_def, 'mouse_map'): getattr(self, action.func)(*action.args) self.current_mouse_button = 0 return True @@ -121,11 +130,15 @@ class BaseTest(TestCase): return options def create_screen(self, cols=5, lines=5, scrollback=5, cell_width=10, cell_height=20, options=None): - opts = self.set_options(options) - c = Callbacks(opts) + self.set_options(options) + c = Callbacks() s = Screen(c, lines, cols, scrollback, cell_width, cell_height, 0, c) return s + def create_pty(self, argv, cols=80, lines=25, scrollback=100, cell_width=10, cell_height=20, options=None, cwd=None): + self.set_options(options) + return PTY(argv, lines, cols, scrollback, cell_width, cell_height, cwd) + def assertEqualAttributes(self, c1, c2): x1, y1, c1.x, c1.y = c1.x, c1.y, 0, 0 x2, y2, c2.x, c2.y = c2.x, c2.y, 0, 0 @@ -133,3 +146,63 @@ class BaseTest(TestCase): self.assertEqual(c1, c2) finally: c1.x, c1.y, c2.x, c2.y = x1, y1, x2, y2 + + +class PTY: + + def __init__(self, argv, rows=25, columns=80, scrollback=100, cell_width=10, cell_height=20, cwd=None): + pid, self.master_fd = fork() + self.is_child = pid == CHILD + if self.is_child: + if cwd: + os.chdir(cwd) + if isinstance(argv, str): + argv = shlex.split(argv) + with no_echo(): + sys.stdin.readline() + os.execlp(argv[0], *argv) + os.set_blocking(self.master_fd, False) + self.set_window_size(rows=rows, columns=columns) + new = termios.tcgetattr(self.master_fd) + new[3] = new[3] & ~termios.ECHO + termios.tcsetattr(self.master_fd, termios.TCSADRAIN, new) + self.write_to_child('ready\r\n') + self.callbacks = Callbacks() + self.screen = Screen(self.callbacks, rows, columns, scrollback, cell_width, cell_height, 0, self.callbacks) + + def __del__(self): + if not self.is_child: + os.close(self.master_fd) + del self.master_fd + + def write_to_child(self, data): + write_all(self.master_fd, data) + + def wait_for_input_from_child(self, timeout=2): + rd = select.select([self.master_fd], [], [], timeout)[0] + return bool(rd) + + def process_input_from_child(self): + bytes_read = 0 + while True: + try: + data = os.read(self.master_fd, io.DEFAULT_BUFFER_SIZE) + except (BlockingIOError, OSError): + data = b'' + if not data: + break + bytes_read += len(data) + parse_bytes(self.screen, data) + return bytes_read + + def set_window_size(self, rows=25, columns=80, x_pixels=0, y_pixels=0): + s = struct.pack('HHHH', rows, columns, x_pixels, y_pixels) + fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, s) + + def screen_contents(self): + lines = [] + for i in range(self.screen.lines): + x = str(self.screen.line(i)) + if x: + lines.append(x) + return '\n'.join(lines) diff --git a/kitty_tests/ssh.py b/kitty_tests/ssh.py index 703b3b48f..04a49f460 100644 --- a/kitty_tests/ssh.py +++ b/kitty_tests/ssh.py @@ -3,14 +3,30 @@ import os -from . import BaseTest +import sys from kittens.ssh.main import get_connection_data from kitty.utils import SSHConnectionData +from . import BaseTest + class SSHTest(BaseTest): + def test_basic_pty_operations(self): + pty = self.create_pty('echo hello') + self.assertTrue(pty.wait_for_input_from_child()) + pty.process_input_from_child() + self.ae(pty.screen_contents(), 'hello') + pty = self.create_pty([sys.executable, '-c', '''\ +import array, fcntl, sys, termios +buf = array.array('H', [0, 0, 0, 0]) +fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf) +print(' '.join(map(str, buf)))'''], lines=13, cols=17) + self.assertTrue(pty.wait_for_input_from_child()) + pty.process_input_from_child() + self.ae(pty.screen_contents(), '13 17 0 0') + def test_ssh_connection_data(self): def t(cmdline, binary='ssh', host='main', port=None, identity_file=''): if identity_file: