diff --git a/kitty/boss.py b/kitty/boss.py index 58cefdd68..2ca5c1747 100644 --- a/kitty/boss.py +++ b/kitty/boss.py @@ -9,11 +9,15 @@ import re from contextlib import suppress from functools import partial from gettext import gettext as _ -from typing import Dict, Iterable, List, Optional, Union +from typing import ( + TYPE_CHECKING, Any, Callable, Dict, Generator, Iterable, List, Optional, + Tuple, Union +) from weakref import WeakValueDictionary from .child import cached_process_data, cwd_of_process from .cli import create_opts, parse_args +from .cli_stub import CLIOptions from .conf.utils import to_cmdline from .config import ( SubSequenceMap, common_opts_as_dict, initial_window_size_func, @@ -34,9 +38,12 @@ from .fast_data_types import ( ) from .keys import get_shortcut, shortcut_matches from .layout import set_layout_options +from .options_stub import Options from .rgb import Color, color_from_int -from .session import create_sessions -from .tabs import SpecialWindow, SpecialWindowInstance, TabManager +from .session import Session, create_sessions +from .tabs import ( + SpecialWindow, SpecialWindowInstance, Tab, TabDict, TabManager +) from .utils import ( func_name, get_editor, get_primary_selection, is_path_in_temp_dir, log_error, open_url, parse_address_spec, remove_socket_file, safe_print, @@ -44,6 +51,19 @@ from .utils import ( ) from .window import MatchPatternType, Window +if TYPE_CHECKING: + from typing import TypedDict + from .rc.base import RemoteCommand # noqa + from subprocess import Popen # noqa +else: + TypedDict = dict + + +class OSWindowDict(TypedDict): + id: int + is_focused: bool + tabs: List[TabDict] + def notification_activated(identifier: str) -> None: if identifier == 'new-version': @@ -61,10 +81,13 @@ def listen_on(spec: str) -> int: return s.fileno() -def data_for_at(w: Window, arg: str, add_wrap_markers: bool = False) -> Optional[str]: - def as_text(**kw) -> str: +def data_for_at(w: Optional[Window], arg: str, add_wrap_markers: bool = False) -> Optional[str]: + if not w: + return None + + def as_text(**kw: bool) -> str: kw['add_wrap_markers'] = add_wrap_markers - return w.as_text(**kw) + return w.as_text(**kw) if w else '' if arg == '@selection': return w.text_for_selection() @@ -89,12 +112,12 @@ def data_for_at(w: Window, arg: str, add_wrap_markers: bool = False) -> Optional class DumpCommands: # {{{ - def __init__(self, args): - self.draw_dump_buf = [] + def __init__(self, args: CLIOptions): + self.draw_dump_buf: List[str] = [] if args.dump_bytes: self.dump_bytes_to = open(args.dump_bytes, 'wb') - def __call__(self, *a): + def __call__(self, *a: Any) -> None: if a: if a[0] == 'draw': if a[1] is None: @@ -116,17 +139,24 @@ class DumpCommands: # {{{ class Boss: - def __init__(self, os_window_id, opts, args, cached_values, new_os_window_trigger): + def __init__( + self, + os_window_id: Optional[int], + opts: Options, + args: CLIOptions, + cached_values: Dict[str, Any], + new_os_window_trigger: Optional[Tuple[int, bool, int]] + ): set_layout_options(opts) self.clipboard_buffers: Dict[str, str] = {} - self.update_check_process = None + self.update_check_process: Optional['Popen'] = None self.window_id_map: WeakValueDictionary[int, Window] = WeakValueDictionary() self.startup_colors = {k: opts[k] for k in opts if isinstance(opts[k], Color)} self.startup_cursor_text_color = opts.cursor_text_color self.pending_sequences: Optional[SubSequenceMap] = None self.cached_values = cached_values - self.os_window_map = {} - self.os_window_death_actions = {} + self.os_window_map: Dict[int, TabManager] = {} + self.os_window_death_actions: Dict[int, Callable[[], None]] = {} self.cursor_blinking = True self.shutting_down = False talk_fd = getattr(single_instance, 'socket', None) @@ -157,7 +187,15 @@ class Boss: from .fast_data_types import cocoa_set_notification_activated_callback cocoa_set_notification_activated_callback(notification_activated) - def add_os_window(self, startup_session=None, os_window_id=None, wclass=None, wname=None, opts_for_size=None, startup_id=None): + def add_os_window( + self, + startup_session: Optional[Session] = None, + os_window_id: Optional[int] = None, + wclass: Optional[str] = None, + wname: Optional[str] = None, + opts_for_size: Optional[Options] = None, + startup_id: Optional[str] = None + ) -> int: if os_window_id is None: opts_for_size = opts_for_size or getattr(startup_session, 'os_window_size', None) or self.opts cls = wclass or self.args.cls or appname @@ -170,7 +208,7 @@ class Boss: self.os_window_map[os_window_id] = tm return os_window_id - def list_os_windows(self): + def list_os_windows(self) -> Generator[OSWindowDict, None, None]: with cached_process_data(): active_tab, active_window = self.active_tab, self.active_window active_tab_manager = self.active_tab_manager @@ -182,20 +220,20 @@ class Boss: } @property - def all_tab_managers(self): + def all_tab_managers(self) -> Generator[TabManager, None, None]: yield from self.os_window_map.values() @property - def all_tabs(self): + def all_tabs(self) -> Generator[Tab, None, None]: for tm in self.all_tab_managers: yield from tm @property - def all_windows(self): + def all_windows(self) -> Generator[Window, None, None]: for tab in self.all_tabs: yield from tab - def match_windows(self, match): + def match_windows(self, match: str) -> Generator[Window, None, None]: try: field, exp = match.split(':', 1) except ValueError: @@ -222,13 +260,13 @@ class Boss: if window.matches(field, pat): yield window - def tab_for_window(self, window): + def tab_for_window(self, window: Window) -> Optional[Tab]: for tab in self.all_tabs: for w in tab: if w.id == window.id: return tab - def match_tabs(self, match): + def match_tabs(self, match: str) -> Generator[Tab, None, None]: try: field, exp = match.split(':', 1) except ValueError: @@ -242,11 +280,11 @@ class Boss: found = True if not found: tabs = {self.tab_for_window(w) for w in self.match_windows(match)} - for tab in tabs: - if tab: - yield tab + for q in tabs: + if q: + yield q - def set_active_window(self, window, switch_os_window_if_needed=False): + def set_active_window(self, window: Window, switch_os_window_if_needed: bool = False) -> Optional[int]: for os_window_id, tm in self.os_window_map.items(): for tab in tm: for w in tab: @@ -258,7 +296,7 @@ class Boss: focus_os_window(os_window_id, True) return os_window_id - def _new_os_window(self, args, cwd_from=None): + def _new_os_window(self, args: Union[SpecialWindowInstance, Iterable[str]], cwd_from: Optional[int] = None) -> int: if isinstance(args, SpecialWindowInstance): sw = args else: @@ -266,32 +304,34 @@ class Boss: startup_session = next(create_sessions(self.opts, special_window=sw, cwd_from=cwd_from)) return self.add_os_window(startup_session) - def new_os_window(self, *args): + def new_os_window(self, *args: str) -> None: self._new_os_window(args) @property - def active_window_for_cwd(self): + def active_window_for_cwd(self) -> Optional[Window]: w = self.active_window if w is not None and w.overlay_for is not None and w.overlay_for in self.window_id_map: w = self.window_id_map[w.overlay_for] return w - def new_os_window_with_cwd(self, *args): + def new_os_window_with_cwd(self, *args: str) -> None: w = self.active_window_for_cwd cwd_from = w.child.pid_for_cwd if w is not None else None self._new_os_window(args, cwd_from) - def new_os_window_with_wd(self, wd): + def new_os_window_with_wd(self, wd: str) -> None: special_window = SpecialWindow(None, cwd=wd) self._new_os_window(special_window) - def add_child(self, window): + def add_child(self, window: Window) -> None: + assert window.child.pid is not None and window.child.child_fd is not None self.child_monitor.add_child(window.id, window.child.pid, window.child.child_fd, window.screen) self.window_id_map[window.id] = window - def _handle_remote_command(self, cmd, window=None, from_peer=False): + def _handle_remote_command(self, cmd: str, window: Optional[Window] = None, from_peer: bool = False) -> Optional[Dict[str, Any]]: from .remote_control import handle_cmd response = None + window = window or None if self.opts.allow_remote_control == 'y' or from_peer or getattr(window, 'allow_remote_control', False): try: response = handle_cmd(self, window, cmd) @@ -310,9 +350,9 @@ class Boss: if msg.startswith(cmd_prefix): cmd = msg[len(cmd_prefix):-2] response = self._handle_remote_command(cmd, from_peer=True) - if response is not None: - response = (cmd_prefix + json.dumps(response) + '\x1b\\').encode('utf-8') - return response + if response is None: + return None + return (cmd_prefix + json.dumps(response) + '\x1b\\').encode('utf-8') else: msg = json.loads(msg) if isinstance(msg, dict) and msg.get('cmd') == 'new_instance': @@ -422,7 +462,7 @@ class Boss: if w is not None: windows.append(w) else: - windows = self.all_windows + windows = list(self.all_windows) reset = action == 'reset' how = 3 if action == 'scrollback' else 2 for w in windows: @@ -524,18 +564,19 @@ class Boss: self._set_os_window_background_opacity(os_window_id, opacity) @property - def active_tab_manager(self): + def active_tab_manager(self) -> Optional[TabManager]: os_window_id = current_os_window() - return self.os_window_map.get(os_window_id) + if os_window_id is not None: + return self.os_window_map.get(os_window_id) @property - def active_tab(self): + def active_tab(self) -> Optional[Tab]: tm = self.active_tab_manager if tm is not None: return tm.active_tab @property - def active_window(self): + def active_window(self) -> Optional[Window]: t = self.active_tab if t is not None: return t.active_window @@ -713,7 +754,7 @@ class Boss: tab = self.active_tab else: w = window - tab = w.tabref() + tab = w.tabref() if w else None if end_kitten.no_ui: end_kitten(None, getattr(w, 'id', None), self) return @@ -723,7 +764,7 @@ class Boss: if input_data is None: type_of_input = end_kitten.type_of_input if type_of_input in ('text', 'history', 'ansi', 'ansi-history', 'screen', 'screen-history', 'screen-ansi', 'screen-ansi-history'): - data = w.as_text( + data: Optional[bytes] = w.as_text( as_ansi='ansi' in type_of_input, add_history='history' in type_of_input, add_wrap_markers='screen' in type_of_input @@ -734,8 +775,6 @@ class Boss: raise ValueError('Unknown type_of_input: {}'.format(type_of_input)) else: data = input_data - if isinstance(data, str): - data = data.encode('utf-8') copts = common_opts_as_dict(self.opts) overlay_window = tab.new_special_window( SpecialWindow( @@ -743,7 +782,7 @@ class Boss: stdin=data, env={ 'KITTY_COMMON_OPTS': json.dumps(copts), - 'KITTY_CHILD_PID': w.child.pid, + 'KITTY_CHILD_PID': str(w.child.pid), 'PYTHONWARNINGS': 'ignore', 'OVERLAID_WINDOW_LINES': str(w.screen.lines), 'OVERLAID_WINDOW_COLS': str(w.screen.columns), @@ -833,7 +872,8 @@ class Boss: def switch_focus_to(self, window_idx): tab = self.active_tab - tab.set_active_window_idx(window_idx) + if tab: + tab.set_active_window_idx(window_idx) def open_url(self, url, program=None, cwd=None): if url: @@ -925,6 +965,8 @@ class Boss: def process_stdin_source(self, window=None, stdin=None): w = window or self.active_window + if not w: + return None, None env = None if stdin: add_wrap_markers = stdin.endswith('_wrap') @@ -942,19 +984,22 @@ class Boss: return env, stdin def data_for_at(self, which, window=None, add_wrap_markers=False) -> Optional[str]: - return data_for_at(window or self.active_window, which, add_wrap_markers=add_wrap_markers) + window = window or self.active_window + if not window: + return None + return data_for_at(window, which, add_wrap_markers=add_wrap_markers) def special_window_for_cmd(self, cmd, window=None, stdin=None, cwd_from=None, as_overlay=False): w = window or self.active_window env, stdin = self.process_stdin_source(w, stdin) cmdline = [] for arg in cmd: - if arg == '@selection': + if arg == '@selection' and w: arg = data_for_at(w, arg) if not arg: continue cmdline.append(arg) - overlay_for = w.id if as_overlay and w.overlay_for is None else None + overlay_for = w.id if w and as_overlay and w.overlay_for is None else None return SpecialWindow(cmd, stdin, cwd_from=cwd_from, overlay_for=overlay_for, env=env) def run_background_process(self, cmd, cwd=None, env=None, stdin=None, cwd_from=None): @@ -1080,7 +1125,7 @@ class Boss: w = self.active_window_for_cwd if w is None: return self.new_window(*args) - cwd_from = w.child.pid_for_cwd if w is not None else None + cwd_from = w.child.pid_for_cwd self._new_window(args, cwd_from=cwd_from) def launch(self, *args: str) -> None: @@ -1105,7 +1150,7 @@ class Boss: if self.active_window is not None: windows = [self.active_window] elif where == 'all': - windows = self.all_windows + windows = list(self.all_windows) elif where == 'tab': if self.active_tab is not None: windows = list(self.active_tab) @@ -1141,11 +1186,12 @@ class Boss: self.update_check_process = process def on_monitored_pid_death(self, pid, exit_status): - update_check_process = getattr(self, 'update_check_process', None) + update_check_process = self.update_check_process if update_check_process is not None and pid == update_check_process.pid: self.update_check_process = None from .update_check import process_current_release try: + assert update_check_process.stdout is not None raw = update_check_process.stdout.read().decode('utf-8') except Exception as e: log_error('Failed to read data from update check process, with error: {}'.format(e)) diff --git a/kitty/conf/definition.py b/kitty/conf/definition.py index 4254bf43d..9bf226fb1 100644 --- a/kitty/conf/definition.py +++ b/kitty/conf/definition.py @@ -334,6 +334,7 @@ def as_type_stub( ans.append(' {}: {}'.format(field_name, type_def)) ans.append(' def __iter__(self) -> typing.Iterator[str]: pass') ans.append(' def __len__(self) -> int: pass') + ans.append(' def __getitem__(self, k: typing.Union[int, str]) -> typing.Any: pass') ans.append(' def _replace(self, **kw: typing.Any) -> {}: pass'.format(class_name)) return '\n'.join(ans) + '\n\n\n' diff --git a/kitty/fast_data_types.pyi b/kitty/fast_data_types.pyi index c8d6b8650..0847f2893 100644 --- a/kitty/fast_data_types.pyi +++ b/kitty/fast_data_types.pyi @@ -1026,6 +1026,15 @@ class Screen: as_text_non_visual = as_text as_text_alternate = as_text + def scroll_until_cursor(self) -> None: + pass + + def reset(self) -> None: + pass + + def erase_in_display(self, how: int = 0, private: bool = False) -> None: + pass + def set_tab_bar_render_data( os_window_id: int, xstart: float, ystart: float, dx: float, dy: float, diff --git a/kitty/launch.py b/kitty/launch.py index 6491d32f6..baa9ff797 100644 --- a/kitty/launch.py +++ b/kitty/launch.py @@ -159,12 +159,15 @@ def get_env(opts: LaunchCLIOptions, active_child: Child) -> Dict[str, str]: return env -def tab_for_window(boss: Boss, opts: LaunchCLIOptions, target_tab: Optional[Tab] = None) -> Tab: +def tab_for_window(boss: Boss, opts: LaunchCLIOptions, target_tab: Optional[Tab] = None) -> Optional[Tab]: if opts.type == 'tab': tm = boss.active_tab_manager - tab: Tab = tm.new_tab(empty_tab=True, location=opts.location) - if opts.tab_title: - tab.set_title(opts.tab_title) + if tm: + tab: Optional[Tab] = tm.new_tab(empty_tab=True, location=opts.location) + if opts.tab_title and tab: + tab.set_title(opts.tab_title) + else: + tab = None elif opts.type == 'os-window': oswid = boss.add_os_window() tm = boss.os_window_map[oswid] @@ -252,8 +255,9 @@ def launch(boss: Boss, opts: LaunchCLIOptions, args: List[str], target_tab: Opti func(kw['stdin']) else: tab = tab_for_window(boss, opts, target_tab) - new_window: Window = tab.new_window(env=env or None, **kw) - if opts.keep_focus and active: - boss.set_active_window(active, switch_os_window_if_needed=True) - return new_window + if tab: + new_window: Window = tab.new_window(env=env or None, **kw) + if opts.keep_focus and active: + boss.set_active_window(active, switch_os_window_if_needed=True) + return new_window return None diff --git a/kitty/remote_control.py b/kitty/remote_control.py index ec362b870..6c570ccbb 100644 --- a/kitty/remote_control.py +++ b/kitty/remote_control.py @@ -29,7 +29,7 @@ if TYPE_CHECKING: from .window import Window # noqa -def handle_cmd(boss: 'Boss', window: 'Window', serialized_cmd: str) -> Optional[Dict[str, Any]]: +def handle_cmd(boss: 'Boss', window: Optional['Window'], serialized_cmd: str) -> Optional[Dict[str, Any]]: cmd = json.loads(serialized_cmd) v = cmd['version'] no_response = cmd.get('no_response', False) diff --git a/kitty/update_check.py b/kitty/update_check.py index 5347bf99d..fe5d2b30c 100644 --- a/kitty/update_check.py +++ b/kitty/update_check.py @@ -17,7 +17,7 @@ from .utils import log_error, open_url CHANGELOG_URL = 'https://sw.kovidgoyal.net/kitty/changelog.html' RELEASED_VERSION_URL = 'https://sw.kovidgoyal.net/kitty/current-version.txt' -CHECK_INTERVAL = 24 * 60 * 60 +CHECK_INTERVAL = 24 * 60 * 60. class Notification(NamedTuple): @@ -123,6 +123,6 @@ def update_check(timer_id: Optional[int] = None) -> bool: return True -def run_update_check(interval: int = CHECK_INTERVAL) -> None: +def run_update_check(interval: float = CHECK_INTERVAL) -> None: if update_check(): add_timer(update_check, interval)