more typing work

This commit is contained in:
Kovid Goyal 2020-03-09 16:13:00 +05:30
parent c817ba9eae
commit ac2c21e046
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
14 changed files with 143 additions and 88 deletions

View File

@ -445,10 +445,10 @@ def parse_shortcut_node(env, sig, signode):
def render_conf(conf_name, all_options):
from kitty.conf.definition import merged_opts, Option
from kitty.conf.definition import merged_opts, Option, Group
ans = ['.. default-domain:: conf', '']
a = ans.append
current_group = None
current_group: Optional[Group] = None
all_options = list(all_options)
kitty_mod = 'kitty_mod'
@ -466,6 +466,7 @@ def render_conf(conf_name, all_options):
def handle_group_end(group):
if group.end_text:
assert current_group is not None
a(''), a(current_group.end_text)
def handle_group(new_group, new_group_is_shortcut=False):

View File

@ -13,6 +13,7 @@ import sys
from collections import namedtuple
from contextlib import contextmanager
from functools import partial
from typing import List
from kitty.constants import is_macos
from kitty.fast_data_types import (
@ -36,8 +37,8 @@ def debug(*a, **kw):
buf = io.StringIO()
kw['file'] = buf
print(*a, **kw)
text = buf.getvalue()
text = b'\x1bP@kitty-print|' + standard_b64encode(text.encode('utf-8')) + b'\x1b\\'
stext = buf.getvalue()
text = b'\x1bP@kitty-print|' + standard_b64encode(stext.encode('utf-8')) + b'\x1b\\'
fobj = getattr(debug, 'fobj', sys.stdout.buffer)
fobj.write(text)
if hasattr(fobj, 'flush'):
@ -158,7 +159,7 @@ class Loop:
if is_macos:
# On macOS PTY devices are not supported by the KqueueSelector and
# the PollSelector is broken, causes 100% CPU usage
self.asycio_loop = asyncio.SelectorEventLoop(selectors.SelectSelector())
self.asycio_loop = asyncio.SelectorEventLoop(selectors.SelectSelector()) # type: ignore
asyncio.set_event_loop(self.asycio_loop)
else:
self.asycio_loop = asyncio.get_event_loop()
@ -178,12 +179,12 @@ class Loop:
def _read_ready(self, handler, fd):
try:
data = os.read(fd, io.DEFAULT_BUFFER_SIZE)
bdata = os.read(fd, io.DEFAULT_BUFFER_SIZE)
except BlockingIOError:
return
if not data:
if not bdata:
raise EOFError('The input stream is closed')
data = self.decoder.decode(data)
data = self.decoder.decode(bdata)
if self.read_buf:
data = self.read_buf + data
self.read_buf = data
@ -295,7 +296,7 @@ class Loop:
if not written:
raise EOFError('The output stream is closed')
if written >= sum(sizes):
self.write_buf = []
self.write_buf: List[bytes] = []
self.asycio_loop.remove_writer(fd)
self.waiting_for_writes = False
else:

View File

@ -43,7 +43,6 @@ def draw_edges(os_window_id, tab_id, colors, width, geometry, base_width=0):
def load_borders_program():
compile_program(BORDERS_PROGRAM, *load_shaders('border'))
init_borders_program()
Borders.program_initialized = True
class Borders:
@ -67,7 +66,8 @@ class Borders:
has_background_image = os_window_has_background_image(self.os_window_id)
if not has_background_image:
for br in chain(current_layout.blank_rects, extra_blank_rects):
add_borders_rect(self.os_window_id, self.tab_id, *br, BorderColor.default_bg)
left, top, right, bottom = br
add_borders_rect(self.os_window_id, self.tab_id, left, top, right, bottom, BorderColor.default_bg)
bw, pw = border_width, padding_width
if bw + pw <= 0:
return

View File

@ -9,13 +9,15 @@ import re
from contextlib import suppress
from functools import partial
from gettext import gettext as _
from typing import Optional
from typing import Dict, Iterable, List, Optional, Union
from weakref import WeakValueDictionary
from .child import cached_process_data, cwd_of_process
from .cli import create_opts, parse_args
from .conf.utils import to_cmdline
from .config import initial_window_size_func, prepare_config_file_for_editing
from .config import (
SubSequenceMap, initial_window_size_func, prepare_config_file_for_editing
)
from .config_data import MINIMUM_FONT_SIZE
from .constants import (
appname, config_dir, is_macos, kitty_exe, supports_primary_selection
@ -115,12 +117,12 @@ class Boss:
def __init__(self, os_window_id, opts, args, cached_values, new_os_window_trigger):
set_layout_options(opts)
self.clipboard_buffers = {}
self.clipboard_buffers: Dict[str, str] = {}
self.update_check_process = None
self.window_id_map: WeakValueDictionary[int, Window] = WeakValueDictionary()
self.startup_colors = {k: opts[k] for k in opts if isinstance(opts[k], Color)}
self.startup_cursor_text_color = opts.cursor_text_color
self.pending_sequences = None
self.pending_sequences: Optional[SubSequenceMap] = None
self.cached_values = cached_values
self.os_window_map = {}
self.os_window_death_actions = {}
@ -464,7 +466,7 @@ class Boss:
new_size = calc_new_size(current_global_size)
if new_size != current_global_size:
global_font_size(new_size)
os_windows = tuple(self.os_window_map.keys())
os_windows = list(self.os_window_map.keys())
else:
os_windows = []
w = self.active_window
@ -553,6 +555,7 @@ class Boss:
def process_sequence(self, key, native_key, action, mods):
if not self.pending_sequences:
set_in_sequence_mode(False)
return
remaining = {}
matched_action = None
@ -755,11 +758,11 @@ class Boss:
overlay_window.action_on_removal = lambda *a: action_on_removal(wid, self)
return overlay_window
def kitten(self, kitten, *args):
def kitten(self, kitten: str, *args: str) -> None:
import shlex
cmdline = args[0] if args else ''
args = shlex.split(cmdline) if cmdline else []
self._run_kitten(kitten, args)
kargs = shlex.split(cmdline) if cmdline else []
self._run_kitten(kitten, kargs)
def on_kitten_finish(self, target_window_id, end_kitten, source_window):
output = self.get_output(source_window, num_lines=None)
@ -888,7 +891,7 @@ class Boss:
def paste_from_buffer(self, buffer_name):
if buffer_name == 'clipboard':
text = get_clipboard_string()
text: Optional[str] = get_clipboard_string()
elif buffer_name == 'primary':
text = get_primary_selection()
else:
@ -1000,23 +1003,24 @@ class Boss:
env, stdin = self.process_stdin_source(stdin=source, window=window)
self.run_background_process(cmd, cwd_from=cwd_from, stdin=stdin, env=env)
def args_to_special_window(self, args, cwd_from=None):
def args_to_special_window(self, args: Iterable[str], cwd_from=None):
args = list(args)
stdin = None
w = self.active_window
if args[0].startswith('@') and args[0] != '@':
stdin = data_for_at(w, args[0]) or None
if stdin is not None:
stdin = stdin.encode('utf-8')
q = data_for_at(w, args[0]) or None
if q is not None:
stdin = q.encode('utf-8')
del args[0]
cmd = []
for arg in args:
if arg == '@selection':
arg = data_for_at(w, arg)
if not arg:
q = data_for_at(w, arg)
if not q:
continue
arg = q
cmd.append(arg)
return SpecialWindow(cmd, stdin, cwd_from=cwd_from)
@ -1078,10 +1082,10 @@ class Boss:
cwd_from = w.child.pid_for_cwd if w is not None else None
self._new_window(args, cwd_from=cwd_from)
def launch(self, *args):
def launch(self, *args: str) -> None:
from kitty.launch import parse_launch_args, launch
opts, args = parse_launch_args(args)
launch(self, opts, args)
opts, args_ = parse_launch_args(args)
launch(self, opts, args_)
def move_tab_forward(self):
tm = self.active_tab_manager
@ -1093,19 +1097,19 @@ class Boss:
if tm is not None:
tm.move_tab(-1)
def disable_ligatures_in(self, where, strategy):
def disable_ligatures_in(self, where: Union[str, Iterable[Window]], strategy: int):
if isinstance(where, str):
windows = ()
windows: List[Window] = []
if where == 'active':
if self.active_window is not None:
windows = (self.active_window,)
windows = [self.active_window]
elif where == 'all':
windows = self.all_windows
elif where == 'tab':
if self.active_tab is not None:
windows = tuple(self.active_tab)
windows = list(self.active_tab)
else:
windows = where
windows = list(where)
for window in windows:
window.screen.disable_ligatures = strategy
window.refresh()
@ -1166,11 +1170,12 @@ class Boss:
self.show_error(_('Errors in kitty.conf'), msg)
def set_colors(self, *args):
from kitty.rc.base import parse_subcommand_cli, command_for_name
from kitty.rc.base import parse_subcommand_cli, command_for_name, PayloadGetter
from kitty.remote_control import parse_rc_args
c = command_for_name('set_colors')
opts, items = parse_subcommand_cli(c, ['set-colors'] + list(args))
payload = c.message_to_kitty(None, opts, items)
c.response_from_kitty(self, self.active_window, payload)
payload = c.message_to_kitty(parse_rc_args([])[0], opts, items)
c.response_from_kitty(self, self.active_window, PayloadGetter(c, payload if isinstance(payload, dict) else {}))
def _move_window_to(self, window=None, target_tab_id=None, target_os_window_id=None):
window = window or self.active_window
@ -1227,8 +1232,10 @@ class Boss:
''
]
fmt = ': {1}'
tab_id_map = {}
tab_id_map: Dict[int, Optional[Union[str, int]]] = {}
current_tab = self.active_tab
done_tab_id: Optional[Union[str, int]] = None
for i, tab in enumerate(self.all_tabs):
if tab is not current_tab:
tab_id_map[len(tab_id_map)] = tab.id
@ -1241,12 +1248,13 @@ class Boss:
lines.append(fmt.format(new_idx, 'New OS Window'))
def done(data, target_window_id, self):
done.tab_id = tab_id_map[int(data['groupdicts'][0]['index'])]
nonlocal done_tab_id
done_tab_id = tab_id_map[int(data['groupdicts'][0]['index'])]
def done2(target_window_id, self):
if not hasattr(done, 'tab_id'):
return
tab_id = done.tab_id
tab_id = done_tab_id
target_window = None
for w in self.all_windows:
if w.id == target_window_id:
@ -1273,8 +1281,11 @@ class Boss:
''
]
fmt = ': {1}'
os_window_id_map = {}
os_window_id_map: Dict[int, Optional[int]] = {}
current_os_window = getattr(self.active_tab, 'os_window_id', 0)
done_osw: Optional[int] = None
done_called = False
for i, osw in enumerate(self.os_window_map):
tm = self.os_window_map[osw]
if current_os_window != osw and tm.active_tab and tm.active_tab:
@ -1285,12 +1296,14 @@ class Boss:
lines.append(fmt.format(new_idx, 'New OS Window'))
def done(data, target_window_id, self):
done.os_window_id = os_window_id_map[int(data['groupdicts'][0]['index'])]
nonlocal done_called, done_osw
done_osw = os_window_id_map[int(data['groupdicts'][0]['index'])]
done_called = True
def done2(target_window_id, self):
if not hasattr(done, 'os_window_id'):
if not done_called:
return
os_window_id = done.os_window_id
os_window_id = done_osw
target_tab = self.active_tab
for w in self.all_windows:
if w.id == target_window_id:

