more typing work

This commit is contained in:
Kovid Goyal 2020-03-12 14:52:11 +05:30
parent b27f6d5957
commit b6692849d6
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
6 changed files with 59 additions and 51 deletions

View File

@ -35,11 +35,11 @@ If specified close the tab this command is run in, rather than the active tab.
def response_from_kitty(self, boss: 'Boss', window: 'Window', payload_get: PayloadGetType) -> ResponseType: def response_from_kitty(self, boss: 'Boss', window: 'Window', payload_get: PayloadGetType) -> ResponseType:
match = payload_get('match') match = payload_get('match')
if match: if match:
tabs = tuple(boss.match_tabs(match)) tabs = list(boss.match_tabs(match))
if not tabs: if not tabs:
raise MatchError(match, 'tabs') raise MatchError(match, 'tabs')
else: else:
tabs = tuple(boss.tab_for_window(window) if window and payload_get('self') else boss.active_tab) tabs = [boss.tab_for_window(window) if window and payload_get('self') else boss.active_tab]
for tab in tabs: for tab in tabs:
if window: if window:
if tab: if tab:

View File

@ -34,11 +34,11 @@ If specified close the window this command is run in, rather than the active win
def response_from_kitty(self, boss: 'Boss', window: 'Window', payload_get: PayloadGetType) -> ResponseType: def response_from_kitty(self, boss: 'Boss', window: 'Window', payload_get: PayloadGetType) -> ResponseType:
match = payload_get('match') match = payload_get('match')
if match: if match:
windows = tuple(boss.match_windows(match)) windows = list(boss.match_windows(match))
if not windows: if not windows:
raise MatchError(match) raise MatchError(match)
else: else:
windows = tuple(window if window and payload_get('self') else boss.active_window) windows = [window if window and payload_get('self') else boss.active_window]
for window in windows: for window in windows:
if window: if window:
boss.close_window(window) boss.close_window(window)

View File

