kitty/kitty/boss.py

832 lines
30 KiB
Python

#!/usr/bin/env python
# vim:fileencoding=utf-8
# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
import atexit
import json
import os
import re
from functools import partial
from gettext import gettext as _
from weakref import WeakValueDictionary
from .cli import create_opts, parse_args
from .conf.utils import to_cmdline
from .config import initial_window_size_func, prepare_config_file_for_editing
from .config_data import MINIMUM_FONT_SIZE
from .constants import (
appname, config_dir, kitty_exe, set_boss, supports_primary_selection
)
from .fast_data_types import (
ChildMonitor, background_opacity_of, change_background_opacity,
create_os_window, current_os_window, destroy_global_data,
get_clipboard_string, glfw_post_empty_event, global_font_size,
mark_os_window_for_close, os_window_font_size, patch_global_colors,
set_clipboard_string, set_in_sequence_mode, toggle_fullscreen
)
from .keys import get_shortcut, shortcut_matches
from .layout import set_draw_minimal_borders
from .remote_control import handle_cmd
from .rgb import Color, color_from_int
from .session import create_session
from .tabs import SpecialWindow, SpecialWindowInstance, TabManager
from .utils import (
get_editor, get_primary_selection, log_error, open_url, parse_address_spec,
remove_socket_file, safe_print, set_primary_selection, single_instance,
startup_notification_handler
)
def listen_on(spec):
import socket
family, address, socket_path = parse_address_spec(spec)
s = socket.socket(family)
atexit.register(remove_socket_file, s, socket_path)
s.bind(address)
s.listen()
return s.fileno()
class DumpCommands: # {{{
def __init__(self, args):
self.draw_dump_buf = []
if args.dump_bytes:
self.dump_bytes_to = open(args.dump_bytes, 'wb')
def __call__(self, *a):
if a:
if a[0] == 'draw':
if a[1] is None:
if self.draw_dump_buf:
safe_print('draw', ''.join(self.draw_dump_buf))
self.draw_dump_buf = []
else:
self.draw_dump_buf.append(a[1])
elif a[0] == 'bytes':
self.dump_bytes_to.write(a[1])
self.dump_bytes_to.flush()
else:
if self.draw_dump_buf:
safe_print('draw', ''.join(self.draw_dump_buf))
self.draw_dump_buf = []
safe_print(*a)
# }}}
class Boss:
def __init__(self, os_window_id, opts, args, cached_values, new_os_window_trigger):
set_draw_minimal_borders(opts)
self.window_id_map = WeakValueDictionary()
self.startup_colors = {k: opts[k] for k in opts if isinstance(opts[k], Color)}
self.pending_sequences = None
self.cached_values = cached_values
self.os_window_map = {}
self.os_window_death_actions = {}
self.cursor_blinking = True
self.shutting_down = False
talk_fd = getattr(single_instance, 'socket', None)
talk_fd = -1 if talk_fd is None else talk_fd.fileno()
listen_fd = -1
if opts.allow_remote_control and args.listen_on:
listen_fd = listen_on(args.listen_on)
self.child_monitor = ChildMonitor(
self.on_child_death,
DumpCommands(args) if args.dump_commands or args.dump_bytes else None,
talk_fd, listen_fd
)
set_boss(self)
self.opts, self.args = opts, args
startup_session = create_session(opts, args, default_session=opts.startup_session)
self.keymap = self.opts.keymap.copy()
if new_os_window_trigger is not None:
self.keymap.pop(new_os_window_trigger, None)
self.add_os_window(startup_session, os_window_id=os_window_id)
def add_os_window(self, startup_session, os_window_id=None, wclass=None, wname=None, opts_for_size=None, startup_id=None):
if os_window_id is None:
opts_for_size = opts_for_size or self.opts
cls = wclass or self.args.cls or appname
with startup_notification_handler(do_notify=startup_id is not None, startup_id=startup_id) as pre_show_callback:
os_window_id = create_os_window(
initial_window_size_func(opts_for_size, self.cached_values),
pre_show_callback,
appname, wname or self.args.name or cls, cls)
tm = TabManager(os_window_id, self.opts, self.args, startup_session)
self.os_window_map[os_window_id] = tm
return os_window_id
def list_os_windows(self):
for os_window_id, tm in self.os_window_map.items():
yield {
'id': os_window_id,
'tabs': list(tm.list_tabs()),
}
@property
def all_tab_managers(self):
yield from self.os_window_map.values()
@property
def all_tabs(self):
for tm in self.all_tab_managers:
yield from tm
@property
def all_windows(self):
for tab in self.all_tabs:
yield from tab
def match_windows(self, match):
try:
field, exp = match.split(':', 1)
except ValueError:
return
if field == 'num':
tab = self.active_tab
if tab is not None:
try:
w = tab.get_nth_window(int(exp))
except Exception:
return
if w is not None:
yield w
return
if field == 'env':
kp, vp = exp.partition('=')[::2]
if vp:
pat = tuple(map(re.compile, (kp, vp)))
else:
pat = re.compile(kp), None
else:
pat = re.compile(exp)
for window in self.all_windows:
if window.matches(field, pat):
yield window
def tab_for_window(self, window):
for tab in self.all_tabs:
for w in tab:
if w.id == window.id:
return tab
def match_tabs(self, match):
try:
field, exp = match.split(':', 1)
except ValueError:
return
pat = re.compile(exp)
found = False
if field in ('title', 'id'):
for tab in self.all_tabs:
if tab.matches(field, pat):
yield tab
found = True
if not found:
tabs = {self.tab_for_window(w) for w in self.match_windows(match)}
for tab in tabs:
if tab:
yield tab
def set_active_window(self, window):
for tm in self.os_window_map.values():
for tab in tm:
for w in tab:
if w.id == window.id:
if tab is not self.active_tab:
tm.set_active_tab(tab)
tab.set_active_window(w)
return
def _new_os_window(self, args, cwd_from=None):
sw = self.args_to_special_window(args, cwd_from) if args else None
startup_session = create_session(self.opts, special_window=sw, cwd_from=cwd_from)
return self.add_os_window(startup_session)
def new_os_window(self, *args):
self._new_os_window(args)
def new_os_window_with_cwd(self, *args):
w = self.active_window
cwd_from = w.child.pid if w is not None else None
self._new_os_window(args, cwd_from)
def add_child(self, window):
self.child_monitor.add_child(window.id, window.child.pid, window.child.child_fd, window.screen)
self.window_id_map[window.id] = window
def _handle_remote_command(self, cmd, window=None):
response = None
if self.opts.allow_remote_control or getattr(window, 'allow_remote_control', False):
try:
response = handle_cmd(self, window, cmd)
except Exception as err:
import traceback
response = {'ok': False, 'error': str(err)}
if not getattr(err, 'hide_traceback', False):
response['tb'] = traceback.format_exc()
else:
response = {'ok': False, 'error': 'Remote control is disabled. Add allow_remote_control yes to your kitty.conf'}
return response
def peer_message_received(self, msg):
msg = msg.decode('utf-8')
cmd_prefix = '\x1bP@kitty-cmd'
if msg.startswith(cmd_prefix):
cmd = msg[len(cmd_prefix):-2]
response = self._handle_remote_command(cmd)
if response is not None:
response = (cmd_prefix + json.dumps(response) + '\x1b\\').encode('utf-8')
return response
else:
msg = json.loads(msg)
if isinstance(msg, dict) and msg.get('cmd') == 'new_instance':
startup_id = msg.get('startup_id')
args, rest = parse_args(msg['args'][1:])
args.args = rest
opts = create_opts(args)
if not os.path.isabs(args.directory):
args.directory = os.path.join(msg['cwd'], args.directory)
session = create_session(opts, args, respect_cwd=True)
os_window_id = self.add_os_window(session, wclass=args.cls, wname=args.name, opts_for_size=opts, startup_id=startup_id)
if msg.get('notify_on_os_window_death'):
self.os_window_death_actions[os_window_id] = partial(self.notify_on_os_window_death, msg['notify_on_os_window_death'])
else:
log_error('Unknown message received from peer, ignoring')
def handle_remote_cmd(self, cmd, window=None):
response = self._handle_remote_command(cmd, window)
if response is not None:
if window is not None:
window.send_cmd_response(response)
def on_child_death(self, window_id):
window = self.window_id_map.pop(window_id, None)
if window is None:
return
if window.action_on_close:
try:
window.action_on_close(window)
except Exception:
import traceback
traceback.print_exc()
os_window_id = window.os_window_id
window.destroy()
tm = self.os_window_map.get(os_window_id)
if tm is None:
return
for tab in tm:
if window in tab:
break
else:
return
tab.remove_window(window)
if len(tab) == 0:
tm.remove(tab)
tab.destroy()
if len(tm) == 0:
if not self.shutting_down:
mark_os_window_for_close(os_window_id)
glfw_post_empty_event()
def close_window(self, window=None):
if window is None:
window = self.active_window
self.child_monitor.mark_for_close(window.id)
def close_tab(self, tab=None):
if tab is None:
tab = self.active_tab
for window in tab:
self.close_window(window)
def toggle_fullscreen(self):
toggle_fullscreen()
def start(self):
if not getattr(self, 'io_thread_started', False):
self.child_monitor.start()
self.io_thread_started = True
def activate_tab_at(self, os_window_id, x):
tm = self.os_window_map.get(os_window_id)
if tm is not None:
tm.activate_tab_at(x)
def on_window_resize(self, os_window_id, w, h, dpi_changed):
if dpi_changed:
self.on_dpi_change(os_window_id)
else:
tm = self.os_window_map.get(os_window_id)
if tm is not None:
tm.resize()
def increase_font_size(self): # legacy
cfs = global_font_size()
self.set_font_size(min(self.opts.font_size * 5, cfs + 2.0))
def decrease_font_size(self): # legacy
cfs = global_font_size()
self.set_font_size(max(MINIMUM_FONT_SIZE, cfs - 2.0))
def restore_font_size(self): # legacy
self.set_font_size(self.opts.font_size)
def set_font_size(self, new_size): # legacy
self.change_font_size(True, None, new_size)
def change_font_size(self, all_windows, increment_operation, amt):
def calc_new_size(old_size):
new_size = old_size
if amt == 0:
new_size = self.opts.font_size
else:
if increment_operation:
new_size += (1 if increment_operation == '+' else -1) * amt
else:
new_size = amt
new_size = max(MINIMUM_FONT_SIZE, min(new_size, self.opts.font_size * 5))
return new_size
if all_windows:
current_global_size = global_font_size()
new_size = calc_new_size(current_global_size)
if new_size != current_global_size:
global_font_size(new_size)
os_windows = tuple(self.os_window_map.keys())
else:
os_windows = []
w = self.active_window
if w is not None:
os_windows.append(w.os_window_id)
if os_windows:
final_windows = {}
for wid in os_windows:
current_size = os_window_font_size(wid)
if current_size:
new_size = calc_new_size(current_size)
if new_size != current_size:
final_windows[wid] = new_size
if final_windows:
self._change_font_size(final_windows)
def _change_font_size(self, sz_map):
for os_window_id, sz in sz_map.items():
tm = self.os_window_map.get(os_window_id)
if tm is not None:
os_window_font_size(os_window_id, sz)
tm.resize()
def on_dpi_change(self, os_window_id):
tm = self.os_window_map.get(os_window_id)
if tm is not None:
sz = os_window_font_size(os_window_id)
if sz:
os_window_font_size(os_window_id, sz, True)
tm.resize()
def _set_os_window_background_opacity(self, os_window_id, opacity):
change_background_opacity(os_window_id, max(0.1, min(opacity, 1.0)))
def set_background_opacity(self, opacity):
window = self.active_window
if window is None or not opacity:
return
if not self.opts.dynamic_background_opacity:
return self.show_error(
_('Cannot change background opacity'),
_('You must set the dynamic_background_opacity option in kitty.conf to be able to change background opacity'))
os_window_id = window.os_window_id
if opacity[0] in '+-':
opacity = background_opacity_of(os_window_id)
if opacity is None:
return
opacity += float(opacity)
elif opacity == 'default':
opacity = self.opts.background_opacity
else:
opacity = float(opacity)
self._set_os_window_background_opacity(os_window_id, opacity)
@property
def active_tab_manager(self):
os_window_id = current_os_window()
return self.os_window_map.get(os_window_id)
@property
def active_tab(self):
tm = self.active_tab_manager
if tm is not None:
return tm.active_tab
@property
def active_window(self):
t = self.active_tab
if t is not None:
return t.active_window
def dispatch_special_key(self, key, scancode, action, mods):
# Handles shortcuts, return True if the key was consumed
key_action = get_shortcut(self.keymap, mods, key, scancode)
if key_action is None:
sequences = get_shortcut(self.opts.sequence_map, mods, key, scancode)
if sequences:
self.pending_sequences = sequences
set_in_sequence_mode(True)
return True
else:
self.current_key_press_info = key, scancode, action, mods
return self.dispatch_action(key_action)
def process_sequence(self, key, scancode, action, mods):
if not self.pending_sequences:
set_in_sequence_mode(False)
remaining = {}
matched_action = None
for seq, key_action in self.pending_sequences.items():
if shortcut_matches(seq[0], mods, key, scancode):
seq = seq[1:]
if seq:
remaining[seq] = key_action
else:
matched_action = key_action
if remaining:
self.pending_sequences = remaining
else:
self.pending_sequences = None
set_in_sequence_mode(False)
if matched_action is not None:
self.dispatch_action(matched_action)
def start_resizing_window(self):
w = self.active_window
if w is None:
return
overlay_window = self._run_kitten('resize_window', args=[
'--horizontal-increment={}'.format(self.opts.window_resize_step_cells),
'--vertical-increment={}'.format(self.opts.window_resize_step_lines)
])
if overlay_window is not None:
overlay_window.allow_remote_control = True
def resize_layout_window(self, window, increment, is_horizontal, reset=False):
tab = window.tabref()
if tab is None or not increment:
return False
if reset:
return tab.reset_window_sizes()
return tab.resize_window_by(window.id, increment, is_horizontal)
def default_bg_changed_for(self, window_id):
w = self.window_id_map.get(window_id)
if w is not None:
tm = self.os_window_map.get(w.os_window_id)
if tm is not None:
t = tm.tab_for_id(w.tab_id)
if t is not None:
t.relayout_borders()
def dispatch_action(self, key_action):
if key_action is not None:
f = getattr(self, key_action.func, None)
if f is not None:
passthrough = f(*key_action.args)
if passthrough is not True:
return True
tab = self.active_tab
if tab is None:
return False
window = self.active_window
if window is None:
return False
if key_action is not None:
f = getattr(tab, key_action.func, getattr(window, key_action.func, None))
if f is not None:
passthrough = f(*key_action.args)
if passthrough is not True:
return True
return False
def combine(self, *actions):
for key_action in actions:
self.dispatch_action(key_action)
def on_focus(self, os_window_id, focused):
tm = self.os_window_map.get(os_window_id)
if tm is not None:
w = tm.active_window
if w is not None:
w.focus_changed(focused)
tm.mark_tab_bar_dirty()
def update_tab_bar_data(self, os_window_id):
tm = self.os_window_map.get(os_window_id)
if tm is not None:
tm.update_tab_bar_data()
def on_drop(self, os_window_id, paths):
tm = self.os_window_map.get(os_window_id)
if tm is not None:
w = tm.active_window
if w is not None:
w.paste('\n'.join(paths))
def on_os_window_closed(self, os_window_id, viewport_width, viewport_height):
self.cached_values['window-size'] = viewport_width, viewport_height
tm = self.os_window_map.pop(os_window_id, None)
if tm is not None:
tm.destroy()
for window_id in tuple(w.id for w in self.window_id_map.values() if getattr(w, 'os_window_id', None) == os_window_id):
self.window_id_map.pop(window_id, None)
action = self.os_window_death_actions.pop(os_window_id, None)
if action is not None:
action()
def notify_on_os_window_death(self, address):
import socket
s = socket.socket(family=socket.AF_UNIX)
try:
s.connect(address)
s.sendall(b'c')
try:
s.shutdown(socket.SHUT_RDWR)
except EnvironmentError:
pass
s.close()
except Exception:
pass
def display_scrollback(self, window, data, cmd):
tab = self.active_tab
if tab is not None and window.overlay_for is None:
tab.new_special_window(
SpecialWindow(
cmd, data, _('History'), overlay_for=window.id))
def edit_config_file(self, *a):
confpath = prepare_config_file_for_editing()
# On macOS vim fails to handle SIGWINCH if it occurs early, so add a
# small delay.
cmd = [kitty_exe(), '+runpy', 'import os, sys, time; time.sleep(0.05); os.execvp(sys.argv[1], sys.argv[1:])'] + get_editor() + [confpath]
self.new_os_window(*cmd)
def get_output(self, source_window, num_lines=1):
output = ''
s = source_window.screen
if num_lines is None:
num_lines = s.lines
for i in range(min(num_lines, s.lines)):
output += str(s.linebuf.line(i))
return output
def _run_kitten(self, kitten, args=(), input_data=None):
w = self.active_window
tab = self.active_tab
if w is not None and tab is not None and w.overlay_for is None:
orig_args, args = list(args), list(args)
from kittens.runner import create_kitten_handler
end_kitten = create_kitten_handler(kitten, orig_args)
args[0:0] = [config_dir, kitten]
if input_data is None:
type_of_input = end_kitten.type_of_input
if type_of_input in ('text', 'history', 'ansi', 'ansi-history', 'screen', 'screen-history', 'screen-ansi', 'screen-ansi-history'):
data = w.as_text(
as_ansi='ansi' in type_of_input,
add_history='history' in type_of_input,
add_wrap_markers='screen' in type_of_input
).encode('utf-8')
elif type_of_input is None:
data = None
else:
raise ValueError('Unknown type_of_input: {}'.format(type_of_input))
else:
data = input_data
if isinstance(data, str):
data = data.encode('utf-8')
copts = {k: self.opts[k] for k in ('select_by_word_characters', 'open_url_with')}
overlay_window = tab.new_special_window(
SpecialWindow(
[kitty_exe(), '+runpy', 'from kittens.runner import main; main()'] + args,
stdin=data,
env={
'KITTY_COMMON_OPTS': json.dumps(copts),
'KITTY_CHILD_PID': w.child.pid,
'PYTHONWARNINGS': 'ignore',
'OVERLAID_WINDOW_LINES': str(w.screen.lines),
'OVERLAID_WINDOW_COLS': str(w.screen.columns),
},
cwd=w.cwd_of_child,
overlay_for=w.id
))
overlay_window.action_on_close = partial(self.on_kitten_finish, w.id, end_kitten)
return overlay_window
def kitten(self, kitten, *args):
import shlex
cmdline = args[0] if args else ''
args = shlex.split(cmdline) if cmdline else []
self._run_kitten(kitten, args)
def on_kitten_finish(self, target_window_id, end_kitten, source_window):
output = self.get_output(source_window, num_lines=None)
from kittens.runner import deserialize
data = deserialize(output)
if data is not None:
end_kitten(data, target_window_id, self)
def input_unicode_character(self):
self._run_kitten('unicode_input')
def set_tab_title(self):
tab = self.active_tab
if tab:
args = ['--name=tab-title', '--message', _('Enter the new title for this tab below.'), 'do_set_tab_title', str(tab.id)]
self._run_kitten('ask', args)
def show_error(self, title, msg):
self._run_kitten('show_error', ['--title', title], input_data=msg)
def do_set_tab_title(self, title, tab_id):
tm = self.active_tab_manager
if tm is not None and title:
tab_id = int(tab_id)
for tab in tm.tabs:
if tab.id == tab_id:
tab.set_title(title)
break
def kitty_shell(self, window_type):
cmd = [kitty_exe(), '@']
if window_type == 'tab':
window = self._new_tab(cmd).active_window
elif window_type == 'os_window':
os_window_id = self._new_os_window(cmd)
window = self.os_window_map[os_window_id].active_window
elif window_type == 'overlay':
w = self.active_window
tab = self.active_tab
if w is not None and tab is not None and w.overlay_for is None:
window = tab.new_special_window(SpecialWindow(cmd, overlay_for=w.id))
else:
window = None
else:
window = self._new_window(cmd)
if window is not None:
window.allow_remote_control = True
def switch_focus_to(self, window_idx):
tab = self.active_tab
tab.set_active_window_idx(window_idx)
def open_url(self, url, program=None, cwd=None):
if url:
if isinstance(program, str):
program = to_cmdline(program)
open_url(url, program or self.opts.open_url_with, cwd=cwd)
def open_url_lines(self, lines, program=None):
self.open_url(''.join(lines), program)
def destroy(self):
self.shutting_down = True
self.child_monitor.shutdown_monitor()
del self.child_monitor
for tm in self.os_window_map.values():
tm.destroy()
self.os_window_map = {}
destroy_global_data()
def paste_to_active_window(self, text):
if text:
w = self.active_window
if w is not None:
w.paste(text)
def paste_from_clipboard(self):
text = get_clipboard_string()
self.paste_to_active_window(text)
def paste_from_selection(self):
text = get_primary_selection() if supports_primary_selection else get_clipboard_string()
self.paste_to_active_window(text)
def set_primary_selection(self):
w = self.active_window
if w is not None and not w.destroyed:
text = w.text_for_selection()
if text:
set_primary_selection(text)
if self.opts.copy_on_select:
set_clipboard_string(text)
def goto_tab(self, tab_num):
tm = self.active_tab_manager
if tm is not None:
tm.goto_tab(tab_num - 1)
def set_active_tab(self, tab):
tm = self.active_tab_manager
if tm is not None:
tm.set_active_tab(tab)
def next_tab(self):
tm = self.active_tab_manager
if tm is not None:
tm.next_tab()
def previous_tab(self):
tm = self.active_tab_manager
if tm is not None:
tm.next_tab(-1)
def args_to_special_window(self, args, cwd_from=None):
args = list(args)
stdin = None
w = self.active_window
def data_for_at(arg):
if arg == '@selection':
return w.text_for_selection()
if arg == '@ansi':
return w.as_text(as_ansi=True, add_history=True)
if arg == '@text':
return w.as_text(add_history=True)
if arg == '@screen':
return w.as_text()
if arg == '@ansi_screen':
return w.as_text(as_ansi=True)
if args[0].startswith('@'):
stdin = data_for_at(args[0]) or None
if stdin is not None:
stdin = stdin.encode('utf-8')
del args[0]
cmd = []
for arg in args:
if arg == '@selection':
arg = data_for_at(arg)
if not arg:
continue
cmd.append(arg)
return SpecialWindow(cmd, stdin, cwd_from=cwd_from)
def _new_tab(self, args, cwd_from=None):
special_window = None
if args:
if isinstance(args, SpecialWindowInstance):
special_window = args
else:
special_window = self.args_to_special_window(args, cwd_from=cwd_from)
tm = self.active_tab_manager
if tm is not None:
return tm.new_tab(special_window=special_window, cwd_from=cwd_from)
def new_tab(self, *args):
self._new_tab(args)
def new_tab_with_cwd(self, *args):
w = self.active_window
cwd_from = w.child.pid if w is not None else None
self._new_tab(args, cwd_from=cwd_from)
def _new_window(self, args, cwd_from=None):
tab = self.active_tab
if tab is not None:
if args:
return tab.new_special_window(self.args_to_special_window(args, cwd_from=cwd_from))
else:
return tab.new_window(cwd_from=cwd_from)
def new_window(self, *args):
self._new_window(args)
def new_window_with_cwd(self, *args):
w = self.active_window
if w is None:
return self.new_window(*args)
cwd_from = w.child.pid if w is not None else None
self._new_window(args, cwd_from=cwd_from)
def move_tab_forward(self):
tm = self.active_tab_manager
if tm is not None:
tm.move_tab(1)
def move_tab_backward(self):
tm = self.active_tab_manager
if tm is not None:
tm.move_tab(-1)
def patch_colors(self, spec, configured=False):
if configured:
for k, v in spec.items():
if hasattr(self.opts, k):
setattr(self.opts, k, color_from_int(v))
for tm in self.all_tab_managers:
tm.tab_bar.patch_colors(spec)
patch_global_colors(spec, configured)