View File

@ -10,7 +10,7 @@ from collections import namedtuple
from contextlib import contextmanager, suppress
from functools import partial
from typing import (
TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional,
Any, Callable, Dict, Iterable, List, Optional,
Sequence, Set, Tuple, Type, cast
)
@ -26,16 +26,25 @@ from .key_names import get_key_name_lookup, key_name_aliases
from .options_stub import Options as OptionsStub
from .utils import log_error
if TYPE_CHECKING:
from .options_stub import SequenceMap, KeyMap
SequenceMap, KeyMap
KeySpec = Tuple[int, bool, int]
KeyMap = Dict[KeySpec, 'KeyAction']
KeySequence = Tuple[KeySpec, ...]
SubSequenceMap = Dict[KeySequence, 'KeyAction']
SequenceMap = Dict[KeySpec, SubSequenceMap]
class InvalidMods(ValueError):
pass
def parse_shortcut(sc: str) -> Tuple[int, bool, Optional[int]]:
parts = sc.split('+')
mods = 0
if len(parts) > 1:
mods = parse_mods(parts[:-1], sc)
mods = parse_mods(parts[:-1], sc) or 0
if not mods:
raise InvalidMods('Invalid shortcut')
q = parts[-1].upper()
key: Optional[int] = getattr(defines, 'GLFW_KEY_' + key_name_aliases.get(q, q), None)
is_native = False
@ -370,7 +379,10 @@ def parse_key(val, key_definitions):
trigger: Optional[Tuple[int, bool, int]] = None
restl: List[Tuple[int, bool, int]] = []
for part in sc.split(sequence_sep):
try:
mods, is_native, key = parse_shortcut(part)
except InvalidMods:
return
if key is None:
if mods is not None:
log_error('Shortcut: {} has unknown key, ignoring'.format(sc))
@ -381,7 +393,10 @@ def parse_key(val, key_definitions):
restl.append((mods, is_native, key))
rest = tuple(restl)
else:
try:
mods, is_native, key = parse_shortcut(sc)
except InvalidMods:
return
if key is None:
if mods is not None:
log_error('Shortcut: {} has unknown key, ignoring'.format(sc))

