329 lines
10 KiB
Python
329 lines
10 KiB
Python
#!/usr/bin/env python
|
|
# vim:fileencoding=utf-8
|
|
# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
|
|
|
|
import codecs
|
|
import fcntl
|
|
import io
|
|
import os
|
|
import re
|
|
import selectors
|
|
import signal
|
|
import sys
|
|
import termios
|
|
import tty
|
|
from collections import namedtuple
|
|
from contextlib import closing, contextmanager
|
|
from functools import partial
|
|
|
|
from kitty.fast_data_types import parse_input_from_terminal
|
|
from kitty.icat import screen_size
|
|
from kitty.key_encoding import (
|
|
ALT, CTRL, PRESS, RELEASE, REPEAT, SHIFT, C, D, backspace_key,
|
|
decode_key_event, enter_key
|
|
)
|
|
|
|
from .handler import Handler
|
|
from .operations import init_state, reset_state
|
|
|
|
|
|
@contextmanager
|
|
def non_block(fd):
|
|
oldfl = fcntl.fcntl(fd, fcntl.F_GETFL)
|
|
fcntl.fcntl(fd, fcntl.F_SETFL, oldfl | os.O_NONBLOCK)
|
|
yield
|
|
fcntl.fcntl(fd, fcntl.F_SETFL, oldfl)
|
|
|
|
|
|
@contextmanager
|
|
def raw_terminal(fd):
|
|
isatty = os.isatty(fd)
|
|
if isatty:
|
|
old = termios.tcgetattr(fd)
|
|
tty.setraw(fd)
|
|
yield
|
|
if isatty:
|
|
termios.tcsetattr(fd, termios.TCSADRAIN, old)
|
|
|
|
|
|
def write_all(fd, data):
|
|
while data:
|
|
n = os.write(fd, data)
|
|
if not n:
|
|
break
|
|
data = data[n:]
|
|
|
|
|
|
@contextmanager
|
|
def sanitize_term(output_fd):
|
|
write_all(output_fd, init_state())
|
|
yield
|
|
write_all(output_fd, reset_state())
|
|
|
|
|
|
LEFT, MIDDLE, RIGHT, FOURTH, FIFTH = 1, 2, 4, 8, 16
|
|
DRAG = REPEAT
|
|
MouseEvent = namedtuple('MouseEvent', 'x y type buttons mods')
|
|
bmap = {0: LEFT, 1: MIDDLE, 2: RIGHT}
|
|
MOTION_INDICATOR = 1 << 5
|
|
EXTRA_BUTTON_INDICATOR = 1 << 6
|
|
SHIFT_INDICATOR = 1 << 2
|
|
ALT_INDICATOR = 1 << 3
|
|
CTRL_INDICATOR = 1 << 4
|
|
|
|
|
|
def decode_sgr_mouse(text):
|
|
cb, x, y = text.split(';')
|
|
m, y = y[-1], y[:-1]
|
|
cb, x, y = map(int, (cb, x, y))
|
|
typ = RELEASE if m == 'm' else (DRAG if cb & MOTION_INDICATOR else PRESS)
|
|
buttons = 0
|
|
cb3 = cb & 3
|
|
if cb3 != 3:
|
|
if cb & EXTRA_BUTTON_INDICATOR:
|
|
buttons |= FIFTH if cb3 & 1 else FOURTH
|
|
else:
|
|
buttons |= bmap[cb3]
|
|
mods = 0
|
|
if cb & SHIFT_INDICATOR:
|
|
mods |= SHIFT
|
|
if cb & ALT_INDICATOR:
|
|
mods |= ALT
|
|
if cb & CTRL_INDICATOR:
|
|
mods |= CTRL
|
|
return MouseEvent(x, y, typ, buttons, mods)
|
|
|
|
|
|
class UnhandledException(Handler):
|
|
|
|
def __init__(self, tb):
|
|
self.tb = tb
|
|
|
|
def initialize(self, screen_size, quit_loop, wakeup):
|
|
Handler.initialize(self, screen_size, quit_loop, wakeup)
|
|
self.write(self.tb)
|
|
self.write('\n')
|
|
self.write('Press the Enter key to quit')
|
|
|
|
def on_key(self, key_event):
|
|
if key_event is enter_key:
|
|
self.quit_loop(1)
|
|
|
|
def on_interrupt(self):
|
|
self.quit_loop(1)
|
|
|
|
def on_eot(self):
|
|
self.quit_loop(1)
|
|
|
|
|
|
class Loop:
|
|
|
|
def __init__(self, input_fd=None, output_fd=None,
|
|
sanitize_bracketed_paste='[\x03\x04\x0e\x0f\r\x07\x7f\x8d\x8e\x8f\x90\x9b\x9d\x9e\x9f]'):
|
|
self.input_fd = input_fd or sys.stdin.fileno()
|
|
self.output_fd = output_fd or sys.stdout.fileno()
|
|
self.wakeup_read_fd, self.wakeup_write_fd = os.pipe()
|
|
self.sel = s = selectors.DefaultSelector()
|
|
s.register(self.input_fd, selectors.EVENT_READ, self._read_ready)
|
|
s.register(
|
|
self.wakeup_read_fd, selectors.EVENT_READ, self._wakeup_ready
|
|
)
|
|
s.register(self.output_fd, selectors.EVENT_WRITE, self._write_ready)
|
|
self.return_code = 0
|
|
self.read_allowed = True
|
|
self.read_buf = ''
|
|
self.decoder = codecs.IncrementalDecoder(errors='ignore')
|
|
try:
|
|
self.iov_limit = os.sysconf('SC_IOV_MAX') - 1
|
|
except Exception:
|
|
self.iov_limit = 255
|
|
self.parse_input_from_terminal = partial(parse_input_from_terminal, self._on_text, self._on_dcs, self._on_csi, self._on_osc, self._on_pm, self._on_apc)
|
|
self.ebs_pat = re.compile('([\177\r\x03\x04])')
|
|
self.in_bracketed_paste = False
|
|
self.sanitize_bracketed_paste = bool(sanitize_bracketed_paste)
|
|
if self.sanitize_bracketed_paste:
|
|
self.sanitize_ibp_pat = re.compile(sanitize_bracketed_paste)
|
|
|
|
def _read_ready(self, handler):
|
|
if not self.read_allowed:
|
|
return
|
|
data = os.read(self.input_fd, io.DEFAULT_BUFFER_SIZE)
|
|
if not data:
|
|
raise EOFError('The input stream is closed')
|
|
data = self.decoder.decode(data)
|
|
if self.read_buf:
|
|
data = self.read_buf + data
|
|
self.read_buf = data
|
|
self.handler = handler
|
|
try:
|
|
self.read_buf = self.parse_input_from_terminal(self.read_buf, self.in_bracketed_paste)
|
|
finally:
|
|
del self.handler
|
|
|
|
def _on_text(self, text):
|
|
if self.in_bracketed_paste and self.sanitize_bracketed_paste:
|
|
text = self.sanitize_ibp_pat.sub('', text)
|
|
|
|
for chunk in self.ebs_pat.split(text):
|
|
if len(chunk) == 1:
|
|
if chunk == '\r':
|
|
self.handler.on_key(enter_key)
|
|
elif chunk == '\177':
|
|
self.handler.on_key(backspace_key)
|
|
elif chunk == '\x03':
|
|
self.handler.on_interrupt()
|
|
elif chunk == '\x04':
|
|
self.handler.on_eot()
|
|
else:
|
|
self.handler.on_text(chunk, self.in_bracketed_paste)
|
|
else:
|
|
self.handler.on_text(chunk, self.in_bracketed_paste)
|
|
|
|
def _on_dcs(self, dcs):
|
|
pass
|
|
|
|
def _on_csi(self, csi):
|
|
q = csi[-1]
|
|
if q in 'mM':
|
|
if csi.startswith('<'):
|
|
# SGR mouse event
|
|
try:
|
|
ev = decode_sgr_mouse(csi[1:])
|
|
except Exception:
|
|
pass
|
|
else:
|
|
self.handler.on_mouse(ev)
|
|
elif q == '~':
|
|
if csi == '200~':
|
|
self.in_bracketed_paste = True
|
|
elif csi == '201~':
|
|
self.in_bracketed_paste = False
|
|
|
|
def _on_pm(self, pm):
|
|
pass
|
|
|
|
def _on_osc(self, osc):
|
|
pass
|
|
|
|
def _on_apc(self, apc):
|
|
if apc.startswith('K'):
|
|
try:
|
|
k = decode_key_event(apc)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
if k.mods is CTRL and k.type is not RELEASE:
|
|
if k.key is C:
|
|
self.handler.on_interrupt()
|
|
return
|
|
if k.key is D:
|
|
self.handler.on_eot()
|
|
return
|
|
self.handler.on_key(k)
|
|
|
|
def _write_ready(self, handler):
|
|
if len(handler.write_buf) > self.iov_limit:
|
|
handler.write_buf[self.iov_limit - 1] = b''.join(handler.write_buf[self.iov_limit - 1:])
|
|
del handler.write_buf[self.iov_limit:]
|
|
sizes = tuple(map(len, handler.write_buf))
|
|
written = os.writev(self.output_fd, handler.write_buf)
|
|
if not written:
|
|
raise EOFError('The output stream is closed')
|
|
if written >= sum(sizes):
|
|
handler.write_buf = []
|
|
else:
|
|
consumed = 0
|
|
for i, buf in enumerate(handler.write_buf):
|
|
if not written:
|
|
break
|
|
if len(buf) <= written:
|
|
written -= len(buf)
|
|
consumed += 1
|
|
continue
|
|
handler.write_buf[i] = buf[written:]
|
|
break
|
|
del handler.write_buf[:consumed]
|
|
|
|
def _wakeup_ready(self, handler):
|
|
data = os.read(self.wakeup_read_fd)
|
|
if b'r' in data:
|
|
screen_size.changed = True
|
|
handler.on_resize(screen_size())
|
|
if b't' in data:
|
|
handler.on_term()
|
|
if b'i' in data:
|
|
handler.on_interrupt()
|
|
|
|
def _wakeup_write(self, val):
|
|
while not os.write(self.wakeup_write_fd, val):
|
|
pass
|
|
|
|
def _on_sigwinch(self, signum, frame):
|
|
self._wakeup_write(b'r')
|
|
|
|
def _on_sigterm(self, signum, frame):
|
|
self._wakeup_write(b't')
|
|
|
|
def _on_sigint(self, signum, frame):
|
|
self._wakeup_write(b'i')
|
|
|
|
def quit(self, return_code=None):
|
|
self.read_allowed = False
|
|
if return_code is not None:
|
|
self.return_code = return_code
|
|
|
|
def wakeup(self):
|
|
self._wakeup_write(b'1')
|
|
|
|
def _modify_output_selector(self, waiting_for_write):
|
|
if waiting_for_write:
|
|
self.sel.register(self.output_fd, selectors.EVENT_WRITE)
|
|
else:
|
|
self.sel.unregister(self.output_fd)
|
|
|
|
def loop(self, handler):
|
|
select = self.sel.select
|
|
tb = None
|
|
waiting_for_write = True
|
|
with closing(self.sel), sanitize_term(self.output_fd), non_block(self.input_fd), non_block(self.output_fd), raw_terminal(self.input_fd):
|
|
signal.signal(signal.SIGWINCH, self._on_sigwinch)
|
|
signal.signal(signal.SIGTERM, self._on_sigterm)
|
|
signal.signal(signal.SIGINT, self._on_sigint)
|
|
handler.write_buf = []
|
|
handler.initialize(screen_size(), self.quit, self.wakeup)
|
|
while True:
|
|
has_data_to_write = bool(handler.write_buf)
|
|
if not has_data_to_write and not self.read_allowed:
|
|
break
|
|
if has_data_to_write != waiting_for_write:
|
|
waiting_for_write = has_data_to_write
|
|
self._modify_output_selector(waiting_for_write)
|
|
events = select()
|
|
for key, mask in events:
|
|
try:
|
|
key.data(handler)
|
|
except Exception:
|
|
import traceback
|
|
tb = traceback.format_exc()
|
|
self.return_code = 1
|
|
break
|
|
if tb is not None:
|
|
self._report_error_loop(tb)
|
|
|
|
def _report_error_loop(self, tb):
|
|
select = self.sel.select
|
|
waiting_for_write = False
|
|
handler = UnhandledException(tb)
|
|
handler.write_buf = []
|
|
handler.initialize(screen_size(), self.quit, self.wakeup)
|
|
while True:
|
|
has_data_to_write = bool(handler.write_buf)
|
|
if not has_data_to_write and not self.read_allowed:
|
|
break
|
|
if has_data_to_write != waiting_for_write:
|
|
waiting_for_write = has_data_to_write
|
|
self._modify_output_selector(waiting_for_write)
|
|
events = select()
|
|
for key, mask in events:
|
|
key.data(handler)
|