More typing work

This commit is contained in:
Kovid Goyal 2020-03-08 18:55:30 +05:30
parent 7057bc663e
commit 308d171dae
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
7 changed files with 216 additions and 61 deletions

View File

@ -9,8 +9,8 @@ import re
from contextlib import suppress from contextlib import suppress
from functools import partial from functools import partial
from gettext import gettext as _ from gettext import gettext as _
from typing import Dict, Optional
from weakref import WeakValueDictionary from weakref import WeakValueDictionary
from typing import Optional
from .child import cached_process_data, cwd_of_process from .child import cached_process_data, cwd_of_process
from .cli import create_opts, parse_args from .cli import create_opts, parse_args
@ -39,7 +39,7 @@ from .utils import (
log_error, open_url, parse_address_spec, remove_socket_file, safe_print, log_error, open_url, parse_address_spec, remove_socket_file, safe_print,
set_primary_selection, single_instance, startup_notification_handler set_primary_selection, single_instance, startup_notification_handler
) )
from .window import Window from .window import MatchPatternType, Window
def notification_activated(identifier: str) -> None: def notification_activated(identifier: str) -> None:
@ -117,7 +117,7 @@ class Boss:
set_layout_options(opts) set_layout_options(opts)
self.clipboard_buffers = {} self.clipboard_buffers = {}
self.update_check_process = None self.update_check_process = None
self.window_id_map = WeakValueDictionary() self.window_id_map: WeakValueDictionary[int, Window] = WeakValueDictionary()
self.startup_colors = {k: opts[k] for k in opts if isinstance(opts[k], Color)} self.startup_colors = {k: opts[k] for k in opts if isinstance(opts[k], Color)}
self.startup_cursor_text_color = opts.cursor_text_color self.startup_cursor_text_color = opts.cursor_text_color
self.pending_sequences = None self.pending_sequences = None
@ -210,7 +210,7 @@ class Boss:
if field == 'env': if field == 'env':
kp, vp = exp.partition('=')[::2] kp, vp = exp.partition('=')[::2]
if vp: if vp:
pat = tuple(map(re.compile, (kp, vp))) pat: MatchPatternType = re.compile(kp), re.compile(vp)
else: else:
pat = re.compile(kp), None pat = re.compile(kp), None
else: else:

View File

@ -805,7 +805,7 @@ def compare_opts(opts: OptionsStub) -> None:
compare_keymaps(final, initial) compare_keymaps(final, initial)
def create_opts(args, debug_config=False, accumulate_bad_lines=None): def create_opts(args: CLIOptions, debug_config=False, accumulate_bad_lines=None) -> OptionsStub:
from .config import load_config from .config import load_config
config = tuple(resolve_config(SYSTEM_CONF, defconf, args.config)) config = tuple(resolve_config(SYSTEM_CONF, defconf, args.config))
if debug_config: if debug_config:

View File

@ -24,7 +24,7 @@ def generate_stub() -> None:
nonlocal text nonlocal text
text += as_type_stub(*parse_option_spec(otext), class_name=cls, extra_fields=extra_fields) text += as_type_stub(*parse_option_spec(otext), class_name=cls, extra_fields=extra_fields)
do() do(extra_fields=('args: typing.Sequence[str]',))
from .launch import options_spec from .launch import options_spec
do(options_spec(), 'LaunchCLIOptions') do(options_spec(), 'LaunchCLIOptions')

View File

@ -5,17 +5,21 @@
import os import os
import re import re
import shlex import shlex
from collections import namedtuple
from typing import ( from typing import (
Any, Callable, Dict, FrozenSet, Iterable, List, Optional, Sequence, Tuple, Any, Callable, Dict, FrozenSet, Iterable, List, NamedTuple, Optional,
Type, Union Sequence, Tuple, Type, Union
) )
from ..rgb import Color, to_color as as_color from ..rgb import Color, to_color as as_color
from ..utils import log_error from ..utils import log_error
key_pat = re.compile(r'([a-zA-Z][a-zA-Z0-9_-]*)\s+(.+)$') key_pat = re.compile(r'([a-zA-Z][a-zA-Z0-9_-]*)\s+(.+)$')
BadLine = namedtuple('BadLine', 'number line exception')
class BadLine(NamedTuple):
number: int
line: str
exception: Exception
def to_color(x: str) -> Color: def to_color(x: str) -> Color:

View File

