kitty/kitty/window.py
2022-09-23 22:18:03 +05:30

1694 lines
65 KiB
Python

#!/usr/bin/env python3
# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
import json
import os
import re
import sys
import weakref
from collections import deque
from contextlib import suppress
from enum import Enum, IntEnum, auto
from functools import lru_cache, partial
from gettext import gettext as _
from itertools import chain
from time import monotonic
from typing import (
TYPE_CHECKING, Any, Callable, Deque, Dict, Iterable, List, NamedTuple, Optional,
Pattern, Sequence, Tuple, Union,
)
from .child import ProcessDesc
from .cli_stub import CLIOptions
from .clipboard import (
get_clipboard_string, get_primary_selection, set_clipboard_string,
set_primary_selection,
)
from .config import build_ansi_color_table
from .constants import (
appname, clear_handled_signals, config_dir, is_macos, wakeup_io_loop,
)
from .fast_data_types import (
BGIMAGE_PROGRAM, BLIT_PROGRAM, CELL_BG_PROGRAM, CELL_FG_PROGRAM, CELL_PROGRAM,
CELL_SPECIAL_PROGRAM, CURSOR_BEAM, CURSOR_BLOCK, CURSOR_UNDERLINE, DCS, DECORATION,
DECORATION_MASK, DIM, GLFW_MOD_CONTROL, GRAPHICS_ALPHA_MASK_PROGRAM,
GRAPHICS_PREMULT_PROGRAM, GRAPHICS_PROGRAM, MARK, MARK_MASK, NO_CURSOR_SHAPE,
NUM_UNDERLINE_STYLES, OSC, REVERSE, SCROLL_FULL, SCROLL_LINE, SCROLL_PAGE,
STRIKETHROUGH, TINT_PROGRAM, Color, KeyEvent, Screen, add_timer, add_window,
cell_size_for_window, click_mouse_cmd_output, click_mouse_url, compile_program,
current_os_window, encode_key_for_tty, get_boss, get_click_interval, get_options,
init_cell_program, mark_os_window_dirty, mouse_selection,
move_cursor_to_mouse_if_in_prompt, pt_to_px, set_titlebar_color, set_window_logo,
set_window_padding, set_window_render_data, update_ime_position_for_window,
update_window_title, update_window_visibility, wakeup_main_loop,
)
from .keys import keyboard_mode_name, mod_mask
from .notify import (
NotificationCommand, handle_notification_cmd, sanitize_identifier_pat,
)
from .options.types import Options
from .rgb import to_color
from .terminfo import get_capabilities
from .types import MouseEvent, OverlayType, WindowGeometry, ac, run_once
from .typing import BossType, ChildType, EdgeLiteral, TabType, TypedDict
from .utils import (
docs_url, kitty_ansi_sanitizer_pat, load_shaders, log_error, open_cmd, open_url,
parse_color_set, path_from_osc7_url, resolve_custom_file, resolved_shell,
sanitize_for_bracketed_paste, sanitize_title,
)
MatchPatternType = Union[Pattern[str], Tuple[Pattern[str], Optional[Pattern[str]]]]
if TYPE_CHECKING:
from .file_transmission import FileTransmission
class CwdRequestType(Enum):
current: int = auto()
last_reported: int = auto()
oldest: int = auto()
class CwdRequest:
def __init__(self, window: Optional['Window'] = None, request_type: CwdRequestType = CwdRequestType.current) -> None:
self.window_id = -1 if window is None else window.id
self.request_type = request_type
def __bool__(self) -> bool:
return self.window_id > -1
@property
def window(self) -> Optional['Window']:
return get_boss().window_id_map.get(self.window_id)
@property
def cwd_of_child(self) -> str:
window = self.window
if not window:
return ''
reported_cwd = path_from_osc7_url(window.screen.last_reported_cwd) if window.screen.last_reported_cwd else ''
if reported_cwd and not window.child_is_remote and (self.request_type is CwdRequestType.last_reported or window.at_prompt):
return reported_cwd
return window.get_cwd_of_child(oldest=self.request_type is CwdRequestType.oldest) or ''
def modify_argv_for_launch_with_cwd(self, argv: List[str]) -> str:
window = self.window
if not window:
return ''
reported_cwd = path_from_osc7_url(window.screen.last_reported_cwd) if window.screen.last_reported_cwd else ''
if reported_cwd:
# First check if we are running ssh kitten, and trying to open the configured login shell
if argv[0] == resolved_shell(get_options())[0]:
ssh_kitten_cmdline = window.ssh_kitten_cmdline()
if ssh_kitten_cmdline:
from kittens.ssh.utils import set_cwd_in_cmdline
argv[:] = ssh_kitten_cmdline
set_cwd_in_cmdline(reported_cwd, argv)
return ''
if not window.child_is_remote and (self.request_type is CwdRequestType.last_reported or window.at_prompt):
return reported_cwd
return window.get_cwd_of_child(oldest=self.request_type is CwdRequestType.oldest) or ''
def process_title_from_child(title: str, is_base64: bool) -> str:
if is_base64:
from base64 import standard_b64decode
try:
title = standard_b64decode(title).decode('utf-8', 'replace')
except Exception:
title = 'undecodeable title'
return sanitize_title(title)
@lru_cache(maxsize=64)
def compile_match_query(exp: str, is_simple: bool = True) -> MatchPatternType:
if is_simple:
pat: MatchPatternType = re.compile(exp)
else:
kp, vp = exp.partition('=')[::2]
if vp:
pat = re.compile(kp), re.compile(vp)
else:
pat = re.compile(kp), None
return pat
class WindowDict(TypedDict):
id: int
is_focused: bool
title: str
pid: Optional[int]
cwd: str
cmdline: List[str]
env: Dict[str, str]
foreground_processes: List[ProcessDesc]
is_self: bool
lines: int
columns: int
class PipeData(TypedDict):
input_line_number: int
scrolled_by: int
cursor_x: int
cursor_y: int
lines: int
columns: int
text: str
class ClipboardPending(NamedTuple):
where: str
data: str
truncated: bool = False
class DynamicColor(IntEnum):
default_fg, default_bg, cursor_color, highlight_fg, highlight_bg = range(1, 6)
class CommandOutput(IntEnum):
last_run, first_on_screen, last_visited, last_non_empty = 0, 1, 2, 3
DYNAMIC_COLOR_CODES = {
10: DynamicColor.default_fg,
11: DynamicColor.default_bg,
12: DynamicColor.cursor_color,
17: DynamicColor.highlight_bg,
19: DynamicColor.highlight_fg,
}
DYNAMIC_COLOR_CODES.update({k+100: v for k, v in DYNAMIC_COLOR_CODES.items()})
class Watcher:
def __call__(self, boss: BossType, window: 'Window', data: Dict[str, Any]) -> None:
pass
class Watchers:
on_resize: List[Watcher]
on_close: List[Watcher]
on_focus_change: List[Watcher]
def __init__(self) -> None:
self.on_resize = []
self.on_close = []
self.on_focus_change = []
def add(self, others: 'Watchers') -> None:
def merge(base: List[Watcher], other: List[Watcher]) -> None:
for x in other:
if x not in base:
base.append(x)
merge(self.on_resize, others.on_resize)
merge(self.on_close, others.on_close)
merge(self.on_focus_change, others.on_focus_change)
def clear(self) -> None:
del self.on_close[:], self.on_resize[:], self.on_focus_change[:]
def copy(self) -> 'Watchers':
ans = Watchers()
ans.on_close = self.on_close[:]
ans.on_resize = self.on_resize[:]
ans.on_focus_change = self.on_focus_change[:]
return ans
@property
def has_watchers(self) -> bool:
return bool(self.on_close or self.on_resize or self.on_focus_change)
def call_watchers(windowref: Callable[[], Optional['Window']], which: str, data: Dict[str, Any]) -> None:
def callback(timer_id: Optional[int]) -> None:
w = windowref()
if w is not None:
watchers: List[Watcher] = getattr(w.watchers, which)
w.call_watchers(watchers, data)
add_timer(callback, 0, False)
def pagerhist(screen: Screen, as_ansi: bool = False, add_wrap_markers: bool = True, upto_output_start: bool = False) -> str:
pht = screen.historybuf.pagerhist_as_text(upto_output_start)
if pht and (not as_ansi or not add_wrap_markers):
sanitizer = text_sanitizer(as_ansi, add_wrap_markers)
pht = sanitizer(pht)
return pht
def as_text(
screen: Screen,
as_ansi: bool = False,
add_history: bool = False,
add_wrap_markers: bool = False,
alternate_screen: bool = False,
add_cursor: bool = False
) -> str:
lines: List[str] = []
add_history = add_history and not (screen.is_using_alternate_linebuf() ^ alternate_screen)
if alternate_screen:
f = screen.as_text_alternate
else:
f = screen.as_text_non_visual if add_history else screen.as_text
f(lines.append, as_ansi, add_wrap_markers)
ctext = ''
if add_cursor:
ctext += '\x1b[?25' + ('h' if screen.cursor_visible else 'l')
ctext += f'\x1b[{screen.cursor.y + 1};{screen.cursor.x + 1}H'
shape = screen.cursor.shape
if shape == NO_CURSOR_SHAPE:
ctext += '\x1b[?12' + ('h' if screen.cursor.blink else 'l')
else:
code = {CURSOR_BLOCK: 1, CURSOR_UNDERLINE: 3, CURSOR_BEAM: 5}[shape]
if not screen.cursor.blink:
code += 1
ctext += f'\x1b[{code} q'
if add_history:
pht = pagerhist(screen, as_ansi, add_wrap_markers)
h: List[str] = [pht] if pht else []
screen.as_text_for_history_buf(h.append, as_ansi, add_wrap_markers)
if h:
if not screen.linebuf.is_continued(0):
h[-1] += '\n'
if as_ansi:
h[-1] += '\x1b[m'
ans = ''.join(chain(h, lines))
if ctext:
ans += ctext
return ans
ans = ''.join(lines)
if ctext:
ans += ctext
return ans
def multi_replace(src: str, **replacements: Any) -> str:
r = {k: str(v) for k, v in replacements.items()}
def sub(m: 're.Match[str]') -> str:
return r.get(m.group(1), m.group(1))
return re.sub(r'\{([A-Z_]+)\}', sub, src)
class LoadShaderPrograms:
def __call__(self, semi_transparent: bool = False) -> None:
compile_program(BLIT_PROGRAM, *load_shaders('blit'))
v, f = load_shaders('cell')
for which, p in {
'SIMPLE': CELL_PROGRAM,
'BACKGROUND': CELL_BG_PROGRAM,
'SPECIAL': CELL_SPECIAL_PROGRAM,
'FOREGROUND': CELL_FG_PROGRAM,
}.items():
ff = f.replace('{WHICH_PROGRAM}', which)
vv = multi_replace(
v,
WHICH_PROGRAM=which,
REVERSE_SHIFT=REVERSE,
STRIKE_SHIFT=STRIKETHROUGH,
DIM_SHIFT=DIM,
DECORATION_SHIFT=DECORATION,
MARK_SHIFT=MARK,
MARK_MASK=MARK_MASK,
DECORATION_MASK=DECORATION_MASK,
STRIKE_SPRITE_INDEX=NUM_UNDERLINE_STYLES + 1,
)
if semi_transparent:
vv = vv.replace('#define NOT_TRANSPARENT', '#define TRANSPARENT')
ff = ff.replace('#define NOT_TRANSPARENT', '#define TRANSPARENT')
compile_program(p, vv, ff)
v, f = load_shaders('graphics')
for which, p in {
'SIMPLE': GRAPHICS_PROGRAM,
'PREMULT': GRAPHICS_PREMULT_PROGRAM,
'ALPHA_MASK': GRAPHICS_ALPHA_MASK_PROGRAM,
}.items():
ff = f.replace('ALPHA_TYPE', which)
compile_program(p, v, ff)
v, f = load_shaders('bgimage')
compile_program(BGIMAGE_PROGRAM, v, f)
v, f = load_shaders('tint')
compile_program(TINT_PROGRAM, v, f)
init_cell_program()
load_shader_programs = LoadShaderPrograms()
def setup_colors(screen: Screen, opts: Options) -> None:
screen.color_profile.update_ansi_color_table(build_ansi_color_table(opts))
def s(c: Optional[Color]) -> int:
return 0 if c is None else (0xff000000 | int(c))
screen.color_profile.set_configured_colors(
s(opts.foreground), s(opts.background),
s(opts.cursor), s(opts.cursor_text_color),
s(opts.selection_foreground), s(opts.selection_background),
s(opts.visual_bell_color)
)
@run_once
def load_paste_filter() -> Callable[[str], str]:
import runpy
import traceback
try:
m = runpy.run_path(os.path.join(config_dir, 'paste-actions.py'))
func: Callable[[str], str] = m['filter_paste']
except Exception as e:
if not isinstance(e, FileNotFoundError):
traceback.print_exc()
log_error(f'Failed to load paste filter function with error: {e}')
def func(text: str) -> str:
return text
return func
def text_sanitizer(as_ansi: bool, add_wrap_markers: bool) -> Callable[[str], str]:
pat = kitty_ansi_sanitizer_pat()
ansi, wrap_markers = not as_ansi, not add_wrap_markers
def remove_wrap_markers(line: str) -> str:
return line.replace('\r', '')
def remove_sgr(line: str) -> str:
return str(pat.sub('', line))
def remove_both(line: str) -> str:
return str(pat.sub('', line.replace('\r', '')))
if ansi:
return remove_both if wrap_markers else remove_sgr
return remove_wrap_markers
def cmd_output(screen: Screen, which: CommandOutput = CommandOutput.last_run, as_ansi: bool = False, add_wrap_markers: bool = False) -> str:
lines: List[str] = []
search_in_pager_hist = screen.cmd_output(which, lines.append, as_ansi, add_wrap_markers)
if search_in_pager_hist:
pht = pagerhist(screen, as_ansi, add_wrap_markers, True)
if pht:
lines.insert(0, pht)
for i in range(min(len(lines), 3)):
x = lines[i]
if x.startswith('\x1b]133;C'):
lines[i] = x.partition('\\')[-1]
return ''.join(lines)
def process_remote_print(msg: str) -> str:
from base64 import standard_b64decode
from .cli import green
text = standard_b64decode(msg).decode('utf-8', 'replace')
return text.replace('\x1b', green(r'\e')).replace('\a', green(r'\a')).replace('\0', green(r'\0'))
class EdgeWidths:
left: Optional[float]
top: Optional[float]
right: Optional[float]
bottom: Optional[float]
def __init__(self, serialized: Optional[Dict[str, Optional[float]]] = None):
if serialized is not None:
self.left = serialized['left']
self.right = serialized['right']
self.top = serialized['top']
self.bottom = serialized['bottom']
else:
self.left = self.top = self.right = self.bottom = None
def serialize(self) -> Dict[str, Optional[float]]:
return {'left': self.left, 'right': self.right, 'top': self.top, 'bottom': self.bottom}
class GlobalWatchers:
def __init__(self) -> None:
self.options_spec: Optional[Dict[str, str]] = None
self.ans = Watchers()
self.extra = ''
def __call__(self) -> Watchers:
spec = get_options().watcher
if spec == self.options_spec:
return self.ans
from .launch import load_watch_modules
if self.extra:
spec = spec.copy()
spec[self.extra] = self.extra
self.ans = load_watch_modules(spec.keys()) or self.ans
self.options_spec = spec.copy()
return self.ans
def set_extra(self, extra: str) -> None:
self.extra = extra
global_watchers = GlobalWatchers()
class Window:
window_custom_type: str = ''
overlay_type = OverlayType.transient
def __init__(
self,
tab: TabType,
child: ChildType,
args: CLIOptions,
override_title: Optional[str] = None,
copy_colors_from: Optional['Window'] = None,
watchers: Optional[Watchers] = None,
allow_remote_control: bool = False,
remote_control_passwords: Optional[Dict[str, Sequence[str]]] = None,
):
if watchers:
self.watchers = watchers
self.watchers.add(global_watchers())
else:
self.watchers = global_watchers().copy()
self.last_focused_at = 0.
self.started_at = monotonic()
self.current_remote_data: List[str] = []
self.current_mouse_event_button = 0
self.current_clipboard_read_ask: Optional[bool] = None
self.prev_osc99_cmd = NotificationCommand()
self.actions_on_close: List[Callable[['Window'], None]] = []
self.actions_on_focus_change: List[Callable[['Window', bool], None]] = []
self.actions_on_removal: List[Callable[['Window'], None]] = []
self.current_marker_spec: Optional[Tuple[str, Union[str, Tuple[Tuple[int, str], ...]]]] = None
self.kitten_result_processors: List[Callable[['Window', Any], None]] = []
self.pty_resized_once = False
self.last_reported_pty_size = (-1, -1, -1, -1)
self.needs_attention = False
self.override_title = override_title
self.default_title = os.path.basename(child.argv[0] or appname)
self.child_title = self.default_title
self.title_stack: Deque[str] = deque(maxlen=10)
self.id: int = add_window(tab.os_window_id, tab.id, self.title)
self.margin = EdgeWidths()
self.padding = EdgeWidths()
self.kitten_result: Optional[Dict[str, Any]] = None
if not self.id:
raise Exception(f'No tab with id: {tab.id} in OS Window: {tab.os_window_id} was found, or the window counter wrapped')
self.tab_id = tab.id
self.os_window_id = tab.os_window_id
self.tabref: Callable[[], Optional[TabType]] = weakref.ref(tab)
self.clipboard_pending: Optional[ClipboardPending] = None
self.destroyed = False
self.geometry: WindowGeometry = WindowGeometry(0, 0, 0, 0, 0, 0)
self.needs_layout = True
self.is_visible_in_layout: bool = True
self.child = child
cell_width, cell_height = cell_size_for_window(self.os_window_id)
opts = get_options()
self.screen: Screen = Screen(self, 24, 80, opts.scrollback_lines, cell_width, cell_height, self.id)
if copy_colors_from is not None:
self.screen.copy_colors_from(copy_colors_from.screen)
else:
setup_colors(self.screen, opts)
self.remote_control_passwords = remote_control_passwords
self.allow_remote_control = allow_remote_control
def remote_control_allowed(self, pcmd: Dict[str, Any], extra_data: Dict[str, Any]) -> bool:
if not self.allow_remote_control:
return False
if not self.remote_control_passwords:
return True
pw = pcmd.get('password', '')
auth_items = self.remote_control_passwords.get(pw)
if pw == '!':
auth_items = None
if auth_items is None:
if '!' in self.remote_control_passwords:
raise PermissionError()
return False
from .remote_control import password_authorizer
pa = password_authorizer(auth_items)
if not pa.is_cmd_allowed(pcmd, self, False, extra_data):
raise PermissionError()
return True
@property
def file_transmission_control(self) -> 'FileTransmission':
ans: Optional['FileTransmission'] = getattr(self, '_file_transmission', None)
if ans is None:
from .file_transmission import FileTransmission
ans = self._file_transmission = FileTransmission(self.id)
return ans
def on_dpi_change(self, font_sz: float) -> None:
self.update_effective_padding()
def change_tab(self, tab: TabType) -> None:
self.tab_id = tab.id
self.os_window_id = tab.os_window_id
self.tabref = weakref.ref(tab)
def effective_margin(self, edge: EdgeLiteral, is_single_window: bool = False) -> int:
q = getattr(self.margin, edge)
if q is not None:
return pt_to_px(q, self.os_window_id)
opts = get_options()
if is_single_window:
q = getattr(opts.single_window_margin_width, edge)
if q > -0.1:
return pt_to_px(q, self.os_window_id)
q = getattr(opts.window_margin_width, edge)
return pt_to_px(q, self.os_window_id)
def effective_padding(self, edge: EdgeLiteral) -> int:
q = getattr(self.padding, edge)
if q is not None:
return pt_to_px(q, self.os_window_id)
q = getattr(get_options().window_padding_width, edge)
return pt_to_px(q, self.os_window_id)
def update_effective_padding(self) -> None:
set_window_padding(
self.os_window_id, self.tab_id, self.id,
self.effective_padding('left'), self.effective_padding('top'),
self.effective_padding('right'), self.effective_padding('bottom'))
def patch_edge_width(self, which: str, edge: EdgeLiteral, val: Optional[float]) -> None:
q = self.padding if which == 'padding' else self.margin
setattr(q, edge, val)
if q is self.padding:
self.update_effective_padding()
def effective_border(self) -> int:
val, unit = get_options().window_border_width
if unit == 'pt':
val = max(1 if val > 0 else 0, pt_to_px(val, self.os_window_id))
else:
val = round(val)
return int(val)
def apply_options(self) -> None:
opts = get_options()
self.update_effective_padding()
self.change_titlebar_color()
setup_colors(self.screen, opts)
@property
def title(self) -> str:
return self.override_title or self.child_title
def __repr__(self) -> str:
return f'Window(title={self.title}, id={self.id})'
def as_dict(self, is_focused: bool = False, is_self: bool = False) -> WindowDict:
return dict(
id=self.id,
is_focused=is_focused,
title=self.title,
pid=self.child.pid,
cwd=self.child.current_cwd or self.child.cwd,
cmdline=self.child.cmdline,
env=self.child.environ,
foreground_processes=self.child.foreground_processes,
is_self=is_self,
lines=self.screen.lines,
columns=self.screen.columns,
)
def serialize_state(self) -> Dict[str, Any]:
ans = {
'version': 1,
'id': self.id,
'child_title': self.child_title,
'override_title': self.override_title,
'default_title': self.default_title,
'title_stack': list(self.title_stack),
'allow_remote_control': self.allow_remote_control,
'remote_control_passwords': self.remote_control_passwords,
'cwd': self.child.current_cwd or self.child.cwd,
'env': self.child.environ,
'cmdline': self.child.cmdline,
'margin': self.margin.serialize(),
'padding': self.padding.serialize(),
}
if self.window_custom_type:
ans['window_custom_type'] = self.window_custom_type
if self.overlay_type is not OverlayType.transient:
ans['overlay_type'] = self.overlay_type.value
return ans
@property
def current_colors(self) -> Dict[str, Optional[int]]:
return self.screen.color_profile.as_dict()
@property
def at_prompt(self) -> bool:
return self.screen.cursor_at_prompt()
@property
def has_running_program(self) -> bool:
return not self.at_prompt
def matches(self, field: str, pat: MatchPatternType) -> bool:
if not pat:
return False
if field == 'env':
assert isinstance(pat, tuple)
key_pat, val_pat = pat
for key, val in self.child.environ.items():
if key_pat.search(key) is not None and (
val_pat is None or val_pat.search(val) is not None):
return True
return False
assert not isinstance(pat, tuple)
if field in ('id', 'window_id'):
return pat.pattern == str(self.id)
if field == 'pid':
return pat.pattern == str(self.child.pid)
if field == 'title':
return pat.search(self.override_title or self.title) is not None
if field in 'cwd':
return pat.search(self.child.current_cwd or self.child.cwd) is not None
if field == 'cmdline':
for x in self.child.cmdline:
if pat.search(x) is not None:
return True
return False
return False
def matches_query(self, field: str, query: str, active_tab: Optional[TabType] = None) -> bool:
if field in ('num', 'recent'):
if active_tab is not None:
try:
q = int(query)
except Exception:
return False
with suppress(Exception):
if field == 'num':
return active_tab.get_nth_window(q) is self
return active_tab.nth_active_window_id(q) == self.id
return False
if field == 'state':
if query == 'active':
return active_tab is not None and self is active_tab.active_window
if query == 'focused':
return active_tab is not None and self is active_tab.active_window and current_os_window() == self.os_window_id
if query == 'needs_attention':
return self.needs_attention
if query == 'parent_active':
return active_tab is not None and self.tabref() is active_tab
if query == 'parent_focused':
return active_tab is not None and self.tabref() is active_tab and current_os_window() == self.os_window_id
return False
pat = compile_match_query(query, field != 'env')
return self.matches(field, pat)
def set_visible_in_layout(self, val: bool) -> None:
val = bool(val)
if val is not self.is_visible_in_layout:
self.is_visible_in_layout = val
update_window_visibility(self.os_window_id, self.tab_id, self.id, val)
if val:
self.refresh()
def refresh(self) -> None:
self.screen.mark_as_dirty()
wakeup_io_loop()
wakeup_main_loop()
def set_geometry(self, new_geometry: WindowGeometry) -> None:
if self.destroyed:
return
if self.needs_layout or new_geometry.xnum != self.screen.columns or new_geometry.ynum != self.screen.lines:
self.screen.resize(new_geometry.ynum, new_geometry.xnum)
self.needs_layout = False
call_watchers(weakref.ref(self), 'on_resize', {'old_geometry': self.geometry, 'new_geometry': new_geometry})
current_pty_size = (
self.screen.lines, self.screen.columns,
max(0, new_geometry.right - new_geometry.left), max(0, new_geometry.bottom - new_geometry.top))
update_ime_position = False
if current_pty_size != self.last_reported_pty_size:
get_boss().child_monitor.resize_pty(self.id, *current_pty_size)
if not self.pty_resized_once:
self.pty_resized_once = True
self.child.mark_terminal_ready()
update_ime_position = True
self.last_reported_pty_size = current_pty_size
else:
mark_os_window_dirty(self.os_window_id)
self.geometry = g = new_geometry
set_window_render_data(self.os_window_id, self.tab_id, self.id, self.screen, *g[:4])
self.update_effective_padding()
if update_ime_position:
update_ime_position_for_window(self.id, True)
def contains(self, x: int, y: int) -> bool:
g = self.geometry
return g.left <= x <= g.right and g.top <= y <= g.bottom
def close(self) -> None:
get_boss().mark_window_for_close(self)
@ac('misc', '''
Send the specified text to the active window
See :sc:`send_text <send_text>` for details.
''')
def send_text(self, *args: str) -> bool:
mode = keyboard_mode_name(self.screen)
required_mode_, text = args[-2:]
required_mode = frozenset(required_mode_.split(','))
if not required_mode & {mode, 'all'}:
return True
if not text:
return True
self.write_to_child(text)
return False
@ac('debug', 'Show a dump of the current lines in the scrollback + screen with their line attributes')
def dump_lines_with_attrs(self) -> None:
strings: List[str] = []
self.screen.dump_lines_with_attrs(strings.append)
text = ''.join(strings)
get_boss().display_scrollback(self, text, title='Dump of lines', report_cursor=False)
def write_to_child(self, data: Union[str, bytes]) -> None:
if data:
if isinstance(data, str):
data = data.encode('utf-8')
if get_boss().child_monitor.needs_write(self.id, data) is not True:
log_error(f'Failed to write to child {self.id} as it does not exist')
def title_updated(self) -> None:
update_window_title(self.os_window_id, self.tab_id, self.id, self.title)
t = self.tabref()
if t is not None:
t.title_changed(self)
def set_title(self, title: Optional[str]) -> None:
if title:
title = sanitize_title(title)
self.override_title = title or None
self.title_updated()
def desktop_notify(self, osc_code: int, raw_data: str) -> None:
if osc_code == 777:
if not raw_data.startswith('notify;'):
log_error(f'Ignoring unknown OSC 777: {raw_data}')
return # unknown OSC 777
raw_data = raw_data[len('notify;'):]
cmd = handle_notification_cmd(osc_code, raw_data, self.id, self.prev_osc99_cmd)
if cmd is not None and osc_code == 99:
self.prev_osc99_cmd = cmd
# screen callbacks {{{
def use_utf8(self, on: bool) -> None:
get_boss().child_monitor.set_iutf8_winid(self.id, on)
def on_mouse_event(self, event: Dict[str, Any]) -> bool:
event['mods'] = event.get('mods', 0) & mod_mask
ev = MouseEvent(**event)
self.current_mouse_event_button = ev.button
action = get_options().mousemap.get(ev)
if action is None:
return False
return get_boss().combine(action, window_for_dispatch=self, dispatch_type='MouseEvent')
def open_url(self, url: str, hyperlink_id: int, cwd: Optional[str] = None) -> None:
opts = get_options()
if hyperlink_id:
if not opts.allow_hyperlinks:
return
from urllib.parse import unquote, urlparse, urlunparse
try:
purl = urlparse(url)
except Exception:
return
if (not purl.scheme or purl.scheme == 'file'):
if purl.netloc:
from socket import gethostname
try:
hostname = gethostname()
except Exception:
hostname = ''
remote_hostname = purl.netloc.partition(':')[0]
if remote_hostname and remote_hostname != hostname and remote_hostname != 'localhost':
self.handle_remote_file(purl.netloc, unquote(purl.path))
return
url = urlunparse(purl._replace(netloc=''))
if opts.allow_hyperlinks & 0b10:
from kittens.tui.operations import styled
get_boss().choose(
'What would you like to do with this URL:\n' + styled(unquote(url), fg='yellow'),
partial(self.hyperlink_open_confirmed, url, cwd),
'o:Open', 'c:Copy to clipboard', 'n;red:Nothing', default='o',
window=self,
)
return
get_boss().open_url(url, cwd=cwd)
def hyperlink_open_confirmed(self, url: str, cwd: Optional[str], q: str) -> None:
if q == 'o':
get_boss().open_url(url, cwd=cwd)
elif q == 'c':
set_clipboard_string(url)
def handle_remote_file(self, netloc: str, remote_path: str) -> None:
from kittens.remote_file.main import is_ssh_kitten_sentinel
from kittens.ssh.main import get_connection_data
from .utils import SSHConnectionData
args = self.ssh_kitten_cmdline()
conn_data: Union[None, List[str], SSHConnectionData] = None
if args:
ssh_cmdline = sorted(self.child.foreground_processes, key=lambda p: p['pid'])[-1]['cmdline'] or ['']
if 'ControlPath=' in ' '.join(ssh_cmdline):
idx = ssh_cmdline.index('--')
conn_data = [is_ssh_kitten_sentinel] + list(ssh_cmdline[:idx + 2])
if conn_data is None:
args = self.child.foreground_cmdline
conn_data = get_connection_data(args, self.child.foreground_cwd or self.child.current_cwd or '')
if conn_data is None:
get_boss().show_error('Could not handle remote file', f'No SSH connection data found in: {args}')
return
get_boss().run_kitten(
'remote_file', '--hostname', netloc.partition(':')[0], '--path', remote_path,
'--ssh-connection-data', json.dumps(conn_data)
)
def send_signal_for_key(self, key_num: int) -> bool:
try:
return self.child.send_signal_for_key(key_num)
except OSError as err:
log_error(f'Failed to send signal for key to child with err: {err}')
return False
def focus_changed(self, focused: bool) -> None:
if self.destroyed:
return
call_watchers(weakref.ref(self), 'on_focus_change', {'focused': focused})
for c in self.actions_on_focus_change:
try:
c(self, focused)
except Exception:
import traceback
traceback.print_exc()
self.screen.focus_changed(focused)
if focused:
self.last_focused_at = monotonic()
update_ime_position_for_window(self.id, False, 1)
changed = self.needs_attention
self.needs_attention = False
if changed:
tab = self.tabref()
if tab is not None:
tab.relayout_borders()
elif self.os_window_id == current_os_window():
# Cancel IME composition after loses focus
update_ime_position_for_window(self.id, False, -1)
def title_changed(self, new_title: Optional[str], is_base64: bool = False) -> None:
self.child_title = process_title_from_child(new_title or self.default_title, is_base64)
if self.override_title is None:
self.title_updated()
def icon_changed(self, new_icon: object) -> None:
pass # TODO: Implement this
@property
def is_active(self) -> bool:
return get_boss().active_window is self
@property
def has_activity_since_last_focus(self) -> bool:
return self.screen.has_activity_since_last_focus()
def on_activity_since_last_focus(self) -> None:
if get_options().tab_activity_symbol:
get_boss().on_activity_since_last_focus(self)
def on_bell(self) -> None:
cb = get_options().command_on_bell
if cb and cb != ['none']:
import shlex
import subprocess
env = self.child.foreground_environ
env['KITTY_CHILD_CMDLINE'] = ' '.join(map(shlex.quote, self.child.cmdline))
subprocess.Popen(cb, env=env, cwd=self.child.foreground_cwd, preexec_fn=clear_handled_signals)
if not self.is_active:
changed = not self.needs_attention
self.needs_attention = True
tab = self.tabref()
if tab is not None:
if changed:
tab.relayout_borders()
tab.on_bell(self)
def change_titlebar_color(self) -> None:
opts = get_options()
val = opts.macos_titlebar_color if is_macos else opts.wayland_titlebar_color
if val > 0:
if (val & 0xff) == 1:
val = self.screen.color_profile.default_bg
else:
val = val >> 8
set_titlebar_color(self.os_window_id, val)
else:
set_titlebar_color(self.os_window_id, 0, True, -val)
def change_colors(self, changes: Dict[DynamicColor, Optional[str]]) -> None:
dirtied = default_bg_changed = False
def item(raw: Optional[str]) -> int:
if raw is None:
return 0
v = to_color(raw)
if v is None:
return 0
return 0xff000000 | int(v)
for which, val_ in changes.items():
val = item(val_)
dirtied = True
setattr(self.screen.color_profile, which.name, val)
if which.name == 'default_bg':
default_bg_changed = True
if dirtied:
self.screen.mark_as_dirty()
if default_bg_changed:
get_boss().default_bg_changed_for(self.id)
def color_profile_popped(self, bg_changed: bool) -> None:
if bg_changed:
get_boss().default_bg_changed_for(self.id)
def report_color(self, code: str, r: int, g: int, b: int) -> None:
r |= r << 8
g |= g << 8
b |= b << 8
self.screen.send_escape_code_to_child(OSC, f'{code};rgb:{r:04x}/{g:04x}/{b:04x}')
def report_notification_activated(self, identifier: str) -> None:
identifier = sanitize_identifier_pat().sub('', identifier)
self.screen.send_escape_code_to_child(OSC, f'99;i={identifier};')
def set_dynamic_color(self, code: int, value: Union[str, bytes]) -> None:
if isinstance(value, bytes):
value = value.decode('utf-8')
color_changes: Dict[DynamicColor, Optional[str]] = {}
for val in value.split(';'):
w = DYNAMIC_COLOR_CODES.get(code)
if w is not None:
if val == '?':
col = getattr(self.screen.color_profile, w.name)
self.report_color(str(code), col >> 16, (col >> 8) & 0xff, col & 0xff)
else:
q = None if code >= 100 else val
color_changes[w] = q
code += 1
if color_changes:
self.change_colors(color_changes)
def set_color_table_color(self, code: int, value: str) -> None:
cp = self.screen.color_profile
if code == 4:
changed = False
for c, val in parse_color_set(value):
if val is None: # color query
qc = self.screen.color_profile.as_color((c << 8) | 1)
assert qc is not None
self.report_color(f'4;{c}', qc.red, qc.green, qc.blue)
else:
changed = True
cp.set_color(c, val)
if changed:
self.refresh()
elif code == 104:
if not value.strip():
cp.reset_color_table()
else:
for x in value.split(';'):
try:
y = int(x)
except Exception:
continue
if 0 <= y <= 255:
cp.reset_color(y)
self.refresh()
def request_capabilities(self, q: str) -> None:
for result in get_capabilities(q, get_options()):
self.screen.send_escape_code_to_child(DCS, result)
def handle_remote_cmd(self, cmd: str) -> None:
get_boss().handle_remote_cmd(cmd, self)
def handle_remote_echo(self, msg: str) -> None:
from base64 import standard_b64decode
data = standard_b64decode(msg)
self.write_to_child(data)
def handle_remote_ssh(self, msg: str) -> None:
from kittens.ssh.main import get_ssh_data
for line in get_ssh_data(msg, f'{os.getpid()}-{self.id}'):
self.write_to_child(line)
def handle_kitten_result(self, msg: str) -> None:
import base64
self.kitten_result = json.loads(base64.b85decode(msg))
for processor in self.kitten_result_processors:
try:
processor(self, self.kitten_result)
except Exception:
import traceback
traceback.print_exc()
def add_kitten_result_processor(self, callback: Callable[['Window', Any], None]) -> None:
self.kitten_result_processors.append(callback)
def handle_overlay_ready(self, msg: str) -> None:
boss = get_boss()
tab = boss.tab_for_window(self)
if tab is not None:
tab.move_window_to_top_of_group(self)
def append_remote_data(self, msg: str) -> str:
if not msg:
cdata = ''.join(self.current_remote_data)
self.current_remote_data = []
return cdata
num, rest = msg.split(':', 1)
max_size = get_options().clipboard_max_size * 1024 * 1024
if num == '0' or sum(map(len, self.current_remote_data)) > max_size:
self.current_remote_data = []
self.current_remote_data.append(rest)
return ''
def handle_remote_edit(self, msg: str) -> None:
cdata = self.append_remote_data(msg)
if cdata:
from .launch import remote_edit
remote_edit(cdata, self)
def handle_remote_clone(self, msg: str) -> None:
cdata = self.append_remote_data(msg)
if cdata:
ac = get_options().allow_cloning
if ac == 'ask':
get_boss().confirm(_(
'A program running in this window wants to clone it into another window.'
' Allow it do so, once?'),
partial(self.handle_remote_clone_confirmation, cdata), window=self,
)
elif ac in ('yes', 'y', 'true'):
self.handle_remote_clone_confirmation(cdata, True)
def handle_remote_clone_confirmation(self, cdata: str, confirmed: bool) -> None:
if confirmed:
from .launch import clone_and_launch
clone_and_launch(cdata, self)
def handle_remote_askpass(self, msg: str) -> None:
from .shm import SharedMemory
with SharedMemory(name=msg, readonly=True) as shm:
shm.seek(1)
data = json.loads(shm.read_data_with_size())
def callback(ans: Any) -> None:
data = json.dumps(ans)
with SharedMemory(name=msg) as shm:
shm.seek(1)
shm.write_data_with_size(data)
shm.flush()
shm.seek(0)
shm.write(b'\x01')
message: str = data['message']
if data['type'] == 'confirm':
get_boss().confirm(
message, callback, window=self, confirm_on_cancel=bool(data.get('confirm_on_cancel')),
confirm_on_accept=bool(data.get('confirm_on_accept', True)))
elif data['type'] == 'choose':
get_boss().choose(
message, callback, *data['choices'], window=self, default=data.get('default', ''))
elif data['type'] == 'get_line':
get_boss().get_line(
message, callback, window=self, is_password=bool(data.get('is_password')), prompt=data.get('prompt', '> '))
else:
log_error(f'Ignoring ask request with unknown type: {data["type"]}')
def handle_remote_print(self, msg: str) -> None:
text = process_remote_print(msg)
print(text, end='', file=sys.stderr)
sys.stderr.flush()
def send_cmd_response(self, response: Any) -> None:
self.screen.send_escape_code_to_child(DCS, '@kitty-cmd' + json.dumps(response))
def file_transmission(self, data: str) -> None:
self.file_transmission_control.handle_serialized_command(data)
def clipboard_control(self, data: str, is_partial: bool = False) -> None:
where, text = data.partition(';')[::2]
if is_partial:
if self.clipboard_pending is None:
self.clipboard_pending = ClipboardPending(where, text)
else:
self.clipboard_pending = self.clipboard_pending._replace(data=self.clipboard_pending[1] + text)
limit = get_options().clipboard_max_size
if limit and len(self.clipboard_pending.data) > limit * 1024 * 1024:
log_error('Discarding part of too large OSC 52 paste request')
self.clipboard_pending = self.clipboard_pending._replace(data='', truncated=True)
return
if not where:
if self.clipboard_pending is not None:
text = self.clipboard_pending.data + text
where = self.clipboard_pending.where
try:
if self.clipboard_pending.truncated:
return
finally:
self.clipboard_pending = None
else:
where = 's0'
cc = get_options().clipboard_control
if text == '?':
response = None
if 's' in where or 'c' in where:
if 'read-clipboard-ask' in cc:
return self.ask_to_read_clipboard(False)
response = get_clipboard_string() if 'read-clipboard' in cc else ''
loc = 'c'
elif 'p' in where:
if 'read-primary-ask' in cc:
return self.ask_to_read_clipboard(True)
response = get_primary_selection() if 'read-primary' in cc else ''
loc = 'p'
response = response or ''
self.send_osc52(loc, response or '')
else:
from base64 import standard_b64decode
try:
text = standard_b64decode(text).decode('utf-8')
except Exception:
text = ''
if 's' in where or 'c' in where:
if 'write-clipboard' in cc:
set_clipboard_string(text)
if 'p' in where:
if 'write-primary' in cc:
set_primary_selection(text)
self.clipboard_pending = None
def send_osc52(self, loc: str, response: str) -> None:
from base64 import standard_b64encode
self.screen.send_escape_code_to_child(OSC, '52;{};{}'.format(
loc, standard_b64encode(response.encode('utf-8')).decode('ascii')))
def ask_to_read_clipboard(self, primary: bool = False) -> None:
if self.current_clipboard_read_ask is not None:
self.current_clipboard_read_ask = primary
return
self.current_clipboard_read_ask = primary
get_boss().confirm(_(
'A program running in this window wants to read from the system clipboard.'
' Allow it do so, once?'),
self.handle_clipboard_confirmation, window=self,
)
def handle_clipboard_confirmation(self, confirmed: bool) -> None:
try:
loc = 'p' if self.current_clipboard_read_ask else 'c'
response = ''
if confirmed:
response = get_primary_selection() if self.current_clipboard_read_ask else get_clipboard_string()
self.send_osc52(loc, response)
finally:
self.current_clipboard_read_ask = None
def manipulate_title_stack(self, pop: bool, title: str, icon: Any) -> None:
if title:
if pop:
if self.title_stack:
self.child_title = self.title_stack.pop()
self.title_updated()
else:
if self.child_title:
self.title_stack.append(self.child_title)
# }}}
# mouse actions {{{
@ac('mouse', '''
Handle a mouse click
Try to perform the specified actions one after the other till one of them is successful.
Supported actions are::
selection - check for a selection and if one exists abort processing
link - if a link exists under the mouse, click it
prompt - if the mouse click happens at a shell prompt move the cursor to the mouse location
For examples, see :ref:`conf-kitty-mouse.mousemap`
''')
def mouse_handle_click(self, *actions: str) -> None:
for a in actions:
if a == 'selection':
if self.screen.has_selection():
break
if a == 'link':
if click_mouse_url(self.os_window_id, self.tab_id, self.id):
break
if a == 'prompt':
# Do not send move cursor events too soon after the window is
# focused, this is because there are people that click on
# windows and start typing immediately and the cursor event
# can interfere with that. See https://github.com/kovidgoyal/kitty/issues/4128
if monotonic() - self.last_focused_at < 1.5 * get_click_interval():
return
if move_cursor_to_mouse_if_in_prompt(self.os_window_id, self.tab_id, self.id):
self.screen.ignore_bells_for(1)
break
@ac('mouse', 'Click the URL under the mouse')
def mouse_click_url(self) -> None:
self.mouse_handle_click('link')
@ac('mouse', 'Click the URL under the mouse only if the screen has no selection')
def mouse_click_url_or_select(self) -> None:
self.mouse_handle_click('selection', 'link')
@ac('mouse', '''
Manipulate the selection based on the current mouse position
For examples, see :ref:`conf-kitty-mouse.mousemap`
''')
def mouse_selection(self, code: int) -> None:
mouse_selection(self.os_window_id, self.tab_id, self.id, code, self.current_mouse_event_button)
@ac('mouse', 'Paste the current primary selection')
def paste_selection(self) -> None:
txt = get_boss().current_primary_selection()
if txt:
self.paste_with_actions(txt)
@ac('mouse', 'Paste the current primary selection or the clipboard if no selection is present')
def paste_selection_or_clipboard(self) -> None:
txt = get_boss().current_primary_selection_or_clipboard()
if txt:
self.paste_with_actions(txt)
@ac('mouse', '''
Select clicked command output
Requires :ref:`shell_integration` to work
''')
def mouse_select_command_output(self) -> None:
click_mouse_cmd_output(self.os_window_id, self.tab_id, self.id, True)
@ac('mouse', '''
Show clicked command output in a pager like less
Requires :ref:`shell_integration` to work
''')
def mouse_show_command_output(self) -> None:
if click_mouse_cmd_output(self.os_window_id, self.tab_id, self.id, False):
self.show_cmd_output(CommandOutput.last_visited, 'Clicked command output')
# }}}
def text_for_selection(self, as_ansi: bool = False) -> str:
sts = get_options().strip_trailing_spaces
strip_trailing_spaces = sts == 'always' or (sts == 'smart' and not self.screen.is_rectangle_select())
lines = self.screen.text_for_selection(as_ansi, strip_trailing_spaces)
return ''.join(lines)
def call_watchers(self, which: Iterable[Watcher], data: Dict[str, Any]) -> None:
boss = get_boss()
for w in which:
try:
w(boss, self, data)
except Exception:
import traceback
traceback.print_exc()
def destroy(self) -> None:
self.call_watchers(self.watchers.on_close, {})
self.destroyed = True
del self.kitten_result_processors
if hasattr(self, 'screen'):
if self.is_active and self.os_window_id == current_os_window():
# Cancel IME composition when window is destroyed
update_ime_position_for_window(self.id, False, -1)
# Remove cycles so that screen is de-allocated immediately
self.screen.reset_callbacks()
del self.screen
def as_text(
self,
as_ansi: bool = False,
add_history: bool = False,
add_wrap_markers: bool = False,
alternate_screen: bool = False,
add_cursor: bool = False
) -> str:
return as_text(self.screen, as_ansi, add_history, add_wrap_markers, alternate_screen, add_cursor)
def cmd_output(self, which: CommandOutput = CommandOutput.last_run, as_ansi: bool = False, add_wrap_markers: bool = False) -> str:
return cmd_output(self.screen, which, as_ansi, add_wrap_markers)
def get_cwd_of_child(self, oldest: bool = False) -> Optional[str]:
return self.child.get_foreground_cwd(oldest) or self.child.current_cwd
@property
def cwd_of_child(self) -> Optional[str]:
return self.get_cwd_of_child()
@property
def child_is_remote(self) -> bool:
for p in self.child.foreground_processes:
q = list(p['cmdline'] or ())
if q and q[0].lower() == 'ssh':
return True
return False
def ssh_kitten_cmdline(self) -> List[str]:
from kittens.ssh.utils import is_kitten_cmdline
for p in self.child.foreground_processes:
q = list(p['cmdline'] or ())
if is_kitten_cmdline(q):
return q
return []
def pipe_data(self, text: str, has_wrap_markers: bool = False) -> PipeData:
text = text or ''
if has_wrap_markers:
text = text.replace('\r\n', '\n').replace('\r', '\n')
lines = text.count('\n')
input_line_number = (lines - (self.screen.lines - 1) - self.screen.scrolled_by)
return {
'input_line_number': input_line_number,
'scrolled_by': self.screen.scrolled_by,
'cursor_x': self.screen.cursor.x + 1,
'cursor_y': self.screen.cursor.y + 1,
'lines': self.screen.lines,
'columns': self.screen.columns,
'text': text
}
def set_logo(self, path: str, position: str = '', alpha: float = -1) -> None:
path = resolve_custom_file(path) if path else ''
set_window_logo(self.os_window_id, self.tab_id, self.id, path, position or '', alpha)
def paste_with_actions(self, text: str) -> None:
if self.destroyed or not text:
return
opts = get_options()
if 'filter' in opts.paste_actions:
text = load_paste_filter()(text)
if not text:
return
if 'quote-urls-at-prompt' in opts.paste_actions and self.at_prompt:
prefixes = '|'.join(opts.url_prefixes)
m = re.match(f'({prefixes}):(.+)', text)
if m is not None:
scheme, rest = m.group(1), m.group(2)
if rest.startswith('//') or scheme in ('mailto', 'irc'):
import shlex
text = shlex.quote(text)
btext = text.encode('utf-8')
if 'confirm' in opts.paste_actions:
msg = ''
limit = 16 * 1024
if not self.screen.in_bracketed_paste_mode:
msg = _('Pasting text into shells that do not support bracketed paste can be dangerous.')
elif len(btext) > limit:
msg = _('Pasting very large amounts of text ({} bytes) can be slow.').format(len(btext))
if msg:
get_boss().confirm(msg + _(' Are you sure?'), partial(self.handle_paste_confirmation, btext), window=self)
return
self.paste_text(btext)
def handle_paste_confirmation(self, btext: bytes, confirmed: bool) -> None:
if confirmed:
self.paste_text(btext)
def paste_bytes(self, text: Union[str, bytes]) -> None:
# paste raw bytes without any processing
if isinstance(text, str):
text = text.encode('utf-8')
self.screen.paste_bytes(text)
def paste_text(self, text: Union[str, bytes]) -> None:
if text and not self.destroyed:
if isinstance(text, str):
text = text.encode('utf-8')
if self.screen.in_bracketed_paste_mode:
text = sanitize_for_bracketed_paste(text)
else:
# Workaround for broken editors like nano that cannot handle
# newlines in pasted text see https://github.com/kovidgoyal/kitty/issues/994
text = text.replace(b'\r\n', b'\n').replace(b'\n', b'\r')
self.screen.paste(text)
def clear_screen(self, reset: bool = False, scrollback: bool = False) -> None:
self.screen.cursor.x = self.screen.cursor.y = 0
if reset:
self.screen.reset()
else:
self.screen.erase_in_display(3 if scrollback else 2, False)
# actions {{{
@ac('cp', 'Show scrollback in a pager like less')
def show_scrollback(self) -> None:
text = self.as_text(as_ansi=True, add_history=True, add_wrap_markers=True)
data = self.pipe_data(text, has_wrap_markers=True)
cursor_on_screen = self.screen.scrolled_by < self.screen.lines - self.screen.cursor.y
get_boss().display_scrollback(self, data['text'], data['input_line_number'], report_cursor=cursor_on_screen)
def show_cmd_output(self, which: CommandOutput, title: str = 'Command output', as_ansi: bool = True, add_wrap_markers: bool = True) -> None:
text = self.cmd_output(which, as_ansi=as_ansi, add_wrap_markers=add_wrap_markers)
text = text.replace('\r\n', '\n').replace('\r', '\n')
get_boss().display_scrollback(self, text, title=title, report_cursor=False)
@ac('cp', '''
Show output from the first shell command on screen in a pager like less
Requires :ref:`shell_integration` to work
''')
def show_first_command_output_on_screen(self) -> None:
self.show_cmd_output(CommandOutput.first_on_screen, 'First command output on screen')
@ac('cp', '''
Show output from the last shell command in a pager like less
Requires :ref:`shell_integration` to work
''')
def show_last_command_output(self) -> None:
self.show_cmd_output(CommandOutput.last_run, 'Last command output')
@ac('cp', '''
Show the first command output below the last scrolled position via scroll_to_prompt
or the last mouse clicked command output in a pager like less
Requires :ref:`shell_integration` to work
''')
def show_last_visited_command_output(self) -> None:
self.show_cmd_output(CommandOutput.last_visited, 'Last visited command output')
@ac('cp', '''
Show the last non-empty output from a shell command in a pager like less
Requires :ref:`shell_integration` to work
''')
def show_last_non_empty_command_output(self) -> None:
self.show_cmd_output(CommandOutput.last_non_empty, 'Last non-empty command output')
@ac('cp', 'Paste the specified text into the current window')
def paste(self, text: str) -> None:
self.paste_with_actions(text)
@ac('cp', 'Copy the selected text from the active window to the clipboard')
def copy_to_clipboard(self) -> None:
text = self.text_for_selection()
if text:
set_clipboard_string(text)
@ac('cp', 'Copy the selected text from the active window to the clipboard with ANSI formatting codes')
def copy_ansi_to_clipboard(self) -> None:
text = self.text_for_selection(as_ansi=True)
if text:
set_clipboard_string(text)
def encoded_key(self, key_event: KeyEvent) -> bytes:
return encode_key_for_tty(
key=key_event.key, shifted_key=key_event.shifted_key, alternate_key=key_event.alternate_key,
mods=key_event.mods, action=key_event.action, text=key_event.text,
key_encoding_flags=self.screen.current_key_encoding_flags(),
cursor_key_mode=self.screen.cursor_key_mode,
).encode('ascii')
@ac('cp', 'Copy the selected text from the active window to the clipboard, if no selection, send SIGINT (aka :kbd:`ctrl+c`)')
def copy_or_interrupt(self) -> None:
text = self.text_for_selection()
if text:
set_clipboard_string(text)
else:
self.scroll_end()
self.write_to_child(self.encoded_key(KeyEvent(key=ord('c'), mods=GLFW_MOD_CONTROL)))
@ac('cp', 'Copy the selected text from the active window to the clipboard and clear selection, if no selection, send SIGINT (aka :kbd:`ctrl+c`)')
def copy_and_clear_or_interrupt(self) -> None:
self.copy_or_interrupt()
self.screen.clear_selection()
@ac('cp', 'Pass the selected text from the active window to the specified program')
def pass_selection_to_program(self, *args: str) -> None:
cwd = self.cwd_of_child
text = self.text_for_selection()
if text:
if args:
open_cmd(args, text, cwd=cwd)
else:
open_url(text, cwd=cwd)
@ac('cp', 'Clear the current selection')
def clear_selection(self) -> None:
self.screen.clear_selection()
@ac('sc', 'Scroll up by one line')
def scroll_line_up(self) -> None:
if self.screen.is_main_linebuf():
self.screen.scroll(SCROLL_LINE, True)
@ac('sc', 'Scroll down by one line')
def scroll_line_down(self) -> None:
if self.screen.is_main_linebuf():
self.screen.scroll(SCROLL_LINE, False)
@ac('sc', 'Scroll up by one page')
def scroll_page_up(self) -> None:
if self.screen.is_main_linebuf():
self.screen.scroll(SCROLL_PAGE, True)
@ac('sc', 'Scroll down by one page')
def scroll_page_down(self) -> None:
if self.screen.is_main_linebuf():
self.screen.scroll(SCROLL_PAGE, False)
@ac('sc', 'Scroll to the top of the scrollback buffer')
def scroll_home(self) -> None:
if self.screen.is_main_linebuf():
self.screen.scroll(SCROLL_FULL, True)
@ac('sc', 'Scroll to the bottom of the scrollback buffer')
def scroll_end(self) -> None:
if self.screen.is_main_linebuf():
self.screen.scroll(SCROLL_FULL, False)
@ac('sc', '''
Scroll to the previous/next shell command prompt
Allows easy jumping from one command to the next. Requires working
:ref:`shell_integration`. Takes a single, optional, number as argument which is
the number of prompts to jump, negative values jump up and positive values jump down.
A value of zero will jump to the last prompt visited by this action.
For example::
map ctrl+p scroll_to_prompt -1 # jump to previous
map ctrl+n scroll_to_prompt 1 # jump to next
map ctrl+o scroll_to_prompt 0 # jump to last visited
''')
def scroll_to_prompt(self, num_of_prompts: int = -1) -> None:
if self.screen.is_main_linebuf():
self.screen.scroll_to_prompt(num_of_prompts)
@ac('sc', 'Scroll prompt to the top of the screen, filling screen with empty lines')
def scroll_prompt_to_top(self, clear_scrollback: bool = False) -> None:
if self.screen.is_main_linebuf():
self.screen.scroll_until_cursor_prompt()
if clear_scrollback:
self.screen.clear_scrollback()
elif self.screen.scrolled_by > 0:
self.screen.scroll(SCROLL_FULL, False)
@ac('sc', 'Scroll prompt to the bottom of the screen, filling in extra lines from the scrollback buffer')
def scroll_prompt_to_bottom(self) -> None:
self.screen.scroll_prompt_to_bottom()
@ac('mk', 'Toggle the current marker on/off')
def toggle_marker(self, ftype: str, spec: Union[str, Tuple[Tuple[int, str], ...]], flags: int) -> None:
from .marks import marker_from_spec
key = ftype, spec
if key == self.current_marker_spec:
self.remove_marker()
return
self.screen.set_marker(marker_from_spec(ftype, spec, flags))
self.current_marker_spec = key
def set_marker(self, spec: Union[str, Sequence[str]]) -> None:
from .marks import marker_from_spec
from .options.utils import parse_marker_spec, toggle_marker
if isinstance(spec, str):
func, (ftype, spec_, flags) = toggle_marker('toggle_marker', spec)
else:
ftype, spec_, flags = parse_marker_spec(spec[0], spec[1:])
key = ftype, spec_
self.screen.set_marker(marker_from_spec(ftype, spec_, flags))
self.current_marker_spec = key
@ac('mk', 'Remove a previously created marker')
def remove_marker(self) -> None:
if self.current_marker_spec is not None:
self.screen.set_marker()
self.current_marker_spec = None
@ac('mk', 'Scroll to the next or previous mark of the specified type')
def scroll_to_mark(self, prev: bool = True, mark: int = 0) -> None:
self.screen.scroll_to_next_mark(mark, prev)
@ac('misc', '''
Send the specified SIGNAL to the foreground process in the active window
For example::
map f1 signal_child SIGTERM
''')
def signal_child(self, *signals: int) -> None:
pid = self.child.pid_for_cwd
if pid is not None:
for sig in signals:
os.kill(pid, sig)
@ac('misc', '''
Display the specified kitty documentation, preferring a local copy, if found.
For example::
# show the config docs
map f1 show_kitty_doc conf
# show the ssh kitten docs
map f1 show_kitty_doc kittens/ssh
''')
def show_kitty_doc(self, which: str = '') -> None:
url = docs_url(which)
get_boss().open_url(url)
# }}}