Framework for testing with external programs via a PTY

This commit is contained in:
Kovid Goyal 2022-02-21 14:08:10 +05:30
parent a559210923
commit 63f974531b
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
3 changed files with 113 additions and 9 deletions

View File

@ -9,7 +9,7 @@ import os
import re
import string
import sys
from contextlib import suppress
from contextlib import contextmanager, suppress
from functools import lru_cache
from time import monotonic
from typing import (
@ -501,6 +501,21 @@ class TTYIO:
break
@contextmanager
def no_echo(fd: int = -1) -> Generator[None, None, None]:
import termios
if fd < 0:
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
new = termios.tcgetattr(fd)
new[3] = new[3] & ~termios.ECHO
try:
termios.tcsetattr(fd, termios.TCSADRAIN, new)
yield
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
def natsort_ints(iterable: Iterable[str]) -> List[str]:
def convert(text: str) -> Union[int, str]:

View File

@ -1,23 +1,31 @@
#!/usr/bin/env python3
# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
import fcntl
import io
import os
import select
import shlex
import struct
import sys
import termios
from pty import CHILD, fork
from unittest import TestCase
from kitty.config import finalize_keys, finalize_mouse_mappings
from kitty.fast_data_types import (
Cursor, HistoryBuf, LineBuf, Screen, set_options
Cursor, HistoryBuf, LineBuf, Screen, get_options, parse_bytes, set_options
)
from kitty.options.parse import merge_result_dicts
from kitty.options.types import Options, defaults
from kitty.types import MouseEvent
from kitty.utils import no_echo, write_all
class Callbacks:
def __init__(self, opts) -> None:
def __init__(self) -> None:
self.clear()
self.opts = opts
def write(self, data) -> None:
self.wtcbuf += data
@ -68,11 +76,12 @@ class Callbacks:
def on_mouse_event(self, event):
ev = MouseEvent(**event)
action_def = self.opts.mousemap.get(ev)
opts = get_options()
action_def = opts.mousemap.get(ev)
if not action_def:
return False
self.current_mouse_button = ev.button
for action in self.opts.alias_map.resolve_aliases(action_def, 'mouse_map'):
for action in opts.alias_map.resolve_aliases(action_def, 'mouse_map'):
getattr(self, action.func)(*action.args)
self.current_mouse_button = 0
return True
@ -121,11 +130,15 @@ class BaseTest(TestCase):
return options
def create_screen(self, cols=5, lines=5, scrollback=5, cell_width=10, cell_height=20, options=None):
opts = self.set_options(options)
c = Callbacks(opts)
self.set_options(options)
c = Callbacks()
s = Screen(c, lines, cols, scrollback, cell_width, cell_height, 0, c)
return s
def create_pty(self, argv, cols=80, lines=25, scrollback=100, cell_width=10, cell_height=20, options=None, cwd=None):
self.set_options(options)
return PTY(argv, lines, cols, scrollback, cell_width, cell_height, cwd)
def assertEqualAttributes(self, c1, c2):
x1, y1, c1.x, c1.y = c1.x, c1.y, 0, 0
x2, y2, c2.x, c2.y = c2.x, c2.y, 0, 0
@ -133,3 +146,63 @@ class BaseTest(TestCase):
self.assertEqual(c1, c2)
finally:
c1.x, c1.y, c2.x, c2.y = x1, y1, x2, y2
class PTY:
def __init__(self, argv, rows=25, columns=80, scrollback=100, cell_width=10, cell_height=20, cwd=None):
pid, self.master_fd = fork()
self.is_child = pid == CHILD
if self.is_child:
if cwd:
os.chdir(cwd)
if isinstance(argv, str):
argv = shlex.split(argv)
with no_echo():
sys.stdin.readline()
os.execlp(argv[0], *argv)
os.set_blocking(self.master_fd, False)
self.set_window_size(rows=rows, columns=columns)
new = termios.tcgetattr(self.master_fd)
new[3] = new[3] & ~termios.ECHO
termios.tcsetattr(self.master_fd, termios.TCSADRAIN, new)
self.write_to_child('ready\r\n')
self.callbacks = Callbacks()
self.screen = Screen(self.callbacks, rows, columns, scrollback, cell_width, cell_height, 0, self.callbacks)
def __del__(self):
if not self.is_child:
os.close(self.master_fd)
del self.master_fd
def write_to_child(self, data):
write_all(self.master_fd, data)
def wait_for_input_from_child(self, timeout=2):
rd = select.select([self.master_fd], [], [], timeout)[0]
return bool(rd)
def process_input_from_child(self):
bytes_read = 0
while True:
try:
data = os.read(self.master_fd, io.DEFAULT_BUFFER_SIZE)
except (BlockingIOError, OSError):
data = b''
if not data:
break
bytes_read += len(data)
parse_bytes(self.screen, data)
return bytes_read
def set_window_size(self, rows=25, columns=80, x_pixels=0, y_pixels=0):
s = struct.pack('HHHH', rows, columns, x_pixels, y_pixels)
fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, s)
def screen_contents(self):
lines = []
for i in range(self.screen.lines):
x = str(self.screen.line(i))
if x:
lines.append(x)
return '\n'.join(lines)

View File

@ -3,14 +3,30 @@
import os
from . import BaseTest
import sys
from kittens.ssh.main import get_connection_data
from kitty.utils import SSHConnectionData
from . import BaseTest
class SSHTest(BaseTest):
def test_basic_pty_operations(self):
pty = self.create_pty('echo hello')
self.assertTrue(pty.wait_for_input_from_child())
pty.process_input_from_child()
self.ae(pty.screen_contents(), 'hello')
pty = self.create_pty([sys.executable, '-c', '''\
import array, fcntl, sys, termios
buf = array.array('H', [0, 0, 0, 0])
fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
print(' '.join(map(str, buf)))'''], lines=13, cols=17)
self.assertTrue(pty.wait_for_input_from_child())
pty.process_input_from_child()
self.ae(pty.screen_contents(), '13 17 0 0')
def test_ssh_connection_data(self):
def t(cmdline, binary='ssh', host='main', port=None, identity_file=''):
if identity_file: