Use sub tests for the various prewarm things
This commit is contained in:
parent
cb5157f2d1
commit
cb0d23bae8
@ -78,54 +78,53 @@ def socket_child_main(exit_code=0, initial_print=''):
|
|||||||
with suppress(AttributeError):
|
with suppress(AttributeError):
|
||||||
self.assertEqual(os.waitstatus_to_exitcode(status), exit_code, pty.screen_contents())
|
self.assertEqual(os.waitstatus_to_exitcode(status), exit_code, pty.screen_contents())
|
||||||
|
|
||||||
# signal delivery tests are pretty flakey on CI so give up on them
|
if not self.is_ci: # signal delivery tests are pretty flakey on CI so give up on them
|
||||||
|
with self.subTest(msg='test SIGINT via signal to wrapper'):
|
||||||
|
pty = self.create_pty(
|
||||||
|
argv=[kitty_exe(), '+runpy', src + 'socket_child_main(initial_print="child ready:")'], cols=cols, env=env, cwd=cwd)
|
||||||
|
pty.wait_till(lambda: 'child ready:' in pty.screen_contents())
|
||||||
|
os.kill(pty.child_pid, signal.SIGINT)
|
||||||
|
pty.wait_till(lambda: 'KeyboardInterrupt' in pty.screen_contents())
|
||||||
|
wait_for_death(signal.SIGINT)
|
||||||
|
|
||||||
# # test SIGINT via signal to wrapper
|
with self.subTest(msg='test SIGINT via Ctrl-c'):
|
||||||
# pty = self.create_pty(
|
pty = self.create_pty(
|
||||||
# argv=[kitty_exe(), '+runpy', src + 'socket_child_main(initial_print="child ready:")'], cols=cols, env=env, cwd=cwd)
|
argv=[kitty_exe(), '+runpy', src + 'socket_child_main(initial_print="child ready:")'], cols=cols, env=env, cwd=cwd)
|
||||||
# pty.wait_till(lambda: 'child ready:' in pty.screen_contents())
|
pty.wait_till(lambda: 'child ready:' in pty.screen_contents())
|
||||||
# os.kill(pty.child_pid, signal.SIGINT)
|
pty.write_to_child('\x03', flush=True)
|
||||||
# pty.wait_till(lambda: 'KeyboardInterrupt' in pty.screen_contents())
|
pty.wait_till(lambda: 'KeyboardInterrupt' in pty.screen_contents())
|
||||||
# wait_for_death(signal.SIGINT)
|
wait_for_death(signal.SIGINT)
|
||||||
#
|
|
||||||
# # test SIGINT via Ctrl-c
|
|
||||||
# pty = self.create_pty(
|
|
||||||
# argv=[kitty_exe(), '+runpy', src + 'socket_child_main(initial_print="child ready:")'], cols=cols, env=env, cwd=cwd)
|
|
||||||
# pty.wait_till(lambda: 'child ready:' in pty.screen_contents())
|
|
||||||
# pty.write_to_child('\x03', flush=True)
|
|
||||||
# pty.wait_till(lambda: 'KeyboardInterrupt' in pty.screen_contents())
|
|
||||||
# wait_for_death(signal.SIGINT)
|
|
||||||
|
|
||||||
# test SIGWINCH handling
|
with self.subTest(msg='test SIGWINCH handling'):
|
||||||
pty = self.create_pty(
|
pty = self.create_pty(
|
||||||
argv=[kitty_exe(), '+runpy', src + 'socket_child_main(initial_print="child ready:")'], cols=cols, env=env, cwd=cwd)
|
argv=[kitty_exe(), '+runpy', src + 'socket_child_main(initial_print="child ready:")'], cols=cols, env=env, cwd=cwd)
|
||||||
pty.wait_till(lambda: 'child ready:' in pty.screen_contents())
|
pty.wait_till(lambda: 'child ready:' in pty.screen_contents())
|
||||||
pty.set_window_size(columns=cols + 3)
|
pty.set_window_size(columns=cols + 3)
|
||||||
pty.wait_till(lambda: f'Screen size changed: {cols + 3}' in pty.screen_contents())
|
pty.wait_till(lambda: f'Screen size changed: {cols + 3}' in pty.screen_contents())
|
||||||
os.close(pty.master_fd)
|
os.close(pty.master_fd)
|
||||||
|
|
||||||
# test passing of data via cwd, env vars and stdin/stdout redirection
|
with self.subTest(msg='test passing of data via cwd, env vars and stdin/stdout redirection'):
|
||||||
stdin_r, stdin_w = os.pipe()
|
stdin_r, stdin_w = os.pipe()
|
||||||
os.set_inheritable(stdin_w, False)
|
os.set_inheritable(stdin_w, False)
|
||||||
stdout_r, stdout_w = os.pipe()
|
stdout_r, stdout_w = os.pipe()
|
||||||
os.set_inheritable(stdout_r, False)
|
os.set_inheritable(stdout_r, False)
|
||||||
pty = self.create_pty(
|
pty = self.create_pty(
|
||||||
argv=[kitty_exe(), '+runpy', src + f'socket_child_main({exit_code})'], cols=cols, env=env, cwd=cwd,
|
argv=[kitty_exe(), '+runpy', src + f'socket_child_main({exit_code})'], cols=cols, env=env, cwd=cwd,
|
||||||
stdin_fd=stdin_r, stdout_fd=stdout_w)
|
stdin_fd=stdin_r, stdout_fd=stdout_w)
|
||||||
stdin_data = 'testing--stdin-read'
|
stdin_data = 'testing--stdin-read'
|
||||||
with open(stdin_w, 'w') as f:
|
with open(stdin_w, 'w') as f:
|
||||||
f.write(stdin_data)
|
f.write(stdin_data)
|
||||||
pty.wait_till(lambda: 'hello' in pty.screen_contents())
|
pty.wait_till(lambda: 'hello' in pty.screen_contents())
|
||||||
wait_for_death(exit_code)
|
wait_for_death(exit_code)
|
||||||
output = json.loads(pty.screen_contents().strip())
|
output = json.loads(pty.screen_contents().strip())
|
||||||
self.assertEqual(output['test_env'], env['TEST_ENV_PASS'])
|
self.assertEqual(output['test_env'], env['TEST_ENV_PASS'])
|
||||||
self.assertEqual(output['cwd'], cwd)
|
self.assertEqual(output['cwd'], cwd)
|
||||||
self.assertEqual(output['font_family'], 'prewarm')
|
self.assertEqual(output['font_family'], 'prewarm')
|
||||||
self.assertEqual(output['cols'], cols)
|
self.assertEqual(output['cols'], cols)
|
||||||
self.assertEqual(output['stdin_data'], stdin_data)
|
self.assertEqual(output['stdin_data'], stdin_data)
|
||||||
with open(stdout_r) as f:
|
with open(stdout_r) as f:
|
||||||
stdout_data = f.read()
|
stdout_data = f.read()
|
||||||
self.assertEqual(stdout_data, 'testing stdout')
|
self.assertEqual(stdout_data, 'testing stdout')
|
||||||
|
|
||||||
def test_prewarming(self):
|
def test_prewarming(self):
|
||||||
from kitty.prewarm import fork_prewarm_process
|
from kitty.prewarm import fork_prewarm_process
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user