The utils and constants modules are now fully typechecked

This commit is contained in:
Kovid Goyal 2020-03-04 10:26:41 +05:30
parent 8ad62106e0
commit f0b29e15c3
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
11 changed files with 182 additions and 125 deletions

View File

@ -5,6 +5,7 @@
import sys import sys
from contextlib import contextmanager from contextlib import contextmanager
from functools import wraps from functools import wraps
from typing import List
from kitty.rgb import Color, color_as_sharp, to_color from kitty.rgb import Color, color_as_sharp, to_color
@ -168,7 +169,7 @@ def styled(text, fg=None, bg=None, fg_intense=False, bg_intense=False, italic=No
def serialize_gr_command(cmd, payload=None): def serialize_gr_command(cmd, payload=None):
cmd = ','.join('{}={}'.format(k, v) for k, v in cmd.items()) cmd = ','.join('{}={}'.format(k, v) for k, v in cmd.items())
ans = [] ans: List[bytes] = []
w = ans.append w = ans.append
w(b'\033_G'), w(cmd.encode('ascii')) w(b'\033_G'), w(cmd.encode('ascii'))
if payload: if payload:

View File

@ -17,8 +17,7 @@ from .conf.utils import to_cmdline
from .config import initial_window_size_func, prepare_config_file_for_editing from .config import initial_window_size_func, prepare_config_file_for_editing
from .config_data import MINIMUM_FONT_SIZE from .config_data import MINIMUM_FONT_SIZE
from .constants import ( from .constants import (
appname, config_dir, is_macos, kitty_exe, set_boss, appname, config_dir, is_macos, kitty_exe, supports_primary_selection
supports_primary_selection
) )
from .fast_data_types import ( from .fast_data_types import (
ChildMonitor, background_opacity_of, change_background_opacity, ChildMonitor, background_opacity_of, change_background_opacity,
@ -26,7 +25,7 @@ from .fast_data_types import (
current_os_window, destroy_global_data, focus_os_window, current_os_window, destroy_global_data, focus_os_window,
get_clipboard_string, global_font_size, mark_os_window_for_close, get_clipboard_string, global_font_size, mark_os_window_for_close,
os_window_font_size, patch_global_colors, safe_pipe, set_background_image, os_window_font_size, patch_global_colors, safe_pipe, set_background_image,
set_clipboard_string, set_in_sequence_mode, thread_write, set_boss, set_clipboard_string, set_in_sequence_mode, thread_write,
toggle_fullscreen, toggle_maximized toggle_fullscreen, toggle_maximized
) )
from .keys import get_shortcut, shortcut_matches from .keys import get_shortcut, shortcut_matches

View File

@ -6,7 +6,7 @@ import json
import os import os
import sys import sys
from contextlib import suppress from contextlib import suppress
from typing import Optional, BinaryIO from typing import Any, BinaryIO, Callable, Dict, List, Optional
from .cli import ( from .cli import (
Namespace, get_defaults_from_seq, parse_args, parse_option_spec Namespace, get_defaults_from_seq, parse_args, parse_option_spec
@ -21,7 +21,6 @@ from .launch import (
from .tabs import SpecialWindow from .tabs import SpecialWindow
from .utils import natsort_ints from .utils import natsort_ints
no_response = object() no_response = object()
@ -43,7 +42,8 @@ class UnknownLayout(ValueError):
hide_traceback = True hide_traceback = True
cmap = {} CommandFunction = Callable[[Namespace, Namespace, List[str]], Optional[Dict[str, Any]]]
cmap: Dict[str, CommandFunction] = {}
def cmd( def cmd(

View File

@ -8,6 +8,8 @@ import sys
import errno import errno
from collections import namedtuple from collections import namedtuple
from contextlib import suppress from contextlib import suppress
from functools import lru_cache
from typing import Set
appname = 'kitty' appname = 'kitty'
version = (0, 16, 0) version = (0, 16, 0)
@ -21,13 +23,12 @@ ScreenGeometry = namedtuple('ScreenGeometry', 'xstart ystart xnum ynum dx dy')
WindowGeometry = namedtuple('WindowGeometry', 'left top right bottom xnum ynum') WindowGeometry = namedtuple('WindowGeometry', 'left top right bottom xnum ynum')
@lru_cache(maxsize=2)
def kitty_exe(): def kitty_exe():
ans = getattr(kitty_exe, 'ans', None)
if ans is None:
rpath = sys._xoptions.get('bundle_exe_dir') rpath = sys._xoptions.get('bundle_exe_dir')
if not rpath: if not rpath:
items = filter(None, os.environ.get('PATH', '').split(os.pathsep)) items = filter(None, os.environ.get('PATH', '').split(os.pathsep))
seen = set() seen: Set[str] = set()
for candidate in items: for candidate in items:
if candidate not in seen: if candidate not in seen:
seen.add(candidate) seen.add(candidate)
@ -36,8 +37,7 @@ def kitty_exe():
break break
else: else:
rpath = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'launcher') rpath = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'launcher')
ans = kitty_exe.ans = os.path.join(rpath, 'kitty') return os.path.join(rpath, 'kitty')
return ans
def _get_config_dir(): def _get_config_dir():
@ -89,7 +89,8 @@ del _get_config_dir
defconf = os.path.join(config_dir, 'kitty.conf') defconf = os.path.join(config_dir, 'kitty.conf')
def _get_cache_dir(): @lru_cache(maxsize=2)
def cache_dir():
if 'KITTY_CACHE_DIRECTORY' in os.environ: if 'KITTY_CACHE_DIRECTORY' in os.environ:
candidate = os.path.abspath(os.environ['KITTY_CACHE_DIRECTORY']) candidate = os.path.abspath(os.environ['KITTY_CACHE_DIRECTORY'])
elif is_macos: elif is_macos:
@ -101,25 +102,11 @@ def _get_cache_dir():
return candidate return candidate
def cache_dir():
ans = getattr(cache_dir, 'ans', None)
if ans is None:
ans = cache_dir.ans = _get_cache_dir()
return ans
def get_boss():
return get_boss.boss
def set_boss(m):
from .fast_data_types import set_boss as set_c_boss
get_boss.boss = m
set_c_boss(m)
def wakeup(): def wakeup():
get_boss.boss.child_monitor.wakeup() from .fast_data_types import get_boss
b = get_boss()
if b is not None:
b.child_monitor.wakeup()
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@ -173,7 +160,7 @@ def is_wayland(opts=None):
if is_macos: if is_macos:
return False return False
if opts is None: if opts is None:
return is_wayland.ans return getattr(is_wayland, 'ans')
if opts.linux_display_server == 'auto': if opts.linux_display_server == 'auto':
ans = detect_if_wayland_ok() ans = detect_if_wayland_ok()
else: else:

View File

@ -1,10 +1,11 @@
from typing import ( from typing import Any, Callable, Dict, List, NewType, Optional, Tuple, Union
Any, Callable, List, Dict, NewType, Optional, Tuple, Union
)
from kitty.boss import Boss
from kitty.cli import Namespace from kitty.cli import Namespace
# Constants {{{ # Constants {{{
ERROR_PREFIX: str
GLSL_VERSION: int
GLFW_IBEAM_CURSOR: int GLFW_IBEAM_CURSOR: int
GLFW_KEY_UNKNOWN: int GLFW_KEY_UNKNOWN: int
GLFW_KEY_SPACE: int GLFW_KEY_SPACE: int
@ -322,6 +323,42 @@ BORDERS_PROGRAM: int
# }}} # }}}
def log_error_string(s: str) -> None:
pass
def set_primary_selection(x: bytes) -> None:
pass
def get_primary_selection() -> Optional[bytes]:
pass
def redirect_std_streams(devnull: str) -> None:
pass
StartupCtx = NewType('StartupCtx', int)
Display = NewType('Display', int)
def init_x11_startup_notification(display: Display, window_id: int, startup_id: Optional[str] = None) -> StartupCtx:
pass
def end_x11_startup_notification(ctx: StartupCtx) -> None:
pass
def x11_display() -> Optional[Display]:
pass
def user_cache_dir() -> str:
pass
def process_group_map() -> Tuple[Tuple[int, int], ...]: def process_group_map() -> Tuple[Tuple[int, int], ...]:
pass pass
@ -490,6 +527,14 @@ def set_background_image(
pass pass
def set_boss(boss: Boss) -> None:
pass
def get_boss() -> Optional[Boss]:
pass
def safe_pipe(nonblock: bool = True) -> Tuple[int, int]: def safe_pipe(nonblock: bool = True) -> Tuple[int, int]:
pass pass
@ -732,3 +777,6 @@ class ChildMonitor:
talk_fd: int = -1, listen_fd: int = -1 talk_fd: int = -1, listen_fd: int = -1
): ):
pass pass
def wakeup(self) -> None:
pass

View File

@ -23,8 +23,7 @@ if is_macos:
else: else:
from .fast_data_types import dbus_send_notification from .fast_data_types import dbus_send_notification, get_boss
from .constants import get_boss
alloc_map: Dict[int, str] = {} alloc_map: Dict[int, str] = {}
identifier_map: Dict[str, int] = {} identifier_map: Dict[str, int] = {}

View File

@ -905,6 +905,14 @@ PYWRAP1(set_boss) {
Py_RETURN_NONE; Py_RETURN_NONE;
} }
PYWRAP0(get_boss) {
if (global_state.boss) {
Py_INCREF(global_state.boss);
return global_state.boss;
}
Py_RETURN_NONE;
}
PYWRAP1(patch_global_colors) { PYWRAP1(patch_global_colors) {
PyObject *spec; PyObject *spec;
int configured; int configured;
@ -1051,6 +1059,7 @@ static PyMethodDef module_methods[] = {
MW(set_background_image, METH_VARARGS), MW(set_background_image, METH_VARARGS),
MW(os_window_font_size, METH_VARARGS), MW(os_window_font_size, METH_VARARGS),
MW(set_boss, METH_O), MW(set_boss, METH_O),
MW(get_boss, METH_NOARGS),
MW(patch_global_colors, METH_VARARGS), MW(patch_global_colors, METH_VARARGS),
MW(create_mock_window, METH_VARARGS), MW(create_mock_window, METH_VARARGS),
MW(destroy_global_data, METH_NOARGS), MW(destroy_global_data, METH_NOARGS),

View File

@ -9,11 +9,11 @@ from functools import partial
from .borders import Borders from .borders import Borders
from .child import Child from .child import Child
from .constants import appname, get_boss, is_macos, is_wayland from .constants import appname, is_macos, is_wayland
from .fast_data_types import ( from .fast_data_types import (
add_tab, attach_window, detach_window, mark_tab_bar_dirty, next_window_id, add_tab, attach_window, detach_window, get_boss, mark_tab_bar_dirty,
pt_to_px, remove_tab, remove_window, ring_bell, set_active_tab, swap_tabs, next_window_id, pt_to_px, remove_tab, remove_window, ring_bell,
x11_window_id set_active_tab, swap_tabs, x11_window_id
) )
from .layout import create_layout_object_for, evict_cached_layouts from .layout import create_layout_object_for, evict_cached_layouts
from .tab_bar import TabBar, TabBarData from .tab_bar import TabBar, TabBarData

View File

@ -6,12 +6,12 @@ import os
import subprocess import subprocess
import time import time
from collections import namedtuple from collections import namedtuple
from urllib.request import urlopen
from contextlib import suppress from contextlib import suppress
from urllib.request import urlopen
from .config import atomic_save from .config import atomic_save
from .constants import cache_dir, get_boss, kitty_exe, version from .constants import cache_dir, kitty_exe, version
from .fast_data_types import add_timer, monitor_pid from .fast_data_types import add_timer, get_boss, monitor_pid
from .notify import notify from .notify import notify
from .utils import log_error, open_url from .utils import log_error, open_url

View File

@ -10,13 +10,18 @@ import os
import re import re
import string import string
import sys import sys
from collections import namedtuple
from contextlib import suppress from contextlib import suppress
from functools import lru_cache
from time import monotonic from time import monotonic
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast
from .constants import ( from .constants import (
appname, is_macos, is_wayland, shell_path, supports_primary_selection appname, is_macos, is_wayland, shell_path, supports_primary_selection
) )
from .rgb import Color, to_color from .rgb import Color, to_color
if TYPE_CHECKING:
from .cli import Namespace # noqa
BASE = os.path.dirname(os.path.abspath(__file__)) BASE = os.path.dirname(os.path.abspath(__file__))
@ -77,31 +82,36 @@ def parse_color_set(raw):
continue continue
def screen_size_function(fd=None): ScreenSize = namedtuple('ScreenSize', 'rows cols width height cell_width cell_height')
ans = getattr(screen_size_function, 'ans', None)
if ans is None:
from collections import namedtuple class ScreenSizeGetter:
changed = True
Size = ScreenSize
ans: Optional[ScreenSize] = None
def __init__(self, fd: Optional[int]):
if fd is None:
fd = sys.stdout.fileno()
self.fd = fd
def __call__(self) -> ScreenSize:
if self.changed:
import array import array
import fcntl import fcntl
import termios import termios
Size = namedtuple('Size', 'rows cols width height cell_width cell_height')
if fd is None:
fd = sys.stdout
def screen_size():
if screen_size.changed:
buf = array.array('H', [0, 0, 0, 0]) buf = array.array('H', [0, 0, 0, 0])
fcntl.ioctl(fd, termios.TIOCGWINSZ, buf) fcntl.ioctl(self.fd, termios.TIOCGWINSZ, cast(bytearray, buf))
rows, cols, width, height = tuple(buf) rows, cols, width, height = tuple(buf)
cell_width, cell_height = width // (cols or 1), height // (rows or 1) cell_width, cell_height = width // (cols or 1), height // (rows or 1)
screen_size.ans = Size(rows, cols, width, height, cell_width, cell_height) self.ans = ScreenSize(rows, cols, width, height, cell_width, cell_height)
screen_size.changed = False self.changed = False
return screen_size.ans return cast(ScreenSize, self.ans)
screen_size.changed = True
screen_size.Size = Size
ans = screen_size_function.ans = screen_size
return ans
@lru_cache(maxsize=64)
def screen_size_function(fd=None):
return ScreenSizeGetter(fd)
def fit_image(width, height, pwidth, pheight): def fit_image(width, height, pwidth, pheight):
@ -311,7 +321,11 @@ def single_instance_unix(name):
return True return True
def single_instance(group_id=None): class SingleInstance:
socket: Optional[Any] = None
def __call__(self, group_id: Optional[str] = None):
import socket import socket
name = '{}-ipc-{}'.format(appname, os.geteuid()) name = '{}-ipc-{}'.format(appname, os.geteuid())
if group_id: if group_id:
@ -337,6 +351,9 @@ def single_instance(group_id=None):
return True return True
single_instance = SingleInstance()
def parse_address_spec(spec): def parse_address_spec(spec):
import socket import socket
protocol, rest = spec.split(':', 1) protocol, rest = spec.split(':', 1)
@ -414,9 +431,8 @@ def exe_exists(exe):
return False return False
def get_editor(): @lru_cache(maxsize=2)
ans = getattr(get_editor, 'ans', False) def get_editor() -> List[str]:
if ans is False:
import shlex import shlex
for ans in (os.environ.get('VISUAL'), os.environ.get('EDITOR'), 'vim', for ans in (os.environ.get('VISUAL'), os.environ.get('EDITOR'), 'vim',
'nvim', 'vi', 'emacs', 'kak', 'micro', 'nano', 'vis'): 'nvim', 'vi', 'emacs', 'kak', 'micro', 'nano', 'vis'):
@ -424,9 +440,7 @@ def get_editor():
break break
else: else:
ans = 'vim' ans = 'vim'
ans = shlex.split(ans) return shlex.split(ans)
get_editor.ans = ans
return ans
def is_path_in_temp_dir(path): def is_path_in_temp_dir(path):
@ -454,7 +468,7 @@ def func_name(f):
return str(f) return str(f)
def resolved_shell(opts=None): def resolved_shell(opts: Optional['Namespace'] = None) -> List[str]:
ans = getattr(opts, 'shell', '.') ans = getattr(opts, 'shell', '.')
if ans == '.': if ans == '.':
ans = [shell_path] ans = [shell_path]
@ -464,10 +478,12 @@ def resolved_shell(opts=None):
return ans return ans
def read_shell_environment(opts=None): def read_shell_environment(opts: Optional['Namespace'] = None) -> Dict[str, str]:
if not hasattr(read_shell_environment, 'ans'): ans = getattr(read_shell_environment, 'ans', None)
if ans is None:
from .child import openpty, remove_blocking from .child import openpty, remove_blocking
ans = read_shell_environment.ans = {} ans = {}
setattr(read_shell_environment, 'ans', ans)
import subprocess import subprocess
shell = resolved_shell(opts) shell = resolved_shell(opts)
master, slave = openpty() master, slave = openpty()
@ -484,7 +500,7 @@ def read_shell_environment(opts=None):
start_time = monotonic() start_time = monotonic()
while monotonic() - start_time < 1.5: while monotonic() - start_time < 1.5:
try: try:
ret = p.wait(0.01) ret: Optional[int] = p.wait(0.01)
except TimeoutExpired: except TimeoutExpired:
ret = None ret = None
with suppress(Exception): with suppress(Exception):
@ -503,11 +519,11 @@ def read_shell_environment(opts=None):
if not x: if not x:
break break
raw += x raw += x
raw = raw.decode('utf-8', 'replace') draw = raw.decode('utf-8', 'replace')
for line in raw.splitlines(): for line in draw.splitlines():
k, v = line.partition('=')[::2] k, v = line.partition('=')[::2]
if k and v: if k and v:
ans[k] = v ans[k] = v
else: else:
log_error('Failed to run shell to read its environment') log_error('Failed to run shell to read its environment')
return read_shell_environment.ans return ans

View File

@ -11,16 +11,14 @@ from enum import IntEnum
from itertools import chain from itertools import chain
from .config import build_ansi_color_table from .config import build_ansi_color_table
from .constants import ( from .constants import ScreenGeometry, WindowGeometry, appname, wakeup
ScreenGeometry, WindowGeometry, appname, get_boss, wakeup
)
from .fast_data_types import ( from .fast_data_types import (
BGIMAGE_PROGRAM, BLIT_PROGRAM, CELL_BG_PROGRAM, CELL_FG_PROGRAM, BGIMAGE_PROGRAM, BLIT_PROGRAM, CELL_BG_PROGRAM, CELL_FG_PROGRAM,
CELL_PROGRAM, CELL_SPECIAL_PROGRAM, CSI, DCS, DECORATION, DIM, CELL_PROGRAM, CELL_SPECIAL_PROGRAM, CSI, DCS, DECORATION, DIM,
GRAPHICS_ALPHA_MASK_PROGRAM, GRAPHICS_PREMULT_PROGRAM, GRAPHICS_PROGRAM, GRAPHICS_ALPHA_MASK_PROGRAM, GRAPHICS_PREMULT_PROGRAM, GRAPHICS_PROGRAM,
MARK, MARK_MASK, OSC, REVERSE, SCROLL_FULL, SCROLL_LINE, SCROLL_PAGE, MARK, MARK_MASK, OSC, REVERSE, SCROLL_FULL, SCROLL_LINE, SCROLL_PAGE,
STRIKETHROUGH, TINT_PROGRAM, Screen, add_window, cell_size_for_window, STRIKETHROUGH, TINT_PROGRAM, Screen, add_window, cell_size_for_window,
compile_program, get_clipboard_string, init_cell_program, compile_program, get_boss, get_clipboard_string, init_cell_program,
set_clipboard_string, set_titlebar_color, set_window_render_data, set_clipboard_string, set_titlebar_color, set_window_render_data,
update_window_title, update_window_visibility, viewport_for_window update_window_title, update_window_visibility, viewport_for_window
) )