more typing work

This commit is contained in:
Kovid Goyal 2020-03-11 20:08:16 +05:30
parent 0e871a89aa
commit 2ebdf738ca
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
10 changed files with 320 additions and 244 deletions

View File

@ -1,6 +1,6 @@
class GlobalData: class GlobalData:
def __init__(self): def __init__(self) -> None:
self.title = '' self.title = ''
self.cmd = '' self.cmd = ''

View File

@ -8,7 +8,11 @@ from contextlib import suppress
from functools import lru_cache from functools import lru_cache
from hashlib import md5 from hashlib import md5
from mimetypes import guess_type from mimetypes import guess_type
from typing import Dict, List, Set from typing import TYPE_CHECKING, Dict, List, Set, Optional, Iterator, Tuple, Union
if TYPE_CHECKING:
from .highlight import DiffHighlight # noqa
path_name_map: Dict[str, str] = {} path_name_map: Dict[str, str] = {}
@ -17,77 +21,77 @@ class Segment:
__slots__ = ('start', 'end', 'start_code', 'end_code') __slots__ = ('start', 'end', 'start_code', 'end_code')
def __init__(self, start, start_code): def __init__(self, start: int, start_code: str):
self.start = start self.start = start
self.start_code = start_code self.start_code = start_code
self.end = None self.end: Optional[int] = None
self.end_code = None self.end_code: Optional[str] = None
def __repr__(self): def __repr__(self) -> str:
return 'Segment(start={!r}, start_code={!r}, end={!r}, end_code={!r})'.format( return 'Segment(start={!r}, start_code={!r}, end={!r}, end_code={!r})'.format(
self.start, self.start_code, self.end, self.end_code) self.start, self.start_code, self.end, self.end_code)
class Collection: class Collection:
def __init__(self): def __init__(self) -> None:
self.changes = {} self.changes: Dict[str, str] = {}
self.renames = {} self.renames: Dict[str, str] = {}
self.adds = set() self.adds: Set[str] = set()
self.removes = set() self.removes: Set[str] = set()
self.all_paths = [] self.all_paths: List[str] = []
self.type_map = {} self.type_map: Dict[str, str] = {}
self.added_count = self.removed_count = 0 self.added_count = self.removed_count = 0
def add_change(self, left_path, right_path): def add_change(self, left_path: str, right_path: str) -> None:
self.changes[left_path] = right_path self.changes[left_path] = right_path
self.all_paths.append(left_path) self.all_paths.append(left_path)
self.type_map[left_path] = 'diff' self.type_map[left_path] = 'diff'
def add_rename(self, left_path, right_path): def add_rename(self, left_path: str, right_path: str) -> None:
self.renames[left_path] = right_path self.renames[left_path] = right_path
self.all_paths.append(left_path) self.all_paths.append(left_path)
self.type_map[left_path] = 'rename' self.type_map[left_path] = 'rename'
def add_add(self, right_path): def add_add(self, right_path: str) -> None:
self.adds.add(right_path) self.adds.add(right_path)
self.all_paths.append(right_path) self.all_paths.append(right_path)
self.type_map[right_path] = 'add' self.type_map[right_path] = 'add'
if isinstance(data_for_path(right_path), str): if isinstance(data_for_path(right_path), str):
self.added_count += len(lines_for_path(right_path)) self.added_count += len(lines_for_path(right_path))
def add_removal(self, left_path): def add_removal(self, left_path: str) -> None:
self.removes.add(left_path) self.removes.add(left_path)
self.all_paths.append(left_path) self.all_paths.append(left_path)
self.type_map[left_path] = 'removal' self.type_map[left_path] = 'removal'
if isinstance(data_for_path(left_path), str): if isinstance(data_for_path(left_path), str):
self.removed_count += len(lines_for_path(left_path)) self.removed_count += len(lines_for_path(left_path))
def finalize(self): def finalize(self) -> None:
self.all_paths.sort(key=path_name_map.get) self.all_paths.sort(key=path_name_map.get)
def __iter__(self): def __iter__(self) -> Iterator[Tuple[str, str, Optional[str]]]:
for path in self.all_paths: for path in self.all_paths:
typ = self.type_map[path] typ = self.type_map[path]
if typ == 'diff': if typ == 'diff':
data = self.changes[path] data: Optional[str] = self.changes[path]
elif typ == 'rename': elif typ == 'rename':
data = self.renames[path] data = self.renames[path]
else: else:
data = None data = None
yield path, typ, data yield path, typ, data
def __len__(self): def __len__(self) -> int:
return len(self.all_paths) return len(self.all_paths)
def collect_files(collection, left, right): def collect_files(collection: Collection, left: str, right: str) -> None:
left_names: Set[str] = set() left_names: Set[str] = set()
right_names: Set[str] = set() right_names: Set[str] = set()
left_path_map: Dict[str, str] = {} left_path_map: Dict[str, str] = {}
right_path_map: Dict[str, str] = {} right_path_map: Dict[str, str] = {}
def walk(base, names, pmap): def walk(base: str, names: Set[str], pmap: Dict[str, str]) -> None:
for dirpath, dirnames, filenames in os.walk(base): for dirpath, dirnames, filenames in os.walk(base):
for filename in filenames: for filename in filenames:
path = os.path.abspath(os.path.join(dirpath, filename)) path = os.path.abspath(os.path.join(dirpath, filename))
@ -95,7 +99,8 @@ def collect_files(collection, left, right):
names.add(name) names.add(name)
pmap[name] = path pmap[name] = path
walk(left, left_names, left_path_map), walk(right, right_names, right_path_map) walk(left, left_names, left_path_map)
walk(right, right_names, right_path_map)
common_names = left_names & right_names common_names = left_names & right_names
changed_names = {n for n in common_names if data_for_path(left_path_map[n]) != data_for_path(right_path_map[n])} changed_names = {n for n in common_names if data_for_path(left_path_map[n]) != data_for_path(right_path_map[n])}
for n in changed_names: for n in changed_names:
@ -121,33 +126,33 @@ def collect_files(collection, left, right):
sanitize_pat = re.compile('[\x00-\x09\x0b-\x1f\x7f\x80-\x9f]') sanitize_pat = re.compile('[\x00-\x09\x0b-\x1f\x7f\x80-\x9f]')
def sanitize(text): def sanitize(text: str) -> str:
ntext = text.replace('\r\n', '\n') ntext = text.replace('\r\n', '\n')
return sanitize_pat.sub('', ntext) return sanitize_pat.sub('', ntext)
@lru_cache(maxsize=1024) @lru_cache(maxsize=1024)
def mime_type_for_path(path): def mime_type_for_path(path: str) -> str:
return guess_type(path)[0] or 'application/octet-stream' return guess_type(path)[0] or 'application/octet-stream'
@lru_cache(maxsize=1024) @lru_cache(maxsize=1024)
def raw_data_for_path(path): def raw_data_for_path(path: str) -> bytes:
with open(path, 'rb') as f: with open(path, 'rb') as f:
return f.read() return f.read()
def is_image(path): def is_image(path: Optional[str]) -> bool:
return mime_type_for_path(path).startswith('image/') if path else False return mime_type_for_path(path).startswith('image/') if path else False
@lru_cache(maxsize=1024) @lru_cache(maxsize=1024)
def data_for_path(path): def data_for_path(path: str) -> Union[str, bytes]:
ans = raw_data_for_path(path) raw_bytes = raw_data_for_path(path)
if not is_image(path) and not os.path.samefile(path, os.devnull): if not is_image(path) and not os.path.samefile(path, os.devnull):
with suppress(UnicodeDecodeError): with suppress(UnicodeDecodeError):
ans = ans.decode('utf-8') return raw_bytes.decode('utf-8')
return ans return raw_bytes
class LinesForPath: class LinesForPath:
@ -155,8 +160,10 @@ class LinesForPath:
replace_tab_by = ' ' * 4 replace_tab_by = ' ' * 4
@lru_cache(maxsize=1024) @lru_cache(maxsize=1024)
def __call__(self, path): def __call__(self, path: str) -> Tuple[str, ...]:
data = data_for_path(path).replace('\t', self.replace_tab_by) data = data_for_path(path)
assert isinstance(data, str)
data = data.replace('\t', self.replace_tab_by)
return tuple(sanitize(data).splitlines()) return tuple(sanitize(data).splitlines())
@ -164,11 +171,11 @@ lines_for_path = LinesForPath()
@lru_cache(maxsize=1024) @lru_cache(maxsize=1024)
def hash_for_path(path): def hash_for_path(path: str) -> bytes:
return md5(raw_data_for_path(path)).digest() return md5(raw_data_for_path(path)).digest()
def create_collection(left, right): def create_collection(left: str, right: str) -> Collection:
collection = Collection() collection = Collection()
if os.path.isdir(left): if os.path.isdir(left):
collect_files(collection, left, right) collect_files(collection, left, right)
@ -181,13 +188,13 @@ def create_collection(left, right):
return collection return collection
highlight_data: Dict[str, List] = {} highlight_data: Dict[str, 'DiffHighlight'] = {}
def set_highlight_data(data): def set_highlight_data(data: Dict[str, 'DiffHighlight']) -> None:
global highlight_data global highlight_data
highlight_data = data highlight_data = data
def highlights_for_path(path): def highlights_for_path(path: str) -> 'DiffHighlight':
return highlight_data.get(path, []) return highlight_data.get(path, [])

View File