@ -9,7 +9,10 @@ import sys
import types import types
from contextlib import suppress from contextlib import suppress
from functools import partial from functools import partial
from typing import Any, Dict, List, Tuple, Union from typing import (
TYPE_CHECKING, Any, Dict, Generator, Iterable, List, Optional, Tuple,
Union, cast
)
from .cli import emph, parse_args from .cli import emph, parse_args
from .cli_stub import RCOptions from .cli_stub import RCOptions
@ -21,14 +24,18 @@ from .rc.base import (
) )
from .utils import TTYIO, parse_address_spec from .utils import TTYIO, parse_address_spec
if TYPE_CHECKING:
from .boss import Boss # noqa
from .window import Window # noqa
def handle_cmd(boss, window, cmd):
cmd = json.loads(cmd) def handle_cmd(boss: 'Boss', window: 'Window', serialized_cmd: str) -> Optional[Dict[str, Any]]:
cmd = json.loads(serialized_cmd)
v = cmd['version'] v = cmd['version']
no_response = cmd.get('no_response', False) no_response = cmd.get('no_response', False)
if tuple(v)[:2] > version[:2]: if tuple(v)[:2] > version[:2]:
if no_response: if no_response:
return return None
return {'ok': False, 'error': 'The kitty client you are using to send remote commands is newer than this kitty instance. This is not supported.'} return {'ok': False, 'error': 'The kitty client you are using to send remote commands is newer than this kitty instance. This is not supported.'}
c = command_for_name(cmd['cmd']) c = command_for_name(cmd['cmd'])
payload = cmd.get('payload') or {} payload = cmd.get('payload') or {}
@ -37,15 +44,16 @@ def handle_cmd(boss, window, cmd):
ans = c.response_from_kitty(boss, window, PayloadGetter(c, payload)) ans = c.response_from_kitty(boss, window, PayloadGetter(c, payload))
except Exception: except Exception:
if no_response: # don't report errors if --no-response was used if no_response: # don't report errors if --no-response was used
return return None
raise raise
if ans is no_response_sentinel: if ans is no_response_sentinel:
return return None
response: Dict[str, Any] = {'ok': True} response: Dict[str, Any] = {'ok': True}
if ans is not None: if ans is not None:
response['data'] = ans response['data'] = ans
if not c.no_response and not no_response: if not c.no_response and not no_response:
return response return response
return None
global_options_spec = partial('''\ global_options_spec = partial('''\
@ -57,29 +65,29 @@ will only work if this process is run within an existing kitty window.
'''.format, appname=appname) '''.format, appname=appname)
def encode_send(send): def encode_send(send: Any) -> bytes:
send = ('@kitty-cmd' + json.dumps(send)).encode('ascii') es = ('@kitty-cmd' + json.dumps(send)).encode('ascii')
return b'\x1bP' + send + b'\x1b\\' return b'\x1bP' + es + b'\x1b\\'
class SocketIO: class SocketIO:
def __init__(self, to): def __init__(self, to: str):
self.family, self.address = parse_address_spec(to)[:2] self.family, self.address = parse_address_spec(to)[:2]
def __enter__(self): def __enter__(self) -> None:
import socket import socket
self.socket = socket.socket(self.family) self.socket = socket.socket(self.family)
self.socket.setblocking(True) self.socket.setblocking(True)
self.socket.connect(self.address) self.socket.connect(self.address)
def __exit__(self, *a): def __exit__(self, *a: Any) -> None:
import socket import socket
with suppress(OSError): # on some OSes such as macOS the socket is already closed at this point with suppress(OSError): # on some OSes such as macOS the socket is already closed at this point
self.socket.shutdown(socket.SHUT_RDWR) self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close() self.socket.close()
def send(self, data): def send(self, data: Union[bytes, Iterable[Union[str, bytes]]]) -> None:
import socket import socket
with self.socket.makefile('wb') as out: with self.socket.makefile('wb') as out:
if isinstance(data, bytes): if isinstance(data, bytes):
@ -92,7 +100,7 @@ class SocketIO:
out.flush() out.flush()
self.socket.shutdown(socket.SHUT_WR) self.socket.shutdown(socket.SHUT_WR)
def recv(self, timeout): def simple_recv(self, timeout: float) -> bytes:
dcs = re.compile(br'\x1bP@kitty-cmd([^\x1b]+)\x1b\\') dcs = re.compile(br'\x1bP@kitty-cmd([^\x1b]+)\x1b\\')
self.socket.settimeout(timeout) self.socket.settimeout(timeout)
with self.socket.makefile('rb') as src: with self.socket.makefile('rb') as src:
@ -100,23 +108,24 @@ class SocketIO:
m = dcs.search(data) m = dcs.search(data)
if m is None: if m is None:
raise TimeoutError('Timed out while waiting to read cmd response') raise TimeoutError('Timed out while waiting to read cmd response')
return m.group(1) return bytes(m.group(1))
class RCIO(TTYIO): class RCIO(TTYIO):
def recv(self, timeout): def simple_recv(self, timeout: float) -> bytes:
ans: List[bytes] = [] ans: List[bytes] = []
read_command_response(self.tty_fd, timeout, ans) read_command_response(self.tty_fd, timeout, ans)
return b''.join(ans) return b''.join(ans)
def do_io(to, send, no_response): def do_io(to: Optional[str], send: Dict, no_response: bool) -> Dict[str, Any]:
payload = send.get('payload') payload = send.get('payload')
if not isinstance(payload, types.GeneratorType): if not isinstance(payload, types.GeneratorType):
send_data = encode_send(send) send_data: Union[bytes, Iterable[bytes]] = encode_send(send)
else: else:
def send_generator(): def send_generator() -> Generator[bytes, None, None]:
assert payload is not None
for chunk in payload: for chunk in payload:
send['payload'] = chunk send['payload'] = chunk
yield encode_send(send) yield encode_send(send)
@ -127,10 +136,9 @@ def do_io(to, send, no_response):
io.send(send_data) io.send(send_data)
if no_response: if no_response:
return {'ok': True} return {'ok': True}
received = io.recv(timeout=10) received = io.simple_recv(timeout=10)
response = json.loads(received.decode('ascii')) return cast(Dict[str, Any], json.loads(received.decode('ascii')))
return response
cli_msg = ( cli_msg = (
@ -150,13 +158,13 @@ def parse_rc_args(args: List[str]) -> Tuple[RCOptions, List[str]]:
return parse_args(args[1:], global_options_spec, 'command ...', msg, '{} @'.format(appname), result_class=RCOptions) return parse_args(args[1:], global_options_spec, 'command ...', msg, '{} @'.format(appname), result_class=RCOptions)
def main(args): def main(args: List[str]) -> None:
global_opts, items = parse_rc_args(args) global_opts, items = parse_rc_args(args)
global_opts.no_command_response = None global_opts.no_command_response = None
if not items: if not items:
from kitty.shell import main from kitty.shell import main as smain
main(global_opts) smain(global_opts)
return return
cmd = items[0] cmd = items[0]
try: try:

View File

@ -9,16 +9,17 @@ import sys
import traceback import traceback
from contextlib import suppress from contextlib import suppress
from functools import lru_cache from functools import lru_cache
from typing import Dict, Tuple from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple
from .cli import ( from .cli import (
OptionDict, emph, green, italic, parse_option_spec, print_help_for_seq, OptionDict, emph, green, italic, parse_option_spec, print_help_for_seq,
title title
) )
from .cli_stub import RCOptions
from .constants import cache_dir, is_macos, version from .constants import cache_dir, is_macos, version
from .rc.base import ( from .rc.base import (
all_command_names, command_for_name, display_subcommand_help, RemoteCommand, all_command_names, command_for_name,
parse_subcommand_cli display_subcommand_help, parse_subcommand_cli
) )
@ -28,7 +29,7 @@ def match_commands() -> Tuple[str, ...]:
return tuple(sorted(all_commands + ('exit', 'help', 'quit'))) return tuple(sorted(all_commands + ('exit', 'help', 'quit')))
def init_readline(readline): def init_readline(readline: Any) -> None:
try: try:
readline.read_init_file() readline.read_init_file()
except OSError: except OSError:
@ -40,7 +41,7 @@ def init_readline(readline):
readline.parse_and_bind('tab: complete') readline.parse_and_bind('tab: complete')
def cmd_names_matching(prefix): def cmd_names_matching(prefix: str) -> Generator[str, None, None]:
for cmd in match_commands(): for cmd in match_commands():
if not prefix or cmd.startswith(prefix): if not prefix or cmd.startswith(prefix):
yield cmd + ' ' yield cmd + ' '
@ -66,7 +67,7 @@ def options_for_cmd(cmd: str) -> Tuple[Tuple[str, ...], Dict[str, OptionDict]]:
return tuple(sorted(ans)), alias_map return tuple(sorted(ans)), alias_map
def options_matching(prefix, cmd, last_word, aliases, alias_map): def options_matching(prefix: str, cmd: str, last_word: str, aliases: Iterable[str], alias_map: Dict[str, OptionDict]) -> Generator[str, None, None]:
for alias in aliases: for alias in aliases:
if (not prefix or alias.startswith(prefix)) and alias.startswith('--'): if (not prefix or alias.startswith(prefix)) and alias.startswith('--'):
yield alias + ' ' yield alias + ' '
@ -74,14 +75,14 @@ def options_matching(prefix, cmd, last_word, aliases, alias_map):
class Completer: class Completer:
def __init__(self): def __init__(self) -> None:
self.matches = [] self.matches: List[str] = []
ddir = cache_dir() ddir = cache_dir()
with suppress(FileExistsError): with suppress(FileExistsError):
os.makedirs(ddir) os.makedirs(ddir)
self.history_path = os.path.join(ddir, 'shell.history') self.history_path = os.path.join(ddir, 'shell.history')
def complete(self, text, state): def complete(self, text: str, state: int) -> Optional[str]:
if state == 0: if state == 0:
line = readline.get_line_buffer() line = readline.get_line_buffer()
cmdline = shlex.split(line) cmdline = shlex.split(line)
@ -91,8 +92,9 @@ class Completer:
self.matches = list(options_matching(text, cmdline[0], cmdline[-1], *options_for_cmd(cmdline[0]))) self.matches = list(options_matching(text, cmdline[0], cmdline[-1], *options_for_cmd(cmdline[0])))
if state < len(self.matches): if state < len(self.matches):
return self.matches[state] return self.matches[state]
return None
def __enter__(self): def __enter__(self) -> 'Completer':
with suppress(Exception): with suppress(Exception):
readline.read_history_file(self.history_path) readline.read_history_file(self.history_path)
readline.set_completer(self.complete) readline.set_completer(self.complete)
@ -100,16 +102,16 @@ class Completer:
readline.set_completer_delims(delims.replace('-', '')) readline.set_completer_delims(delims.replace('-', ''))
return self return self
def __exit__(self, *a): def __exit__(self, *a: Any) -> None:
readline.write_history_file(self.history_path) readline.write_history_file(self.history_path)
def print_err(*a, **kw): def print_err(*a: Any, **kw: Any) -> None:
kw['file'] = sys.stderr kw['file'] = sys.stderr
print(*a, **kw) print(*a, **kw)
def print_help(which=None): def print_help(which: Optional[str] = None) -> None:
if which is None: if which is None:
print('Control kitty by sending it commands.') print('Control kitty by sending it commands.')
print() print()
@ -135,9 +137,9 @@ def print_help(which=None):
display_subcommand_help(func) display_subcommand_help(func)
def run_cmd(global_opts, cmd, func, opts, items): def run_cmd(global_opts: RCOptions, cmd: str, func: RemoteCommand, opts: Any, items: List[str]) -> None:
from .remote_control import do_io from .remote_control import do_io
payload = func(global_opts, opts, items) payload = func.message_to_kitty(global_opts, opts, items)
send = { send = {
'cmd': cmd, 'cmd': cmd,
'version': version, 'version': version,
@ -155,7 +157,7 @@ def run_cmd(global_opts, cmd, func, opts, items):
print(response['data']) print(response['data'])
def real_main(global_opts): def real_main(global_opts: RCOptions) -> None:
init_readline(readline) init_readline(readline)
print_help_for_seq.allow_pager = False print_help_for_seq.allow_pager = False
print('Welcome to the kitty shell!') print('Welcome to the kitty shell!')
@ -214,7 +216,7 @@ def real_main(global_opts):
continue continue
def main(global_opts): def main(global_opts: RCOptions) -> None:
try: try:
with Completer(): with Completer():
real_main(global_opts) real_main(global_opts)

View File

@ -14,7 +14,8 @@ from contextlib import suppress
from functools import lru_cache from functools import lru_cache
from time import monotonic from time import monotonic
from typing import ( from typing import (
Any, Dict, Generator, List, NamedTuple, Optional, Tuple, Union, cast Any, Dict, Generator, Iterable, List, NamedTuple, Optional, Tuple, Union,
cast
) )
from .constants import ( from .constants import (
@ -403,7 +404,7 @@ class TTYIO:
from .fast_data_types import close_tty from .fast_data_types import close_tty
close_tty(self.tty_fd, self.original_termios) close_tty(self.tty_fd, self.original_termios)
def send(self, data): def send(self, data: Union[str, bytes, Iterable[Union[str, bytes]]]) -> None:
if isinstance(data, (str, bytes)): if isinstance(data, (str, bytes)):
write_all(self.tty_fd, data) write_all(self.tty_fd, data)
else: else:

View File

@ -25,9 +25,6 @@ warn_unreachable = True
warn_no_return = False warn_no_return = False
warn_unused_configs = True warn_unused_configs = True
check_untyped_defs = True check_untyped_defs = True
# disallow_untyped_defs = True
[mypy-kitty.rc.*,kitty.conf.*,kitty.fonts.*,kittens.*,kitty.launch,kitty.child,kitty.cli,kitty.config,kitty.choose_entry,kitty.main]
disallow_untyped_defs = True disallow_untyped_defs = True
[mypy-conf] [mypy-conf]