View File

@ -28,7 +28,7 @@ mod_map = {'CTRL': 'CONTROL', 'CMD': 'SUPER', '⌘': 'SUPER',
'': 'ALT', 'OPTION': 'ALT', 'KITTY_MOD': 'KITTY'}
def parse_mods(parts: Iterable[str], sc: str) -> int:
def parse_mods(parts: Iterable[str], sc: str) -> Optional[int]:
def map_mod(m):
return mod_map.get(m, m)
@ -39,7 +39,7 @@ def parse_mods(parts: Iterable[str], sc: str) -> int:
mods |= getattr(defines, 'GLFW_MOD_' + map_mod(m.upper()))
except AttributeError:
log_error('Shortcut: {} has unknown modifier, ignoring'.format(sc))
return
return None
return mods

View File

@ -941,6 +941,7 @@ class Screen:
in_bracketed_paste_mode: bool
scrolled_by: int
cursor: Cursor
disable_ligatures: int
def __init__(
self,
@ -1063,6 +1064,18 @@ class ChildMonitor:
def set_iutf8_winid(self, win_id: int, on: bool) -> bool:
pass
def add_child(self, id: int, pid: int, fd: int, screen: Screen) -> None:
pass
def mark_for_close(self, window_id: int) -> None:
pass
def start(self) -> None:
pass
def shutdown_monitor(self) -> None:
pass
def set_iutf8_fd(fd: int, on: bool) -> bool:
pass
@ -1081,3 +1094,7 @@ def spawn(
ready_write_fd: int
) -> int:
pass
def key_to_bytes(glfw_key: int, smkx: bool, extended: bool, mods: int, action: int) -> bytes:
pass

View File

@ -3,9 +3,10 @@
# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
import string
from typing import Dict, Tuple, Union
from typing import Dict, Optional, Tuple, Union, overload
from . import fast_data_types as defines
from .config import KeyAction, KeyMap, SequenceMap, SubSequenceMap
from .key_encoding import KEY_MAP
from .terminfo import key_as_bytes, modify_key_bytes
from .utils import base64_encode
@ -246,7 +247,8 @@ def key_to_bytes(key, smkx, extended, mods, action):
data.extend(m[key])
elif mods == ctrl_alt_mod and key in all_control_alt_keys:
if key in CTRL_ALT_KEYS:
data.append(0x1b), data.extend(control_codes[key])
data.append(0x1b)
data.extend(control_codes[key])
else:
data.extend(control_alt_codes[key])
elif mods == ctrl_alt_shift_mod and key in control_alt_shift_codes:
@ -257,6 +259,7 @@ def key_to_bytes(key, smkx, extended, mods, action):
if x is not None:
if mods == defines.GLFW_MOD_SHIFT:
x = SHIFTED_KEYS.get(key, x)
assert x is not None
data.extend(x)
return bytes(data)
@ -272,6 +275,16 @@ def interpret_key_event(key, native_key, mods, window, action):
return b''
@overload
def get_shortcut(seqmap: SequenceMap, mods: int, key: int, native_key: int) -> Optional[SubSequenceMap]:
...
@overload
def get_shortcut(keymap: KeyMap, mods: int, key: int, native_key: int) -> Optional[KeyAction]:
...
def get_shortcut(keymap, mods, key, native_key):
mods &= 0b1111
ans = keymap.get((mods, False, key))

View File

@ -23,7 +23,7 @@ if is_macos:
else:
from .fast_data_types import dbus_send_notification, get_boss
from .fast_data_types import dbus_send_notification
alloc_map: Dict[int, str] = {}
identifier_map: Dict[str, int] = {}
@ -37,7 +37,8 @@ else:
rmap = {v: k for k, v in identifier_map.items()}
identifier = rmap.get(notification_id)
if identifier is not None:
get_boss().notification_activated(identifier)
from .boss import notification_activated
notification_activated(identifier)
def notify(
title,

View File

@ -20,10 +20,7 @@ def generate_stub():
'font_features': 'typing.Dict[str, typing.Tuple[str, ...]]'
},
preamble_lines=(
'from kitty.config import KeyAction',
'KeySpec = typing.Tuple[int, bool, int]',
'KeyMap = typing.Dict[KeySpec, KeyAction]',
'SequenceMap = typing.Dict[KeySpec, KeyMap]',
'from kitty.config import KeyAction, KeyMap, SequenceMap, KeySpec',
),
extra_fields=(
('keymap', 'KeyMap'),

View File

@ -9,7 +9,7 @@ import sys
import types
from contextlib import suppress
from functools import partial
from typing import List
from typing import Any, Dict, List, Tuple, Union
from .cli import emph, parse_args
from .cli_stub import RCOptions
@ -41,7 +41,7 @@ def handle_cmd(boss, window, cmd):
raise
if ans is no_response_sentinel:
return
response = {'ok': True}
response: Dict[str, Any] = {'ok': True}
if ans is not None:
response['data'] = ans
if not c.no_response and not no_response:
@ -106,7 +106,7 @@ class SocketIO:
class RCIO(TTYIO):
def recv(self, timeout):
ans = []
ans: List[bytes] = []
read_command_response(self.tty_fd, timeout, ans)
return b''.join(ans)
@ -122,7 +122,7 @@ def do_io(to, send, no_response):
yield encode_send(send)
send_data = send_generator()
io = SocketIO(to) if to else RCIO()
io: Union[SocketIO, RCIO] = SocketIO(to) if to else RCIO()
with io:
io.send(send_data)
if no_response:
@ -140,7 +140,7 @@ cli_msg = (
).format(appname=appname)
def parse_rc_args(args: List[str]):
def parse_rc_args(args: List[str]) -> Tuple[RCOptions, List[str]]:
cmap = {name: command_for_name(name) for name in sorted(all_command_names())}
cmds = (' :green:`{}`\n {}'.format(cmd.name, cmd.short_desc) for c, cmd in cmap.items())
msg = cli_msg + (
@ -173,7 +173,7 @@ def main(args):
if payload is not None:
send['payload'] = payload
if global_opts.no_command_response is not None:
no_response = global_opts.no_command_response
no_response = global_opts.no_command_response # type: ignore
else:
no_response = c.no_response
send['no_response'] = no_response

View File

@ -164,17 +164,17 @@ def real_main(global_opts):
while True:
try:
try:
cmdline = input('🐱 ')
scmdline = input('🐱 ')
except UnicodeEncodeError:
cmdline = input('kitty> ')
scmdline = input('kitty> ')
except EOFError:
break
except KeyboardInterrupt:
print()
continue
if not cmdline:
if not scmdline:
continue
cmdline = shlex.split(cmdline)
cmdline = shlex.split(scmdline)
cmd = cmdline[0].lower()
try:

View File

@ -114,11 +114,8 @@ def update_check(timer_id: Optional[int] = None) -> bool:
log_error('Failed to run kitty for update check, with error: {}'.format(e))
return False
monitor_pid(p.pid)
boss = get_boss()
if boss is not None:
boss.set_update_check_process(p)
get_boss().set_update_check_process(p)
return True
return False
def run_update_check(interval: int = CHECK_INTERVAL) -> None:

24
test.py
View File

@ -6,7 +6,7 @@ import importlib
import os
import sys
import unittest
from typing import NoReturn
from typing import Callable, NoReturn, Set
base = os.path.dirname(os.path.abspath(__file__))
@ -39,9 +39,9 @@ def find_tests_in_dir(path, excludes=('main.py',)):
return unittest.TestSuite(suits)
def filter_tests(suite, test_ok):
def filter_tests(suite: unittest.TestSuite, test_ok: Callable[[unittest.TestCase], bool]) -> unittest.TestSuite:
ans = unittest.TestSuite()
added = set()
added: Set[unittest.TestCase] = set()
for test in itertests(suite):
if test_ok(test) and test not in added:
ans.addTest(test)
@ -49,20 +49,20 @@ def filter_tests(suite, test_ok):
return ans
def filter_tests_by_name(suite, *names):
names = {x if x.startswith('test_') else 'test_' + x for x in names}
def filter_tests_by_name(suite: unittest.TestSuite, *names: str) -> unittest.TestSuite:
names_ = {x if x.startswith('test_') else 'test_' + x for x in names}
def q(test):
return test._testMethodName in names
return test._testMethodName in names_
return filter_tests(suite, q)
def filter_tests_by_module(suite, *names):
names = frozenset(names)
def filter_tests_by_module(suite: unittest.TestSuite, *names: str) -> unittest.TestSuite:
names_ = frozenset(names)
def q(test):
m = test.__class__.__module__.rpartition('.')[-1]
return m in names
return m in names_
return filter_tests(suite, q)
@ -93,12 +93,12 @@ def run_tests():
run_cli(tests, args.verbosity)
def run_cli(suite, verbosity=4):
def run_cli(suite: unittest.TestSuite, verbosity: int = 4) -> None:
r = unittest.TextTestRunner
r.resultclass = unittest.TextTestResult
r.resultclass = unittest.TextTestResult # type: ignore
init_env()
runner = r(verbosity=verbosity)
runner.tb_locals = True
runner.tb_locals = True # type: ignore
result = runner.run(suite)
if not result.wasSuccessful():
raise SystemExit(1)