@ -1,4 +1,6 @@
from typing import Any, Callable, Dict, List, NewType, Optional, Tuple, Union from typing import (
Any, AnyStr, Callable, Dict, List, NewType, Optional, Tuple, Union
)
from kitty.boss import Boss from kitty.boss import Boss
from kitty.options_stub import Options from kitty.options_stub import Options
@ -577,7 +579,7 @@ def set_in_sequence_mode(yes: bool) -> None:
pass pass
def set_clipboard_string(data: bytes) -> None: def set_clipboard_string(data: AnyStr) -> None:
pass pass
@ -594,7 +596,7 @@ def set_boss(boss: Boss) -> None:
pass pass
def get_boss() -> Optional[Boss]: def get_boss() -> Boss: # this can return None but we ignore that for convenience
pass pass
@ -607,7 +609,23 @@ def patch_global_colors(spec: Dict[str, int], configured: bool) -> None:
class ColorProfile: class ColorProfile:
pass
default_bg: int
def as_dict(self) -> Dict:
pass
def as_color(self, val: int) -> Tuple[int, int, int]:
pass
def set_color(self, num: int, val: int) -> None:
pass
def reset_color_table(self) -> None:
pass
def reset_color(self, num: int) -> None:
pass
def patch_color_profiles( def patch_color_profiles(
@ -623,6 +641,18 @@ def os_window_font_size(
pass pass
def cocoa_set_notification_activated_callback(identifier: Callable[[str], None]) -> None:
pass
def cocoa_set_new_window_trigger(mods: int, key: int) -> bool:
pass
def cocoa_get_lang() -> Optional[str]:
pass
def mark_os_window_for_close(os_window_id: int, yes: bool = True) -> bool: def mark_os_window_for_close(os_window_id: int, yes: bool = True) -> bool:
pass pass
@ -639,6 +669,14 @@ def focus_os_window(os_window_id: int, also_raise: bool = True) -> bool:
pass pass
def start_profiler(path: str) -> None:
pass
def stop_profiler() -> None:
pass
def destroy_global_data() -> None: def destroy_global_data() -> None:
pass pass
@ -828,8 +866,95 @@ def create_test_font_group(sz: float, dpix: float,
pass pass
class HistoryBuf:
def as_text(self, callback: Callable[[str], None], as_ansi: bool, insert_wrap_markers: bool) -> None:
pass
def pagerhist_as_text(self, callback: Callable[[str], None]) -> None:
pass
class LineBuf:
def is_continued(self, idx: int) -> bool:
pass
class Cursor:
x: int
y: int
class Screen: class Screen:
pass
color_profile: ColorProfile
columns: int
lines: int
focus_tracking_enabled: bool
historybuf: HistoryBuf
linebuf: LineBuf
in_bracketed_paste_mode: bool
scrolled_by: int
cursor: Cursor
def __init__(
self,
callbacks: Any = None,
lines: int = 80, columns: int = 24, scrollback: int = 0,
cell_width: int = 10, cell_height: int = 20,
window_id: int = 0,
test_child: Any = None
):
pass
def copy_colors_from(self, other: 'Screen') -> None:
pass
def mark_as_dirty(self) -> None:
pass
def resize(self, width: int, height: int) -> None:
pass
def send_escape_code_to_child(self, code: int, text: str) -> None:
pass
def reset_callbacks(self) -> None:
pass
def text_for_selection(self) -> Tuple[str, ...]:
pass
def is_rectangle_select(self) -> bool:
pass
def is_using_alternate_linebuf(self) -> bool:
pass
def is_main_linebuf(self) -> bool:
pass
def scroll(self, amt: int, upwards: bool) -> bool:
pass
def scroll_to_next_mark(self, mark: int = 0, backwards: bool = True) -> bool:
pass
def clear_selection(self) -> None:
pass
def set_marker(self, marker: Optional[Callable] = None) -> None:
pass
def paste_bytes(self, data: bytes) -> None:
pass
paste = paste_bytes
def as_text(self, callback: Callable[[str], None], as_ansi: bool, insert_wrap_markers: bool) -> None:
pass
as_text_non_visual = as_text
as_text_alternate = as_text
def set_tab_bar_render_data( def set_tab_bar_render_data(
@ -866,3 +991,15 @@ class ChildMonitor:
def wakeup(self) -> None: def wakeup(self) -> None:
pass pass
def main_loop(self) -> None:
pass
def resize_pty(self, window_id: int, rows: int, cols: int, x_pixels: int, y_pixels: int) -> None:
pass
def needs_write(self, child_id: int, data: bytes) -> bool:
pass
def set_iutf8_winid(self, win_id: int, on: bool) -> bool:
pass

View File

@ -7,12 +7,14 @@ import os
import shutil import shutil
import sys import sys
from contextlib import contextmanager, suppress from contextlib import contextmanager, suppress
from typing import Generator, List, Mapping, Optional, Tuple
from .borders import load_borders_program from .borders import load_borders_program
from .boss import Boss from .boss import Boss
from .child import set_default_env from .child import set_default_env
from .cli import create_opts, parse_args from .cli import create_opts, parse_args
from .cli_stub import CLIOptions from .cli_stub import CLIOptions
from .conf.utils import BadLine
from .config import cached_values_for, initial_window_size_func from .config import cached_values_for, initial_window_size_func
from .constants import ( from .constants import (
appname, beam_cursor_data_file, config_dir, glfw_path, is_macos, appname, beam_cursor_data_file, config_dir, glfw_path, is_macos,
@ -25,6 +27,7 @@ from .fast_data_types import (
) )
from .fonts.box_drawing import set_scale from .fonts.box_drawing import set_scale
from .fonts.render import set_font_family from .fonts.render import set_font_family
from .options_stub import Options as OptionsStub
from .utils import ( from .utils import (
detach, log_error, read_shell_environment, single_instance, detach, log_error, read_shell_environment, single_instance,
startup_notification_handler, unix_socket_paths startup_notification_handler, unix_socket_paths
@ -66,13 +69,15 @@ def talk_to_instance(args):
data['notify_on_os_window_death'] = address data['notify_on_os_window_death'] = address
notify_socket.listen() notify_socket.listen()
data = json.dumps(data, ensure_ascii=False).encode('utf-8') sdata = json.dumps(data, ensure_ascii=False).encode('utf-8')
single_instance.socket.sendall(data) assert single_instance.socket is not None
single_instance.socket.sendall(sdata)
with suppress(OSError): with suppress(OSError):
single_instance.socket.shutdown(socket.SHUT_RDWR) single_instance.socket.shutdown(socket.SHUT_RDWR)
single_instance.socket.close() single_instance.socket.close()
if args.wait_for_single_instance_window_close: if args.wait_for_single_instance_window_close:
assert notify_socket is not None
conn = notify_socket.accept()[0] conn = notify_socket.accept()[0]
conn.recv(1) conn.recv(1)
with suppress(OSError): with suppress(OSError):
@ -96,7 +101,7 @@ def init_glfw(opts, debug_keyboard=False):
return glfw_module return glfw_module
def get_new_os_window_trigger(opts): def get_new_os_window_trigger(opts: OptionsStub) -> Optional[Tuple[int, bool, int]]:
new_os_window_trigger = None new_os_window_trigger = None
if is_macos: if is_macos:
new_os_window_shortcuts = [] new_os_window_shortcuts = []
@ -113,7 +118,7 @@ def get_new_os_window_trigger(opts):
return new_os_window_trigger return new_os_window_trigger
def _run_app(opts, args, bad_lines=()): def _run_app(opts: OptionsStub, args, bad_lines=()):
new_os_window_trigger = get_new_os_window_trigger(opts) new_os_window_trigger = get_new_os_window_trigger(opts)
if is_macos and opts.macos_custom_beam_cursor: if is_macos and opts.macos_custom_beam_cursor:
set_custom_ibeam_cursor() set_custom_ibeam_cursor()
@ -145,7 +150,7 @@ class AppRunner:
self.first_window_callback = lambda window_handle: None self.first_window_callback = lambda window_handle: None
self.initial_window_size_func = initial_window_size_func self.initial_window_size_func = initial_window_size_func
def __call__(self, opts, args, bad_lines=()): def __call__(self, opts: OptionsStub, args: CLIOptions, bad_lines=()) -> None:
set_scale(opts.box_drawing_scale) set_scale(opts.box_drawing_scale)
set_options(opts, is_wayland(), args.debug_gl, args.debug_font_fallback) set_options(opts, is_wayland(), args.debug_gl, args.debug_font_fallback)
set_font_family(opts, debug_font_matching=args.debug_font_fallback) set_font_family(opts, debug_font_matching=args.debug_font_fallback)
@ -158,7 +163,7 @@ class AppRunner:
run_app = AppRunner() run_app = AppRunner()
def ensure_macos_locale(): def ensure_macos_locale() -> None:
# Ensure the LANG env var is set. See # Ensure the LANG env var is set. See
# https://github.com/kovidgoyal/kitty/issues/90 # https://github.com/kovidgoyal/kitty/issues/90
from .fast_data_types import cocoa_get_lang from .fast_data_types import cocoa_get_lang
@ -169,15 +174,16 @@ def ensure_macos_locale():
@contextmanager @contextmanager
def setup_profiling(args): def setup_profiling(args) -> Generator[None, None, None]:
try: try:
from .fast_data_types import start_profiler, stop_profiler from .fast_data_types import start_profiler, stop_profiler
do_profile = True
except ImportError: except ImportError:
start_profiler = stop_profiler = None do_profile = False
if start_profiler is not None: if do_profile:
start_profiler('/tmp/kitty-profile.log') start_profiler('/tmp/kitty-profile.log')
yield yield
if stop_profiler is not None: if do_profile:
import subprocess import subprocess
stop_profiler() stop_profiler()
exe = kitty_exe() exe = kitty_exe()
@ -206,7 +212,7 @@ def macos_cmdline(argv_args):
return ans return ans
def get_editor_from_env(shell_env): def get_editor_from_env(shell_env: Mapping[str, str]) -> Optional[str]:
for var in ('VISUAL', 'EDITOR'): for var in ('VISUAL', 'EDITOR'):
editor = shell_env.get(var) editor = shell_env.get(var)
if editor: if editor:
@ -214,8 +220,9 @@ def get_editor_from_env(shell_env):
import shlex import shlex
editor_cmd = shlex.split(editor) editor_cmd = shlex.split(editor)
if not os.path.isabs(editor_cmd[0]): if not os.path.isabs(editor_cmd[0]):
editor_cmd[0] = shutil.which(editor_cmd[0], path=shell_env['PATH']) q = shutil.which(editor_cmd[0], path=shell_env['PATH'])
if editor_cmd[0]: if q:
editor_cmd[0] = q
editor = ' '.join(map(shlex.quote, editor_cmd)) editor = ' '.join(map(shlex.quote, editor_cmd))
else: else:
editor = None editor = None
@ -282,30 +289,30 @@ def _main():
cwd_ok = False cwd_ok = False
if not cwd_ok: if not cwd_ok:
os.chdir(os.path.expanduser('~')) os.chdir(os.path.expanduser('~'))
args, rest = parse_args(result_class=CLIOptions, args=args) cli_opts, rest = parse_args(args=args, result_class=CLIOptions)
args.args = rest cli_opts.args = rest
if args.debug_config: if cli_opts.debug_config:
create_opts(args, debug_config=True) create_opts(cli_opts, debug_config=True)
return return
if getattr(args, 'detach', False): if cli_opts.detach:
detach() detach()
if args.replay_commands: if cli_opts.replay_commands:
from kitty.client import main from kitty.client import main as client_main
main(args.replay_commands) client_main(cli_opts.replay_commands)
return return
if args.single_instance: if cli_opts.single_instance:
is_first = single_instance(args.instance_group) is_first = single_instance(cli_opts.instance_group)
if not is_first: if not is_first:
talk_to_instance(args) talk_to_instance(cli_opts)
return return
bad_lines = [] bad_lines: List[BadLine] = []
opts = create_opts(args, accumulate_bad_lines=bad_lines) opts = create_opts(cli_opts, accumulate_bad_lines=bad_lines)
init_glfw(opts, args.debug_keyboard) init_glfw(opts, cli_opts.debug_keyboard)
setup_environment(opts, args) setup_environment(opts, cli_opts)
try: try:
with setup_profiling(args): with setup_profiling(cli_opts):
# Avoid needing to launch threads to reap zombies # Avoid needing to launch threads to reap zombies
run_app(opts, args, bad_lines) run_app(opts, cli_opts, bad_lines)
finally: finally:
glfw_terminate() glfw_terminate()

View File

@ -9,7 +9,8 @@ import weakref
from collections import deque from collections import deque
from enum import IntEnum from enum import IntEnum
from itertools import chain from itertools import chain
from typing import List from re import Pattern
from typing import Deque, Dict, List, Optional, Tuple, Union
from .config import build_ansi_color_table from .config import build_ansi_color_table
from .constants import ScreenGeometry, WindowGeometry, appname, wakeup from .constants import ScreenGeometry, WindowGeometry, appname, wakeup
@ -32,6 +33,8 @@ from .utils import (
set_primary_selection set_primary_selection
) )
MatchPatternType = Union[Pattern, Tuple[Pattern, Optional[Pattern]]]
class DynamicColor(IntEnum): class DynamicColor(IntEnum):
default_fg, default_bg, cursor_color, highlight_fg, highlight_bg = range(1, 6) default_fg, default_bg, cursor_color, highlight_fg, highlight_bg = range(1, 6)
@ -123,7 +126,8 @@ def text_sanitizer(as_ansi, add_wrap_markers):
pat = getattr(text_sanitizer, 'pat', None) pat = getattr(text_sanitizer, 'pat', None)
if pat is None: if pat is None:
import re import re
pat = text_sanitizer.pat = re.compile(r'\033\[.+?m') pat = re.compile(r'\033\[.+?m')
setattr(text_sanitizer, 'pat', pat)
ansi, wrap_markers = not as_ansi, not add_wrap_markers ansi, wrap_markers = not as_ansi, not add_wrap_markers
@ -153,7 +157,7 @@ class Window:
self.overlay_for = None self.overlay_for = None
self.default_title = os.path.basename(child.argv[0] or appname) self.default_title = os.path.basename(child.argv[0] or appname)
self.child_title = self.default_title self.child_title = self.default_title
self.title_stack = deque(maxlen=10) self.title_stack: Deque[str] = deque(maxlen=10)
self.allow_remote_control = child.allow_remote_control self.allow_remote_control = child.allow_remote_control
self.id = add_window(tab.os_window_id, tab.id, self.title) self.id = add_window(tab.os_window_id, tab.id, self.title)
if not self.id: if not self.id:
@ -200,14 +204,23 @@ class Window:
) )
@property @property
def current_colors(self): def current_colors(self) -> Dict:
return self.screen.color_profile.as_dict() return self.screen.color_profile.as_dict()
def matches(self, field, pat): def matches(self, field: str, pat: MatchPatternType) -> bool:
if field == 'env':
assert isinstance(pat, tuple)
key_pat, val_pat = pat
for key, val in self.child.environ.items():
if key_pat.search(key) is not None and (
val_pat is None or val_pat.search(val) is not None):
return True
assert isinstance(pat, Pattern)
if field == 'id': if field == 'id':
return pat.pattern == str(self.id) return True if pat.pattern == str(self.id) else False
if field == 'pid': if field == 'pid':
return pat.pattern == str(self.child.pid) return True if pat.pattern == str(self.child.pid) else False
if field == 'title': if field == 'title':
return pat.search(self.override_title or self.title) is not None return pat.search(self.override_title or self.title) is not None
if field in 'cwd': if field in 'cwd':
@ -217,12 +230,6 @@ class Window:
if pat.search(x) is not None: if pat.search(x) is not None:
return True return True
return False return False
if field == 'env':
key_pat, val_pat = pat
for key, val in self.child.environ.items():
if key_pat.search(key) is not None and (
val_pat is None or val_pat.search(val) is not None):
return True
return False return False
def set_visible_in_layout(self, window_idx, val): def set_visible_in_layout(self, window_idx, val):
@ -488,15 +495,15 @@ class Window:
lines = self.screen.text_for_selection() lines = self.screen.text_for_selection()
if self.opts.strip_trailing_spaces == 'always' or ( if self.opts.strip_trailing_spaces == 'always' or (
self.opts.strip_trailing_spaces == 'smart' and not self.screen.is_rectangle_select()): self.opts.strip_trailing_spaces == 'smart' and not self.screen.is_rectangle_select()):
lines = ((l.rstrip() or '\n') for l in lines) return ''.join((l.rstrip() or '\n') for l in lines)
return ''.join(lines) return ''.join(lines)
def destroy(self): def destroy(self):
self.destroyed = True self.destroyed = True
if self.screen is not None: if hasattr(self, 'screen'):
# Remove cycles so that screen is de-allocated immediately # Remove cycles so that screen is de-allocated immediately
self.screen.reset_callbacks() self.screen.reset_callbacks()
self.screen = None del self.screen
def as_text(self, as_ansi=False, add_history=False, add_wrap_markers=False, alternate_screen=False) -> str: def as_text(self, as_ansi=False, add_history=False, add_wrap_markers=False, alternate_screen=False) -> str:
lines: List[str] = [] lines: List[str] = []