Finish up tui.Loop

This commit is contained in:
Kovid Goyal 2018-02-07 13:30:34 +05:30
parent 168bc58635
commit e3b4919c17
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 96 additions and 14 deletions

30
kittens/tui/handler.py Normal file
View File

@ -0,0 +1,30 @@
#!/usr/bin/env python
# vim:fileencoding=utf-8
# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
class Handler:
def initialize(self, screen_size, quit_loop, wakeup):
self.screen_size, self.quit_loop = screen_size, quit_loop
self.wakeup = wakeup
def on_resize(self, screen_size):
self.screen_size = screen_size
def on_term(self):
self.quit_loop(1)
def on_text(self, text, in_bracketed_paste=False):
pass
def on_key(self, key_event):
pass
def on_mouse(self, mouse_event):
pass
def write(self, data):
if isinstance(data, str):
data = data.encode('utf-8')
self.write_buf.append(data)

View File

@ -8,6 +8,7 @@ import io
import os import os
import re import re
import selectors import selectors
import signal
import sys import sys
import termios import termios
import tty import tty
@ -16,10 +17,12 @@ from contextlib import closing, contextmanager
from functools import partial from functools import partial
from kitty.fast_data_types import parse_input_from_terminal from kitty.fast_data_types import parse_input_from_terminal
from kitty.icat import screen_size
from kitty.key_encoding import ( from kitty.key_encoding import (
ALT, CTRL, SHIFT, backspace_key, decode_key_event, enter_key ALT, CTRL, SHIFT, backspace_key, decode_key_event, enter_key
) )
from .handler import Handler
from .operations import init_state, reset_state from .operations import init_state, reset_state
@ -90,6 +93,22 @@ def decode_sgr_mouse(text):
return MouseEvent(x, y, typ, buttons, mods) return MouseEvent(x, y, typ, buttons, mods)
class UnhandledException(Handler):
def __init__(self, tb):
self.tb = tb
def initialize(self, screen_size, quit_loop, wakeup):
Handler.initialize(self, screen_size, quit_loop, wakeup)
self.write(self.tb)
self.write('\n')
self.write('Press the Enter key to quit')
def on_key(self, key_event):
if key_event is enter_key:
self.quit_loop(1)
class Loop: class Loop:
def __init__(self, input_fd=None, output_fd=None, sanitize_bracketed_paste='[\x0e\x0f\r\x07\x7f\x8d\x8e\x8f\x90\x9b\x9d\x9e\x9f]'): def __init__(self, input_fd=None, output_fd=None, sanitize_bracketed_paste='[\x0e\x0f\r\x07\x7f\x8d\x8e\x8f\x90\x9b\x9d\x9e\x9f]'):
@ -110,7 +129,7 @@ class Loop:
self.iov_limit = os.sysconf('SC_IOV_MAX') - 1 self.iov_limit = os.sysconf('SC_IOV_MAX') - 1
except Exception: except Exception:
self.iov_limit = 255 self.iov_limit = 255
self.parse_input_from_terminal = partial(parse_input_from_terminal, self.on_text, self.on_dcs, self.on_csi, self.on_osc, self.on_pm, self.on_apc) self.parse_input_from_terminal = partial(parse_input_from_terminal, self._on_text, self._on_dcs, self._on_csi, self.on_osc, self._on_pm, self._on_apc)
self.ebs_pat = re.compile('([\177\r])') self.ebs_pat = re.compile('([\177\r])')
self.in_bracketed_paste = False self.in_bracketed_paste = False
self.sanitize_bracketed_paste = bool(sanitize_bracketed_paste) self.sanitize_bracketed_paste = bool(sanitize_bracketed_paste)
@ -133,7 +152,7 @@ class Loop:
finally: finally:
del self.handler del self.handler
def on_text(self, text): def _on_text(self, text):
if self.in_bracketed_paste and self.sanitize_bracketed_paste: if self.in_bracketed_paste and self.sanitize_bracketed_paste:
text = self.sanitize_ibp_pat.sub('', text) text = self.sanitize_ibp_pat.sub('', text)
@ -145,10 +164,10 @@ class Loop:
else: else:
self.handler.on_text(chunk, self.in_bracketed_paste) self.handler.on_text(chunk, self.in_bracketed_paste)
def on_dcs(self, dcs): def _on_dcs(self, dcs):
pass pass
def on_csi(self, csi): def _on_csi(self, csi):
q = csi[-1] q = csi[-1]
if q in 'mM': if q in 'mM':
if csi.startswith('<'): if csi.startswith('<'):
@ -165,10 +184,10 @@ class Loop:
elif csi == '201~': elif csi == '201~':
self.in_bracketed_paste = False self.in_bracketed_paste = False
def on_pm(self, pm): def _on_pm(self, pm):
pass pass
def on_apc(self, apc): def _on_apc(self, apc):
if apc.startswith('K'): if apc.startswith('K'):
try: try:
k = decode_key_event(apc) k = decode_key_event(apc)
@ -200,25 +219,41 @@ class Loop:
break break
del handler.write_buf[:consumed] del handler.write_buf[:consumed]
def _on_unhandled_exception(self, tb):
pass
def _wakeup_ready(self, handler): def _wakeup_ready(self, handler):
os.read(self.wakeup_read_fd) data = os.read(self.wakeup_read_fd)
if b'r' in data:
screen_size.changed = True
handler.on_resize(screen_size())
if b't' in data:
handler.on_term()
def wakeup(self): def _wakeup_write(self, val):
os.write(self.wakeup_write_fd, b'1') while not os.write(self.wakeup_write_fd, val):
pass
def _on_sigwinch(self, signum, frame):
self._wakeup_write(b'r')
def _on_sigterm(self, signum, frame):
self._wakeup_write(b't')
def quit(self, return_code=None): def quit(self, return_code=None):
self.read_allowed = False self.read_allowed = False
if return_code is not None: if return_code is not None:
self.return_code = return_code self.return_code = return_code
def _loop(self, handler): def wakeup(self):
self._wakeup_write(b'1')
def loop(self, handler):
select = self.sel.select select = self.sel.select
tb = None tb = None
waiting_for_write = True waiting_for_write = True
with closing(self.sel), sanitize_term(self.output_fd), non_block(self.input_fd), non_block(self.output_fd), raw_terminal(self.input_fd): with closing(self.sel), sanitize_term(self.output_fd), non_block(self.input_fd), non_block(self.output_fd), raw_terminal(self.input_fd):
signal.signal(signal.SIGWINCH, self._on_sigwinch)
signal.signal(signal.SIGTERM, self._on_sigterm)
handler.write_buf = []
handler.initialize(screen_size(), self.quit, self.wakeup)
while True: while True:
has_data_to_write = bool(handler.write_buf) has_data_to_write = bool(handler.write_buf)
if not has_data_to_write and not self.read_allowed: if not has_data_to_write and not self.read_allowed:
@ -242,4 +277,21 @@ class Loop:
self._report_error_loop(tb) self._report_error_loop(tb)
def _report_error_loop(self, tb): def _report_error_loop(self, tb):
raise NotImplementedError('TODO: Implement') select = self.sel.select
waiting_for_write = False
handler = UnhandledException(tb)
handler.write_buf = []
handler.initialize(screen_size(), self.quit, self.wakeup)
while True:
has_data_to_write = bool(handler.write_buf)
if not has_data_to_write and not self.read_allowed:
break
if has_data_to_write != waiting_for_write:
waiting_for_write = has_data_to_write
self.sel.modify(
self.output_fd, selectors.EVENT_WRITE
if waiting_for_write else 0, self._write_ready
)
events = select()
for key, mask in events:
key.data(handler)