@ -3,7 +3,7 @@
# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net> # License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
import os import os
from typing import Any, Dict, Optional, Type from typing import Any, Dict, Iterable, Optional, Tuple, Type, Union
from kitty.conf.definition import config_lines from kitty.conf.definition import config_lines
from kitty.conf.utils import ( from kitty.conf.utils import (
@ -12,6 +12,7 @@ from kitty.conf.utils import (
) )
from kitty.constants import config_dir from kitty.constants import config_dir
from kitty.options_stub import DiffOptions from kitty.options_stub import DiffOptions
from kitty.cli_stub import DiffCLIOptions
from kitty.rgb import color_as_sgr from kitty.rgb import color_as_sgr
from .config_data import all_options, type_convert from .config_data import all_options, type_convert
@ -25,7 +26,7 @@ formats = {
} }
def set_formats(opts): def set_formats(opts: DiffOptions) -> None:
formats['text'] = '48' + color_as_sgr(opts.background) formats['text'] = '48' + color_as_sgr(opts.background)
formats['title'] = '38' + color_as_sgr(opts.title_fg) + ';48' + color_as_sgr(opts.title_bg) + ';1' formats['title'] = '38' + color_as_sgr(opts.title_fg) + ';48' + color_as_sgr(opts.title_bg) + ';1'
formats['margin'] = '38' + color_as_sgr(opts.margin_fg) + ';48' + color_as_sgr(opts.margin_bg) formats['margin'] = '38' + color_as_sgr(opts.margin_fg) + ';48' + color_as_sgr(opts.margin_bg)
@ -44,7 +45,7 @@ func_with_args, args_funcs = key_func()
@func_with_args('scroll_by') @func_with_args('scroll_by')
def parse_scroll_by(func, rest): def parse_scroll_by(func: str, rest: str) -> Tuple[str, int]:
try: try:
return func, int(rest) return func, int(rest)
except Exception: except Exception:
@ -52,7 +53,7 @@ def parse_scroll_by(func, rest):
@func_with_args('scroll_to') @func_with_args('scroll_to')
def parse_scroll_to(func, rest): def parse_scroll_to(func: str, rest: str) -> Tuple[str, str]:
rest = rest.lower() rest = rest.lower()
if rest not in {'start', 'end', 'next-change', 'prev-change', 'next-page', 'prev-page', 'next-match', 'prev-match'}: if rest not in {'start', 'end', 'next-change', 'prev-change', 'next-page', 'prev-page', 'next-match', 'prev-match'}:
rest = 'start' rest = 'start'
@ -60,7 +61,7 @@ def parse_scroll_to(func, rest):
@func_with_args('change_context') @func_with_args('change_context')
def parse_change_context(func, rest): def parse_change_context(func: str, rest: str) -> Tuple[str, Union[int, str]]:
rest = rest.lower() rest = rest.lower()
if rest in {'all', 'default'}: if rest in {'all', 'default'}:
return func, rest return func, rest
@ -72,23 +73,24 @@ def parse_change_context(func, rest):
@func_with_args('start_search') @func_with_args('start_search')
def parse_start_search(func, rest): def parse_start_search(func: str, rest: str) -> Tuple[str, Tuple[bool, bool]]:
rest = rest.lower().split() rest_ = rest.lower().split()
is_regex = rest and rest[0] == 'regex' is_regex = bool(rest_ and rest_[0] == 'regex')
is_backward = len(rest) > 1 and rest[1] == 'backward' is_backward = bool(len(rest_) > 1 and rest_[1] == 'backward')
return func, (is_regex, is_backward) return func, (is_regex, is_backward)
def special_handling(key, val, ans): def special_handling(key: str, val: str, ans: Dict) -> bool:
if key == 'map': if key == 'map':
x = parse_kittens_key(val, args_funcs) x = parse_kittens_key(val, args_funcs)
if x is not None: if x is not None:
action, *key_def = x action, key_def = x
ans['key_definitions'][tuple(key_def)] = action ans['key_definitions'][key_def] = action
return True return True
return False
def parse_config(lines, check_keys=True): def parse_config(lines: Iterable[str], check_keys: bool = True) -> Dict[str, Any]:
ans: Dict[str, Any] = {'key_definitions': {}} ans: Dict[str, Any] = {'key_definitions': {}}
parse_config_base( parse_config_base(
lines, lines,
@ -101,7 +103,7 @@ def parse_config(lines, check_keys=True):
return ans return ans
def merge_configs(defaults, vals): def merge_configs(defaults: Dict, vals: Dict) -> Dict:
ans = {} ans = {}
for k, v in defaults.items(): for k, v in defaults.items():
if isinstance(v, dict): if isinstance(v, dict):
@ -112,7 +114,7 @@ def merge_configs(defaults, vals):
return ans return ans
def parse_defaults(lines, check_keys=False): def parse_defaults(lines: Iterable[str], check_keys: bool = False) -> Dict[str, Any]:
return parse_config(lines, check_keys) return parse_config(lines, check_keys)
@ -121,7 +123,7 @@ Options: Type[DiffOptions] = x[0]
defaults = x[1] defaults = x[1]
def load_config(*paths, overrides=None): def load_config(*paths: str, overrides: Optional[Iterable[str]] = None) -> DiffOptions:
return _load_config(Options, defaults, parse_config, merge_configs, *paths, overrides=overrides) return _load_config(Options, defaults, parse_config, merge_configs, *paths, overrides=overrides)
@ -129,7 +131,7 @@ SYSTEM_CONF = '/etc/xdg/kitty/diff.conf'
defconf = os.path.join(config_dir, 'diff.conf') defconf = os.path.join(config_dir, 'diff.conf')
def init_config(args): def init_config(args: DiffCLIOptions) -> DiffOptions:
config = tuple(resolve_config(SYSTEM_CONF, defconf, args.config)) config = tuple(resolve_config(SYSTEM_CONF, defconf, args.config))
overrides = (a.replace('=', ' ', 1) for a in args.override or ()) overrides = (a.replace('=', ' ', 1) for a in args.override or ())
opts = load_config(*config, overrides=overrides) opts = load_config(*config, overrides=overrides)

View File

@ -5,7 +5,7 @@
import concurrent import concurrent
import os import os
import re import re
from typing import List, Optional from typing import IO, Dict, Iterable, List, Optional, Tuple, Union, cast
from pygments import highlight # type: ignore from pygments import highlight # type: ignore
from pygments.formatter import Formatter # type: ignore from pygments.formatter import Formatter # type: ignore
@ -14,7 +14,7 @@ from pygments.util import ClassNotFound # type: ignore
from kitty.rgb import color_as_sgr, parse_sharp from kitty.rgb import color_as_sgr, parse_sharp
from .collect import Segment, data_for_path, lines_for_path from .collect import Collection, Segment, data_for_path, lines_for_path
class StyleNotFound(Exception): class StyleNotFound(Exception):
@ -23,7 +23,7 @@ class StyleNotFound(Exception):
class DiffFormatter(Formatter): class DiffFormatter(Formatter):
def __init__(self, style='default'): def __init__(self, style: str = 'default') -> None:
try: try:
Formatter.__init__(self, style=style) Formatter.__init__(self, style=style)
initialized = True initialized = True
@ -32,26 +32,26 @@ class DiffFormatter(Formatter):
if not initialized: if not initialized:
raise StyleNotFound('pygments style "{}" not found'.format(style)) raise StyleNotFound('pygments style "{}" not found'.format(style))
self.styles = {} self.styles: Dict[str, Tuple[str, str]] = {}
for token, style in self.style: for token, token_style in self.style:
start = [] start = []
end = [] end = []
fstart = fend = '' fstart = fend = ''
# a style item is a tuple in the following form: # a style item is a tuple in the following form:
# colors are readily specified in hex: 'RRGGBB' # colors are readily specified in hex: 'RRGGBB'
col = style['color'] col = token_style['color']
if col: if col:
pc = parse_sharp(col) pc = parse_sharp(col)
if pc is not None: if pc is not None:
start.append('38' + color_as_sgr(pc)) start.append('38' + color_as_sgr(pc))
end.append('39') end.append('39')
if style['bold']: if token_style['bold']:
start.append('1') start.append('1')
end.append('22') end.append('22')
if style['italic']: if token_style['italic']:
start.append('3') start.append('3')
end.append('23') end.append('23')
if style['underline']: if token_style['underline']:
start.append('4') start.append('4')
end.append('24') end.append('24')
if start: if start:
@ -59,7 +59,7 @@ class DiffFormatter(Formatter):
fend = '\033[{}m'.format(';'.join(end)) fend = '\033[{}m'.format(';'.join(end))
self.styles[token] = fstart, fend self.styles[token] = fstart, fend
def format(self, tokensource, outfile): def format(self, tokensource: Iterable[Tuple[str, str]], outfile: IO[str]) -> None:
for ttype, value in tokensource: for ttype, value in tokensource:
not_found = True not_found = True
if value.rstrip('\n'): if value.rstrip('\n'):
@ -84,12 +84,12 @@ class DiffFormatter(Formatter):
formatter: Optional[DiffFormatter] = None formatter: Optional[DiffFormatter] = None
def initialize_highlighter(style='default'): def initialize_highlighter(style: str = 'default') -> None:
global formatter global formatter
formatter = DiffFormatter(style) formatter = DiffFormatter(style)
def highlight_data(code, filename, aliases=None): def highlight_data(code: str, filename: str, aliases: Optional[Dict[str, str]] = None) -> Optional[str]:
if aliases: if aliases:
base, ext = os.path.splitext(filename) base, ext = os.path.splitext(filename)
alias = aliases.get(ext[1:]) alias = aliases.get(ext[1:])
@ -98,15 +98,14 @@ def highlight_data(code, filename, aliases=None):
try: try:
lexer = get_lexer_for_filename(filename, stripnl=False) lexer = get_lexer_for_filename(filename, stripnl=False)
except ClassNotFound: except ClassNotFound:
pass return None
else: return cast(str, highlight(code, lexer, formatter))
return highlight(code, lexer, formatter)
split_pat = re.compile(r'(\033\[.*?m)') split_pat = re.compile(r'(\033\[.*?m)')
def highlight_line(line): def highlight_line(line: str) -> List[Segment]:
ans: List[Segment] = [] ans: List[Segment] = []
current: Optional[Segment] = None current: Optional[Segment] = None
pos = 0 pos = 0
@ -124,8 +123,11 @@ def highlight_line(line):
return ans return ans
def highlight_for_diff(path, aliases): DiffHighlight = List[List[Segment]]
ans = []
def highlight_for_diff(path: str, aliases: Dict[str, str]) -> DiffHighlight:
ans: DiffHighlight = []
lines = lines_for_path(path) lines = lines_for_path(path)
hd = highlight_data('\n'.join(lines), path, aliases) hd = highlight_data('\n'.join(lines), path, aliases)
if hd is not None: if hd is not None:
@ -134,9 +136,9 @@ def highlight_for_diff(path, aliases):
return ans return ans
def highlight_collection(collection, aliases=None): def highlight_collection(collection: Collection, aliases: Optional[Dict[str, str]] = None) -> Union[str, Dict[str, DiffHighlight]]:
jobs = {} jobs = {}
ans = {} ans: Dict[str, DiffHighlight] = {}
with concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count()) as executor: with concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
for path, item_type, other_path in collection: for path, item_type, other_path in collection:
if item_type != 'rename': if item_type != 'rename':
@ -155,7 +157,7 @@ def highlight_collection(collection, aliases=None):
return ans return ans
def main(): def main() -> None:
from .config import defaults from .config import defaults
# kitty +runpy "from kittens.diff.highlight import main; main()" file # kitty +runpy "from kittens.diff.highlight import main; main()" file
import sys import sys

View File

@ -13,22 +13,27 @@ from collections import defaultdict
from contextlib import suppress from contextlib import suppress
from functools import partial from functools import partial
from gettext import gettext as _ from gettext import gettext as _
from typing import DefaultDict, List, Tuple from typing import DefaultDict, Dict, Iterable, List, Optional, Tuple, Union
from kitty.cli import CONFIG_HELP, parse_args from kitty.cli import CONFIG_HELP, parse_args
from kitty.cli_stub import DiffCLIOptions from kitty.cli_stub import DiffCLIOptions
from kitty.conf.utils import KittensKeyAction
from kitty.constants import appname from kitty.constants import appname
from kitty.fast_data_types import wcswidth from kitty.fast_data_types import wcswidth
from kitty.key_encoding import RELEASE, enter_key, key_defs as K from kitty.key_encoding import RELEASE, KeyEvent, enter_key, key_defs as K
from kitty.options_stub import DiffOptions
from kitty.utils import ScreenSize
from . import global_data
from .collect import ( from .collect import (
create_collection, data_for_path, lines_for_path, sanitize, Collection, create_collection, data_for_path, lines_for_path, sanitize,
set_highlight_data set_highlight_data
) )
from . import global_data
from .config import init_config from .config import init_config
from .patch import Differ, set_diff_command, worker_processes from .patch import Differ, Patch, set_diff_command, worker_processes
from .render import ImageSupportWarning, LineRef, Reference, render_diff from .render import (
ImagePlacement, ImageSupportWarning, Line, LineRef, Reference, render_diff
)
from .search import BadRegex, Search from .search import BadRegex, Search
from ..tui.handler import Handler from ..tui.handler import Handler
from ..tui.images import ImageManager from ..tui.images import ImageManager
@ -37,8 +42,9 @@ from ..tui.loop import Loop
from ..tui.operations import styled from ..tui.operations import styled
try: try:
from .highlight import initialize_highlighter, highlight_collection from .highlight import initialize_highlighter, highlight_collection, DiffHighlight
has_highlighter = True has_highlighter = True
DiffHighlight
except ImportError: except ImportError:
has_highlighter = False has_highlighter = False
@ -47,13 +53,14 @@ INITIALIZING, COLLECTED, DIFFED, COMMAND, MESSAGE = range(5)
ESCAPE = K['ESCAPE'] ESCAPE = K['ESCAPE']
def generate_diff(collection, context): def generate_diff(collection: Collection, context: int) -> Union[str, Dict[str, Patch]]:
d = Differ() d = Differ()
for path, item_type, changed_path in collection: for path, item_type, changed_path in collection:
if item_type == 'diff': if item_type == 'diff':
is_binary = isinstance(data_for_path(path), bytes) or isinstance(data_for_path(changed_path), bytes) is_binary = isinstance(data_for_path(path), bytes) or isinstance(data_for_path(changed_path), bytes)
if not is_binary: if not is_binary:
assert changed_path is not None
d.add_diff(path, changed_path) d.add_diff(path, changed_path)
return d(context) return d(context)
@ -63,35 +70,35 @@ class DiffHandler(Handler):
image_manager_class = ImageManager image_manager_class = ImageManager
def __init__(self, args, opts, left, right): def __init__(self, args: DiffCLIOptions, opts: DiffOptions, left: str, right: str) -> None:
self.state = INITIALIZING self.state = INITIALIZING
self.message = '' self.message = ''
self.current_search_is_regex = True self.current_search_is_regex = True
self.current_search = None self.current_search: Optional[Search] = None
self.line_edit = LineEdit() self.line_edit = LineEdit()
self.opts = opts self.opts = opts
self.left, self.right = left, right self.left, self.right = left, right
self.report_traceback_on_exit = None self.report_traceback_on_exit: Union[str, Dict[str, Patch], None] = None
self.args = args self.args = args
self.scroll_pos = self.max_scroll_pos = 0 self.scroll_pos = self.max_scroll_pos = 0
self.current_context_count = self.original_context_count = self.args.context self.current_context_count = self.original_context_count = self.args.context
if self.current_context_count < 0: if self.current_context_count < 0:
self.current_context_count = self.original_context_count = self.opts.num_context_lines self.current_context_count = self.original_context_count = self.opts.num_context_lines
self.highlighting_done = False self.highlighting_done = False
self.restore_position = None self.restore_position: Optional[Reference] = None
for key_def, action in self.opts.key_definitions.items(): for key_def, action in self.opts.key_definitions.items():
self.add_shortcut(action, *key_def) self.add_shortcut(action, *key_def)
def perform_action(self, action): def perform_action(self, action: KittensKeyAction) -> None:
func, args = action func, args = action
if func == 'quit': if func == 'quit':
self.quit_loop(0) self.quit_loop(0)
return return
if self.state <= DIFFED: if self.state <= DIFFED:
if func == 'scroll_by': if func == 'scroll_by':
return self.scroll_lines(*args) return self.scroll_lines(int(args[0]))
if func == 'scroll_to': if func == 'scroll_to':
where = args[0] where = str(args[0])
if 'change' in where: if 'change' in where:
return self.scroll_to_next_change(backwards='prev' in where) return self.scroll_to_next_change(backwards='prev' in where)
if 'match' in where: if 'match' in where:
@ -103,7 +110,7 @@ class DiffHandler(Handler):
return self.scroll_lines(amt) return self.scroll_lines(amt)
if func == 'change_context': if func == 'change_context':
new_ctx = self.current_context_count new_ctx = self.current_context_count
to = args[0] to = int(args[0])
if to == 'all': if to == 'all':
new_ctx = 100000 new_ctx = 100000
elif to == 'default': elif to == 'default':
@ -112,25 +119,25 @@ class DiffHandler(Handler):
new_ctx += to new_ctx += to
return self.change_context_count(new_ctx) return self.change_context_count(new_ctx)
if func == 'start_search': if func == 'start_search':
self.start_search(*args) self.start_search(bool(args[0]), bool(args[1]))
return return
def create_collection(self): def create_collection(self) -> None:
def collect_done(collection): def collect_done(collection: Collection) -> None:
self.collection = collection self.collection = collection
self.state = COLLECTED self.state = COLLECTED
self.generate_diff() self.generate_diff()
def collect(left, right): def collect(left: str, right: str) -> None:
collection = create_collection(left, right) collection = create_collection(left, right)
self.asyncio_loop.call_soon_threadsafe(collect_done, collection) self.asyncio_loop.call_soon_threadsafe(collect_done, collection)
self.asyncio_loop.run_in_executor(None, collect, self.left, self.right) self.asyncio_loop.run_in_executor(None, collect, self.left, self.right)
def generate_diff(self): def generate_diff(self) -> None:
def diff_done(diff_map): def diff_done(diff_map: Union[str, Dict[str, Patch]]) -> None:
if isinstance(diff_map, str): if isinstance(diff_map, str):
self.report_traceback_on_exit = diff_map self.report_traceback_on_exit = diff_map
self.quit_loop(1) self.quit_loop(1)
@ -155,15 +162,15 @@ class DiffHandler(Handler):
return return
self.syntax_highlight() self.syntax_highlight()
def diff(collection, current_context_count): def diff(collection: Collection, current_context_count: int) -> None:
diff_map = generate_diff(collection, current_context_count) diff_map = generate_diff(collection, current_context_count)
self.asyncio_loop.call_soon_threadsafe(diff_done, diff_map) self.asyncio_loop.call_soon_threadsafe(diff_done, diff_map)
self.asyncio_loop.run_in_executor(None, diff, self.collection, self.current_context_count) self.asyncio_loop.run_in_executor(None, diff, self.collection, self.current_context_count)
def syntax_highlight(self): def syntax_highlight(self) -> None:
def highlighting_done(hdata): def highlighting_done(hdata: Union[str, Dict[str, 'DiffHighlight']]) -> None:
if isinstance(hdata, str): if isinstance(hdata, str):
self.report_traceback_on_exit = hdata self.report_traceback_on_exit = hdata
self.quit_loop(1) self.quit_loop(1)
@ -172,21 +179,21 @@ class DiffHandler(Handler):
self.render_diff() self.render_diff()
self.draw_screen() self.draw_screen()
def highlight(*a): def highlight(collection: Collection, aliases: Optional[Dict[str, str]] = None) -> None:
result = highlight_collection(*a) result = highlight_collection(collection, aliases)
self.asyncio_loop.call_soon_threadsafe(highlighting_done, result) self.asyncio_loop.call_soon_threadsafe(highlighting_done, result)
self.asyncio_loop.run_in_executor(None, highlight, self.collection, self.opts.syntax_aliases) self.asyncio_loop.run_in_executor(None, highlight, self.collection, self.opts.syntax_aliases)
def calculate_statistics(self): def calculate_statistics(self) -> None:
self.added_count = self.collection.added_count self.added_count = self.collection.added_count
self.removed_count = self.collection.removed_count self.removed_count = self.collection.removed_count
for patch in self.diff_map.values(): for patch in self.diff_map.values():
self.added_count += patch.added_count self.added_count += patch.added_count
self.removed_count += patch.removed_count self.removed_count += patch.removed_count
def render_diff(self): def render_diff(self) -> None:
self.diff_lines = tuple(render_diff(self.collection, self.diff_map, self.args, self.screen_size.cols, self.image_manager)) self.diff_lines: Tuple[Line, ...] = tuple(render_diff(self.collection, self.diff_map, self.args, self.screen_size.cols, self.image_manager))
self.margin_size = render_diff.margin_size self.margin_size = render_diff.margin_size
self.ref_path_map: DefaultDict[str, List[Tuple[int, Reference]]] = defaultdict(list) self.ref_path_map: DefaultDict[str, List[Tuple[int, Reference]]] = defaultdict(list)
for i, l in enumerate(self.diff_lines): for i, l in enumerate(self.diff_lines):
@ -196,11 +203,11 @@ class DiffHandler(Handler):
self.current_search(self.diff_lines, self.margin_size, self.screen_size.cols) self.current_search(self.diff_lines, self.margin_size, self.screen_size.cols)
@property @property
def current_position(self): def current_position(self) -> Reference:
return self.diff_lines[min(len(self.diff_lines) - 1, self.scroll_pos)].ref return self.diff_lines[min(len(self.diff_lines) - 1, self.scroll_pos)].ref
@current_position.setter @current_position.setter
def current_position(self, ref): def current_position(self, ref: Reference) -> None:
num = None num = None
if isinstance(ref.extra, LineRef): if isinstance(ref.extra, LineRef):
sln = ref.extra.src_line_number sln = ref.extra.src_line_number
@ -220,10 +227,10 @@ class DiffHandler(Handler):
self.scroll_pos = max(0, min(num, self.max_scroll_pos)) self.scroll_pos = max(0, min(num, self.max_scroll_pos))
@property @property
def num_lines(self): def num_lines(self) -> int:
return self.screen_size.rows - 1 return self.screen_size.rows - 1
def scroll_to_next_change(self, backwards=False): def scroll_to_next_change(self, backwards: bool = False) -> None:
if backwards: if backwards:
r = range(self.scroll_pos - 1, -1, -1) r = range(self.scroll_pos - 1, -1, -1)
else: else:
@ -235,7 +242,7 @@ class DiffHandler(Handler):
return return
self.cmd.bell() self.cmd.bell()
def scroll_to_next_match(self, backwards=False, include_current=False): def scroll_to_next_match(self, backwards: bool = False, include_current: bool = False) -> None:
if self.current_search is not None: if self.current_search is not None:
offset = 0 if include_current else 1 offset = 0 if include_current else 1
if backwards: if backwards:
@ -248,10 +255,10 @@ class DiffHandler(Handler):
return return
self.cmd.bell() self.cmd.bell()
def set_scrolling_region(self): def set_scrolling_region(self) -> None:
self.cmd.set_scrolling_region(self.screen_size, 0, self.num_lines - 2) self.cmd.set_scrolling_region(self.screen_size, 0, self.num_lines - 2)
def scroll_lines(self, amt=1): def scroll_lines(self, amt: int = 1) -> None:
new_pos = max(0, min(self.scroll_pos + amt, self.max_scroll_pos)) new_pos = max(0, min(self.scroll_pos + amt, self.max_scroll_pos))
if new_pos == self.scroll_pos: if new_pos == self.scroll_pos:
self.cmd.bell() self.cmd.bell()
@ -271,7 +278,7 @@ class DiffHandler(Handler):
self.draw_lines(amt, self.num_lines - amt) self.draw_lines(amt, self.num_lines - amt)
self.draw_status_line() self.draw_status_line()
def init_terminal_state(self): def init_terminal_state(self) -> None:
self.cmd.set_line_wrapping(False) self.cmd.set_line_wrapping(False)
self.cmd.set_window_title(global_data.title) self.cmd.set_window_title(global_data.title)
self.cmd.set_default_colors( self.cmd.set_default_colors(
@ -280,21 +287,21 @@ class DiffHandler(Handler):
select_bg=self.opts.select_bg) select_bg=self.opts.select_bg)
self.cmd.set_cursor_shape('bar') self.cmd.set_cursor_shape('bar')
def finalize(self): def finalize(self) -> None:
self.cmd.set_default_colors() self.cmd.set_default_colors()
self.cmd.set_cursor_visible(True) self.cmd.set_cursor_visible(True)
self.cmd.set_scrolling_region() self.cmd.set_scrolling_region()
def initialize(self): def initialize(self) -> None:
self.init_terminal_state() self.init_terminal_state()
self.set_scrolling_region() self.set_scrolling_region()
self.draw_screen() self.draw_screen()
self.create_collection() self.create_collection()
def enforce_cursor_state(self): def enforce_cursor_state(self) -> None:
self.cmd.set_cursor_visible(self.state == COMMAND) self.cmd.set_cursor_visible(self.state == COMMAND)
def draw_lines(self, num, offset=0): def draw_lines(self, num: int, offset: int = 0) -> None:
offset += self.scroll_pos offset += self.scroll_pos
image_involved = False image_involved = False
limit = len(self.diff_lines) limit = len(self.diff_lines)
@ -315,7 +322,7 @@ class DiffHandler(Handler):
if image_involved: if image_involved:
self.place_images() self.place_images()
def place_images(self): def place_images(self) -> None:
self.cmd.clear_images_on_screen() self.cmd.clear_images_on_screen()
offset = self.scroll_pos offset = self.scroll_pos
limit = len(self.diff_lines) limit = len(self.diff_lines)
@ -338,7 +345,7 @@ class DiffHandler(Handler):
self.place_image(row, right_placement, False) self.place_image(row, right_placement, False)
in_image = True in_image = True
def place_image(self, row, placement, is_left): def place_image(self, row: int, placement: ImagePlacement, is_left: bool) -> None:
xpos = (0 if is_left else (self.screen_size.cols // 2)) + placement.image.margin_size xpos = (0 if is_left else (self.screen_size.cols // 2)) + placement.image.margin_size
image_height_in_rows = placement.image.rows image_height_in_rows = placement.image.rows
topmost_visible_row = placement.row topmost_visible_row = placement.row
@ -350,7 +357,7 @@ class DiffHandler(Handler):
self.image_manager.show_image(placement.image.image_id, xpos, row, src_rect=( self.image_manager.show_image(placement.image.image_id, xpos, row, src_rect=(
0, top, placement.image.width, height)) 0, top, placement.image.width, height))
def draw_screen(self): def draw_screen(self) -> None:
self.enforce_cursor_state() self.enforce_cursor_state()
if self.state < DIFFED: if self.state < DIFFED:
self.cmd.clear_screen() self.cmd.clear_screen()
@ -361,7 +368,7 @@ class DiffHandler(Handler):
self.draw_lines(self.num_lines) self.draw_lines(self.num_lines)
self.draw_status_line() self.draw_status_line()
def draw_status_line(self): def draw_status_line(self) -> None:
if self.state < DIFFED: if self.state < DIFFED:
return return
self.enforce_cursor_state() self.enforce_cursor_state()
@ -388,7 +395,7 @@ class DiffHandler(Handler):
text = '{}{}{}'.format(prefix, ' ' * filler, suffix) text = '{}{}{}'.format(prefix, ' ' * filler, suffix)
self.write(text) self.write(text)
def change_context_count(self, new_ctx): def change_context_count(self, new_ctx: int) -> None:
new_ctx = max(0, new_ctx) new_ctx = max(0, new_ctx)
if new_ctx != self.current_context_count: if new_ctx != self.current_context_count:
self.current_context_count = new_ctx self.current_context_count = new_ctx
@ -397,7 +404,7 @@ class DiffHandler(Handler):
self.restore_position = self.current_position self.restore_position = self.current_position
self.draw_screen() self.draw_screen()
def start_search(self, is_regex, is_backward): def start_search(self, is_regex: bool, is_backward: bool) -> None:
if self.state != DIFFED: if self.state != DIFFED:
self.cmd.bell() self.cmd.bell()
return return
@ -407,7 +414,7 @@ class DiffHandler(Handler):
self.current_search_is_regex = is_regex self.current_search_is_regex = is_regex
self.draw_status_line() self.draw_status_line()
def do_search(self): def do_search(self) -> None:
self.current_search = None self.current_search = None
query = self.line_edit.current_input query = self.line_edit.current_input
if len(query) < 2: if len(query) < 2:
@ -426,7 +433,7 @@ class DiffHandler(Handler):
self.message = sanitize(_('No matches found')) self.message = sanitize(_('No matches found'))
self.cmd.bell() self.cmd.bell()
def on_text(self, text, in_bracketed_paste=False): def on_text(self, text: str, in_bracketed_paste: bool = False) -> None:
if self.state is COMMAND: if self.state is COMMAND:
self.line_edit.on_text(text, in_bracketed_paste) self.line_edit.on_text(text, in_bracketed_paste)
self.draw_status_line() self.draw_status_line()
@ -439,7 +446,7 @@ class DiffHandler(Handler):
if action is not None: if action is not None:
return self.perform_action(action) return self.perform_action(action)
def on_key(self, key_event): def on_key(self, key_event: KeyEvent) -> None:
if self.state is MESSAGE: if self.state is MESSAGE:
if key_event.type is not RELEASE: if key_event.type is not RELEASE:
self.state = DIFFED self.state = DIFFED
@ -472,7 +479,7 @@ class DiffHandler(Handler):
if action is not None: if action is not None:
return self.perform_action(action) return self.perform_action(action)
def on_resize(self, screen_size): def on_resize(self, screen_size: ScreenSize) -> None:
self.screen_size = screen_size self.screen_size = screen_size
self.set_scrolling_region() self.set_scrolling_region()
if self.state > COLLECTED: if self.state > COLLECTED:
@ -480,10 +487,10 @@ class DiffHandler(Handler):
self.render_diff() self.render_diff()
self.draw_screen() self.draw_screen()
def on_interrupt(self): def on_interrupt(self) -> None:
self.quit_loop(1) self.quit_loop(1)
def on_eot(self): def on_eot(self) -> None:
self.quit_loop(1) self.quit_loop(1)
@ -510,10 +517,10 @@ Syntax: :italic:`name=value`. For example: :italic:`-o background=gray`
class ShowWarning: class ShowWarning:
def __init__(self): def __init__(self) -> None:
self.warnings = [] self.warnings: List[str] = []
def __call__(self, message, category, filename, lineno, file=None, line=None): def __call__(self, message: str, category: object, filename: str, lineno: int, file: object = None, line: object = None) -> None:
if category is ImageSupportWarning: if category is ImageSupportWarning:
showwarning.warnings.append(message) showwarning.warnings.append(message)
@ -523,13 +530,13 @@ help_text = 'Show a side-by-side diff of the specified files/directories. You ca
usage = 'file_or_directory_left file_or_directory_right' usage = 'file_or_directory_left file_or_directory_right'
def terminate_processes(processes): def terminate_processes(processes: Iterable[int]) -> None:
for pid in processes: for pid in processes:
with suppress(Exception): with suppress(Exception):
os.kill(pid, signal.SIGKILL) os.kill(pid, signal.SIGKILL)
def get_remote_file(path): def get_remote_file(path: str) -> str:
if path.startswith('ssh:'): if path.startswith('ssh:'):
parts = path.split(':', 2) parts = path.split(':', 2)
if len(parts) == 3: if len(parts) == 3:
@ -543,16 +550,16 @@ def get_remote_file(path):
return path return path
def main(args): def main(args: List[str]) -> None:
warnings.showwarning = showwarning warnings.showwarning = showwarning
args, items = parse_args(args[1:], OPTIONS, usage, help_text, 'kitty +kitten diff', result_class=DiffCLIOptions) cli_opts, items = parse_args(args[1:], OPTIONS, usage, help_text, 'kitty +kitten diff', result_class=DiffCLIOptions)
if len(items) != 2: if len(items) != 2:
raise SystemExit('You must specify exactly two files/directories to compare') raise SystemExit('You must specify exactly two files/directories to compare')
left, right = items left, right = items
global_data.title = _('{} vs. {}').format(left, right) global_data.title = _('{} vs. {}').format(left, right)
if os.path.isdir(left) != os.path.isdir(right): if os.path.isdir(left) != os.path.isdir(right):
raise SystemExit('The items to be diffed should both be either directories or files. Comparing a directory to a file is not valid.') raise SystemExit('The items to be diffed should both be either directories or files. Comparing a directory to a file is not valid.')
opts = init_config(args) opts = init_config(cli_opts)
set_diff_command(opts.diff_cmd) set_diff_command(opts.diff_cmd)
lines_for_path.replace_tab_by = opts.replace_tab_by lines_for_path.replace_tab_by = opts.replace_tab_by
left, right = map(get_remote_file, (left, right)) left, right = map(get_remote_file, (left, right))
@ -561,7 +568,7 @@ def main(args):
raise SystemExit('{} does not exist'.format(f)) raise SystemExit('{} does not exist'.format(f))
loop = Loop() loop = Loop()
handler = DiffHandler(args, opts, left, right) handler = DiffHandler(cli_opts, opts, left, right)
loop.loop(handler) loop.loop(handler)
for message in showwarning.warnings: for message in showwarning.warnings:
from kitty.utils import safe_print from kitty.utils import safe_print

View File

@ -7,7 +7,7 @@ import os
import shlex import shlex
import shutil import shutil
import subprocess import subprocess
from typing import List, Optional, Tuple from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union
from . import global_data from . import global_data
from .collect import lines_for_path from .collect import lines_for_path
@ -20,14 +20,14 @@ DIFF_DIFF = 'diff -p -U _CONTEXT_ --'
worker_processes: List[int] = [] worker_processes: List[int] = []
def find_differ(): def find_differ() -> Optional[str]:
if shutil.which('git') and subprocess.Popen(['git', '--help'], stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL).wait() == 0: if shutil.which('git') and subprocess.Popen(['git', '--help'], stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL).wait() == 0:
return GIT_DIFF return GIT_DIFF
if shutil.which('diff'): if shutil.which('diff'):
return DIFF_DIFF return DIFF_DIFF
def set_diff_command(opt): def set_diff_command(opt: str) -> None:
if opt == 'auto': if opt == 'auto':
cmd = find_differ() cmd = find_differ()
if cmd is None: if cmd is None:
@ -37,7 +37,7 @@ def set_diff_command(opt):
global_data.cmd = cmd global_data.cmd = cmd
def run_diff(file1: str, file2: str, context: int = 3): def run_diff(file1: str, file2: str, context: int = 3) -> Tuple[bool, Union[int, bool], str]:
# returns: ok, is_different, patch # returns: ok, is_different, patch
cmd = shlex.split(global_data.cmd.replace('_CONTEXT_', str(context))) cmd = shlex.split(global_data.cmd.replace('_CONTEXT_', str(context)))
# we resolve symlinks because git diff does not follow symlinks, while diff # we resolve symlinks because git diff does not follow symlinks, while diff
@ -61,49 +61,49 @@ class Chunk:
__slots__ = ('is_context', 'left_start', 'right_start', 'left_count', 'right_count', 'centers') __slots__ = ('is_context', 'left_start', 'right_start', 'left_count', 'right_count', 'centers')
def __init__(self, left_start, right_start, is_context=False): def __init__(self, left_start: int, right_start: int, is_context: bool = False) -> None:
self.is_context = is_context self.is_context = is_context
self.left_start = left_start self.left_start = left_start
self.right_start = right_start self.right_start = right_start
self.left_count = self.right_count = 0 self.left_count = self.right_count = 0
self.centers = None self.centers: Optional[Tuple[Tuple[int, int], ...]] = None
def add_line(self): def add_line(self) -> None:
self.right_count += 1 self.right_count += 1
def remove_line(self): def remove_line(self) -> None:
self.left_count += 1 self.left_count += 1
def context_line(self): def context_line(self) -> None:
self.left_count += 1 self.left_count += 1
self.right_count += 1 self.right_count += 1
def finalize(self): def finalize(self) -> None:
if not self.is_context and self.left_count == self.right_count: if not self.is_context and self.left_count == self.right_count:
self.centers = tuple( self.centers = tuple(
changed_center(left_lines[self.left_start + i], right_lines[self.right_start + i]) changed_center(left_lines[self.left_start + i], right_lines[self.right_start + i])
for i in range(self.left_count) for i in range(self.left_count)
) )
def __repr__(self): def __repr__(self) -> str:
return 'Chunk(is_context={}, left_start={}, left_count={}, right_start={}, right_count={})'.format( return 'Chunk(is_context={}, left_start={}, left_count={}, right_start={}, right_count={})'.format(
self.is_context, self.left_start, self.left_count, self.right_start, self.right_count) self.is_context, self.left_start, self.left_count, self.right_start, self.right_count)
class Hunk: class Hunk:
def __init__(self, title, left, right): def __init__(self, title: str, left: Tuple[int, int], right: Tuple[int, int]) -> None:
self.left_start, self.left_count = left self.left_start, self.left_count = left
self.right_start, self.right_count = right self.right_start, self.right_count = right
self.left_start -= 1 # 0-index self.left_start -= 1 # 0-index
self.right_start -= 1 # 0-index self.right_start -= 1 # 0-index
self.title = title self.title = title
self.added_count = self.removed_count = 0 self.added_count = self.removed_count = 0
self.chunks = [] self.chunks: List[Chunk] = []
self.current_chunk: Optional[Chunk] = None self.current_chunk: Optional[Chunk] = None
self.largest_line_number = max(self.left_start + self.left_count, self.right_start + self.right_count) self.largest_line_number = max(self.left_start + self.left_count, self.right_start + self.right_count)
def new_chunk(self, is_context=False): def new_chunk(self, is_context: bool = False) -> Chunk:
if self.chunks: if self.chunks:
c = self.chunks[-1] c = self.chunks[-1]
left_start = c.left_start + c.left_count left_start = c.left_start + c.left_count
@ -113,38 +113,38 @@ class Hunk:
right_start = self.right_start right_start = self.right_start
return Chunk(left_start, right_start, is_context) return Chunk(left_start, right_start, is_context)
def ensure_diff_chunk(self): def ensure_diff_chunk(self) -> None:
if self.current_chunk is None: if self.current_chunk is None:
self.current_chunk = self.new_chunk(is_context=False) self.current_chunk = self.new_chunk(is_context=False)
elif self.current_chunk.is_context: elif self.current_chunk.is_context:
self.chunks.append(self.current_chunk) self.chunks.append(self.current_chunk)
self.current_chunk = self.new_chunk(is_context=False) self.current_chunk = self.new_chunk(is_context=False)
def ensure_context_chunk(self): def ensure_context_chunk(self) -> None:
if self.current_chunk is None: if self.current_chunk is None:
self.current_chunk = self.new_chunk(is_context=True) self.current_chunk = self.new_chunk(is_context=True)
elif not self.current_chunk.is_context: elif not self.current_chunk.is_context:
self.chunks.append(self.current_chunk) self.chunks.append(self.current_chunk)
self.current_chunk = self.new_chunk(is_context=True) self.current_chunk = self.new_chunk(is_context=True)
def add_line(self): def add_line(self) -> None:
self.ensure_diff_chunk() self.ensure_diff_chunk()
if self.current_chunk is not None: if self.current_chunk is not None:
self.current_chunk.add_line() self.current_chunk.add_line()
self.added_count += 1 self.added_count += 1
def remove_line(self): def remove_line(self) -> None:
self.ensure_diff_chunk() self.ensure_diff_chunk()
if self.current_chunk is not None: if self.current_chunk is not None:
self.current_chunk.remove_line() self.current_chunk.remove_line()
self.removed_count += 1 self.removed_count += 1
def context_line(self): def context_line(self) -> None:
self.ensure_context_chunk() self.ensure_context_chunk()
if self.current_chunk is not None: if self.current_chunk is not None:
self.current_chunk.context_line() self.current_chunk.context_line()
def finalize(self): def finalize(self) -> None:
if self.current_chunk is not None: if self.current_chunk is not None:
self.chunks.append(self.current_chunk) self.chunks.append(self.current_chunk)
del self.current_chunk del self.current_chunk
@ -158,14 +158,14 @@ class Hunk:
c.finalize() c.finalize()
def parse_range(x): def parse_range(x: str) -> Tuple[int, int]:
parts = x[1:].split(',', 1) parts = x[1:].split(',', 1)
start = abs(int(parts[0])) start = abs(int(parts[0]))
count = 1 if len(parts) < 2 else int(parts[1]) count = 1 if len(parts) < 2 else int(parts[1])
return start, count return start, count
def parse_hunk_header(line): def parse_hunk_header(line: str) -> Hunk:
parts: Tuple[str, ...] = tuple(filter(None, line.split('@@', 2))) parts: Tuple[str, ...] = tuple(filter(None, line.split('@@', 2)))
linespec = parts[0].strip() linespec = parts[0].strip()
title = '' title = ''
@ -177,20 +177,20 @@ def parse_hunk_header(line):
class Patch: class Patch:
def __init__(self, all_hunks): def __init__(self, all_hunks: Sequence[Hunk]):
self.all_hunks = all_hunks self.all_hunks = all_hunks
self.largest_line_number = self.all_hunks[-1].largest_line_number if self.all_hunks else 0 self.largest_line_number = self.all_hunks[-1].largest_line_number if self.all_hunks else 0
self.added_count = sum(h.added_count for h in all_hunks) self.added_count = sum(h.added_count for h in all_hunks)
self.removed_count = sum(h.removed_count for h in all_hunks) self.removed_count = sum(h.removed_count for h in all_hunks)
def __iter__(self): def __iter__(self) -> Iterator[Hunk]:
return iter(self.all_hunks) return iter(self.all_hunks)
def __len__(self): def __len__(self) -> int:
return len(self.all_hunks) return len(self.all_hunks)
def parse_patch(raw): def parse_patch(raw: str) -> Patch:
all_hunks = [] all_hunks = []
current_hunk = None current_hunk = None
for line in raw.splitlines(): for line in raw.splitlines():
@ -218,19 +218,19 @@ class Differ:
diff_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None diff_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None
def __init__(self): def __init__(self) -> None:
self.jmap = {} self.jmap: Dict[str, str] = {}
self.jobs = [] self.jobs: List[str] = []
if Differ.diff_executor is None: if Differ.diff_executor is None:
Differ.diff_executor = self.diff_executor = concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) Differ.diff_executor = self.diff_executor = concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count())
def add_diff(self, file1, file2): def add_diff(self, file1: str, file2: str) -> None:
self.jmap[file1] = file2 self.jmap[file1] = file2
self.jobs.append(file1) self.jobs.append(file1)
def __call__(self, context=3): def __call__(self, context: int = 3) -> Union[str, Dict[str, Patch]]:
global left_lines, right_lines global left_lines, right_lines
ans = {} ans: Dict[str, Patch] = {}
executor = self.diff_executor executor = self.diff_executor
assert executor is not None assert executor is not None
jobs = {executor.submit(run_diff, key, self.jmap[key], context): key for key in self.jobs} jobs = {executor.submit(run_diff, key, self.jmap[key], context): key for key in self.jobs}

View File

@ -7,17 +7,20 @@ from functools import lru_cache
from gettext import gettext as _ from gettext import gettext as _
from itertools import repeat, zip_longest from itertools import repeat, zip_longest
from math import ceil from math import ceil
from typing import Callable, Iterable, List, Optional from typing import Callable, Dict, Generator, Iterable, List, Optional, Tuple
from kitty.fast_data_types import truncate_point_for_length, wcswidth from kitty.fast_data_types import truncate_point_for_length, wcswidth
from kitty.cli_stub import DiffCLIOptions
from kitty.utils import ScreenSize
from .collect import ( from .collect import (
Segment, data_for_path, highlights_for_path, is_image, lines_for_path, Collection, Segment, data_for_path, highlights_for_path, is_image,
path_name_map, sanitize lines_for_path, path_name_map, sanitize
) )
from .config import formats from .config import formats
from .diff_speedup import split_with_highlights as _split_with_highlights from .diff_speedup import split_with_highlights as _split_with_highlights
from ..tui.images import can_display_images from .patch import Chunk, Hunk, Patch
from ..tui.images import ImageManager, can_display_images
class ImageSupportWarning(Warning): class ImageSupportWarning(Warning):
@ -25,7 +28,7 @@ class ImageSupportWarning(Warning):
@lru_cache(maxsize=2) @lru_cache(maxsize=2)
def images_supported(): def images_supported() -> bool:
ans = can_display_images() ans = can_display_images()
if not ans: if not ans:
warnings.warn('ImageMagick not found images cannot be displayed', ImageSupportWarning) warnings.warn('ImageMagick not found images cannot be displayed', ImageSupportWarning)
@ -34,10 +37,10 @@ def images_supported():
class Ref: class Ref:
def __setattr__(self, name, value): def __setattr__(self, name: str, value: object) -> None:
raise AttributeError("can't set attribute") raise AttributeError("can't set attribute")
def __repr__(self): def __repr__(self) -> str:
return '{}({})'.format(self.__class__.__name__, ', '.join( return '{}({})'.format(self.__class__.__name__, ', '.join(
'{}={}'.format(n, getattr(self, n)) for n in self.__slots__ if n != '_hash')) '{}={}'.format(n, getattr(self, n)) for n in self.__slots__ if n != '_hash'))
@ -48,7 +51,7 @@ class LineRef(Ref):
src_line_number: int src_line_number: int
wrapped_line_idx: int wrapped_line_idx: int
def __init__(self, sln, wli=0): def __init__(self, sln: int, wli: int = 0) -> None:
object.__setattr__(self, 'src_line_number', sln) object.__setattr__(self, 'src_line_number', sln)
object.__setattr__(self, 'wrapped_line_idx', wli) object.__setattr__(self, 'wrapped_line_idx', wli)
@ -59,7 +62,7 @@ class Reference(Ref):
path: str path: str
extra: Optional[LineRef] extra: Optional[LineRef]
def __init__(self, path, extra=None): def __init__(self, path: str, extra: Optional[LineRef] = None) -> None:
object.__setattr__(self, 'path', path) object.__setattr__(self, 'path', path)
object.__setattr__(self, 'extra', extra) object.__setattr__(self, 'extra', extra)
@ -68,35 +71,41 @@ class Line:
__slots__ = ('text', 'ref', 'is_change_start', 'image_data') __slots__ = ('text', 'ref', 'is_change_start', 'image_data')
def __init__(self, text, ref, change_start=False, image_data=None): def __init__(
self,
text: str,
ref: Reference,
change_start: bool = False,
image_data: Optional[Tuple[Optional['ImagePlacement'], Optional['ImagePlacement']]] = None
) -> None:
self.text = text self.text = text
self.ref = ref self.ref = ref
self.is_change_start = change_start self.is_change_start = change_start
self.image_data = image_data self.image_data = image_data
def yield_lines_from(iterator, reference, is_change_start=True): def yield_lines_from(iterator: Iterable[str], reference: Reference, is_change_start: bool = True) -> Generator[Line, None, None]:
for text in iterator: for text in iterator:
yield Line(text, reference, is_change_start) yield Line(text, reference, is_change_start)
is_change_start = False is_change_start = False
def human_readable(size, sep=' '): def human_readable(size: int, sep: str = ' ') -> str:
""" Convert a size in bytes into a human readable form """ """ Convert a size in bytes into a human readable form """
divisor, suffix = 1, "B" divisor, suffix = 1, "B"
for i, candidate in enumerate(('B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB')): for i, candidate in enumerate(('B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB')):
if size < (1 << ((i + 1) * 10)): if size < (1 << ((i + 1) * 10)):
divisor, suffix = (1 << (i * 10)), candidate divisor, suffix = (1 << (i * 10)), candidate
break break
size = str(float(size)/divisor) s = str(float(size)/divisor)
if size.find(".") > -1: if s.find(".") > -1:
size = size[:size.find(".")+2] s = s[:s.find(".")+2]
if size.endswith('.0'): if s.endswith('.0'):
size = size[:-2] s = s[:-2]
return size + sep + suffix return s + sep + suffix
def fit_in(text, count): def fit_in(text: str, count: int) -> str:
p = truncate_point_for_length(text, count) p = truncate_point_for_length(text, count)
if p >= len(text): if p >= len(text):
return text return text
@ -105,14 +114,14 @@ def fit_in(text, count):
return text[:p] + '' return text[:p] + ''
def fill_in(text, sz): def fill_in(text: str, sz: int) -> str:
w = wcswidth(text) w = wcswidth(text)
if w < sz: if w < sz:
text += ' ' * (sz - w) text += ' ' * (sz - w)
return text return text
def place_in(text, sz): def place_in(text: str, sz: int) -> str:
return fill_in(fit_in(text, sz), sz) return fill_in(fit_in(text, sz), sz)
@ -137,39 +146,41 @@ hunk_format = format_func('hunk')
highlight_map = {'remove': ('removed_highlight', 'removed'), 'add': ('added_highlight', 'added')} highlight_map = {'remove': ('removed_highlight', 'removed'), 'add': ('added_highlight', 'added')}
def highlight_boundaries(ltype): def highlight_boundaries(ltype: str) -> Tuple[str, str]:
s, e = highlight_map[ltype] s, e = highlight_map[ltype]
start = '\x1b[' + formats[s] + 'm' start = '\x1b[' + formats[s] + 'm'
stop = '\x1b[' + formats[e] + 'm' stop = '\x1b[' + formats[e] + 'm'
return start, stop return start, stop
def title_lines(left_path, right_path, args, columns, margin_size): def title_lines(left_path: Optional[str], right_path: Optional[str], args: DiffCLIOptions, columns: int, margin_size: int) -> Generator[str, None, None]:
m = ' ' * margin_size m = ' ' * margin_size
left_name, right_name = map(path_name_map.get, (left_path, right_path)) left_name = path_name_map.get(left_path) if left_path else None
right_name = path_name_map.get(right_path) if right_path else None
if right_name and right_name != left_name: if right_name and right_name != left_name:
n1 = fit_in(m + sanitize(left_name), columns // 2 - margin_size) n1 = fit_in(m + sanitize(left_name or ''), columns // 2 - margin_size)
n1 = place_in(n1, columns // 2) n1 = place_in(n1, columns // 2)
n2 = fit_in(m + sanitize(right_name), columns // 2 - margin_size) n2 = fit_in(m + sanitize(right_name), columns // 2 - margin_size)
n2 = place_in(n2, columns // 2) n2 = place_in(n2, columns // 2)
name = n1 + n2 name = n1 + n2
else: else:
name = place_in(m + sanitize(left_name), columns) name = place_in(m + sanitize(left_name or ''), columns)
yield title_format(place_in(name, columns)) yield title_format(place_in(name, columns))
yield title_format('' * columns) yield title_format('' * columns)
def binary_lines(path, other_path, columns, margin_size): def binary_lines(path: Optional[str], other_path: Optional[str], columns: int, margin_size: int) -> Generator[str, None, None]:
template = _('Binary file: {}') template = _('Binary file: {}')
available_cols = columns // 2 - margin_size available_cols = columns // 2 - margin_size
def fl(path, fmt): def fl(path: str, fmt: Callable[[str], str]) -> str:
text = template.format(human_readable(len(data_for_path(path)))) text = template.format(human_readable(len(data_for_path(path))))
text = place_in(text, available_cols) text = place_in(text, available_cols)
return margin_format(' ' * margin_size) + fmt(text) return margin_format(' ' * margin_size) + fmt(text)
if path is None: if path is None:
filler = render_diff_line('', '', 'filler', margin_size, available_cols) filler = render_diff_line('', '', 'filler', margin_size, available_cols)
assert other_path is not None
yield filler + fl(other_path, added_format) yield filler + fl(other_path, added_format)
elif other_path is None: elif other_path is None:
filler = render_diff_line('', '', 'filler', margin_size, available_cols) filler = render_diff_line('', '', 'filler', margin_size, available_cols)
@ -178,7 +189,7 @@ def binary_lines(path, other_path, columns, margin_size):
yield fl(path, removed_format) + fl(other_path, added_format) yield fl(path, removed_format) + fl(other_path, added_format)
def split_to_size(line, width): def split_to_size(line: str, width: int) -> Generator[str, None, None]:
if not line: if not line:
yield line yield line
while line: while line:
@ -187,7 +198,7 @@ def split_to_size(line, width):
line = line[p:] line = line[p:]
def truncate_points(line, width): def truncate_points(line: str, width: int) -> Generator[int, None, None]:
pos = 0 pos = 0
sz = len(line) sz = len(line)
while True: while True:
@ -198,7 +209,7 @@ def truncate_points(line, width):
break break
def split_with_highlights(line, width, highlights, bg_highlight=None): def split_with_highlights(line: str, width: int, highlights: List, bg_highlight: Optional[Segment] = None) -> List:
truncate_pts = list(truncate_points(line, width)) truncate_pts = list(truncate_points(line, width))
return _split_with_highlights(line, truncate_pts, highlights, bg_highlight) return _split_with_highlights(line, truncate_pts, highlights, bg_highlight)
@ -209,7 +220,7 @@ text_bg_map = {'filler': filler_format, 'remove': removed_format, 'add': added_f
class DiffData: class DiffData:
def __init__(self, left_path, right_path, available_cols, margin_size): def __init__(self, left_path: str, right_path: str, available_cols: int, margin_size: int):
self.left_path, self.right_path = left_path, right_path self.left_path, self.right_path = left_path, right_path
self.available_cols = available_cols self.available_cols = available_cols
self.margin_size = margin_size self.margin_size = margin_size
@ -220,24 +231,28 @@ class DiffData:
self.left_hdata = highlights_for_path(left_path) self.left_hdata = highlights_for_path(left_path)
self.right_hdata = highlights_for_path(right_path) self.right_hdata = highlights_for_path(right_path)
def left_highlights_for_line(self, line_num): def left_highlights_for_line(self, line_num: int) -> List[Segment]:
if line_num < len(self.left_hdata): if line_num < len(self.left_hdata):
return self.left_hdata[line_num] return self.left_hdata[line_num]
return [] return []
def right_highlights_for_line(self, line_num): def right_highlights_for_line(self, line_num: int) -> List[Segment]:
if line_num < len(self.right_hdata): if line_num < len(self.right_hdata):
return self.right_hdata[line_num] return self.right_hdata[line_num]
return [] return []
def render_diff_line(number, text, ltype, margin_size, available_cols) -> str: def render_diff_line(number: Optional[str], text: str, ltype: str, margin_size: int, available_cols: int) -> str:
margin = margin_bg_map[ltype](place_in(number, margin_size)) margin = margin_bg_map[ltype](place_in(number or '', margin_size))
content = text_bg_map[ltype](fill_in(text or '', available_cols)) content = text_bg_map[ltype](fill_in(text or '', available_cols))
return margin + content return margin + content
def render_diff_pair(left_line_number, left, left_is_change, right_line_number, right, right_is_change, is_first, margin_size, available_cols): def render_diff_pair(
left_line_number: Optional[str], left: str, left_is_change: bool,
right_line_number: Optional[str], right: str, right_is_change: bool,
is_first: bool, margin_size: int, available_cols: int
) -> str:
ltype = 'filler' if left_line_number is None else ('remove' if left_is_change else 'context') ltype = 'filler' if left_line_number is None else ('remove' if left_is_change else 'context')
rtype = 'filler' if right_line_number is None else ('add' if right_is_change else 'context') rtype = 'filler' if right_line_number is None else ('add' if right_is_change else 'context')
return ( return (
@ -246,14 +261,22 @@ def render_diff_pair(left_line_number, left, left_is_change, right_line_number,
) )
def hunk_title(hunk_num, hunk, margin_size, available_cols): def hunk_title(hunk_num: int, hunk: Hunk, margin_size: int, available_cols: int) -> str:
m = hunk_margin_format(' ' * margin_size) m = hunk_margin_format(' ' * margin_size)
t = '@@ -{},{} +{},{} @@ {}'.format(hunk.left_start + 1, hunk.left_count, hunk.right_start + 1, hunk.right_count, hunk.title) t = '@@ -{},{} +{},{} @@ {}'.format(hunk.left_start + 1, hunk.left_count, hunk.right_start + 1, hunk.right_count, hunk.title)
return m + hunk_format(place_in(t, available_cols)) return m + hunk_format(place_in(t, available_cols))
def render_half_line(line_number, line, highlights, ltype, margin_size, available_cols, changed_center=None): def render_half_line(
bg_highlight = None line_number: int,
line: str,
highlights: List,
ltype: str,
margin_size: int,
available_cols: int,
changed_center: Optional[Tuple[int, int]] = None
) -> Generator[str, None, None]:
bg_highlight: Optional[Segment] = None
if changed_center is not None and changed_center[0]: if changed_center is not None and changed_center[0]:
prefix_count, suffix_count = changed_center prefix_count, suffix_count = changed_center
line_sz = len(line) line_sz = len(line)
@ -264,36 +287,36 @@ def render_half_line(line_number, line, highlights, ltype, margin_size, availabl
seg.end_code = stop seg.end_code = stop
bg_highlight = seg bg_highlight = seg
if highlights or bg_highlight: if highlights or bg_highlight:
lines = split_with_highlights(line, available_cols, highlights, bg_highlight) lines: Iterable[str] = split_with_highlights(line, available_cols, highlights, bg_highlight)
else: else:
lines = split_to_size(line, available_cols) lines = split_to_size(line, available_cols)
line_number = str(line_number + 1) lnum = str(line_number + 1)
for line in lines: for line in lines:
yield render_diff_line(line_number, line, ltype, margin_size, available_cols) yield render_diff_line(lnum, line, ltype, margin_size, available_cols)
line_number = '' lnum = ''
def lines_for_chunk(data, hunk_num, chunk, chunk_num): def lines_for_chunk(data: DiffData, hunk_num: int, chunk: Chunk, chunk_num: int) -> Generator[Line, None, None]:
if chunk.is_context: if chunk.is_context:
for i in range(chunk.left_count): for i in range(chunk.left_count):
left_line_number = line_ref = chunk.left_start + i left_line_number = line_ref = chunk.left_start + i
right_line_number = chunk.right_start + i right_line_number = chunk.right_start + i
highlights = data.left_highlights_for_line(left_line_number) highlights = data.left_highlights_for_line(left_line_number)
if highlights: if highlights:
lines = split_with_highlights(data.left_lines[left_line_number], data.available_cols, highlights) lines: Iterable[str] = split_with_highlights(data.left_lines[left_line_number], data.available_cols, highlights)
else: else:
lines = split_to_size(data.left_lines[left_line_number], data.available_cols) lines = split_to_size(data.left_lines[left_line_number], data.available_cols)
left_line_number = str(left_line_number + 1) left_line_number_s = str(left_line_number + 1)
right_line_number = str(right_line_number + 1) right_line_number_s = str(right_line_number + 1)
for wli, text in enumerate(lines): for wli, text in enumerate(lines):
line = render_diff_line(left_line_number, text, 'context', data.margin_size, data.available_cols) line = render_diff_line(left_line_number_s, text, 'context', data.margin_size, data.available_cols)
if right_line_number == left_line_number: if right_line_number_s == left_line_number_s:
r = line r = line
else: else:
r = render_diff_line(right_line_number, text, 'context', data.margin_size, data.available_cols) r = render_diff_line(right_line_number_s, text, 'context', data.margin_size, data.available_cols)
ref = Reference(data.left_path, LineRef(line_ref, wli)) ref = Reference(data.left_path, LineRef(line_ref, wli))
yield Line(line + r, ref) yield Line(line + r, ref)
left_line_number = right_line_number = '' left_line_number_s = right_line_number_s = ''
else: else:
common = min(chunk.left_count, chunk.right_count) common = min(chunk.left_count, chunk.right_count)
for i in range(max(chunk.left_count, chunk.right_count)): for i in range(max(chunk.left_count, chunk.right_count)):
@ -333,7 +356,7 @@ def lines_for_chunk(data, hunk_num, chunk, chunk_num):
yield Line(left_line + right_line, ref, i == 0 and wli == 0) yield Line(left_line + right_line, ref, i == 0 and wli == 0)
def lines_for_diff(left_path, right_path, hunks, args, columns, margin_size): def lines_for_diff(left_path: str, right_path: str, hunks: Iterable[Hunk], args: DiffCLIOptions, columns: int, margin_size: int) -> Generator[Line, None, None]:
available_cols = columns // 2 - margin_size available_cols = columns // 2 - margin_size
data = DiffData(left_path, right_path, available_cols, margin_size) data = DiffData(left_path, right_path, available_cols, margin_size)
@ -343,7 +366,7 @@ def lines_for_diff(left_path, right_path, hunks, args, columns, margin_size):
yield from lines_for_chunk(data, hunk_num, chunk, cnum) yield from lines_for_chunk(data, hunk_num, chunk, cnum)
def all_lines(path, args, columns, margin_size, is_add=True): def all_lines(path: str, args: DiffCLIOptions, columns: int, margin_size: int, is_add: bool = True) -> Generator[Line, None, None]:
available_cols = columns // 2 - margin_size available_cols = columns // 2 - margin_size
ltype = 'add' if is_add else 'remove' ltype = 'add' if is_add else 'remove'
lines = lines_for_path(path) lines = lines_for_path(path)
@ -351,7 +374,7 @@ def all_lines(path, args, columns, margin_size, is_add=True):
msg_written = False msg_written = False
hdata = highlights_for_path(path) hdata = highlights_for_path(path)
def highlights(num): def highlights(num: int) -> List[Segment]:
return hdata[num] if num < len(hdata) else [] return hdata[num] if num < len(hdata) else []
for line_number, line in enumerate(lines): for line_number, line in enumerate(lines):
@ -368,7 +391,7 @@ def all_lines(path, args, columns, margin_size, is_add=True):
yield Line(text, ref, line_number == 0 and i == 0) yield Line(text, ref, line_number == 0 and i == 0)
def rename_lines(path, other_path, args, columns, margin_size): def rename_lines(path: str, other_path: str, args: DiffCLIOptions, columns: int, margin_size: int) -> Generator[str, None, None]:
m = ' ' * margin_size m = ' ' * margin_size
for line in split_to_size(_('The file {0} was renamed to {1}').format( for line in split_to_size(_('The file {0} was renamed to {1}').format(
sanitize(path_name_map[path]), sanitize(path_name_map[other_path])), columns - margin_size): sanitize(path_name_map[path]), sanitize(path_name_map[other_path])), columns - margin_size):
@ -377,7 +400,7 @@ def rename_lines(path, other_path, args, columns, margin_size):
class Image: class Image:
def __init__(self, image_id, width, height, margin_size, screen_size): def __init__(self, image_id: int, width: int, height: int, margin_size: int, screen_size: ScreenSize) -> None:
self.image_id = image_id self.image_id = image_id
self.width, self.height = width, height self.width, self.height = width, height
self.rows = int(ceil(self.height / screen_size.cell_height)) self.rows = int(ceil(self.height / screen_size.cell_height))
@ -387,18 +410,23 @@ class Image:
class ImagePlacement: class ImagePlacement:
def __init__(self, image, row): def __init__(self, image: Image, row: int) -> None:
self.image = image self.image = image
self.row = row self.row = row
def render_image(path, is_left, available_cols, margin_size, image_manager): def render_image(
path: str,
is_left: bool,
available_cols: int, margin_size: int,
image_manager: ImageManager
) -> Generator[Tuple[str, Reference, Optional[ImagePlacement]], None, None]:
lnum = 0 lnum = 0
margin_fmt = removed_margin_format if is_left else added_margin_format margin_fmt = removed_margin_format if is_left else added_margin_format
m = margin_fmt(' ' * margin_size) m = margin_fmt(' ' * margin_size)
fmt = removed_format if is_left else added_format fmt = removed_format if is_left else added_format
def yield_split(text): def yield_split(text: str) -> Generator[Tuple[str, Reference, Optional[ImagePlacement]], None, None]:
nonlocal lnum nonlocal lnum
for i, line in enumerate(split_to_size(text, available_cols)): for i, line in enumerate(split_to_size(text, available_cols)):
yield m + fmt(place_in(line, available_cols)), Reference(path, LineRef(lnum, i)), None yield m + fmt(place_in(line, available_cols)), Reference(path, LineRef(lnum, i)), None
@ -420,10 +448,16 @@ def render_image(path, is_left, available_cols, margin_size, image_manager):
lnum += 1 lnum += 1
def image_lines(left_path, right_path, columns, margin_size, image_manager): def image_lines(
left_path: Optional[str],
right_path: Optional[str],
columns: int,
margin_size: int,
image_manager: ImageManager
) -> Generator[Line, None, None]:
available_cols = columns // 2 - margin_size available_cols = columns // 2 - margin_size
left_lines: Iterable[str] = iter(()) left_lines: Iterable[Tuple[str, Reference, Optional[ImagePlacement]]] = iter(())
right_lines: Iterable[str] = iter(()) right_lines: Iterable[Tuple[str, Reference, Optional[ImagePlacement]]] = iter(())
if left_path is not None: if left_path is not None:
left_lines = render_image(left_path, True, available_cols, margin_size, image_manager) left_lines = render_image(left_path, True, available_cols, margin_size, image_manager)
if right_path is not None: if right_path is not None:
@ -450,7 +484,14 @@ class RenderDiff:
margin_size: int = 0 margin_size: int = 0
def __call__(self, collection, diff_map, args, columns, image_manager): def __call__(
self,
collection: Collection,
diff_map: Dict[str, Patch],
args: DiffCLIOptions,
columns: int,
image_manager: ImageManager
) -> Generator[Line, None, None]:
largest_line_number = 0 largest_line_number = 0
for path, item_type, other_path in collection: for path, item_type, other_path in collection:
if item_type == 'diff': if item_type == 'diff':
@ -475,6 +516,7 @@ class RenderDiff:
else: else:
ans = yield_lines_from(binary_lines(path, other_path, columns, margin_size), item_ref) ans = yield_lines_from(binary_lines(path, other_path, columns, margin_size), item_ref)
else: else:
assert other_path is not None
ans = lines_for_diff(path, other_path, diff_map[path], args, columns, margin_size) ans = lines_for_diff(path, other_path, diff_map[path], args, columns, margin_size)
elif item_type == 'add': elif item_type == 'add':
if is_binary: if is_binary:
@ -493,6 +535,7 @@ class RenderDiff:
else: else:
ans = all_lines(path, args, columns, margin_size, is_add=False) ans = all_lines(path, args, columns, margin_size, is_add=False)
elif item_type == 'rename': elif item_type == 'rename':
assert other_path is not None
ans = yield_lines_from(rename_lines(path, other_path, args, columns, margin_size), item_ref) ans = yield_lines_from(rename_lines(path, other_path, args, columns, margin_size), item_ref)
else: else:
raise ValueError('Unsupported item type: {}'.format(item_type)) raise ValueError('Unsupported item type: {}'.format(item_type))

View File

@ -12,11 +12,11 @@ if TYPE_CHECKING:
from kitty.utils import ScreenSize from kitty.utils import ScreenSize
from .loop import TermManager, Loop, Debug, MouseEvent from .loop import TermManager, Loop, Debug, MouseEvent
from .images import ImageManager from .images import ImageManager
from kitty.config import KeyAction from kitty.conf.utils import KittensKeyAction
from kitty.boss import Boss from kitty.boss import Boss
from kitty.key_encoding import KeyEvent from kitty.key_encoding import KeyEvent
from types import TracebackType from types import TracebackType
ScreenSize, TermManager, Loop, Debug, KeyAction, KeyEvent, MouseEvent, TracebackType, Boss, ImageManager ScreenSize, TermManager, Loop, Debug, KeyEvent, MouseEvent, TracebackType, Boss, ImageManager
import asyncio import asyncio
@ -51,7 +51,7 @@ class Handler:
def asyncio_loop(self) -> 'asyncio.AbstractEventLoop': def asyncio_loop(self) -> 'asyncio.AbstractEventLoop':
return self._tui_loop.asycio_loop return self._tui_loop.asycio_loop
def add_shortcut(self, action: 'KeyAction', key: str, mods: Optional[int] = None, is_text: Optional[bool] = False) -> None: def add_shortcut(self, action: 'KittensKeyAction', key: str, mods: Optional[int] = None, is_text: Optional[bool] = False) -> None:
if not hasattr(self, '_text_shortcuts'): if not hasattr(self, '_text_shortcuts'):
self._text_shortcuts, self._key_shortcuts = {}, {} self._text_shortcuts, self._key_shortcuts = {}, {}
if is_text: if is_text:
@ -59,7 +59,7 @@ class Handler:
else: else:
self._key_shortcuts[(key, mods or 0)] = action self._key_shortcuts[(key, mods or 0)] = action
def shortcut_action(self, key_event_or_text: Union[str, 'KeyEvent']) -> Optional['KeyAction']: def shortcut_action(self, key_event_or_text: Union[str, 'KeyEvent']) -> Optional['KittensKeyAction']:
if isinstance(key_event_or_text, str): if isinstance(key_event_or_text, str):
return self._text_shortcuts.get(key_event_or_text) return self._text_shortcuts.get(key_event_or_text)
return self._key_shortcuts.get((key_event_or_text.key, key_event_or_text.mods)) return self._key_shortcuts.get((key_event_or_text.key, key_event_or_text.mods))

View File

@ -306,7 +306,10 @@ def parse_kittens_shortcut(sc: str) -> Tuple[Optional[int], str, bool]:
return mods, rkey, is_text return mods, rkey, is_text
def parse_kittens_func_args(action: str, args_funcs: Dict[str, Callable]) -> Tuple[str, Tuple[str, ...]]: KittensKeyAction = Tuple[str, Tuple[str, ...]]
def parse_kittens_func_args(action: str, args_funcs: Dict[str, Callable]) -> KittensKeyAction:
parts = action.strip().split(' ', 1) parts = action.strip().split(' ', 1)
func = parts[0] func = parts[0]
if len(parts) == 1: if len(parts) == 1:
@ -332,12 +335,15 @@ def parse_kittens_func_args(action: str, args_funcs: Dict[str, Callable]) -> Tup
return func, tuple(args) return func, tuple(args)
KittensKey = Tuple[str, Optional[int], bool]
def parse_kittens_key( def parse_kittens_key(
val: str, funcs_with_args: Dict[str, Callable] val: str, funcs_with_args: Dict[str, Callable]
) -> Optional[Tuple[Tuple[str, Tuple[str, ...]], str, Optional[int], bool]]: ) -> Optional[Tuple[KittensKeyAction, KittensKey]]:
sc, action = val.partition(' ')[::2] sc, action = val.partition(' ')[::2]
if not sc or not action: if not sc or not action:
return None return None
mods, key, is_text = parse_kittens_shortcut(sc) mods, key, is_text = parse_kittens_shortcut(sc)
ans = parse_kittens_func_args(action, funcs_with_args) ans = parse_kittens_func_args(action, funcs_with_args)
return ans, key, mods, is_text return ans, (key, mods, is_text)

View File

@ -29,7 +29,16 @@ def generate_stub():
) )
from kittens.diff.config_data import all_options from kittens.diff.config_data import all_options
text += as_type_stub(all_options, class_name='DiffOptions') text += as_type_stub(
all_options,
class_name='DiffOptions',
preamble_lines=(
'from kitty.conf.utils import KittensKey, KittensKeyAction',
),
extra_fields=(
('key_definitions', 'typing.Dict[KittensKey, KittensKeyAction]'),
)
)
save_type_stub(text, __file__) save_type_stub(text, __file__)