more typing work

This commit is contained in:
Kovid Goyal 2020-03-14 11:42:05 +05:30
parent ea48332f46
commit d3f37eeba4
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
14 changed files with 216 additions and 194 deletions

View File

@ -587,11 +587,11 @@ class Boss:
key_action = get_shortcut(self.keymap, mods, key, native_key) key_action = get_shortcut(self.keymap, mods, key, native_key)
if key_action is None: if key_action is None:
sequences = get_shortcut(self.opts.sequence_map, mods, key, native_key) sequences = get_shortcut(self.opts.sequence_map, mods, key, native_key)
if sequences: if sequences and not isinstance(sequences, KeyAction):
self.pending_sequences = sequences self.pending_sequences = sequences
set_in_sequence_mode(True) set_in_sequence_mode(True)
return True return True
else: elif isinstance(key_action, KeyAction):
self.current_key_press_info = key, native_key, action, mods self.current_key_press_info = key, native_key, action, mods
return self.dispatch_action(key_action) return self.dispatch_action(key_action)
@ -1072,8 +1072,10 @@ class Boss:
elif dest in ('clipboard', 'primary'): elif dest in ('clipboard', 'primary'):
env, stdin = self.process_stdin_source(stdin=source, window=window) env, stdin = self.process_stdin_source(stdin=source, window=window)
if stdin: if stdin:
func = set_clipboard_string if dest == 'clipboard' else set_primary_selection if dest == 'clipboard':
func(stdin) set_clipboard_string(stdin)
else:
set_primary_selection(stdin)
else: else:
env, stdin = self.process_stdin_source(stdin=source, window=window) env, stdin = self.process_stdin_source(stdin=source, window=window)
self.run_background_process(cmd, cwd_from=cwd_from, stdin=stdin, env=env) self.run_background_process(cmd, cwd_from=cwd_from, stdin=stdin, env=env)

View File

@ -10,120 +10,121 @@
import sys import sys
from contextlib import suppress from contextlib import suppress
from typing import Any
CSI = '\033[' CSI = '\033['
OSC = '\033]' OSC = '\033]'
def write(x): def write(x: str) -> None:
sys.stdout.write(x) sys.stdout.write(x)
sys.stdout.flush() sys.stdout.flush()
def set_title(*args): def set_title(*args: Any) -> None:
pass pass
def set_icon(*args): def set_icon(*args: Any) -> None:
pass pass
def screen_bell(): def screen_bell() -> None:
pass pass
def screen_cursor_position(y, x): def screen_cursor_position(y: int, x: int) -> None:
write(CSI + '%s;%sH' % (y, x)) write(CSI + '%s;%sH' % (y, x))
def screen_cursor_forward(amt): def screen_cursor_forward(amt: int) -> None:
write(CSI + '%sC' % amt) write(CSI + '%sC' % amt)
def screen_cursor_back1(amt): def screen_cursor_back1(amt: int) -> None:
write(CSI + '%sD' % amt) write(CSI + '%sD' % amt)
def screen_designate_charset(which, to): def screen_designate_charset(which: int, to: int) -> None:
which = '()'[int(which)] w = '()'[int(which)]
to = chr(int(to)) t = chr(int(to))
write('\033' + which + to) write('\033' + w + t)
def select_graphic_rendition(*a): def select_graphic_rendition(*a: int) -> None:
write(CSI + '%sm' % ';'.join(map(str, a))) write(CSI + '%sm' % ';'.join(map(str, a)))
def screen_cursor_to_column(c): def screen_cursor_to_column(c: int) -> None:
write(CSI + '%dG' % c) write(CSI + '%dG' % c)
def screen_cursor_to_line(l): def screen_cursor_to_line(l: int) -> None:
write(CSI + '%dd' % l) write(CSI + '%dd' % l)
def screen_set_mode(x, private): def screen_set_mode(x: int, private: bool) -> None:
write(CSI + ('?' if private else '') + str(x) + 'h') write(CSI + ('?' if private else '') + str(x) + 'h')
def screen_reset_mode(x, private): def screen_reset_mode(x: int, private: bool) -> None:
write(CSI + ('?' if private else '') + str(x) + 'l') write(CSI + ('?' if private else '') + str(x) + 'l')
def screen_set_margins(t, b): def screen_set_margins(t: int, b: int) -> None:
write(CSI + '%d;%dr' % (t, b)) write(CSI + '%d;%dr' % (t, b))
def screen_indexn(n): def screen_indexn(n: int) -> None:
write(CSI + '%dS' % n) write(CSI + '%dS' % n)
def screen_erase_in_display(how, private): def screen_erase_in_display(how: int, private: bool) -> None:
write(CSI + ('?' if private else '') + str(how) + 'J') write(CSI + ('?' if private else '') + str(how) + 'J')
def screen_erase_in_line(how, private): def screen_erase_in_line(how: int, private: bool) -> None:
write(CSI + ('?' if private else '') + str(how) + 'K') write(CSI + ('?' if private else '') + str(how) + 'K')
def screen_delete_lines(num): def screen_delete_lines(num: int) -> None:
write(CSI + str(num) + 'M') write(CSI + str(num) + 'M')
def screen_cursor_up2(count): def screen_cursor_up2(count: int) -> None:
write(CSI + '%dA' % count) write(CSI + '%dA' % count)
def screen_cursor_down(count): def screen_cursor_down(count: int) -> None:
write(CSI + '%dB' % count) write(CSI + '%dB' % count)
def screen_carriage_return(): def screen_carriage_return() -> None:
write('\r') write('\r')
def screen_linefeed(): def screen_linefeed() -> None:
write('\n') write('\n')
def screen_backspace(): def screen_backspace() -> None:
write('\x08') write('\x08')
def screen_set_cursor(mode, secondary): def screen_set_cursor(mode: int, secondary: int) -> None:
write(CSI + '%d q' % secondary) write(CSI + '%d q' % secondary)
def screen_insert_lines(num): def screen_insert_lines(num: int) -> None:
write(CSI + '%dL' % num) write(CSI + '%dL' % num)
def draw(*a): def draw(*a: str) -> None:
write(' '.join(a)) write(' '.join(a))
def screen_manipulate_title_stack(op, which): def screen_manipulate_title_stack(op: int, which: int) -> None:
write(CSI + '%d;%dt' % (op, which)) write(CSI + '%d;%dt' % (op, which))
@ -136,7 +137,7 @@ def report_device_attributes(mode: int, char: int) -> None:
write(CSI + x + 'c') write(CSI + x + 'c')
def write_osc(code: int, string='') -> None: def write_osc(code: int, string: str = '') -> None:
if string: if string:
string = ';' + string string = ';' + string
write(OSC + str(code) + string + '\x07') write(OSC + str(code) + string + '\x07')
@ -145,18 +146,18 @@ def write_osc(code: int, string='') -> None:
set_dynamic_color = set_color_table_color = write_osc set_dynamic_color = set_color_table_color = write_osc
def replay(raw): def replay(raw: str) -> None:
for line in raw.splitlines(): for line in raw.splitlines():
if line.strip() and not line.startswith('#'): if line.strip() and not line.startswith('#'):
cmd, rest = line.partition(' ')[::2] cmd, rest = line.partition(' ')[::2]
if cmd in {'draw', 'set_title', 'set_icon', 'set_dynamic_color', 'set_color_table_color'}: if cmd in {'draw', 'set_title', 'set_icon', 'set_dynamic_color', 'set_color_table_color'}:
globals()[cmd](rest) globals()[cmd](rest)
else: else:
rest = map(int, rest.split()) if rest else () r = map(int, rest.split()) if rest else ()
globals()[cmd](*rest) globals()[cmd](*r)
def main(path): def main(path: str) -> None:
with open(path) as f: with open(path) as f:
raw = f.read() raw = f.read()
replay(raw) replay(raw)

