more typing work

This commit is contained in:
Kovid Goyal 2020-03-13 21:41:07 +05:30
parent 626a96e20f
commit a348f64833
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
6 changed files with 123 additions and 63 deletions

View File

@ -9,11 +9,15 @@ import re
from contextlib import suppress
from functools import partial
from gettext import gettext as _
from typing import Dict, Iterable, List, Optional, Union
from typing import (
TYPE_CHECKING, Any, Callable, Dict, Generator, Iterable, List, Optional,
Tuple, Union
)
from weakref import WeakValueDictionary
from .child import cached_process_data, cwd_of_process
from .cli import create_opts, parse_args
from .cli_stub import CLIOptions
from .conf.utils import to_cmdline
from .config import (
SubSequenceMap, common_opts_as_dict, initial_window_size_func,
@ -34,9 +38,12 @@ from .fast_data_types import (
)
from .keys import get_shortcut, shortcut_matches
from .layout import set_layout_options
from .options_stub import Options
from .rgb import Color, color_from_int
from .session import create_sessions
from .tabs import SpecialWindow, SpecialWindowInstance, TabManager
from .session import Session, create_sessions
from .tabs import (
SpecialWindow, SpecialWindowInstance, Tab, TabDict, TabManager
)
from .utils import (
func_name, get_editor, get_primary_selection, is_path_in_temp_dir,
log_error, open_url, parse_address_spec, remove_socket_file, safe_print,
@ -44,6 +51,19 @@ from .utils import (
)
from .window import MatchPatternType, Window
if TYPE_CHECKING:
from typing import TypedDict
from .rc.base import RemoteCommand # noqa
from subprocess import Popen # noqa
else:
TypedDict = dict
class OSWindowDict(TypedDict):
id: int
is_focused: bool
tabs: List[TabDict]
def notification_activated(identifier: str) -> None:
if identifier == 'new-version':
@ -61,10 +81,13 @@ def listen_on(spec: str) -> int:
return s.fileno()
def data_for_at(w: Window, arg: str, add_wrap_markers: bool = False) -> Optional[str]:
def as_text(**kw) -> str:
def data_for_at(w: Optional[Window], arg: str, add_wrap_markers: bool = False) -> Optional[str]:
if not w:
return None
def as_text(**kw: bool) -> str:
kw['add_wrap_markers'] = add_wrap_markers
return w.as_text(**kw)
return w.as_text(**kw) if w else ''
if arg == '@selection':
return w.text_for_selection()
@ -89,12 +112,12 @@ def data_for_at(w: Window, arg: str, add_wrap_markers: bool = False) -> Optional
class DumpCommands: # {{{
def __init__(self, args):
self.draw_dump_buf = []
def __init__(self, args: CLIOptions):
self.draw_dump_buf: List[str] = []
if args.dump_bytes:
self.dump_bytes_to = open(args.dump_bytes, 'wb')
def __call__(self, *a):
def __call__(self, *a: Any) -> None:
if a:
if a[0] == 'draw':
if a[1] is None:
@ -116,17 +139,24 @@ class DumpCommands: # {{{
class Boss:
def __init__(self, os_window_id, opts, args, cached_values, new_os_window_trigger):
def __init__(
self,
os_window_id: Optional[int],
opts: Options,
args: CLIOptions,
cached_values: Dict[str, Any],
new_os_window_trigger: Optional[Tuple[int, bool, int]]
):
set_layout_options(opts)
self.clipboard_buffers: Dict[str, str] = {}
self.update_check_process = None
self.update_check_process: Optional['Popen'] = None
self.window_id_map: WeakValueDictionary[int, Window] = WeakValueDictionary()
self.startup_colors = {k: opts[k] for k in opts if isinstance(opts[k], Color)}
self.startup_cursor_text_color = opts.cursor_text_color
self.pending_sequences: Optional[SubSequenceMap] = None
self.cached_values = cached_values
self.os_window_map = {}
self.os_window_death_actions = {}
self.os_window_map: Dict[int, TabManager] = {}
self.os_window_death_actions: Dict[int, Callable[[], None]] = {}
self.cursor_blinking = True
self.shutting_down = False
talk_fd = getattr(single_instance, 'socket', None)
@ -157,7 +187,15 @@ class Boss:
from .fast_data_types import cocoa_set_notification_activated_callback
cocoa_set_notification_activated_callback(notification_activated)
def add_os_window(self, startup_session=None, os_window_id=None, wclass=None, wname=None, opts_for_size=None, startup_id=None):
def add_os_window(
self,
startup_session: Optional[Session] = None,
os_window_id: Optional[int] = None,
wclass: Optional[str] = None,
wname: Optional[str] = None,
opts_for_size: Optional[Options] = None,
startup_id: Optional[str] = None
) -> int:
if os_window_id is None:
opts_for_size = opts_for_size or getattr(startup_session, 'os_window_size', None) or self.opts
cls = wclass or self.args.cls or appname
@ -170,7 +208,7 @@ class Boss:
self.os_window_map[os_window_id] = tm
return os_window_id
def list_os_windows(self):
def list_os_windows(self) -> Generator[OSWindowDict, None, None]:
with cached_process_data():
active_tab, active_window = self.active_tab, self.active_window
active_tab_manager = self.active_tab_manager
@ -182,20 +220,20 @@ class Boss:
}
@property
def all_tab_managers(self):
def all_tab_managers(self) -> Generator[TabManager, None, None]:
yield from self.os_window_map.values()
@property
def all_tabs(self):
def all_tabs(self) -> Generator[Tab, None, None]:
for tm in self.all_tab_managers:
yield from tm
@property
def all_windows(self):
def all_windows(self) -> Generator[Window, None, None]:
for tab in self.all_tabs:
yield from tab
def match_windows(self, match):
def match_windows(self, match: str) -> Generator[Window, None, None]:
try:
field, exp = match.split(':', 1)
except ValueError:
@ -222,13 +260,13 @@ class Boss:
if window.matches(field, pat):
yield window
def tab_for_window(self, window):
def tab_for_window(self, window: Window) -> Optional[Tab]:
for tab in self.all_tabs:
for w in tab:
if w.id == window.id:
return tab
def match_tabs(self, match):
def match_tabs(self, match: str) -> Generator[Tab, None, None]:
try:
field, exp = match.split(':', 1)
except ValueError:
@ -242,11 +280,11 @@ class Boss:
found = True
if not found:
tabs = {self.tab_for_window(w) for w in self.match_windows(match)}
for tab in tabs:
if tab:
yield tab
for q in tabs:
if q:
yield q
def set_active_window(self, window, switch_os_window_if_needed=False):
def set_active_window(self, window: Window, switch_os_window_if_needed: bool = False) -> Optional[int]:
for os_window_id, tm in self.os_window_map.items():
for tab in tm:
for w in tab:
@ -258,7 +296,7 @@ class Boss:
focus_os_window(os_window_id, True)
return os_window_id
def _new_os_window(self, args, cwd_from=None):
def _new_os_window(self, args: Union[SpecialWindowInstance, Iterable[str]], cwd_from: Optional[int] = None) -> int:
if isinstance(args, SpecialWindowInstance):
sw = args
else:
@ -266,32 +304,34 @@ class Boss:
startup_session = next(create_sessions(self.opts, special_window=sw, cwd_from=cwd_from))
return self.add_os_window(startup_session)
def new_os_window(self, *args):
def new_os_window(self, *args: str) -> None:
self._new_os_window(args)
@property
def active_window_for_cwd(self):
def active_window_for_cwd(self) -> Optional[Window]:
w = self.active_window
if w is not None and w.overlay_for is not None and w.overlay_for in self.window_id_map:
w = self.window_id_map[w.overlay_for]
return w
def new_os_window_with_cwd(self, *args):
def new_os_window_with_cwd(self, *args: str) -> None:
w = self.active_window_for_cwd
cwd_from = w.child.pid_for_cwd if w is not None else None
self._new_os_window(args, cwd_from)
def new_os_window_with_wd(self, wd):
def new_os_window_with_wd(self, wd: str) -> None:
special_window = SpecialWindow(None, cwd=wd)
self._new_os_window(special_window)
def add_child(self, window):
def add_child(self, window: Window) -> None:
assert window.child.pid is not None and window.child.child_fd is not None
self.child_monitor.add_child(window.id, window.child.pid, window.child.child_fd, window.screen)
self.window_id_map[window.id] = window
def _handle_remote_command(self, cmd, window=None, from_peer=False):
def _handle_remote_command(self, cmd: str, window: Optional[Window] = None, from_peer: bool = False) -> Optional[Dict[str, Any]]:
from .remote_control import handle_cmd
response = None
window = window or None
if self.opts.allow_remote_control == 'y' or from_peer or getattr(window, 'allow_remote_control', False):
try:
response = handle_cmd(self, window, cmd)
@ -310,9 +350,9 @@ class Boss:
if msg.startswith(cmd_prefix):
cmd = msg[len(cmd_prefix):-2]
response = self._handle_remote_command(cmd, from_peer=True)
if response is not None:
response = (cmd_prefix + json.dumps(response) + '\x1b\\').encode('utf-8')
return response
if response is None:
return None
return (cmd_prefix + json.dumps(response) + '\x1b\\').encode('utf-8')
else:
msg = json.loads(msg)
if isinstance(msg, dict) and msg.get('cmd') == 'new_instance':
@ -422,7 +462,7 @@ class Boss:
if w is not None:
windows.append(w)
else:
windows = self.all_windows
windows = list(self.all_windows)
reset = action == 'reset'
how = 3 if action == 'scrollback' else 2
for w in windows:
@ -524,18 +564,19 @@ class Boss:
self._set_os_window_background_opacity(os_window_id, opacity)
@property
def active_tab_manager(self):
def active_tab_manager(self) -> Optional[TabManager]:
os_window_id = current_os_window()
return self.os_window_map.get(os_window_id)
if os_window_id is not None:
return self.os_window_map.get(os_window_id)
@property
def active_tab(self):
def active_tab(self) -> Optional[Tab]:
tm = self.active_tab_manager
if tm is not None:
return tm.active_tab
@property
def active_window(self):
def active_window(self) -> Optional[Window]:
t = self.active_tab
if t is not None:
return t.active_window
@ -713,7 +754,7 @@ class Boss:
tab = self.active_tab
else:
w = window
tab = w.tabref()
tab = w.tabref() if w else None
if end_kitten.no_ui:
end_kitten(None, getattr(w, 'id', None), self)
return
@ -723,7 +764,7 @@ class Boss:
if input_data is None:
type_of_input = end_kitten.type_of_input
if type_of_input in ('text', 'history', 'ansi', 'ansi-history', 'screen', 'screen-history', 'screen-ansi', 'screen-ansi-history'):
data = w.as_text(
data: Optional[bytes] = w.as_text(
as_ansi='ansi' in type_of_input,
add_history='history' in type_of_input,
add_wrap_markers='screen' in type_of_input
@ -734,8 +775,6 @@ class Boss:
raise ValueError('Unknown type_of_input: {}'.format(type_of_input))
else:
data = input_data
if isinstance(data, str):
data = data.encode('utf-8')
copts = common_opts_as_dict(self.opts)
overlay_window = tab.new_special_window(
SpecialWindow(
@ -743,7 +782,7 @@ class Boss:
stdin=data,
env={
'KITTY_COMMON_OPTS': json.dumps(copts),
'KITTY_CHILD_PID': w.child.pid,
'KITTY_CHILD_PID': str(w.child.pid),
'PYTHONWARNINGS': 'ignore',
'OVERLAID_WINDOW_LINES': str(w.screen.lines),
'OVERLAID_WINDOW_COLS': str(w.screen.columns),
@ -833,7 +872,8 @@ class Boss:
def switch_focus_to(self, window_idx):
tab = self.active_tab
tab.set_active_window_idx(window_idx)
if tab:
tab.set_active_window_idx(window_idx)
def open_url(self, url, program=None, cwd=None):
if url:
@ -925,6 +965,8 @@ class Boss:
def process_stdin_source(self, window=None, stdin=None):
w = window or self.active_window
if not w:
return None, None
env = None
if stdin:
add_wrap_markers = stdin.endswith('_wrap')
@ -942,19 +984,22 @@ class Boss:
return env, stdin
def data_for_at(self, which, window=None, add_wrap_markers=False) -> Optional[str]:
return data_for_at(window or self.active_window, which, add_wrap_markers=add_wrap_markers)
window = window or self.active_window
if not window:
return None
return data_for_at(window, which, add_wrap_markers=add_wrap_markers)
def special_window_for_cmd(self, cmd, window=None, stdin=None, cwd_from=None, as_overlay=False):
w = window or self.active_window
env, stdin = self.process_stdin_source(w, stdin)
cmdline = []
for arg in cmd:
if arg == '@selection':
if arg == '@selection' and w:
arg = data_for_at(w, arg)
if not arg:
continue
cmdline.append(arg)
overlay_for = w.id if as_overlay and w.overlay_for is None else None
overlay_for = w.id if w and as_overlay and w.overlay_for is None else None
return SpecialWindow(cmd, stdin, cwd_from=cwd_from, overlay_for=overlay_for, env=env)
def run_background_process(self, cmd, cwd=None, env=None, stdin=None, cwd_from=None):
@ -1080,7 +1125,7 @@ class Boss:
w = self.active_window_for_cwd
if w is None:
return self.new_window(*args)
cwd_from = w.child.pid_for_cwd if w is not None else None
cwd_from = w.child.pid_for_cwd
self._new_window(args, cwd_from=cwd_from)
def launch(self, *args: str) -> None:
@ -1105,7 +1150,7 @@ class Boss:
if self.active_window is not None:
windows = [self.active_window]
elif where == 'all':
windows = self.all_windows
windows = list(self.all_windows)
elif where == 'tab':
if self.active_tab is not None:
windows = list(self.active_tab)
@ -1141,11 +1186,12 @@ class Boss:
self.update_check_process = process
def on_monitored_pid_death(self, pid, exit_status):
update_check_process = getattr(self, 'update_check_process', None)
update_check_process = self.update_check_process
if update_check_process is not None and pid == update_check_process.pid:
self.update_check_process = None
from .update_check import process_current_release
try:
assert update_check_process.stdout is not None
raw = update_check_process.stdout.read().decode('utf-8')
except Exception as e:
log_error('Failed to read data from update check process, with error: {}'.format(e))

View File

@ -334,6 +334,7 @@ def as_type_stub(
ans.append(' {}: {}'.format(field_name, type_def))
ans.append(' def __iter__(self) -> typing.Iterator[str]: pass')
ans.append(' def __len__(self) -> int: pass')
ans.append(' def __getitem__(self, k: typing.Union[int, str]) -> typing.Any: pass')
ans.append(' def _replace(self, **kw: typing.Any) -> {}: pass'.format(class_name))
return '\n'.join(ans) + '\n\n\n'

View File

@ -1026,6 +1026,15 @@ class Screen:
as_text_non_visual = as_text
as_text_alternate = as_text
def scroll_until_cursor(self) -> None:
pass
def reset(self) -> None:
pass
def erase_in_display(self, how: int = 0, private: bool = False) -> None:
pass
def set_tab_bar_render_data(
os_window_id: int, xstart: float, ystart: float, dx: float, dy: float,

View File

@ -159,12 +159,15 @@ def get_env(opts: LaunchCLIOptions, active_child: Child) -> Dict[str, str]:
return env
def tab_for_window(boss: Boss, opts: LaunchCLIOptions, target_tab: Optional[Tab] = None) -> Tab:
def tab_for_window(boss: Boss, opts: LaunchCLIOptions, target_tab: Optional[Tab] = None) -> Optional[Tab]:
if opts.type == 'tab':
tm = boss.active_tab_manager
tab: Tab = tm.new_tab(empty_tab=True, location=opts.location)
if opts.tab_title:
tab.set_title(opts.tab_title)
if tm:
tab: Optional[Tab] = tm.new_tab(empty_tab=True, location=opts.location)
if opts.tab_title and tab:
tab.set_title(opts.tab_title)
else:
tab = None
elif opts.type == 'os-window':
oswid = boss.add_os_window()
tm = boss.os_window_map[oswid]
@ -252,8 +255,9 @@ def launch(boss: Boss, opts: LaunchCLIOptions, args: List[str], target_tab: Opti
func(kw['stdin'])
else:
tab = tab_for_window(boss, opts, target_tab)
new_window: Window = tab.new_window(env=env or None, **kw)
if opts.keep_focus and active:
boss.set_active_window(active, switch_os_window_if_needed=True)
return new_window
if tab:
new_window: Window = tab.new_window(env=env or None, **kw)
if opts.keep_focus and active:
boss.set_active_window(active, switch_os_window_if_needed=True)
return new_window
return None

View File

@ -29,7 +29,7 @@ if TYPE_CHECKING:
from .window import Window # noqa
def handle_cmd(boss: 'Boss', window: 'Window', serialized_cmd: str) -> Optional[Dict[str, Any]]:
def handle_cmd(boss: 'Boss', window: Optional['Window'], serialized_cmd: str) -> Optional[Dict[str, Any]]:
cmd = json.loads(serialized_cmd)
v = cmd['version']
no_response = cmd.get('no_response', False)

View File

@ -17,7 +17,7 @@ from .utils import log_error, open_url
CHANGELOG_URL = 'https://sw.kovidgoyal.net/kitty/changelog.html'
RELEASED_VERSION_URL = 'https://sw.kovidgoyal.net/kitty/current-version.txt'
CHECK_INTERVAL = 24 * 60 * 60
CHECK_INTERVAL = 24 * 60 * 60.
class Notification(NamedTuple):
@ -123,6 +123,6 @@ def update_check(timer_id: Optional[int] = None) -> bool:
return True
def run_update_check(interval: int = CHECK_INTERVAL) -> None:
def run_update_check(interval: float = CHECK_INTERVAL) -> None:
if update_check():
add_timer(update_check, interval)