diff --git a/kitty_tests/prewarm.py b/kitty_tests/prewarm.py index 9c2b8b000..07364fb52 100644 --- a/kitty_tests/prewarm.py +++ b/kitty_tests/prewarm.py @@ -11,7 +11,7 @@ import tempfile import time from contextlib import suppress -from kitty.constants import kitty_exe +from kitty.constants import kitty_exe, terminfo_dir from kitty.fast_data_types import ( CLD_EXITED, CLD_KILLED, get_options, has_sigqueue, install_signal_handlers, read_signals, remove_signal_handlers, sigqueue @@ -28,17 +28,13 @@ class Prewarm(BaseTest): from kitty.prewarm import fork_prewarm_process, wait_for_child_death exit_code = 17 src = '''\ -def socket_child_main(exit_code=0, for_signal=False): - if for_signal: - print('child ready') - sys.stdin.read() - raise SystemExit(exit_code) - import json - import os - import sys - +def socket_child_main(exit_code=0, initial_print=''): + import os, sys, json from kitty.fast_data_types import get_options from kitty.utils import read_screen_size + if initial_print: + print(initial_print, flush=True, file=sys.stderr) + output = { 'test_env': os.environ.get('TEST_ENV_PASS', ''), 'cwd': os.path.realpath(os.getcwd()), @@ -59,8 +55,24 @@ def socket_child_main(exit_code=0, for_signal=False): if p is None: return env = os.environ.copy() - env.update({'TEST_ENV_PASS': 'xyz', 'KITTY_PREWARM_SOCKET': p.socket_env_var()}) + env.update({'TEST_ENV_PASS': 'xyz', 'KITTY_PREWARM_SOCKET': p.socket_env_var(), 'TERM': 'xterm-kitty', 'TERMINFO': terminfo_dir}) cols = 117 + + def wait_for_death(exit_code): + status = wait_for_child_death(pty.child_pid, timeout=5) + if status is None: + os.kill(pty.child_pid, signal.SIGKILL) + self.assertIsNotNone(status, 'prewarm wrapper process did not exit') + with suppress(AttributeError): + self.assertEqual(os.waitstatus_to_exitcode(status), exit_code, pty.screen_contents()) + + pty = self.create_pty( + argv=[kitty_exe(), '+runpy', src + 'socket_child_main(initial_print="child ready:")'], cols=cols, env=env, cwd=cwd) + pty.wait_till(lambda: 'child ready:' in pty.screen_contents()) + os.kill(pty.child_pid, signal.SIGINT) + wait_for_death(128 + signal.SIGINT) + pty.wait_till(lambda: 'KeyboardInterrupt' in pty.screen_contents()) + stdin_r, stdin_w = os.pipe() os.set_inheritable(stdin_w, False) stdout_r, stdout_w = os.pipe() @@ -72,36 +84,17 @@ def socket_child_main(exit_code=0, for_signal=False): with open(stdin_w, 'w') as f: f.write(stdin_data) pty.wait_till(lambda: 'hello' in pty.screen_contents()) - with open(stdout_r) as f: - stdout_data = f.read() - status = wait_for_child_death(pty.child_pid, timeout=5) - if status is None: - os.kill(pty.child_pid, signal.SIGKILL) - self.assertIsNotNone(status, 'prewarm wrapper process did not exit') - with suppress(AttributeError): - self.assertEqual(os.waitstatus_to_exitcode(status), exit_code) + wait_for_death(exit_code) output = json.loads(pty.screen_contents().strip()) self.assertEqual(output['test_env'], env['TEST_ENV_PASS']) self.assertEqual(output['cwd'], cwd) self.assertEqual(output['font_family'], 'prewarm') self.assertEqual(output['cols'], cols) self.assertEqual(output['stdin_data'], stdin_data) + with open(stdout_r) as f: + stdout_data = f.read() self.assertEqual(stdout_data, 'testing stdout') - stdin_r, stdin_w = os.pipe() - os.set_inheritable(stdin_w, False) - pty = self.create_pty( - argv=[kitty_exe(), '+runpy', src + 'socket_child_main(for_signal=True)'], cols=cols, env=env, cwd=cwd, stdin_fd=stdin_r) - pty.wait_till(lambda: 'child ready' in pty.screen_contents()) - os.kill(pty.child_pid, signal.SIGINT) - pty.wait_till(lambda: 'Traceback' in pty.screen_contents()) - status = wait_for_child_death(pty.child_pid, timeout=5) - if status is None: - os.kill(pty.child_pid, signal.SIGKILL) - self.assertIsNotNone(status, 'prewarm wrapper process did not exit') - with suppress(AttributeError): - self.assertEqual(os.waitstatus_to_exitcode(status), 128 + signal.SIGINT) - def test_prewarming(self): from kitty.prewarm import fork_prewarm_process