kitty/kitty/boss.py

263 lines
8.9 KiB
Python

#!/usr/bin/env python
# vim:fileencoding=utf-8
# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
import os
import io
import signal
import select
import subprocess
import struct
from itertools import repeat
from time import monotonic
from threading import Thread, current_thread
from queue import Queue, Empty
import glfw
from pyte.streams import Stream, DebugStream
from pyte import modes as mo
from .char_grid import CharGrid
from .keys import interpret_text_event, interpret_key_event
from .screen import Screen
from .tracker import ChangeTracker
from .utils import resize_pty, create_pty
def handle_unix_signals():
read_fd, write_fd = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, lambda x, y: None)
signal.siginterrupt(sig, False)
signal.set_wakeup_fd(write_fd)
return read_fd
class Boss(Thread):
daemon = True
shutting_down = False
pending_title_change = pending_icon_change = None
pending_color_changes = {}
SCREEN_UPDATE_DELAY = 1 / 100 # seconds
def __init__(self, window, window_width, window_height, opts, args):
Thread.__init__(self, name='ChildMonitor')
self.pending_update_screen = None
self.action_queue = Queue()
self.child_fd = create_pty()[0]
self.read_wakeup_fd, self.write_wakeup_fd = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
self.signal_fd = handle_unix_signals()
self.readers = [self.child_fd, self.signal_fd, self.read_wakeup_fd]
self.writers = [self.child_fd]
self.queue_action(self.initialize)
self.profile = args.profile
self.window, self.opts = window, opts
self.tracker = ChangeTracker(self.mark_dirtied)
self.screen = Screen(self.opts, self.tracker, self)
self.char_grid = CharGrid(self.screen, opts, window_width, window_height)
sclass = DebugStream if args.dump_commands else Stream
self.stream = sclass(self.screen)
self.write_buf = memoryview(b'')
glfw.glfwSetCharModsCallback(window, self.on_text_input)
glfw.glfwSetKeyCallback(window, self.on_key)
glfw.glfwSetMouseButtonCallback(window, self.on_mouse_button)
glfw.glfwSetWindowFocusCallback(window, self.on_focus)
resize_pty(80, 24)
def queue_action(self, func, *args):
self.action_queue.put((func, args))
self.wakeup()
def wakeup(self):
os.write(self.write_wakeup_fd, b'1')
def on_wakeup(self):
try:
os.read(self.read_wakeup_fd, io.DEFAULT_BUFFER_SIZE)
except (EnvironmentError, BlockingIOError):
pass
while not self.shutting_down:
try:
func, args = self.action_queue.get_nowait()
except Empty:
break
func(*args)
def signal_received(self):
try:
data = os.read(self.signal_fd, io.DEFAULT_BUFFER_SIZE)
except BlockingIOError:
return
if data:
signals = struct.unpack('%uB' % len(data), data)
if signal.SIGINT in signals or signal.SIGTERM in signals:
self.shutdown()
def initialize(self):
self.char_grid.initialize()
glfw.glfwPostEmptyEvent()
def on_focus(self, window, focused):
if focused:
if self.screen.enable_focus_tracking:
self.write_to_child(b'\x1b[I')
else:
if self.screen.enable_focus_tracking:
self.write_to_child(b'\x1b[O')
def on_mouse_button(self, window, button, action, mods):
if action == glfw.GLFW_RELEASE:
if button == glfw.GLFW_MOUSE_BUTTON_MIDDLE:
# glfw has no way to get the primary selection
# text = glfw.glfwGetClipboardString(window)
text = subprocess.check_output(['xsel'])
if text:
if self.screen.in_bracketed_paste_mode:
text = mo.BRACKETED_PASTE_START + text + mo.BRACKETED_PASTE_END
self.write_to_child(text)
def on_key(self, window, key, scancode, action, mods):
if action == glfw.GLFW_PRESS or action == glfw.GLFW_REPEAT:
data = interpret_key_event(key, scancode, mods)
if data:
self.write_to_child(data)
def on_text_input(self, window, codepoint, mods):
data = interpret_text_event(codepoint, mods)
if data:
self.write_to_child(data)
def on_window_resize(self, window, w, h):
self.queue_action(self.apply_resize_screen, w, h)
def apply_resize_screen(self, w, h):
self.char_grid.resize_screen(w, h)
sg = self.char_grid.screen_geometry
resize_pty(sg.xnum, sg.ynum)
glfw.glfwPostEmptyEvent()
def apply_opts(self, opts):
self.opts = opts
self.queue_action(self.apply_opts_to_screen)
def apply_opts_to_screen(self):
self.screen.apply_opts(self.opts)
self.char_grid.apply_opts(self.opts)
self.char_grid.dirty_everything()
def render(self):
if self.pending_title_change is not None:
glfw.glfwSetWindowTitle(self.window, self.pending_title_change)
self.pending_title_change = None
if self.pending_icon_change is not None:
self.pending_icon_change = None # TODO: Implement this
self.char_grid.render()
def run(self):
if self.profile:
import cProfile
import pstats
pr = cProfile.Profile()
pr.enable()
self.loop()
if self.profile:
pr.disable()
pr.create_stats()
s = pstats.Stats(pr)
s.dump_stats(self.profile)
def loop(self):
all_readers, all_writers = self.readers, self.writers
dispatch = list(repeat(None, max(all_readers) + 1))
dispatch[self.child_fd] = self.read_ready
dispatch[self.read_wakeup_fd] = self.on_wakeup
dispatch[self.signal_fd] = self.signal_received
while not self.shutting_down:
timeout = None if self.pending_update_screen is None else max(0, self.pending_update_screen - monotonic())
readers, writers, _ = select.select(all_readers, all_writers if self.write_buf else [], [], timeout)
for r in readers:
dispatch[r]()
if writers:
self.write_ready()
if self.pending_update_screen is not None and monotonic() > self.pending_update_screen:
self.apply_update_screen()
def close(self):
if not self.shutting_down:
self.queue_action(self.shutdown)
def destroy(self):
# Must be called in the main thread as it manipulates signal handlers
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
self.char_grid.destroy()
def shutdown(self):
self.shutting_down = True
glfw.glfwSetWindowShouldClose(self.window, True)
glfw.glfwPostEmptyEvent()
def read_ready(self):
if self.shutting_down:
return
try:
data = os.read(self.child_fd, io.DEFAULT_BUFFER_SIZE)
except BlockingIOError:
return
except EnvironmentError:
data = b''
if data:
self.stream.feed(data)
else: # EOF
self.shutdown()
def write_ready(self):
if not self.shutting_down:
while self.write_buf:
try:
n = os.write(self.child_fd, self.write_buf)
except BlockingIOError:
n = 0
if not n:
return
self.write_buf = self.write_buf[n:]
def write_to_child(self, data):
if data:
if current_thread() is self:
self.queue_write(data)
else:
self.queue_action(self.queue_write, data)
def queue_write(self, data):
self.write_buf = memoryview(self.write_buf.tobytes() + data)
def mark_dirtied(self):
# Batch screen updates
if self.pending_update_screen is None:
self.pending_update_screen = monotonic() + self.SCREEN_UPDATE_DELAY
def apply_update_screen(self):
self.pending_update_screen = None
changes = self.tracker.consolidate_changes()
self.char_grid.update_cell_data(changes)
glfw.glfwPostEmptyEvent()
def title_changed(self, new_title):
self.pending_title_change = new_title
glfw.glfwPostEmptyEvent()
def icon_changed(self, new_icon):
self.pending_icon_change = new_icon
glfw.glfwPostEmptyEvent()
def change_default_color(self, which, value):
self.pending_color_changes[which] = value
self.queue_action(self.apply_change_colors)
def apply_change_colors(self):
self.char_grid.change_colors(self.pending_color_changes)
self.pending_color_changes = {}
glfw.glfwPostEmptyEvent()