View File

@ -31,7 +31,7 @@ mod_map = {'CTRL': 'CONTROL', 'CMD': 'SUPER', '⌘': 'SUPER',
def parse_mods(parts: Iterable[str], sc: str) -> Optional[int]: def parse_mods(parts: Iterable[str], sc: str) -> Optional[int]:
def map_mod(m): def map_mod(m: str) -> str:
return mod_map.get(m, m) return mod_map.get(m, m)
mods = 0 mods = 0

View File

@ -333,7 +333,7 @@ def log_error_string(s: str) -> None:
pass pass
def set_primary_selection(x: bytes) -> None: def set_primary_selection(x: Union[bytes, str]) -> None:
pass pass
@ -517,7 +517,7 @@ def cocoa_send_notification(
def create_os_window( def create_os_window(
get_window_size: Callable[[int, int, int, int, float, float], Tuple[int, get_window_size: Callable[[int, int, int, int, float, float], Tuple[int,
int]], int]],
pre_show_callback: Callable[[object], None], pre_show_callback: Callable[[int], None],
title: str, title: str,
wm_class_name: str, wm_class_name: str,
wm_class_class: str, wm_class_class: str,
@ -955,6 +955,9 @@ class Screen:
scrolled_by: int scrolled_by: int
cursor: Cursor cursor: Cursor
disable_ligatures: int disable_ligatures: int
extended_keyboard: bool
cursor_key_mode: bool
auto_repeat_enabled: bool
def __init__( def __init__(
self, self,

20
kitty/key_encoding.py generated
View File

@ -3,7 +3,7 @@
# License: GPL v3 Copyright: 2017, Kovid Goyal <kovid at kovidgoyal.net> # License: GPL v3 Copyright: 2017, Kovid Goyal <kovid at kovidgoyal.net>
import string import string
from typing import NamedTuple from typing import NamedTuple, Optional
from . import fast_data_types as defines from . import fast_data_types as defines
from .key_names import key_name_aliases from .key_names import key_name_aliases
@ -390,19 +390,19 @@ text_keys = (
) )
def text_match(key): def text_match(key: str) -> Optional[str]:
if key.upper() == 'SPACE': if key.upper() == 'SPACE':
return ' ' return ' '
if key not in text_keys: if key not in text_keys:
return return None
return key return key
def encode( def encode(
integer, integer: int,
chars=string.ascii_uppercase + string.ascii_lowercase + string.digits + chars: str = string.ascii_uppercase + string.ascii_lowercase + string.digits +
'.-:+=^!/*?&<>()[]{}@%$#' '.-:+=^!/*?&<>()[]{}@%$#'
): ) -> str:
ans = '' ans = ''
d = len(chars) d = len(chars)
while True: while True:
@ -413,11 +413,11 @@ def encode(
return ans return ans
def symbolic_name(glfw_name): def symbolic_name(glfw_name: str) -> str:
return glfw_name[9:].replace('_', ' ') return glfw_name[9:].replace('_', ' ')
def update_encoding(): def update_encoding() -> None:
import re import re
import subprocess import subprocess
keys = {a for a in dir(defines) if a.startswith('GLFW_KEY_')} keys = {a for a in dir(defines) if a.startswith('GLFW_KEY_')}
@ -489,14 +489,14 @@ globals().update(key_defs)
del key_name, enc del key_name, enc
def decode_key_event(text): def decode_key_event(text: str) -> KeyEvent:
typ = type_map[text[1]] typ = type_map[text[1]]
mods = mod_map[text[2]] mods = mod_map[text[2]]
key = key_rmap[text[3:5]] key = key_rmap[text[3:5]]
return KeyEvent(typ, mods, key) return KeyEvent(typ, mods, key)
def encode_key_event(key_event): def encode_key_event(key_event: KeyEvent) -> str:
typ = rtype_map[key_event.type] typ = rtype_map[key_event.type]
mods = rmod_map[key_event.mods] mods = rmod_map[key_event.mods]
key = ENCODING[key_event.key.replace('_', ' ')] key = ENCODING[key_event.key.replace('_', ' ')]

View File

@ -125,9 +125,9 @@ else:
f.argtypes = [ctypes.c_char_p, ctypes.c_int] f.argtypes = [ctypes.c_char_p, ctypes.c_int]
f.restype = ctypes.c_int f.restype = ctypes.c_int
def xkb_lookup(name, case_sensitive=False): def xkb_lookup(name: str, case_sensitive: bool = False) -> Optional[int]:
name = name.encode('utf-8') q = name.encode('utf-8')
return f(name, int(case_sensitive)) or None return f(q, int(case_sensitive)) or None
return xkb_lookup return xkb_lookup

View File

@ -3,14 +3,20 @@
# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net> # License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
import string import string
from typing import Dict, Optional, Tuple, Union, overload from typing import (
TYPE_CHECKING, Any, Callable, Dict, Iterable, Optional, Tuple, Union
)
from . import fast_data_types as defines from . import fast_data_types as defines
from .config import KeyAction, KeyMap, SequenceMap, SubSequenceMap from .config import KeyAction, KeyMap, KeySpec, SequenceMap, SubSequenceMap
from .key_encoding import KEY_MAP from .key_encoding import KEY_MAP
from .terminfo import key_as_bytes, modify_key_bytes from .terminfo import key_as_bytes, modify_key_bytes
from .utils import base64_encode from .utils import base64_encode
if TYPE_CHECKING:
from .fast_data_types import Screen # noqa
from .window import Window # noqa
def modify_complex_key(name: Union[str, bytes], amt: int) -> bytes: def modify_complex_key(name: Union[str, bytes], amt: int) -> bytes:
q = name if isinstance(name, bytes) else key_as_bytes(name) q = name if isinstance(name, bytes) else key_as_bytes(name)
@ -62,7 +68,7 @@ ASCII_C0_SHIFTED = {
control_shift_keys = {getattr(defines, 'GLFW_KEY_' + k): v for k, v in ASCII_C0_SHIFTED.items()} control_shift_keys = {getattr(defines, 'GLFW_KEY_' + k): v for k, v in ASCII_C0_SHIFTED.items()}
def create_modifier_variants(keycode, terminfo_name_or_bytes, add_shifted_key=True): def create_modifier_variants(keycode: int, terminfo_name_or_bytes: Union[str, bytes], add_shifted_key: bool = True) -> None:
kn = terminfo_name_or_bytes kn = terminfo_name_or_bytes
smkx_key_map[keycode] = kn if isinstance(kn, bytes) else key_as_bytes(kn) smkx_key_map[keycode] = kn if isinstance(kn, bytes) else key_as_bytes(kn)
if add_shifted_key: if add_shifted_key:
@ -143,7 +149,7 @@ rmkx_key_map.update({
cursor_key_mode_map = {True: smkx_key_map, False: rmkx_key_map} cursor_key_mode_map = {True: smkx_key_map, False: rmkx_key_map}
def keyboard_mode_name(screen): def keyboard_mode_name(screen: 'Screen') -> str:
if screen.extended_keyboard: if screen.extended_keyboard:
return 'kitty' return 'kitty'
return 'application' if screen.cursor_key_mode else 'normal' return 'application' if screen.cursor_key_mode else 'normal'
@ -156,7 +162,7 @@ action_map = {
} }
def extended_key_event(key, mods, action): def extended_key_event(key: int, mods: int, action: int) -> bytes:
if key >= defines.GLFW_KEY_LAST or key == defines.GLFW_KEY_UNKNOWN or ( if key >= defines.GLFW_KEY_LAST or key == defines.GLFW_KEY_UNKNOWN or (
# Shifted printable key should be handled by on_text_input() # Shifted printable key should be handled by on_text_input()
mods <= defines.GLFW_MOD_SHIFT and defines.GLFW_KEY_SPACE <= key <= defines.GLFW_KEY_LAST_PRINTABLE mods <= defines.GLFW_MOD_SHIFT and defines.GLFW_KEY_SPACE <= key <= defines.GLFW_KEY_LAST_PRINTABLE
@ -187,13 +193,13 @@ def extended_key_event(key, mods, action):
).encode('ascii') ).encode('ascii')
def pmap(names, r): def pmap(names: str, r: Iterable[str]) -> Dict[int, bytes]:
names = names.split() snames = names.split()
r = [x.encode('ascii') for x in r] b = [x.encode('ascii') for x in r]
if len(names) != len(r): if len(snames) != len(b):
raise ValueError('Incorrect mapping for {}'.format(names)) raise ValueError('Incorrect mapping for {}'.format(names))
names = [getattr(defines, 'GLFW_KEY_' + n) for n in names] anames = [getattr(defines, 'GLFW_KEY_' + n) for n in snames]
return dict(zip(names, r)) return dict(zip(anames, b))
UN_SHIFTED_PRINTABLE = { UN_SHIFTED_PRINTABLE = {
@ -229,7 +235,7 @@ CTRL_ALT_KEYS = {getattr(defines, 'GLFW_KEY_' + k) for k in string.ascii_upperca
all_control_alt_keys = set(CTRL_ALT_KEYS) | set(control_alt_codes) all_control_alt_keys = set(CTRL_ALT_KEYS) | set(control_alt_codes)
def key_to_bytes(key, smkx, extended, mods, action): def key_to_bytes(key: int, smkx: bool, extended: bool, mods: int, action: int) -> bytes:
if extended: if extended:
return extended_key_event(key, mods, action) return extended_key_event(key, mods, action)
data = bytearray() data = bytearray()
@ -264,7 +270,7 @@ def key_to_bytes(key, smkx, extended, mods, action):
return bytes(data) return bytes(data)
def interpret_key_event(key, native_key, mods, window, action): def interpret_key_event(key: int, native_key: int, mods: int, window: 'Window', action: int) -> bytes:
screen = window.screen screen = window.screen
if ( if (
action == defines.GLFW_PRESS or action == defines.GLFW_PRESS or
@ -275,17 +281,7 @@ def interpret_key_event(key, native_key, mods, window, action):
return b'' return b''
@overload def get_shortcut(keymap: Union[KeyMap, SequenceMap], mods: int, key: int, native_key: int) -> Optional[Union[KeyAction, SubSequenceMap]]:
def get_shortcut(seqmap: SequenceMap, mods: int, key: int, native_key: int) -> Optional[SubSequenceMap]:
...
@overload
def get_shortcut(keymap: KeyMap, mods: int, key: int, native_key: int) -> Optional[KeyAction]:
...
def get_shortcut(keymap, mods, key, native_key):
mods &= 0b1111 mods &= 0b1111
ans = keymap.get((mods, False, key)) ans = keymap.get((mods, False, key))
if ans is None: if ans is None:
@ -293,13 +289,13 @@ def get_shortcut(keymap, mods, key, native_key):
return ans return ans
def shortcut_matches(s, mods, key, native_key): def shortcut_matches(s: KeySpec, mods: int, key: int, native_key: int) -> bool:
mods &= 0b1111 mods &= 0b1111
q = native_key if s[1] else key q = native_key if s[1] else key
return s[0] & 0b1111 == mods & 0b1111 and s[2] == q return bool(s[0] & 0b1111 == mods & 0b1111 and s[2] == q)
def generate_key_table_impl(w): def generate_key_table_impl(w: Callable) -> None:
w('// auto-generated from keys.py, do not edit!') w('// auto-generated from keys.py, do not edit!')
w('#pragma once') w('#pragma once')
w('#include <stddef.h>') w('#include <stddef.h>')
@ -311,7 +307,7 @@ def generate_key_table_impl(w):
w('static const uint8_t key_map[%d] = {' % number_of_keys) w('static const uint8_t key_map[%d] = {' % number_of_keys)
key_count = 0 key_count = 0
def key_name(k): def key_name(k: str) -> str:
return k[len('GLFW_KEY_'):] return k[len('GLFW_KEY_'):]
keys = {v: k for k, v in vars(defines).items() if k.startswith('GLFW_KEY_') and k not in {'GLFW_KEY_LAST', 'GLFW_KEY_LAST_PRINTABLE', 'GLFW_KEY_UNKNOWN'}} keys = {v: k for k, v in vars(defines).items() if k.startswith('GLFW_KEY_') and k not in {'GLFW_KEY_LAST', 'GLFW_KEY_LAST_PRINTABLE', 'GLFW_KEY_UNKNOWN'}}
@ -337,7 +333,7 @@ def generate_key_table_impl(w):
w('static inline const char*\nkey_lookup(uint8_t key, KeyboardMode mode, uint8_t mods, uint8_t action) {') w('static inline const char*\nkey_lookup(uint8_t key, KeyboardMode mode, uint8_t mods, uint8_t action) {')
i = 1 i = 1
def ind(*a): def ind(*a: Any) -> None:
w((' ' * i)[:-1], *a) w((' ' * i)[:-1], *a)
ind('switch(mode) {') ind('switch(mode) {')
mmap = [(False, False), (True, False), (False, True)] mmap = [(False, False), (True, False), (False, True)]
@ -396,7 +392,7 @@ def generate_key_table_impl(w):
w('}') w('}')
def generate_key_table(): def generate_key_table() -> None:
# To run this, use: ./kitty/launcher/kitty +runpy "from kitty.keys import *; generate_key_table()" # To run this, use: ./kitty/launcher/kitty +runpy "from kitty.keys import *; generate_key_table()"
import os import os
from functools import partial from functools import partial

View File

@ -253,9 +253,12 @@ def launch(boss: Boss, opts: LaunchCLIOptions, args: List[str], target_tab: Opti
raise ValueError('The cmd to run must be specified when running a background process') raise ValueError('The cmd to run must be specified when running a background process')
boss.run_background_process(cmd, cwd=kw['cwd'], cwd_from=kw['cwd_from'], env=env or None, stdin=kw['stdin']) boss.run_background_process(cmd, cwd=kw['cwd'], cwd_from=kw['cwd_from'], env=env or None, stdin=kw['stdin'])
elif opts.type in ('clipboard', 'primary'): elif opts.type in ('clipboard', 'primary'):
if kw.get('stdin') is not None: stdin = kw.get('stdin')
func = set_clipboard_string if opts.type == 'clipboard' else set_primary_selection if stdin is not None:
func(kw['stdin']) if opts.type == 'clipboard':
set_clipboard_string(stdin)
else:
set_primary_selection(stdin)
else: else:
tab = tab_for_window(boss, opts, target_tab) tab = tab_for_window(boss, opts, target_tab)
if tab: if tab:

View File

@ -15,7 +15,7 @@ pointer_to_uint = POINTER(c_uint)
MarkerFunc = Callable[[str, int, int, int], Generator[None, None, None]] MarkerFunc = Callable[[str, int, int, int], Generator[None, None, None]]
def get_output_variables(left_address, right_address, color_address): def get_output_variables(left_address: int, right_address: int, color_address: int) -> Tuple[c_uint, c_uint, c_uint]:
return ( return (
cast(c_void_p(left_address), pointer_to_uint).contents, cast(c_void_p(left_address), pointer_to_uint).contents,
cast(c_void_p(right_address), pointer_to_uint).contents, cast(c_void_p(right_address), pointer_to_uint).contents,

View File

@ -4,51 +4,64 @@
import shlex import shlex
import sys import sys
from collections import namedtuple from typing import (
TYPE_CHECKING, Generator, List, NamedTuple, Optional, Tuple, Union
)
from .config_data import to_layout_names from .config_data import to_layout_names
from .constants import kitty_exe from .constants import kitty_exe
from .layout import all_layouts from .layout import all_layouts
from .options_stub import Options
from .utils import log_error, resolved_shell from .utils import log_error, resolved_shell
WindowSizeOpts = namedtuple( if TYPE_CHECKING:
'WindowSizeOpts', 'initial_window_width initial_window_height window_margin_width window_padding_width remember_window_size') from .tabs import SpecialWindowInstance # noqa
from .cli_stub import CLIOptions # noqa
class WindowSizeOpts(NamedTuple):
initial_window_width: Tuple[int, str]
initial_window_height: Tuple[int, str]
window_margin_width: float
window_padding_width: float
remember_window_size: bool
class Tab: class Tab:
def __init__(self, opts, name): def __init__(self, opts: Options, name: str):
self.windows = [] self.windows: List[Union[List[str], 'SpecialWindowInstance']] = []
self.name = name.strip() self.name = name.strip()
self.active_window_idx = 0 self.active_window_idx = 0
self.enabled_layouts = opts.enabled_layouts self.enabled_layouts = opts.enabled_layouts
self.layout = (self.enabled_layouts or ['tall'])[0] self.layout = (self.enabled_layouts or ['tall'])[0]
self.cwd = None self.cwd: Optional[str] = None
self.next_title = None self.next_title: Optional[str] = None
class Session: class Session:
def __init__(self, default_title=None): def __init__(self, default_title: Optional[str] = None):
self.tabs = [] self.tabs: List[Tab] = []
self.active_tab_idx = 0 self.active_tab_idx = 0
self.default_title = default_title self.default_title = default_title
self.os_window_size = None self.os_window_size: Optional[WindowSizeOpts] = None
def add_tab(self, opts, name=''): def add_tab(self, opts: Options, name: str = '') -> None:
if self.tabs and not self.tabs[-1].windows: if self.tabs and not self.tabs[-1].windows:
del self.tabs[-1] del self.tabs[-1]
self.tabs.append(Tab(opts, name)) self.tabs.append(Tab(opts, name))
def set_next_title(self, title): def set_next_title(self, title: str) -> None:
self.tabs[-1].next_title = title.strip() self.tabs[-1].next_title = title.strip()
def set_layout(self, val): def set_layout(self, val: str) -> None:
if val not in all_layouts: if val not in all_layouts:
raise ValueError('{} is not a valid layout'.format(val)) raise ValueError('{} is not a valid layout'.format(val))
self.tabs[-1].layout = val self.tabs[-1].layout = val
def add_window(self, cmd): def add_window(self, cmd: Union[None, str, List[str]]) -> None:
if cmd: if cmd:
cmd = shlex.split(cmd) if isinstance(cmd, str) else cmd cmd = shlex.split(cmd) if isinstance(cmd, str) else cmd
else: else:
@ -58,25 +71,25 @@ class Session:
t.windows.append(SpecialWindow(cmd, cwd=t.cwd, override_title=t.next_title or self.default_title)) t.windows.append(SpecialWindow(cmd, cwd=t.cwd, override_title=t.next_title or self.default_title))
t.next_title = None t.next_title = None
def add_special_window(self, sw): def add_special_window(self, sw: 'SpecialWindowInstance') -> None:
self.tabs[-1].windows.append(sw) self.tabs[-1].windows.append(sw)
def focus(self): def focus(self) -> None:
self.active_tab_idx = max(0, len(self.tabs) - 1) self.active_tab_idx = max(0, len(self.tabs) - 1)
self.tabs[-1].active_window_idx = max(0, len(self.tabs[-1].windows) - 1) self.tabs[-1].active_window_idx = max(0, len(self.tabs[-1].windows) - 1)
def set_enabled_layouts(self, raw): def set_enabled_layouts(self, raw: str) -> None:
self.tabs[-1].enabled_layouts = to_layout_names(raw) self.tabs[-1].enabled_layouts = to_layout_names(raw)
if self.tabs[-1].layout not in self.tabs[-1].enabled_layouts: if self.tabs[-1].layout not in self.tabs[-1].enabled_layouts:
self.tabs[-1].layout = self.tabs[-1].enabled_layouts[0] self.tabs[-1].layout = self.tabs[-1].enabled_layouts[0]
def set_cwd(self, val): def set_cwd(self, val: str) -> None:
self.tabs[-1].cwd = val self.tabs[-1].cwd = val
def parse_session(raw, opts, default_title=None): def parse_session(raw: str, opts: Options, default_title: Optional[str] = None) -> Generator[Session, None, None]:
def finalize_session(ans): def finalize_session(ans: Session) -> Session:
for t in ans.tabs: for t in ans.tabs:
if not t.windows: if not t.windows:
t.windows.append(resolved_shell(opts)) t.windows.append(resolved_shell(opts))
@ -120,7 +133,14 @@ def parse_session(raw, opts, default_title=None):
yield finalize_session(ans) yield finalize_session(ans)
def create_sessions(opts, args=None, special_window=None, cwd_from=None, respect_cwd=False, default_session=None): def create_sessions(
opts: Options,
args: Optional['CLIOptions'] = None,
special_window: Optional['SpecialWindowInstance'] = None,
cwd_from: Optional[int] = None,
respect_cwd: bool = False,
default_session: Optional[str] = None
) -> Generator[Session, None, None]:
if args and args.session: if args and args.session:
if args.session == '-': if args.session == '-':
f = sys.stdin f = sys.stdin
@ -148,11 +168,8 @@ def create_sessions(opts, args=None, special_window=None, cwd_from=None, respect
if args and args.hold: if args and args.hold:
cmd = [kitty_exe(), '+hold'] + cmd cmd = [kitty_exe(), '+hold'] + cmd
from kitty.tabs import SpecialWindow from kitty.tabs import SpecialWindow
k = {'cwd_from': cwd_from} cwd: Optional[str] = args.directory if respect_cwd and args else None
if respect_cwd: title = getattr(args, 'title', None)
k['cwd'] = args.directory special_window = SpecialWindow(cmd, override_title=title, cwd_from=cwd_from, cwd=cwd)
if getattr(args, 'title', None):
k['override_title'] = args.title
special_window = SpecialWindow(cmd, **k)
ans.add_special_window(special_window) ans.add_special_window(special_window)
yield ans yield ans

View File

@ -19,7 +19,7 @@ def modify_key_bytes(keybytes: bytes, amt: int) -> bytes:
raise ValueError('Unknown key type in key: {!r}'.format(keybytes)) raise ValueError('Unknown key type in key: {!r}'.format(keybytes))
def encode_keystring(keybytes): def encode_keystring(keybytes: bytes) -> str:
return keybytes.decode('ascii').replace('\033', r'\E') return keybytes.decode('ascii').replace('\033', r'\E')
@ -420,7 +420,7 @@ if extra - no_termcap_for:
del extra del extra
def generate_terminfo(): def generate_terminfo() -> str:
# Use ./build-terminfo to update definition files # Use ./build-terminfo to update definition files
ans = ['|'.join(names)] ans = ['|'.join(names)]
ans.extend(sorted(bool_capabilities)) ans.extend(sorted(bool_capabilities))
@ -440,7 +440,7 @@ def key_as_bytes(name: str) -> bytes:
return ans.encode('ascii') return ans.encode('ascii')
def get_capabilities(query_string): def get_capabilities(query_string: str) -> str:
from .fast_data_types import ERROR_PREFIX from .fast_data_types import ERROR_PREFIX
ans = [] ans = []
try: try:

View File

@ -14,8 +14,8 @@ from contextlib import suppress
from functools import lru_cache from functools import lru_cache
from time import monotonic from time import monotonic
from typing import ( from typing import (
Any, Dict, Generator, Iterable, List, NamedTuple, Optional, Tuple, Union, TYPE_CHECKING, Any, Callable, Dict, Generator, Iterable, List,
cast NamedTuple, Optional, Tuple, Union, cast
) )
from .constants import ( from .constants import (
@ -24,10 +24,15 @@ from .constants import (
from .options_stub import Options from .options_stub import Options
from .rgb import Color, to_color from .rgb import Color, to_color
if TYPE_CHECKING:
from subprocess import Popen # noqa
from .fast_data_types import StartupCtx # noqa
from socket import socket as Socket, AddressFamily # noqa
BASE = os.path.dirname(os.path.abspath(__file__)) BASE = os.path.dirname(os.path.abspath(__file__))
def load_shaders(name): def load_shaders(name: str) -> Tuple[str, str]:
from .fast_data_types import GLSL_VERSION from .fast_data_types import GLSL_VERSION
with open(os.path.join(BASE, '{}_vertex.glsl'.format(name))) as f: with open(os.path.join(BASE, '{}_vertex.glsl'.format(name))) as f:
vert = f.read().replace('GLSL_VERSION', str(GLSL_VERSION), 1) vert = f.read().replace('GLSL_VERSION', str(GLSL_VERSION), 1)
@ -36,31 +41,31 @@ def load_shaders(name):
return vert, frag return vert, frag
def safe_print(*a, **k): def safe_print(*a: Any, **k: Any) -> None:
with suppress(Exception): with suppress(Exception):
print(*a, **k) print(*a, **k)
def log_error(*a, **k): def log_error(*a: Any, **k: str) -> None:
from .fast_data_types import log_error_string from .fast_data_types import log_error_string
with suppress(Exception): with suppress(Exception):
msg = k.get('sep', ' ').join(map(str, a)) + k.get('end', '') msg = k.get('sep', ' ').join(map(str, a)) + k.get('end', '')
log_error_string(msg.replace('\0', '')) log_error_string(msg.replace('\0', ''))
def ceil_int(x): def ceil_int(x: float) -> int:
return int(math.ceil(x)) return int(math.ceil(x))
def sanitize_title(x): def sanitize_title(x: str) -> str:
return re.sub(r'\s+', ' ', re.sub(r'[\0-\x19\x80-\x9f]', '', x)) return re.sub(r'\s+', ' ', re.sub(r'[\0-\x19\x80-\x9f]', '', x))
def color_as_int(val): def color_as_int(val: Tuple[int, int, int]) -> int:
return val[0] << 16 | val[1] << 8 | val[2] return val[0] << 16 | val[1] << 8 | val[2]
def color_from_int(val): def color_from_int(val: int) -> Color:
return Color((val >> 16) & 0xFF, (val >> 8) & 0xFF, val & 0xFF) return Color((val >> 16) & 0xFF, (val >> 8) & 0xFF, val & 0xFF)
@ -119,11 +124,11 @@ class ScreenSizeGetter:
@lru_cache(maxsize=64) @lru_cache(maxsize=64)
def screen_size_function(fd=None): def screen_size_function(fd: Optional[int] = None) -> ScreenSizeGetter:
return ScreenSizeGetter(fd) return ScreenSizeGetter(fd)
def fit_image(width, height, pwidth, pheight): def fit_image(width: int, height: int, pwidth: int, pheight: int) -> Tuple[int, int]:
from math import floor from math import floor
if height > pheight: if height > pheight:
corrf = pheight / float(height) corrf = pheight / float(height)
@ -138,27 +143,25 @@ def fit_image(width, height, pwidth, pheight):
return int(width), int(height) return int(width), int(height)
def set_primary_selection(text): def set_primary_selection(text: Union[str, bytes]) -> None:
if not supports_primary_selection: if not supports_primary_selection:
return # There is no primary selection return # There is no primary selection
if isinstance(text, bytes): from kitty.fast_data_types import set_primary_selection as s
text = text.decode('utf-8') s(text)
from kitty.fast_data_types import set_primary_selection
set_primary_selection(text)
def get_primary_selection(): def get_primary_selection() -> str:
if not supports_primary_selection: if not supports_primary_selection:
return '' # There is no primary selection return '' # There is no primary selection
from kitty.fast_data_types import get_primary_selection from kitty.fast_data_types import get_primary_selection as g
return (get_primary_selection() or b'').decode('utf-8', 'replace') return (g() or b'').decode('utf-8', 'replace')
def base64_encode( def base64_encode(
integer, integer: int,
chars=string.ascii_uppercase + string.ascii_lowercase + string.digits + chars: str = string.ascii_uppercase + string.ascii_lowercase + string.digits +
'+/' '+/'
): ) -> str:
ans = '' ans = ''
while True: while True:
integer, remainder = divmod(integer, 64) integer, remainder = divmod(integer, 64)
@ -168,7 +171,7 @@ def base64_encode(
return ans return ans
def command_for_open(program='default'): def command_for_open(program: Union[str, List[str]] = 'default') -> List[str]:
if isinstance(program, str): if isinstance(program, str):
from .conf.utils import to_cmdline from .conf.utils import to_cmdline
program = to_cmdline(program) program = to_cmdline(program)
@ -179,22 +182,22 @@ def command_for_open(program='default'):
return cmd return cmd
def open_cmd(cmd, arg=None, cwd=None): def open_cmd(cmd: Union[Iterable[str], List[str]], arg: Union[None, Iterable[str], str] = None, cwd: Optional[str] = None) -> 'Popen':
import subprocess import subprocess
if arg is not None: if arg is not None:
cmd = list(cmd) cmd = list(cmd)
if isinstance(arg, (list, tuple)): if isinstance(arg, str):
cmd.extend(arg)
else:
cmd.append(arg) cmd.append(arg)
return subprocess.Popen(cmd, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, cwd=cwd or None) else:
cmd.extend(arg)
return subprocess.Popen(tuple(cmd), stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, cwd=cwd or None)
def open_url(url, program='default', cwd=None): def open_url(url: str, program: Union[str, List[str]] = 'default', cwd: Optional[str] = None) -> 'Popen':
return open_cmd(command_for_open(program), url, cwd=cwd) return open_cmd(command_for_open(program), url, cwd=cwd)
def detach(fork=True, setsid=True, redirect=True): def detach(fork: bool = True, setsid: bool = True, redirect: bool = True) -> None:
if fork: if fork:
# Detach from the controlling process. # Detach from the controlling process.
if os.fork() != 0: if os.fork() != 0:
@ -206,36 +209,36 @@ def detach(fork=True, setsid=True, redirect=True):
redirect_std_streams(os.devnull) redirect_std_streams(os.devnull)
def adjust_line_height(cell_height, val): def adjust_line_height(cell_height: int, val: Union[int, float]) -> int:
if isinstance(val, int): if isinstance(val, int):
return cell_height + val return cell_height + val
return int(cell_height * val) return int(cell_height * val)
def init_startup_notification_x11(window_handle, startup_id=None): def init_startup_notification_x11(window_handle: int, startup_id: Optional[str] = None) -> Optional['StartupCtx']:
# https://specifications.freedesktop.org/startup-notification-spec/startup-notification-latest.txt # https://specifications.freedesktop.org/startup-notification-spec/startup-notification-latest.txt
from kitty.fast_data_types import init_x11_startup_notification from kitty.fast_data_types import init_x11_startup_notification
sid = startup_id or os.environ.pop('DESKTOP_STARTUP_ID', None) # ensure child processes don't get this env var sid = startup_id or os.environ.pop('DESKTOP_STARTUP_ID', None) # ensure child processes don't get this env var
if not sid: if not sid:
return return None
from .fast_data_types import x11_display from .fast_data_types import x11_display
display = x11_display() display = x11_display()
if not display: if not display:
return return None
return init_x11_startup_notification(display, window_handle, sid) return init_x11_startup_notification(display, window_handle, sid)
def end_startup_notification_x11(ctx): def end_startup_notification_x11(ctx: 'StartupCtx') -> None:
from kitty.fast_data_types import end_x11_startup_notification from kitty.fast_data_types import end_x11_startup_notification
end_x11_startup_notification(ctx) end_x11_startup_notification(ctx)
def init_startup_notification(window_handle, startup_id=None): def init_startup_notification(window_handle: Optional[int], startup_id: Optional[str] = None) -> Optional['StartupCtx']:
if is_macos or is_wayland(): if is_macos or is_wayland():
return return None
if window_handle is None: if window_handle is None:
log_error('Could not perform startup notification as window handle not present') log_error('Could not perform startup notification as window handle not present')
return return None
try: try:
return init_startup_notification_x11(window_handle, startup_id) return init_startup_notification_x11(window_handle, startup_id)
except Exception: except Exception:
@ -243,7 +246,7 @@ def init_startup_notification(window_handle, startup_id=None):
traceback.print_exc() traceback.print_exc()
def end_startup_notification(ctx): def end_startup_notification(ctx: Optional['StartupCtx']) -> None:
if not ctx: if not ctx:
return return
if is_macos or is_wayland(): if is_macos or is_wayland():
@ -257,15 +260,15 @@ def end_startup_notification(ctx):
class startup_notification_handler: class startup_notification_handler:
def __init__(self, do_notify=True, startup_id=None, extra_callback=None): def __init__(self, do_notify: bool = True, startup_id: Optional[str] = None, extra_callback: Optional[Callable] = None):
self.do_notify = do_notify self.do_notify = do_notify
self.startup_id = startup_id self.startup_id = startup_id
self.extra_callback = extra_callback self.extra_callback = extra_callback
self.ctx = None self.ctx: Optional['StartupCtx'] = None
def __enter__(self): def __enter__(self) -> Callable[[int], None]:
def pre_show_callback(window_handle): def pre_show_callback(window_handle: int) -> None:
if self.extra_callback is not None: if self.extra_callback is not None:
self.extra_callback(window_handle) self.extra_callback(window_handle)
if self.do_notify: if self.do_notify:
@ -273,12 +276,12 @@ class startup_notification_handler:
return pre_show_callback return pre_show_callback
def __exit__(self, *a): def __exit__(self, *a: Any) -> None:
if self.ctx is not None: if self.ctx is not None:
end_startup_notification(self.ctx) end_startup_notification(self.ctx)
def remove_socket_file(s, path=None): def remove_socket_file(s: 'Socket', path: Optional[str] = None) -> None:
with suppress(OSError): with suppress(OSError):
s.close() s.close()
if path: if path:
@ -286,7 +289,7 @@ def remove_socket_file(s, path=None):
os.unlink(path) os.unlink(path)
def unix_socket_paths(name, ext='.lock'): def unix_socket_paths(name: str, ext: str = '.lock') -> Generator[str, None, None]:
import tempfile import tempfile
home = os.path.expanduser('~') home = os.path.expanduser('~')
candidates = [tempfile.gettempdir(), home] candidates = [tempfile.gettempdir(), home]
@ -299,7 +302,7 @@ def unix_socket_paths(name, ext='.lock'):
yield os.path.join(loc, filename) yield os.path.join(loc, filename)
def single_instance_unix(name): def single_instance_unix(name: str) -> bool:
import socket import socket
for path in unix_socket_paths(name): for path in unix_socket_paths(name):
socket_path = path.rpartition('.')[0] + '.sock' socket_path = path.rpartition('.')[0] + '.sock'
@ -328,13 +331,14 @@ def single_instance_unix(name):
s.listen() s.listen()
s.set_inheritable(False) s.set_inheritable(False)
return True return True
return False
class SingleInstance: class SingleInstance:
socket: Optional[Any] = None socket: Optional['Socket'] = None
def __call__(self, group_id: Optional[str] = None): def __call__(self, group_id: Optional[str] = None) -> bool:
import socket import socket
name = '{}-ipc-{}'.format(appname, os.geteuid()) name = '{}-ipc-{}'.format(appname, os.geteuid())
if group_id: if group_id:
@ -350,11 +354,11 @@ class SingleInstance:
return single_instance_unix(name) return single_instance_unix(name)
if err.errno == errno.EADDRINUSE: if err.errno == errno.EADDRINUSE:
s.connect(addr) s.connect(addr)
single_instance.socket = s self.socket = s
return False return False
raise raise
s.listen() s.listen()
single_instance.socket = s # prevent garbage collection from closing the socket self.socket = s # prevent garbage collection from closing the socket
s.set_inheritable(False) s.set_inheritable(False)
atexit.register(remove_socket_file, s) atexit.register(remove_socket_file, s)
return True return True
@ -363,10 +367,11 @@ class SingleInstance:
single_instance = SingleInstance() single_instance = SingleInstance()
def parse_address_spec(spec): def parse_address_spec(spec: str) -> Tuple['AddressFamily', Union[Tuple[str, int], str], Optional[str]]:
import socket import socket
protocol, rest = spec.split(':', 1) protocol, rest = spec.split(':', 1)
socket_path = None socket_path = None
address: Union[str, Tuple[str, int]] = ''
if protocol == 'unix': if protocol == 'unix':
family = socket.AF_UNIX family = socket.AF_UNIX
address = rest address = rest
@ -395,12 +400,12 @@ def write_all(fd: int, data: Union[str, bytes]) -> None:
class TTYIO: class TTYIO:
def __enter__(self): def __enter__(self) -> 'TTYIO':
from .fast_data_types import open_tty from .fast_data_types import open_tty
self.tty_fd, self.original_termios = open_tty(True) self.tty_fd, self.original_termios = open_tty(True)
return self return self
def __exit__(self, *a): def __exit__(self, *a: Any) -> None:
from .fast_data_types import close_tty from .fast_data_types import close_tty
close_tty(self.tty_fd, self.original_termios) close_tty(self.tty_fd, self.original_termios)
@ -411,7 +416,7 @@ class TTYIO:
for chunk in data: for chunk in data:
write_all(self.tty_fd, chunk) write_all(self.tty_fd, chunk)
def recv(self, more_needed, timeout, sz=1): def recv(self, more_needed: Callable[[bytes], bool], timeout: float, sz: int = 1) -> None:
fd = self.tty_fd fd = self.tty_fd
start_time = monotonic() start_time = monotonic()
while timeout > monotonic() - start_time: while timeout > monotonic() - start_time:
@ -422,43 +427,38 @@ class TTYIO:
break break
def natsort_ints(iterable): def natsort_ints(iterable: Iterable[str]) -> List[str]:
def convert(text): def convert(text: str) -> Union[int, str]:
return int(text) if text.isdigit() else text return int(text) if text.isdigit() else text
def alphanum_key(key): def alphanum_key(key: str) -> Tuple[Union[int, str], ...]:
return tuple(map(convert, re.split(r'(\d+)', key))) return tuple(map(convert, re.split(r'(\d+)', key)))
return sorted(iterable, key=alphanum_key) return sorted(iterable, key=alphanum_key)
def exe_exists(exe):
for loc in os.environ.get('PATH', '').split(os.pathsep):
if loc and os.access(os.path.join(loc, exe), os.X_OK):
return os.path.join(loc, exe)
return False
@lru_cache(maxsize=2) @lru_cache(maxsize=2)
def get_editor() -> List[str]: def get_editor() -> List[str]:
import shlex import shlex
import shutil
for ans in (os.environ.get('VISUAL'), os.environ.get('EDITOR'), 'vim', for ans in (os.environ.get('VISUAL'), os.environ.get('EDITOR'), 'vim',
'nvim', 'vi', 'emacs', 'kak', 'micro', 'nano', 'vis'): 'nvim', 'vi', 'emacs', 'kak', 'micro', 'nano', 'vis'):
if ans and exe_exists(shlex.split(ans)[0]): if ans and shutil.which(shlex.split(ans)[0]):
break break
else: else:
ans = 'vim' ans = 'vim'
return shlex.split(ans) return shlex.split(ans)
def is_path_in_temp_dir(path): def is_path_in_temp_dir(path: str) -> bool:
if not path: if not path:
return False return False
def abspath(x): def abspath(x: Optional[str]) -> str:
if x: if x:
return os.path.abspath(os.path.realpath(x)) x = os.path.abspath(os.path.realpath(x))
return x or ''
import tempfile import tempfile
path = abspath(path) path = abspath(path)
@ -469,11 +469,11 @@ def is_path_in_temp_dir(path):
return False return False
def func_name(f): def func_name(f: Any) -> str:
if hasattr(f, '__name__'): if hasattr(f, '__name__'):
return f.__name__ return str(f.__name__)
if hasattr(f, 'func') and hasattr(f.func, '__name__'): if hasattr(f, 'func') and hasattr(f.func, '__name__'):
return f.func.__name__ return str(f.func.__name__)
return str(f) return str(f)

View File

@ -652,8 +652,8 @@ class Window:
set_clipboard_string(text) set_clipboard_string(text)
else: else:
mode = keyboard_mode_name(self.screen) mode = keyboard_mode_name(self.screen)
text = extended_key_event(defines.GLFW_KEY_C, defines.GLFW_MOD_CONTROL, defines.GLFW_PRESS) if mode == 'kitty' else b'\x03' data = extended_key_event(defines.GLFW_KEY_C, defines.GLFW_MOD_CONTROL, defines.GLFW_PRESS) if mode == 'kitty' else b'\x03'
self.write_to_child(text) self.write_to_child(data)
def copy_and_clear_or_interrupt(self) -> None: def copy_and_clear_or_interrupt(self) -> None:
self.copy_or_interrupt() self.copy_or_interrupt()

View File

@ -25,7 +25,7 @@ warn_unreachable = True
warn_no_return = False warn_no_return = False
warn_unused_configs = True warn_unused_configs = True
check_untyped_defs = True check_untyped_defs = True
# disallow_untyped_defs = True disallow_untyped_defs = True
[mypy-conf] [mypy-conf]
# ignored because on the CI server sphinx type stubs are available somehow, but # ignored because on the CI server sphinx type stubs are available somehow, but