Make the socket prewarm test a bit more robust
This commit is contained in:
parent
991fbacb99
commit
8998970adc
@ -11,7 +11,7 @@ import tempfile
|
|||||||
import time
|
import time
|
||||||
from contextlib import suppress
|
from contextlib import suppress
|
||||||
|
|
||||||
from kitty.constants import kitty_exe
|
from kitty.constants import kitty_exe, terminfo_dir
|
||||||
from kitty.fast_data_types import (
|
from kitty.fast_data_types import (
|
||||||
CLD_EXITED, CLD_KILLED, get_options, has_sigqueue, install_signal_handlers,
|
CLD_EXITED, CLD_KILLED, get_options, has_sigqueue, install_signal_handlers,
|
||||||
read_signals, remove_signal_handlers, sigqueue
|
read_signals, remove_signal_handlers, sigqueue
|
||||||
@ -28,17 +28,13 @@ class Prewarm(BaseTest):
|
|||||||
from kitty.prewarm import fork_prewarm_process, wait_for_child_death
|
from kitty.prewarm import fork_prewarm_process, wait_for_child_death
|
||||||
exit_code = 17
|
exit_code = 17
|
||||||
src = '''\
|
src = '''\
|
||||||
def socket_child_main(exit_code=0, for_signal=False):
|
def socket_child_main(exit_code=0, initial_print=''):
|
||||||
if for_signal:
|
import os, sys, json
|
||||||
print('child ready')
|
|
||||||
sys.stdin.read()
|
|
||||||
raise SystemExit(exit_code)
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
|
|
||||||
from kitty.fast_data_types import get_options
|
from kitty.fast_data_types import get_options
|
||||||
from kitty.utils import read_screen_size
|
from kitty.utils import read_screen_size
|
||||||
|
if initial_print:
|
||||||
|
print(initial_print, flush=True, file=sys.stderr)
|
||||||
|
|
||||||
output = {
|
output = {
|
||||||
'test_env': os.environ.get('TEST_ENV_PASS', ''),
|
'test_env': os.environ.get('TEST_ENV_PASS', ''),
|
||||||
'cwd': os.path.realpath(os.getcwd()),
|
'cwd': os.path.realpath(os.getcwd()),
|
||||||
@ -59,8 +55,24 @@ def socket_child_main(exit_code=0, for_signal=False):
|
|||||||
if p is None:
|
if p is None:
|
||||||
return
|
return
|
||||||
env = os.environ.copy()
|
env = os.environ.copy()
|
||||||
env.update({'TEST_ENV_PASS': 'xyz', 'KITTY_PREWARM_SOCKET': p.socket_env_var()})
|
env.update({'TEST_ENV_PASS': 'xyz', 'KITTY_PREWARM_SOCKET': p.socket_env_var(), 'TERM': 'xterm-kitty', 'TERMINFO': terminfo_dir})
|
||||||
cols = 117
|
cols = 117
|
||||||
|
|
||||||
|
def wait_for_death(exit_code):
|
||||||
|
status = wait_for_child_death(pty.child_pid, timeout=5)
|
||||||
|
if status is None:
|
||||||
|
os.kill(pty.child_pid, signal.SIGKILL)
|
||||||
|
self.assertIsNotNone(status, 'prewarm wrapper process did not exit')
|
||||||
|
with suppress(AttributeError):
|
||||||
|
self.assertEqual(os.waitstatus_to_exitcode(status), exit_code, pty.screen_contents())
|
||||||
|
|
||||||
|
pty = self.create_pty(
|
||||||
|
argv=[kitty_exe(), '+runpy', src + 'socket_child_main(initial_print="child ready:")'], cols=cols, env=env, cwd=cwd)
|
||||||
|
pty.wait_till(lambda: 'child ready:' in pty.screen_contents())
|
||||||
|
os.kill(pty.child_pid, signal.SIGINT)
|
||||||
|
wait_for_death(128 + signal.SIGINT)
|
||||||
|
pty.wait_till(lambda: 'KeyboardInterrupt' in pty.screen_contents())
|
||||||
|
|
||||||
stdin_r, stdin_w = os.pipe()
|
stdin_r, stdin_w = os.pipe()
|
||||||
os.set_inheritable(stdin_w, False)
|
os.set_inheritable(stdin_w, False)
|
||||||
stdout_r, stdout_w = os.pipe()
|
stdout_r, stdout_w = os.pipe()
|
||||||
@ -72,36 +84,17 @@ def socket_child_main(exit_code=0, for_signal=False):
|
|||||||
with open(stdin_w, 'w') as f:
|
with open(stdin_w, 'w') as f:
|
||||||
f.write(stdin_data)
|
f.write(stdin_data)
|
||||||
pty.wait_till(lambda: 'hello' in pty.screen_contents())
|
pty.wait_till(lambda: 'hello' in pty.screen_contents())
|
||||||
with open(stdout_r) as f:
|
wait_for_death(exit_code)
|
||||||
stdout_data = f.read()
|
|
||||||
status = wait_for_child_death(pty.child_pid, timeout=5)
|
|
||||||
if status is None:
|
|
||||||
os.kill(pty.child_pid, signal.SIGKILL)
|
|
||||||
self.assertIsNotNone(status, 'prewarm wrapper process did not exit')
|
|
||||||
with suppress(AttributeError):
|
|
||||||
self.assertEqual(os.waitstatus_to_exitcode(status), exit_code)
|
|
||||||
output = json.loads(pty.screen_contents().strip())
|
output = json.loads(pty.screen_contents().strip())
|
||||||
self.assertEqual(output['test_env'], env['TEST_ENV_PASS'])
|
self.assertEqual(output['test_env'], env['TEST_ENV_PASS'])
|
||||||
self.assertEqual(output['cwd'], cwd)
|
self.assertEqual(output['cwd'], cwd)
|
||||||
self.assertEqual(output['font_family'], 'prewarm')
|
self.assertEqual(output['font_family'], 'prewarm')
|
||||||
self.assertEqual(output['cols'], cols)
|
self.assertEqual(output['cols'], cols)
|
||||||
self.assertEqual(output['stdin_data'], stdin_data)
|
self.assertEqual(output['stdin_data'], stdin_data)
|
||||||
|
with open(stdout_r) as f:
|
||||||
|
stdout_data = f.read()
|
||||||
self.assertEqual(stdout_data, 'testing stdout')
|
self.assertEqual(stdout_data, 'testing stdout')
|
||||||
|
|
||||||
stdin_r, stdin_w = os.pipe()
|
|
||||||
os.set_inheritable(stdin_w, False)
|
|
||||||
pty = self.create_pty(
|
|
||||||
argv=[kitty_exe(), '+runpy', src + 'socket_child_main(for_signal=True)'], cols=cols, env=env, cwd=cwd, stdin_fd=stdin_r)
|
|
||||||
pty.wait_till(lambda: 'child ready' in pty.screen_contents())
|
|
||||||
os.kill(pty.child_pid, signal.SIGINT)
|
|
||||||
pty.wait_till(lambda: 'Traceback' in pty.screen_contents())
|
|
||||||
status = wait_for_child_death(pty.child_pid, timeout=5)
|
|
||||||
if status is None:
|
|
||||||
os.kill(pty.child_pid, signal.SIGKILL)
|
|
||||||
self.assertIsNotNone(status, 'prewarm wrapper process did not exit')
|
|
||||||
with suppress(AttributeError):
|
|
||||||
self.assertEqual(os.waitstatus_to_exitcode(status), 128 + signal.SIGINT)
|
|
||||||
|
|
||||||
def test_prewarming(self):
|
def test_prewarming(self):
|
||||||
from kitty.prewarm import fork_prewarm_process
|
from kitty.prewarm import fork_prewarm_process
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user