more typing work
This commit is contained in:
parent
eb12511646
commit
ea48332f46
331
kitty/boss.py
331
kitty/boss.py
@ -18,9 +18,9 @@ from weakref import WeakValueDictionary
|
||||
from .child import cached_process_data, cwd_of_process
|
||||
from .cli import create_opts, parse_args
|
||||
from .cli_stub import CLIOptions
|
||||
from .conf.utils import to_cmdline
|
||||
from .conf.utils import BadLine, to_cmdline
|
||||
from .config import (
|
||||
SubSequenceMap, common_opts_as_dict, initial_window_size_func,
|
||||
KeyAction, SubSequenceMap, common_opts_as_dict, initial_window_size_func,
|
||||
prepare_config_file_for_editing
|
||||
)
|
||||
from .config_data import MINIMUM_FONT_SIZE
|
||||
@ -298,7 +298,7 @@ class Boss:
|
||||
|
||||
def _new_os_window(self, args: Union[SpecialWindowInstance, Iterable[str]], cwd_from: Optional[int] = None) -> int:
|
||||
if isinstance(args, SpecialWindowInstance):
|
||||
sw = args
|
||||
sw: Optional[SpecialWindowInstance] = args
|
||||
else:
|
||||
sw = self.args_to_special_window(args, cwd_from) if args else None
|
||||
startup_session = next(create_sessions(self.opts, special_window=sw, cwd_from=cwd_from))
|
||||
@ -344,8 +344,8 @@ class Boss:
|
||||
response = {'ok': False, 'error': 'Remote control is disabled. Add allow_remote_control to your kitty.conf'}
|
||||
return response
|
||||
|
||||
def peer_message_received(self, msg):
|
||||
msg = msg.decode('utf-8')
|
||||
def peer_message_received(self, msg_bytes: bytes) -> Optional[bytes]:
|
||||
msg = msg_bytes.decode('utf-8')
|
||||
cmd_prefix = '\x1bP@kitty-cmd'
|
||||
if msg.startswith(cmd_prefix):
|
||||
cmd = msg[len(cmd_prefix):-2]
|
||||
@ -353,30 +353,30 @@ class Boss:
|
||||
if response is None:
|
||||
return None
|
||||
return (cmd_prefix + json.dumps(response) + '\x1b\\').encode('utf-8')
|
||||
data = json.loads(msg)
|
||||
if isinstance(data, dict) and data.get('cmd') == 'new_instance':
|
||||
from .cli_stub import CLIOptions
|
||||
startup_id = data.get('startup_id')
|
||||
args, rest = parse_args(data['args'][1:], result_class=CLIOptions)
|
||||
args.args = rest
|
||||
opts = create_opts(args)
|
||||
if not os.path.isabs(args.directory):
|
||||
args.directory = os.path.join(data['cwd'], args.directory)
|
||||
for session in create_sessions(opts, args, respect_cwd=True):
|
||||
os_window_id = self.add_os_window(session, wclass=args.cls, wname=args.name, opts_for_size=opts, startup_id=startup_id)
|
||||
if data.get('notify_on_os_window_death'):
|
||||
self.os_window_death_actions[os_window_id] = partial(self.notify_on_os_window_death, data['notify_on_os_window_death'])
|
||||
else:
|
||||
msg = json.loads(msg)
|
||||
if isinstance(msg, dict) and msg.get('cmd') == 'new_instance':
|
||||
from .cli_stub import CLIOptions
|
||||
startup_id = msg.get('startup_id')
|
||||
args, rest = parse_args(msg['args'][1:], result_class=CLIOptions)
|
||||
args.args = rest
|
||||
opts = create_opts(args)
|
||||
if not os.path.isabs(args.directory):
|
||||
args.directory = os.path.join(msg['cwd'], args.directory)
|
||||
for session in create_sessions(opts, args, respect_cwd=True):
|
||||
os_window_id = self.add_os_window(session, wclass=args.cls, wname=args.name, opts_for_size=opts, startup_id=startup_id)
|
||||
if msg.get('notify_on_os_window_death'):
|
||||
self.os_window_death_actions[os_window_id] = partial(self.notify_on_os_window_death, msg['notify_on_os_window_death'])
|
||||
else:
|
||||
log_error('Unknown message received from peer, ignoring')
|
||||
log_error('Unknown message received from peer, ignoring')
|
||||
return None
|
||||
|
||||
def handle_remote_cmd(self, cmd, window=None):
|
||||
def handle_remote_cmd(self, cmd: str, window: Optional[Window] = None) -> None:
|
||||
response = self._handle_remote_command(cmd, window)
|
||||
if response is not None:
|
||||
if window is not None:
|
||||
window.send_cmd_response(response)
|
||||
|
||||
def _cleanup_tab_after_window_removal(self, src_tab):
|
||||
def _cleanup_tab_after_window_removal(self, src_tab: Tab) -> None:
|
||||
if len(src_tab) < 1:
|
||||
tm = src_tab.tab_manager_ref()
|
||||
if tm is not None:
|
||||
@ -386,7 +386,7 @@ class Boss:
|
||||
if not self.shutting_down:
|
||||
mark_os_window_for_close(src_tab.os_window_id)
|
||||
|
||||
def on_child_death(self, window_id):
|
||||
def on_child_death(self, window_id: int) -> None:
|
||||
window = self.window_id_map.pop(window_id, None)
|
||||
if window is None:
|
||||
return
|
||||
@ -416,24 +416,24 @@ class Boss:
|
||||
traceback.print_exc()
|
||||
window.action_on_close = window.action_on_removal = None
|
||||
|
||||
def close_window(self, window=None):
|
||||
if window is None:
|
||||
window = self.active_window
|
||||
self.child_monitor.mark_for_close(window.id)
|
||||
def close_window(self, window: Optional[Window] = None) -> None:
|
||||
window = window or self.active_window
|
||||
if window:
|
||||
self.child_monitor.mark_for_close(window.id)
|
||||
|
||||
def close_tab(self, tab=None):
|
||||
if tab is None:
|
||||
tab = self.active_tab
|
||||
for window in tab:
|
||||
self.close_window(window)
|
||||
def close_tab(self, tab: Optional[Tab] = None) -> None:
|
||||
tab = tab or self.active_tab
|
||||
if tab:
|
||||
for window in tab:
|
||||
self.close_window(window)
|
||||
|
||||
def toggle_fullscreen(self):
|
||||
def toggle_fullscreen(self) -> None:
|
||||
toggle_fullscreen()
|
||||
|
||||
def toggle_maximized(self):
|
||||
def toggle_maximized(self) -> None:
|
||||
toggle_maximized()
|
||||
|
||||
def start(self):
|
||||
def start(self) -> None:
|
||||
if not getattr(self, 'io_thread_started', False):
|
||||
self.child_monitor.start()
|
||||
self.io_thread_started = True
|
||||
@ -442,12 +442,12 @@ class Boss:
|
||||
run_update_check(self.opts.update_check_interval * 60 * 60)
|
||||
self.update_check_started = True
|
||||
|
||||
def activate_tab_at(self, os_window_id, x):
|
||||
def activate_tab_at(self, os_window_id: int, x: int) -> int:
|
||||
tm = self.os_window_map.get(os_window_id)
|
||||
if tm is not None:
|
||||
tm.activate_tab_at(x)
|
||||
|
||||
def on_window_resize(self, os_window_id, w, h, dpi_changed):
|
||||
def on_window_resize(self, os_window_id: int, w: int, h: int, dpi_changed: bool) -> None:
|
||||
if dpi_changed:
|
||||
self.on_dpi_change(os_window_id)
|
||||
else:
|
||||
@ -455,7 +455,7 @@ class Boss:
|
||||
if tm is not None:
|
||||
tm.resize()
|
||||
|
||||
def clear_terminal(self, action, only_active):
|
||||
def clear_terminal(self, action: str, only_active: bool) -> None:
|
||||
if only_active:
|
||||
windows = []
|
||||
w = self.active_window
|
||||
@ -475,22 +475,22 @@ class Boss:
|
||||
else:
|
||||
w.screen.erase_in_display(how, False)
|
||||
|
||||
def increase_font_size(self): # legacy
|
||||
def increase_font_size(self) -> None: # legacy
|
||||
cfs = global_font_size()
|
||||
self.set_font_size(min(self.opts.font_size * 5, cfs + 2.0))
|
||||
|
||||
def decrease_font_size(self): # legacy
|
||||
def decrease_font_size(self) -> None: # legacy
|
||||
cfs = global_font_size()
|
||||
self.set_font_size(max(MINIMUM_FONT_SIZE, cfs - 2.0))
|
||||
|
||||
def restore_font_size(self): # legacy
|
||||
def restore_font_size(self) -> None: # legacy
|
||||
self.set_font_size(self.opts.font_size)
|
||||
|
||||
def set_font_size(self, new_size): # legacy
|
||||
def set_font_size(self, new_size: float) -> None: # legacy
|
||||
self.change_font_size(True, None, new_size)
|
||||
|
||||
def change_font_size(self, all_windows, increment_operation, amt):
|
||||
def calc_new_size(old_size):
|
||||
def change_font_size(self, all_windows: bool, increment_operation: Optional[str], amt: float) -> None:
|
||||
def calc_new_size(old_size: float) -> float:
|
||||
new_size = old_size
|
||||
if amt == 0:
|
||||
new_size = self.opts.font_size
|
||||
@ -524,14 +524,14 @@ class Boss:
|
||||
if final_windows:
|
||||
self._change_font_size(final_windows)
|
||||
|
||||
def _change_font_size(self, sz_map):
|
||||
def _change_font_size(self, sz_map: Dict[int, float]) -> None:
|
||||
for os_window_id, sz in sz_map.items():
|
||||
tm = self.os_window_map.get(os_window_id)
|
||||
if tm is not None:
|
||||
os_window_font_size(os_window_id, sz)
|
||||
tm.resize()
|
||||
|
||||
def on_dpi_change(self, os_window_id):
|
||||
def on_dpi_change(self, os_window_id: int) -> None:
|
||||
tm = self.os_window_map.get(os_window_id)
|
||||
if tm is not None:
|
||||
sz = os_window_font_size(os_window_id)
|
||||
@ -540,28 +540,29 @@ class Boss:
|
||||
tm.update_dpi_based_sizes()
|
||||
tm.resize()
|
||||
|
||||
def _set_os_window_background_opacity(self, os_window_id, opacity):
|
||||
def _set_os_window_background_opacity(self, os_window_id: int, opacity: float) -> None:
|
||||
change_background_opacity(os_window_id, max(0.1, min(opacity, 1.0)))
|
||||
|
||||
def set_background_opacity(self, opacity):
|
||||
def set_background_opacity(self, opacity: str) -> None:
|
||||
window = self.active_window
|
||||
if window is None or not opacity:
|
||||
return
|
||||
if not self.opts.dynamic_background_opacity:
|
||||
return self.show_error(
|
||||
self.show_error(
|
||||
_('Cannot change background opacity'),
|
||||
_('You must set the dynamic_background_opacity option in kitty.conf to be able to change background opacity'))
|
||||
return
|
||||
os_window_id = window.os_window_id
|
||||
if opacity[0] in '+-':
|
||||
old_opacity = background_opacity_of(os_window_id)
|
||||
if old_opacity is None:
|
||||
return
|
||||
opacity = old_opacity + float(opacity)
|
||||
fin_opacity = old_opacity + float(opacity)
|
||||
elif opacity == 'default':
|
||||
opacity = self.opts.background_opacity
|
||||
fin_opacity = self.opts.background_opacity
|
||||
else:
|
||||
opacity = float(opacity)
|
||||
self._set_os_window_background_opacity(os_window_id, opacity)
|
||||
fin_opacity = float(opacity)
|
||||
self._set_os_window_background_opacity(os_window_id, fin_opacity)
|
||||
|
||||
@property
|
||||
def active_tab_manager(self) -> Optional[TabManager]:
|
||||
@ -581,7 +582,7 @@ class Boss:
|
||||
if t is not None:
|
||||
return t.active_window
|
||||
|
||||
def dispatch_special_key(self, key, native_key, action, mods):
|
||||
def dispatch_special_key(self, key: int, native_key: int, action: int, mods: int) -> bool:
|
||||
# Handles shortcuts, return True if the key was consumed
|
||||
key_action = get_shortcut(self.keymap, mods, key, native_key)
|
||||
if key_action is None:
|
||||
@ -594,7 +595,7 @@ class Boss:
|
||||
self.current_key_press_info = key, native_key, action, mods
|
||||
return self.dispatch_action(key_action)
|
||||
|
||||
def process_sequence(self, key, native_key, action, mods):
|
||||
def process_sequence(self, key: int, native_key: int, action: Any, mods: int) -> None:
|
||||
if not self.pending_sequences:
|
||||
set_in_sequence_mode(False)
|
||||
return
|
||||
@ -617,7 +618,7 @@ class Boss:
|
||||
if matched_action is not None:
|
||||
self.dispatch_action(matched_action)
|
||||
|
||||
def start_resizing_window(self):
|
||||
def start_resizing_window(self) -> None:
|
||||
w = self.active_window
|
||||
if w is None:
|
||||
return
|
||||
@ -628,15 +629,16 @@ class Boss:
|
||||
if overlay_window is not None:
|
||||
overlay_window.allow_remote_control = True
|
||||
|
||||
def resize_layout_window(self, window, increment, is_horizontal, reset=False):
|
||||
def resize_layout_window(self, window: Window, increment: float, is_horizontal: bool, reset: bool = False) -> Union[bool, None, str]:
|
||||
tab = window.tabref()
|
||||
if tab is None or not increment:
|
||||
return False
|
||||
if reset:
|
||||
return tab.reset_window_sizes()
|
||||
tab.reset_window_sizes()
|
||||
return None
|
||||
return tab.resize_window_by(window.id, increment, is_horizontal)
|
||||
|
||||
def default_bg_changed_for(self, window_id):
|
||||
def default_bg_changed_for(self, window_id: int) -> None:
|
||||
w = self.window_id_map.get(window_id)
|
||||
if w is not None:
|
||||
tm = self.os_window_map.get(w.os_window_id)
|
||||
@ -647,7 +649,7 @@ class Boss:
|
||||
if t is not None:
|
||||
t.relayout_borders()
|
||||
|
||||
def dispatch_action(self, key_action):
|
||||
def dispatch_action(self, key_action: KeyAction) -> bool:
|
||||
if key_action is not None:
|
||||
f = getattr(self, key_action.func, None)
|
||||
if f is not None:
|
||||
@ -672,11 +674,11 @@ class Boss:
|
||||
return True
|
||||
return False
|
||||
|
||||
def combine(self, *actions):
|
||||
def combine(self, *actions: KeyAction) -> None:
|
||||
for key_action in actions:
|
||||
self.dispatch_action(key_action)
|
||||
|
||||
def on_focus(self, os_window_id, focused):
|
||||
def on_focus(self, os_window_id: int, focused: bool) -> None:
|
||||
tm = self.os_window_map.get(os_window_id)
|
||||
if tm is not None:
|
||||
w = tm.active_window
|
||||
@ -686,19 +688,19 @@ class Boss:
|
||||
cocoa_set_menubar_title(w.title or '')
|
||||
tm.mark_tab_bar_dirty()
|
||||
|
||||
def update_tab_bar_data(self, os_window_id):
|
||||
def update_tab_bar_data(self, os_window_id: int) -> None:
|
||||
tm = self.os_window_map.get(os_window_id)
|
||||
if tm is not None:
|
||||
tm.update_tab_bar_data()
|
||||
|
||||
def on_drop(self, os_window_id, strings):
|
||||
def on_drop(self, os_window_id: int, strings: Iterable[str]) -> None:
|
||||
tm = self.os_window_map.get(os_window_id)
|
||||
if tm is not None:
|
||||
w = tm.active_window
|
||||
if w is not None:
|
||||
w.paste('\n'.join(strings))
|
||||
|
||||
def on_os_window_closed(self, os_window_id, viewport_width, viewport_height):
|
||||
def on_os_window_closed(self, os_window_id: int, viewport_width: int, viewport_height: int) -> None:
|
||||
self.cached_values['window-size'] = viewport_width, viewport_height
|
||||
tm = self.os_window_map.pop(os_window_id, None)
|
||||
if tm is not None:
|
||||
@ -711,7 +713,7 @@ class Boss:
|
||||
if action is not None:
|
||||
action()
|
||||
|
||||
def notify_on_os_window_death(self, address):
|
||||
def notify_on_os_window_death(self, address: str) -> None:
|
||||
import socket
|
||||
s = socket.socket(family=socket.AF_UNIX)
|
||||
with suppress(Exception):
|
||||
@ -721,7 +723,7 @@ class Boss:
|
||||
s.shutdown(socket.SHUT_RDWR)
|
||||
s.close()
|
||||
|
||||
def display_scrollback(self, window, data, cmd):
|
||||
def display_scrollback(self, window: Window, data: Optional[bytes], cmd: Optional[List[str]]) -> None:
|
||||
tab = self.active_tab
|
||||
if tab is not None and window.overlay_for is None:
|
||||
tab.new_special_window(
|
||||
@ -729,14 +731,14 @@ class Boss:
|
||||
copy_colors_from=self.active_window
|
||||
)
|
||||
|
||||
def edit_config_file(self, *a):
|
||||
def edit_config_file(self, *a: Any) -> None:
|
||||
confpath = prepare_config_file_for_editing()
|
||||
# On macOS vim fails to handle SIGWINCH if it occurs early, so add a
|
||||
# small delay.
|
||||
cmd = [kitty_exe(), '+runpy', 'import os, sys, time; time.sleep(0.05); os.execvp(sys.argv[1], sys.argv[1:])'] + get_editor() + [confpath]
|
||||
self.new_os_window(*cmd)
|
||||
|
||||
def get_output(self, source_window, num_lines=1):
|
||||
def get_output(self, source_window: Window, num_lines: Optional[int] = 1) -> str:
|
||||
output = ''
|
||||
s = source_window.screen
|
||||
if num_lines is None:
|
||||
@ -745,7 +747,15 @@ class Boss:
|
||||
output += str(s.linebuf.line(i))
|
||||
return output
|
||||
|
||||
def _run_kitten(self, kitten, args=(), input_data=None, window=None, custom_callback=None, action_on_removal=None):
|
||||
def _run_kitten(
|
||||
self,
|
||||
kitten: str,
|
||||
args: Iterable[str] = (),
|
||||
input_data: Optional[Union[bytes, str]] = None,
|
||||
window: Optional[Window] = None,
|
||||
custom_callback: Optional[Callable] = None,
|
||||
action_on_removal: Optional[Callable] = None
|
||||
) -> Optional[Window]:
|
||||
orig_args, args = list(args), list(args)
|
||||
from kittens.runner import create_kitten_handler
|
||||
end_kitten = create_kitten_handler(kitten, orig_args)
|
||||
@ -757,7 +767,7 @@ class Boss:
|
||||
tab = w.tabref() if w else None
|
||||
if end_kitten.no_ui:
|
||||
end_kitten(None, getattr(w, 'id', None), self)
|
||||
return
|
||||
return None
|
||||
|
||||
if w is not None and tab is not None and w.overlay_for is None:
|
||||
args[0:0] = [config_dir, kitten]
|
||||
@ -774,7 +784,7 @@ class Boss:
|
||||
else:
|
||||
raise ValueError('Unknown type_of_input: {}'.format(type_of_input))
|
||||
else:
|
||||
data = input_data
|
||||
data = input_data if isinstance(input_data, bytes) else input_data.encode('utf-8')
|
||||
copts = common_opts_as_dict(self.opts)
|
||||
overlay_window = tab.new_special_window(
|
||||
SpecialWindow(
|
||||
@ -795,7 +805,11 @@ class Boss:
|
||||
wid = w.id
|
||||
overlay_window.action_on_close = partial(self.on_kitten_finish, wid, custom_callback or end_kitten)
|
||||
if action_on_removal is not None:
|
||||
overlay_window.action_on_removal = lambda *a: action_on_removal(wid, self)
|
||||
|
||||
def callback_wrapper(*a: Any) -> None:
|
||||
if action_on_removal is not None:
|
||||
action_on_removal(wid, self)
|
||||
overlay_window.action_on_removal = callback_wrapper
|
||||
return overlay_window
|
||||
|
||||
def kitten(self, kitten: str, *args: str) -> None:
|
||||
@ -804,23 +818,23 @@ class Boss:
|
||||
kargs = shlex.split(cmdline) if cmdline else []
|
||||
self._run_kitten(kitten, kargs)
|
||||
|
||||
def on_kitten_finish(self, target_window_id, end_kitten, source_window):
|
||||
def on_kitten_finish(self, target_window_id: str, end_kitten: Callable, source_window: Window) -> None:
|
||||
output = self.get_output(source_window, num_lines=None)
|
||||
from kittens.runner import deserialize
|
||||
data = deserialize(output)
|
||||
if data is not None:
|
||||
end_kitten(data, target_window_id, self)
|
||||
|
||||
def input_unicode_character(self):
|
||||
def input_unicode_character(self) -> None:
|
||||
self._run_kitten('unicode_input')
|
||||
|
||||
def set_tab_title(self):
|
||||
def set_tab_title(self) -> None:
|
||||
tab = self.active_tab
|
||||
if tab:
|
||||
args = ['--name=tab-title', '--message', _('Enter the new title for this tab below.'), 'do_set_tab_title', str(tab.id)]
|
||||
self._run_kitten('ask', args)
|
||||
|
||||
def do_set_tab_title(self, title, tab_id):
|
||||
def do_set_tab_title(self, title: str, tab_id: int) -> None:
|
||||
tm = self.active_tab_manager
|
||||
if tm is not None and title:
|
||||
tab_id = int(tab_id)
|
||||
@ -829,19 +843,19 @@ class Boss:
|
||||
tab.set_title(title)
|
||||
break
|
||||
|
||||
def show_error(self, title, msg):
|
||||
def show_error(self, title: str, msg: str) -> None:
|
||||
self._run_kitten('show_error', args=['--title', title], input_data=msg)
|
||||
|
||||
def create_marker(self):
|
||||
def create_marker(self) -> None:
|
||||
w = self.active_window
|
||||
if w:
|
||||
spec = None
|
||||
|
||||
def done(data, target_window_id, self):
|
||||
def done(data: Dict[str, Any], target_window_id: int, self: Boss) -> None:
|
||||
nonlocal spec
|
||||
spec = data['response']
|
||||
|
||||
def done2(target_window_id, self):
|
||||
def done2(target_window_id: int, self: Boss) -> None:
|
||||
w = self.window_id_map.get(target_window_id)
|
||||
if w is not None and spec:
|
||||
try:
|
||||
@ -855,7 +869,7 @@ class Boss:
|
||||
],
|
||||
custom_callback=done, action_on_removal=done2)
|
||||
|
||||
def kitty_shell(self, window_type):
|
||||
def kitty_shell(self, window_type: str) -> None:
|
||||
cmd = ['@', kitty_exe(), '@']
|
||||
if window_type == 'tab':
|
||||
self._new_tab(cmd)
|
||||
@ -870,21 +884,21 @@ class Boss:
|
||||
else:
|
||||
self._new_window(cmd)
|
||||
|
||||
def switch_focus_to(self, window_idx):
|
||||
def switch_focus_to(self, window_idx: int) -> None:
|
||||
tab = self.active_tab
|
||||
if tab:
|
||||
tab.set_active_window_idx(window_idx)
|
||||
|
||||
def open_url(self, url, program=None, cwd=None):
|
||||
def open_url(self, url: str, program: Optional[Union[str, List[str]]] = None, cwd: Optional[str] = None) -> None:
|
||||
if url:
|
||||
if isinstance(program, str):
|
||||
program = to_cmdline(program)
|
||||
open_url(url, program or self.opts.open_url_with, cwd=cwd)
|
||||
|
||||
def open_url_lines(self, lines, program=None):
|
||||
def open_url_lines(self, lines: Iterable[str], program: Optional[Union[str, List[str]]] = None) -> None:
|
||||
self.open_url(''.join(lines), program)
|
||||
|
||||
def destroy(self):
|
||||
def destroy(self) -> None:
|
||||
self.shutting_down = True
|
||||
self.child_monitor.shutdown_monitor()
|
||||
self.set_update_check_process()
|
||||
@ -895,21 +909,21 @@ class Boss:
|
||||
self.os_window_map = {}
|
||||
destroy_global_data()
|
||||
|
||||
def paste_to_active_window(self, text):
|
||||
def paste_to_active_window(self, text: str) -> None:
|
||||
if text:
|
||||
w = self.active_window
|
||||
if w is not None:
|
||||
w.paste(text)
|
||||
|
||||
def paste_from_clipboard(self):
|
||||
def paste_from_clipboard(self) -> None:
|
||||
text = get_clipboard_string()
|
||||
self.paste_to_active_window(text)
|
||||
|
||||
def paste_from_selection(self):
|
||||
def paste_from_selection(self) -> None:
|
||||
text = get_primary_selection() if supports_primary_selection else get_clipboard_string()
|
||||
self.paste_to_active_window(text)
|
||||
|
||||
def set_primary_selection(self):
|
||||
def set_primary_selection(self) -> None:
|
||||
w = self.active_window
|
||||
if w is not None and not w.destroyed:
|
||||
text = w.text_for_selection()
|
||||
@ -918,7 +932,7 @@ class Boss:
|
||||
if self.opts.copy_on_select:
|
||||
self.copy_to_buffer(self.opts.copy_on_select)
|
||||
|
||||
def copy_to_buffer(self, buffer_name):
|
||||
def copy_to_buffer(self, buffer_name: str) -> None:
|
||||
w = self.active_window
|
||||
if w is not None and not w.destroyed:
|
||||
text = w.text_for_selection()
|
||||
@ -930,7 +944,7 @@ class Boss:
|
||||
else:
|
||||
self.clipboard_buffers[buffer_name] = text
|
||||
|
||||
def paste_from_buffer(self, buffer_name):
|
||||
def paste_from_buffer(self, buffer_name: str) -> None:
|
||||
if buffer_name == 'clipboard':
|
||||
text: Optional[str] = get_clipboard_string()
|
||||
elif buffer_name == 'primary':
|
||||
@ -940,69 +954,84 @@ class Boss:
|
||||
if text:
|
||||
self.paste_to_active_window(text)
|
||||
|
||||
def goto_tab(self, tab_num):
|
||||
def goto_tab(self, tab_num: int) -> None:
|
||||
tm = self.active_tab_manager
|
||||
if tm is not None:
|
||||
tm.goto_tab(tab_num - 1)
|
||||
|
||||
def set_active_tab(self, tab):
|
||||
def set_active_tab(self, tab: Tab) -> bool:
|
||||
tm = self.active_tab_manager
|
||||
if tm is not None:
|
||||
return tm.set_active_tab(tab)
|
||||
return False
|
||||
|
||||
def next_tab(self):
|
||||
def next_tab(self) -> None:
|
||||
tm = self.active_tab_manager
|
||||
if tm is not None:
|
||||
tm.next_tab()
|
||||
|
||||
def previous_tab(self):
|
||||
def previous_tab(self) -> None:
|
||||
tm = self.active_tab_manager
|
||||
if tm is not None:
|
||||
tm.next_tab(-1)
|
||||
|
||||
prev_tab = previous_tab
|
||||
|
||||
def process_stdin_source(self, window=None, stdin=None):
|
||||
def process_stdin_source(self, window: Optional[Window] = None, stdin: Optional[str] = None) -> Tuple[Optional[Dict[str, str]], Optional[bytes]]:
|
||||
w = window or self.active_window
|
||||
if not w:
|
||||
return None, None
|
||||
env = None
|
||||
input_data = None
|
||||
if stdin:
|
||||
add_wrap_markers = stdin.endswith('_wrap')
|
||||
if add_wrap_markers:
|
||||
stdin = stdin[:-len('_wrap')]
|
||||
stdin = data_for_at(w, stdin, add_wrap_markers=add_wrap_markers)
|
||||
if stdin is not None:
|
||||
pipe_data = w.pipe_data(stdin, has_wrap_markers=add_wrap_markers) if w else {}
|
||||
pipe_data = w.pipe_data(stdin, has_wrap_markers=add_wrap_markers) if w else None
|
||||
if pipe_data:
|
||||
env = {
|
||||
'KITTY_PIPE_DATA':
|
||||
'{scrolled_by}:{cursor_x},{cursor_y}:{lines},{columns}'.format(**pipe_data)
|
||||
}
|
||||
stdin = stdin.encode('utf-8')
|
||||
return env, stdin
|
||||
input_data = stdin.encode('utf-8')
|
||||
return env, input_data
|
||||
|
||||
def data_for_at(self, which, window=None, add_wrap_markers=False) -> Optional[str]:
|
||||
def data_for_at(self, which: str, window: Optional[Window] = None, add_wrap_markers: bool = False) -> Optional[str]:
|
||||
window = window or self.active_window
|
||||
if not window:
|
||||
return None
|
||||
return data_for_at(window, which, add_wrap_markers=add_wrap_markers)
|
||||
|
||||
def special_window_for_cmd(self, cmd, window=None, stdin=None, cwd_from=None, as_overlay=False):
|
||||
def special_window_for_cmd(
|
||||
self, cmd: List[str],
|
||||
window: Optional[Window] = None,
|
||||
stdin: Optional[str] = None,
|
||||
cwd_from: Optional[int] = None,
|
||||
as_overlay: bool = False
|
||||
) -> SpecialWindowInstance:
|
||||
w = window or self.active_window
|
||||
env, stdin = self.process_stdin_source(w, stdin)
|
||||
env, input_data = self.process_stdin_source(w, stdin)
|
||||
cmdline = []
|
||||
for arg in cmd:
|
||||
if arg == '@selection' and w:
|
||||
arg = data_for_at(w, arg)
|
||||
if not arg:
|
||||
q = data_for_at(w, arg)
|
||||
if not q:
|
||||
continue
|
||||
arg = q
|
||||
cmdline.append(arg)
|
||||
overlay_for = w.id if w and as_overlay and w.overlay_for is None else None
|
||||
return SpecialWindow(cmd, stdin, cwd_from=cwd_from, overlay_for=overlay_for, env=env)
|
||||
return SpecialWindow(cmd, input_data, cwd_from=cwd_from, overlay_for=overlay_for, env=env)
|
||||
|
||||
def run_background_process(self, cmd, cwd=None, env=None, stdin=None, cwd_from=None):
|
||||
def run_background_process(
|
||||
self,
|
||||
cmd: List[str],
|
||||
cwd: Optional[str] = None,
|
||||
env: Optional[Dict[str, str]] = None,
|
||||
stdin: Optional[bytes] = None,
|
||||
cwd_from: Optional[int] = None
|
||||
) -> None:
|
||||
import subprocess
|
||||
if cwd_from:
|
||||
with suppress(Exception):
|
||||
@ -1021,12 +1050,12 @@ class Boss:
|
||||
else:
|
||||
subprocess.Popen(cmd, env=env, cwd=cwd)
|
||||
|
||||
def pipe(self, source, dest, exe, *args):
|
||||
def pipe(self, source: str, dest: str, exe: str, *args: str) -> Window:
|
||||
cmd = [exe] + list(args)
|
||||
window = self.active_window
|
||||
cwd_from = window.child.pid_for_cwd if window else None
|
||||
|
||||
def create_window():
|
||||
def create_window() -> SpecialWindowInstance:
|
||||
return self.special_window_for_cmd(
|
||||
cmd, stdin=source, as_overlay=dest == 'overlay', cwd_from=cwd_from)
|
||||
|
||||
@ -1049,7 +1078,7 @@ class Boss:
|
||||
env, stdin = self.process_stdin_source(stdin=source, window=window)
|
||||
self.run_background_process(cmd, cwd_from=cwd_from, stdin=stdin, env=env)
|
||||
|
||||
def args_to_special_window(self, args: Iterable[str], cwd_from=None):
|
||||
def args_to_special_window(self, args: Iterable[str], cwd_from: Optional[int] = None) -> SpecialWindowInstance:
|
||||
args = list(args)
|
||||
stdin = None
|
||||
w = self.active_window
|
||||
@ -1070,7 +1099,7 @@ class Boss:
|
||||
cmd.append(arg)
|
||||
return SpecialWindow(cmd, stdin, cwd_from=cwd_from)
|
||||
|
||||
def _new_tab(self, args, cwd_from=None, as_neighbor=False):
|
||||
def _new_tab(self, args: Union[SpecialWindowInstance, Iterable[str]], cwd_from: Optional[int] = None, as_neighbor: bool = False) -> Optional[Tab]:
|
||||
special_window = None
|
||||
if args:
|
||||
if isinstance(args, SpecialWindowInstance):
|
||||
@ -1081,26 +1110,26 @@ class Boss:
|
||||
if tm is not None:
|
||||
return tm.new_tab(special_window=special_window, cwd_from=cwd_from, as_neighbor=as_neighbor)
|
||||
|
||||
def _create_tab(self, args, cwd_from=None):
|
||||
def _create_tab(self, args: List[str], cwd_from: Optional[int] = None) -> None:
|
||||
as_neighbor = False
|
||||
if args and args[0].startswith('!'):
|
||||
as_neighbor = 'neighbor' in args[0][1:].split(',')
|
||||
args = args[1:]
|
||||
self._new_tab(args, as_neighbor=as_neighbor, cwd_from=cwd_from)
|
||||
|
||||
def new_tab(self, *args):
|
||||
self._create_tab(args)
|
||||
def new_tab(self, *args: str) -> None:
|
||||
self._create_tab(list(args))
|
||||
|
||||
def new_tab_with_cwd(self, *args):
|
||||
def new_tab_with_cwd(self, *args: str) -> None:
|
||||
w = self.active_window_for_cwd
|
||||
cwd_from = w.child.pid_for_cwd if w is not None else None
|
||||
self._create_tab(args, cwd_from=cwd_from)
|
||||
self._create_tab(list(args), cwd_from=cwd_from)
|
||||
|
||||
def new_tab_with_wd(self, wd):
|
||||
def new_tab_with_wd(self, wd: str) -> None:
|
||||
special_window = SpecialWindow(None, cwd=wd)
|
||||
self._new_tab(special_window)
|
||||
|
||||
def _new_window(self, args, cwd_from=None):
|
||||
def _new_window(self, args: List[str], cwd_from: Optional[int] = None) -> Optional[Window]:
|
||||
tab = self.active_tab
|
||||
if tab is not None:
|
||||
allow_remote_control = False
|
||||
@ -1118,32 +1147,32 @@ class Boss:
|
||||
else:
|
||||
return tab.new_window(cwd_from=cwd_from, location=location, allow_remote_control=allow_remote_control)
|
||||
|
||||
def new_window(self, *args):
|
||||
self._new_window(args)
|
||||
def new_window(self, *args: str) -> None:
|
||||
self._new_window(list(args))
|
||||
|
||||
def new_window_with_cwd(self, *args):
|
||||
def new_window_with_cwd(self, *args: str) -> None:
|
||||
w = self.active_window_for_cwd
|
||||
if w is None:
|
||||
return self.new_window(*args)
|
||||
cwd_from = w.child.pid_for_cwd
|
||||
self._new_window(args, cwd_from=cwd_from)
|
||||
self._new_window(list(args), cwd_from=cwd_from)
|
||||
|
||||
def launch(self, *args: str) -> None:
|
||||
from kitty.launch import parse_launch_args, launch
|
||||
opts, args_ = parse_launch_args(args)
|
||||
launch(self, opts, args_)
|
||||
|
||||
def move_tab_forward(self):
|
||||
def move_tab_forward(self) -> None:
|
||||
tm = self.active_tab_manager
|
||||
if tm is not None:
|
||||
tm.move_tab(1)
|
||||
|
||||
def move_tab_backward(self):
|
||||
def move_tab_backward(self) -> None:
|
||||
tm = self.active_tab_manager
|
||||
if tm is not None:
|
||||
tm.move_tab(-1)
|
||||
|
||||
def disable_ligatures_in(self, where: Union[str, Iterable[Window]], strategy: int):
|
||||
def disable_ligatures_in(self, where: Union[str, Iterable[Window]], strategy: int) -> None:
|
||||
if isinstance(where, str):
|
||||
windows: List[Window] = []
|
||||
if where == 'active':
|
||||
@ -1160,7 +1189,7 @@ class Boss:
|
||||
window.screen.disable_ligatures = strategy
|
||||
window.refresh()
|
||||
|
||||
def patch_colors(self, spec, cursor_text_color, configured=False):
|
||||
def patch_colors(self, spec: Dict[str, int], cursor_text_color: Union[bool, int, Color], configured: bool = False) -> None:
|
||||
if configured:
|
||||
for k, v in spec.items():
|
||||
if hasattr(self.opts, k):
|
||||
@ -1173,19 +1202,19 @@ class Boss:
|
||||
tm.tab_bar.patch_colors(spec)
|
||||
patch_global_colors(spec, configured)
|
||||
|
||||
def safe_delete_temp_file(self, path):
|
||||
def safe_delete_temp_file(self, path: str) -> None:
|
||||
if is_path_in_temp_dir(path):
|
||||
with suppress(FileNotFoundError):
|
||||
os.remove(path)
|
||||
|
||||
def set_update_check_process(self, process=None):
|
||||
def set_update_check_process(self, process: Optional['Popen'] = None) -> None:
|
||||
if self.update_check_process is not None:
|
||||
with suppress(Exception):
|
||||
if self.update_check_process.poll() is None:
|
||||
self.update_check_process.kill()
|
||||
self.update_check_process = process
|
||||
|
||||
def on_monitored_pid_death(self, pid, exit_status):
|
||||
def on_monitored_pid_death(self, pid: int, exit_status: int) -> None:
|
||||
update_check_process = self.update_check_process
|
||||
if update_check_process is not None and pid == update_check_process.pid:
|
||||
self.update_check_process = None
|
||||
@ -1201,22 +1230,24 @@ class Boss:
|
||||
except Exception as e:
|
||||
log_error('Failed to process update check data {!r}, with error: {}'.format(raw, e))
|
||||
|
||||
def dbus_notification_callback(self, activated, *args):
|
||||
def dbus_notification_callback(self, activated: bool, a: int, b: Union[int, str]) -> None:
|
||||
from .notify import dbus_notification_created, dbus_notification_activated
|
||||
if activated:
|
||||
dbus_notification_activated(*args)
|
||||
assert isinstance(b, str)
|
||||
dbus_notification_activated(a, b)
|
||||
else:
|
||||
dbus_notification_created(*args)
|
||||
assert isinstance(b, int)
|
||||
dbus_notification_created(a, b)
|
||||
|
||||
def show_bad_config_lines(self, bad_lines):
|
||||
def show_bad_config_lines(self, bad_lines: Iterable[BadLine]) -> None:
|
||||
|
||||
def format_bad_line(bad_line):
|
||||
def format_bad_line(bad_line: BadLine) -> str:
|
||||
return '{}:{} in line: {}\n'.format(bad_line.number, bad_line.exception, bad_line.line)
|
||||
|
||||
msg = '\n'.join(map(format_bad_line, bad_lines)).rstrip()
|
||||
self.show_error(_('Errors in kitty.conf'), msg)
|
||||
|
||||
def set_colors(self, *args):
|
||||
def set_colors(self, *args: str) -> None:
|
||||
from kitty.rc.base import parse_subcommand_cli, command_for_name, PayloadGetter
|
||||
from kitty.remote_control import parse_rc_args
|
||||
c = command_for_name('set_colors')
|
||||
@ -1224,7 +1255,12 @@ class Boss:
|
||||
payload = c.message_to_kitty(parse_rc_args([])[0], opts, items)
|
||||
c.response_from_kitty(self, self.active_window, PayloadGetter(c, payload if isinstance(payload, dict) else {}))
|
||||
|
||||
def _move_window_to(self, window=None, target_tab_id=None, target_os_window_id=None):
|
||||
def _move_window_to(
|
||||
self,
|
||||
window: Optional[Window] = None,
|
||||
target_tab_id: Optional[Union[str, int]] = None,
|
||||
target_os_window_id: Optional[Union[str, int]] = None
|
||||
) -> None:
|
||||
window = window or self.active_window
|
||||
if not window:
|
||||
return
|
||||
@ -1238,7 +1274,12 @@ class Boss:
|
||||
else:
|
||||
target_os_window_id = target_os_window_id or current_os_window()
|
||||
if target_tab_id == 'new':
|
||||
tm = self.os_window_map[target_os_window_id]
|
||||
if not isinstance(target_os_window_id, int):
|
||||
q = self.active_tab_manager
|
||||
assert q is not None
|
||||
tm = q
|
||||
else:
|
||||
tm = self.os_window_map[target_os_window_id]
|
||||
target_tab = tm.new_tab(empty_tab=True)
|
||||
else:
|
||||
for tab in self.all_tabs:
|
||||
@ -1257,7 +1298,7 @@ class Boss:
|
||||
self._cleanup_tab_after_window_removal(src_tab)
|
||||
target_tab.make_active()
|
||||
|
||||
def _move_tab_to(self, tab=None, target_os_window_id=None):
|
||||
def _move_tab_to(self, tab: Optional[Tab] = None, target_os_window_id: Optional[int] = None) -> None:
|
||||
tab = tab or self.active_tab
|
||||
if tab is None:
|
||||
return
|
||||
@ -1269,7 +1310,7 @@ class Boss:
|
||||
self._cleanup_tab_after_window_removal(tab)
|
||||
target_tab.make_active()
|
||||
|
||||
def detach_window(self, *args):
|
||||
def detach_window(self, *args: str) -> None:
|
||||
if not args or args[0] == 'new':
|
||||
return self._move_window_to(target_os_window_id='new')
|
||||
if args[0] == 'new-tab':
|
||||
@ -1294,11 +1335,11 @@ class Boss:
|
||||
tab_id_map[new_idx - 1] = None
|
||||
lines.append(fmt.format(new_idx, 'New OS Window'))
|
||||
|
||||
def done(data, target_window_id, self):
|
||||
def done(data: Dict[str, Any], target_window_id: int, self: Boss) -> None:
|
||||
nonlocal done_tab_id
|
||||
done_tab_id = tab_id_map[int(data['groupdicts'][0]['index'])]
|
||||
|
||||
def done2(target_window_id, self):
|
||||
def done2(target_window_id: int, self: Boss) -> None:
|
||||
if not hasattr(done, 'tab_id'):
|
||||
return
|
||||
tab_id = done_tab_id
|
||||
@ -1319,7 +1360,7 @@ class Boss:
|
||||
), input_data='\r\n'.join(lines).encode('utf-8'), custom_callback=done, action_on_removal=done2
|
||||
)
|
||||
|
||||
def detach_tab(self, *args):
|
||||
def detach_tab(self, *args: str) -> None:
|
||||
if not args or args[0] == 'new':
|
||||
return self._move_tab_to()
|
||||
|
||||
@ -1342,12 +1383,12 @@ class Boss:
|
||||
os_window_id_map[new_idx - 1] = None
|
||||
lines.append(fmt.format(new_idx, 'New OS Window'))
|
||||
|
||||
def done(data, target_window_id, self):
|
||||
def done(data: Dict[str, Any], target_window_id: int, self: Boss) -> None:
|
||||
nonlocal done_called, done_osw
|
||||
done_osw = os_window_id_map[int(data['groupdicts'][0]['index'])]
|
||||
done_called = True
|
||||
|
||||
def done2(target_window_id, self):
|
||||
def done2(target_window_id: int, self: Boss) -> None:
|
||||
if not done_called:
|
||||
return
|
||||
os_window_id = done_osw
|
||||
@ -1367,7 +1408,7 @@ class Boss:
|
||||
), input_data='\r\n'.join(lines).encode('utf-8'), custom_callback=done, action_on_removal=done2
|
||||
)
|
||||
|
||||
def set_background_image(self, path, os_windows, configured, layout):
|
||||
def set_background_image(self, path: Optional[str], os_windows: Tuple[int, ...], configured: bool, layout: Optional[str]) -> None:
|
||||
if layout:
|
||||
set_background_image(path, os_windows, configured, layout)
|
||||
else:
|
||||
|
||||
@ -930,6 +930,9 @@ class LineBuf:
|
||||
def is_continued(self, idx: int) -> bool:
|
||||
pass
|
||||
|
||||
def line(self, num: int) -> Line:
|
||||
pass
|
||||
|
||||
|
||||
class Cursor:
|
||||
x: int
|
||||
|
||||
@ -248,7 +248,10 @@ def launch(boss: Boss, opts: LaunchCLIOptions, args: List[str], target_tab: Opti
|
||||
env.update(penv)
|
||||
|
||||
if opts.type == 'background':
|
||||
boss.run_background_process(kw['cmd'], cwd=kw['cwd'], cwd_from=kw['cwd_from'], env=env or None, stdin=kw['stdin'])
|
||||
cmd = kw['cmd']
|
||||
if not cmd:
|
||||
raise ValueError('The cmd to run must be specified when running a background process')
|
||||
boss.run_background_process(cmd, cwd=kw['cwd'], cwd_from=kw['cwd_from'], env=env or None, stdin=kw['stdin'])
|
||||
elif opts.type in ('clipboard', 'primary'):
|
||||
if kw.get('stdin') is not None:
|
||||
func = set_clipboard_string if opts.type == 'clipboard' else set_primary_selection
|
||||
|
||||
@ -1 +0,0 @@
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
# vim:fileencoding=utf-8
|
||||
# License: GPLv3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
|
||||
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from typing import TYPE_CHECKING, Optional, Union
|
||||
|
||||
from .base import (
|
||||
MATCH_WINDOW_OPTION, ArgsType, Boss, PayloadGetType,
|
||||
@ -55,7 +55,7 @@ If specified resize the window this command is run in, rather than the active wi
|
||||
|
||||
def response_from_kitty(self, boss: Boss, window: Optional[Window], payload_get: PayloadGetType) -> ResponseType:
|
||||
windows = self.windows_for_match_payload(boss, window, payload_get)
|
||||
resized = False
|
||||
resized: Union[bool, None, str] = False
|
||||
if windows and windows[0]:
|
||||
resized = boss.resize_layout_window(
|
||||
windows[0], increment=payload_get('increment'), is_horizontal=payload_get('axis') == 'horizontal',
|
||||
|
||||
@ -275,7 +275,7 @@ class Tab: # {{{
|
||||
self._set_current_layout(layout_name)
|
||||
self.relayout()
|
||||
|
||||
def resize_window_by(self, window_id: int, increment: int, is_horizontal: bool) -> Optional[str]:
|
||||
def resize_window_by(self, window_id: int, increment: float, is_horizontal: bool) -> Optional[str]:
|
||||
increment_as_percent = self.current_layout.bias_increment_for_cell(is_horizontal) * increment
|
||||
if self.current_layout.modify_size_of_window(self.windows, window_id, increment_as_percent, is_horizontal):
|
||||
self.relayout()
|
||||
|
||||
@ -42,8 +42,7 @@ if TYPE_CHECKING:
|
||||
from .tabs import Tab
|
||||
from .child import Child
|
||||
from typing import TypedDict
|
||||
from .rc.base import RemoteCommand
|
||||
Tab, Child, RemoteCommand
|
||||
Tab, Child
|
||||
else:
|
||||
TypedDict = dict
|
||||
|
||||
@ -470,7 +469,7 @@ class Window:
|
||||
def request_capabilities(self, q: str) -> None:
|
||||
self.screen.send_escape_code_to_child(DCS, get_capabilities(q))
|
||||
|
||||
def handle_remote_cmd(self, cmd: 'RemoteCommand') -> None:
|
||||
def handle_remote_cmd(self, cmd: str) -> None:
|
||||
get_boss().handle_remote_cmd(cmd, self)
|
||||
|
||||
def handle_remote_print(self, msg: bytes) -> None:
|
||||
@ -615,7 +614,10 @@ class Window:
|
||||
exe = shutil.which(cmd[0], path=env['PATH'])
|
||||
if exe:
|
||||
cmd[0] = exe
|
||||
get_boss().display_scrollback(self, data['text'], cmd)
|
||||
bdata: Union[str, bytes, None] = data['text']
|
||||
if isinstance(bdata, str):
|
||||
bdata = bdata.encode('utf-8')
|
||||
get_boss().display_scrollback(self, bdata, cmd)
|
||||
|
||||
def paste_bytes(self, text: Union[str, bytes]) -> None:
|
||||
# paste raw bytes without any processing
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user