#!/usr/bin/env python3 # License: GPL v3 Copyright: 2016, Kovid Goyal import json import os import re import sys import weakref from collections import deque from contextlib import suppress from enum import Enum, IntEnum, auto from functools import lru_cache, partial from gettext import gettext as _ from itertools import chain from time import monotonic from typing import ( TYPE_CHECKING, Any, Callable, Deque, Dict, Iterable, List, NamedTuple, Optional, Pattern, Sequence, Tuple, Union ) from .child import ProcessDesc from .cli_stub import CLIOptions from .config import build_ansi_color_table from .constants import ( appname, clear_handled_signals, config_dir, is_macos, wakeup_io_loop ) from .fast_data_types import ( BGIMAGE_PROGRAM, BLIT_PROGRAM, CELL_BG_PROGRAM, CELL_FG_PROGRAM, CELL_PROGRAM, CELL_SPECIAL_PROGRAM, CURSOR_BEAM, CURSOR_BLOCK, CURSOR_UNDERLINE, DCS, DECORATION, DECORATION_MASK, DIM, GLFW_MOD_CONTROL, GRAPHICS_ALPHA_MASK_PROGRAM, GRAPHICS_PREMULT_PROGRAM, GRAPHICS_PROGRAM, MARK, MARK_MASK, NO_CURSOR_SHAPE, NUM_UNDERLINE_STYLES, OSC, REVERSE, SCROLL_FULL, SCROLL_LINE, SCROLL_PAGE, STRIKETHROUGH, TINT_PROGRAM, Color, KeyEvent, Screen, add_timer, add_window, cell_size_for_window, click_mouse_cmd_output, click_mouse_url, compile_program, current_os_window, encode_key_for_tty, get_boss, get_click_interval, get_clipboard_string, get_options, init_cell_program, mark_os_window_dirty, mouse_selection, move_cursor_to_mouse_if_in_prompt, pt_to_px, set_clipboard_string, set_titlebar_color, set_window_logo, set_window_padding, set_window_render_data, update_ime_position_for_window, update_window_title, update_window_visibility, wakeup_main_loop ) from .keys import keyboard_mode_name, mod_mask from .notify import NotificationCommand, handle_notification_cmd from .options.types import Options from .rgb import to_color from .terminfo import get_capabilities from .types import MouseEvent, WindowGeometry, ac, run_once from .typing import BossType, ChildType, EdgeLiteral, TabType, TypedDict from .utils import ( get_primary_selection, kitty_ansi_sanitizer_pat, load_shaders, log_error, open_cmd, open_url, parse_color_set, path_from_osc7_url, resolve_custom_file, resolved_shell, sanitize_title, set_primary_selection ) MatchPatternType = Union[Pattern[str], Tuple[Pattern[str], Optional[Pattern[str]]]] if TYPE_CHECKING: from .file_transmission import FileTransmission class CwdRequestType(Enum): current: int = auto() last_reported: int = auto() oldest: int = auto() class CwdRequest: def __init__(self, window: Optional['Window'] = None, request_type: CwdRequestType = CwdRequestType.current) -> None: self.window_id = -1 if window is None else window.id self.request_type = request_type def __bool__(self) -> bool: return self.window_id > -1 @property def window(self) -> Optional['Window']: return get_boss().window_id_map.get(self.window_id) @property def cwd_of_child(self) -> str: window = self.window if not window: return '' reported_cwd = path_from_osc7_url(window.screen.last_reported_cwd) if window.screen.last_reported_cwd else '' if reported_cwd and not window.child_is_remote and (self.request_type is CwdRequestType.last_reported or window.at_prompt): return reported_cwd return window.get_cwd_of_child(oldest=self.request_type is CwdRequestType.oldest) or '' def modify_argv_for_launch_with_cwd(self, argv: List[str]) -> str: window = self.window if not window: return '' reported_cwd = path_from_osc7_url(window.screen.last_reported_cwd) if window.screen.last_reported_cwd else '' if reported_cwd: # First check if we are running ssh kitten, and trying to open the configured login shell if argv[0] == resolved_shell(get_options())[0]: ssh_kitten_cmdline = window.ssh_kitten_cmdline() if ssh_kitten_cmdline: from kittens.ssh.utils import set_cwd_in_cmdline argv[:] = ssh_kitten_cmdline set_cwd_in_cmdline(reported_cwd, argv) return '' if not window.child_is_remote and (self.request_type is CwdRequestType.last_reported or window.at_prompt): return reported_cwd return window.get_cwd_of_child(oldest=self.request_type is CwdRequestType.oldest) or '' def process_title_from_child(title: str, is_base64: bool) -> str: if is_base64: from base64 import standard_b64decode try: title = standard_b64decode(title).decode('utf-8', 'replace') except Exception: title = 'undecodeable title' return sanitize_title(title) @lru_cache(maxsize=64) def compile_match_query(exp: str, is_simple: bool = True) -> MatchPatternType: if is_simple: pat: MatchPatternType = re.compile(exp) else: kp, vp = exp.partition('=')[::2] if vp: pat = re.compile(kp), re.compile(vp) else: pat = re.compile(kp), None return pat class WindowDict(TypedDict): id: int is_focused: bool title: str pid: Optional[int] cwd: str cmdline: List[str] env: Dict[str, str] foreground_processes: List[ProcessDesc] is_self: bool lines: int columns: int class PipeData(TypedDict): input_line_number: int scrolled_by: int cursor_x: int cursor_y: int lines: int columns: int text: str class ClipboardPending(NamedTuple): where: str data: str truncated: bool = False class DynamicColor(IntEnum): default_fg, default_bg, cursor_color, highlight_fg, highlight_bg = range(1, 6) class CommandOutput(IntEnum): last_run, first_on_screen, last_visited, last_non_empty = 0, 1, 2, 3 DYNAMIC_COLOR_CODES = { 10: DynamicColor.default_fg, 11: DynamicColor.default_bg, 12: DynamicColor.cursor_color, 17: DynamicColor.highlight_bg, 19: DynamicColor.highlight_fg, } DYNAMIC_COLOR_CODES.update({k+100: v for k, v in DYNAMIC_COLOR_CODES.items()}) class Watcher: def __call__(self, boss: BossType, window: 'Window', data: Dict[str, Any]) -> None: pass class Watchers: on_resize: List[Watcher] on_close: List[Watcher] on_focus_change: List[Watcher] def __init__(self) -> None: self.on_resize = [] self.on_close = [] self.on_focus_change = [] def add(self, others: 'Watchers') -> None: def merge(base: List[Watcher], other: List[Watcher]) -> None: for x in other: if x not in base: base.append(x) merge(self.on_resize, others.on_resize) merge(self.on_close, others.on_close) merge(self.on_focus_change, others.on_focus_change) def clear(self) -> None: del self.on_close[:], self.on_resize[:], self.on_focus_change[:] def copy(self) -> 'Watchers': ans = Watchers() ans.on_close = self.on_close[:] ans.on_resize = self.on_resize[:] ans.on_focus_change = self.on_focus_change[:] return ans @property def has_watchers(self) -> bool: return bool(self.on_close or self.on_resize or self.on_focus_change) def call_watchers(windowref: Callable[[], Optional['Window']], which: str, data: Dict[str, Any]) -> None: def callback(timer_id: Optional[int]) -> None: w = windowref() if w is not None: watchers: List[Watcher] = getattr(w.watchers, which) w.call_watchers(watchers, data) add_timer(callback, 0, False) def pagerhist(screen: Screen, as_ansi: bool = False, add_wrap_markers: bool = True, upto_output_start: bool = False) -> str: pht = screen.historybuf.pagerhist_as_text(upto_output_start) if pht and (not as_ansi or not add_wrap_markers): sanitizer = text_sanitizer(as_ansi, add_wrap_markers) pht = sanitizer(pht) return pht def as_text( screen: Screen, as_ansi: bool = False, add_history: bool = False, add_wrap_markers: bool = False, alternate_screen: bool = False, add_cursor: bool = False ) -> str: lines: List[str] = [] add_history = add_history and not (screen.is_using_alternate_linebuf() ^ alternate_screen) if alternate_screen: f = screen.as_text_alternate else: f = screen.as_text_non_visual if add_history else screen.as_text f(lines.append, as_ansi, add_wrap_markers) ctext = '' if add_cursor: ctext += '\x1b[?25' + ('h' if screen.cursor_visible else 'l') ctext += f'\x1b[{screen.cursor.y + 1};{screen.cursor.x + 1}H' shape = screen.cursor.shape if shape == NO_CURSOR_SHAPE: ctext += '\x1b[?12' + ('h' if screen.cursor.blink else 'l') else: code = {CURSOR_BLOCK: 1, CURSOR_UNDERLINE: 3, CURSOR_BEAM: 5}[shape] if not screen.cursor.blink: code += 1 ctext += f'\x1b[{code} q' if add_history: pht = pagerhist(screen, as_ansi, add_wrap_markers) h: List[str] = [pht] if pht else [] screen.historybuf.as_text(h.append, as_ansi, add_wrap_markers) if h: if not screen.linebuf.is_continued(0): h[-1] += '\n' if as_ansi: h[-1] += '\x1b[m' ans = ''.join(chain(h, lines)) if ctext: ans += ctext return ans ans = ''.join(lines) if ctext: ans += ctext return ans def multi_replace(src: str, **replacements: Any) -> str: r = {k: str(v) for k, v in replacements.items()} def sub(m: 're.Match[str]') -> str: return r.get(m.group(1), m.group(1)) return re.sub(r'\{([A-Z_]+)\}', sub, src) class LoadShaderPrograms: def __call__(self, semi_transparent: bool = False) -> None: compile_program(BLIT_PROGRAM, *load_shaders('blit')) v, f = load_shaders('cell') for which, p in { 'SIMPLE': CELL_PROGRAM, 'BACKGROUND': CELL_BG_PROGRAM, 'SPECIAL': CELL_SPECIAL_PROGRAM, 'FOREGROUND': CELL_FG_PROGRAM, }.items(): ff = f.replace('{WHICH_PROGRAM}', which) vv = multi_replace( v, WHICH_PROGRAM=which, REVERSE_SHIFT=REVERSE, STRIKE_SHIFT=STRIKETHROUGH, DIM_SHIFT=DIM, DECORATION_SHIFT=DECORATION, MARK_SHIFT=MARK, MARK_MASK=MARK_MASK, DECORATION_MASK=DECORATION_MASK, STRIKE_SPRITE_INDEX=NUM_UNDERLINE_STYLES + 1, ) if semi_transparent: vv = vv.replace('#define NOT_TRANSPARENT', '#define TRANSPARENT') ff = ff.replace('#define NOT_TRANSPARENT', '#define TRANSPARENT') compile_program(p, vv, ff) v, f = load_shaders('graphics') for which, p in { 'SIMPLE': GRAPHICS_PROGRAM, 'PREMULT': GRAPHICS_PREMULT_PROGRAM, 'ALPHA_MASK': GRAPHICS_ALPHA_MASK_PROGRAM, }.items(): ff = f.replace('ALPHA_TYPE', which) compile_program(p, v, ff) v, f = load_shaders('bgimage') compile_program(BGIMAGE_PROGRAM, v, f) v, f = load_shaders('tint') compile_program(TINT_PROGRAM, v, f) init_cell_program() load_shader_programs = LoadShaderPrograms() def setup_colors(screen: Screen, opts: Options) -> None: screen.color_profile.update_ansi_color_table(build_ansi_color_table(opts)) def s(c: Optional[Color]) -> int: return 0 if c is None else (0xff000000 | int(c)) screen.color_profile.set_configured_colors( s(opts.foreground), s(opts.background), s(opts.cursor), s(opts.cursor_text_color), s(opts.selection_foreground), s(opts.selection_background), s(opts.visual_bell_color) ) @run_once def load_paste_filter() -> Callable[[str], str]: import runpy import traceback try: m = runpy.run_path(os.path.join(config_dir, 'paste-actions.py')) func: Callable[[str], str] = m['filter_paste'] except Exception as e: if not isinstance(e, FileNotFoundError): traceback.print_exc() log_error(f'Failed to load paste filter function with error: {e}') def func(text: str) -> str: return text return func def text_sanitizer(as_ansi: bool, add_wrap_markers: bool) -> Callable[[str], str]: pat = kitty_ansi_sanitizer_pat() ansi, wrap_markers = not as_ansi, not add_wrap_markers def remove_wrap_markers(line: str) -> str: return line.replace('\r', '') def remove_sgr(line: str) -> str: return str(pat.sub('', line)) def remove_both(line: str) -> str: return str(pat.sub('', line.replace('\r', ''))) if ansi: return remove_both if wrap_markers else remove_sgr return remove_wrap_markers def cmd_output(screen: Screen, which: CommandOutput = CommandOutput.last_run, as_ansi: bool = False, add_wrap_markers: bool = False) -> str: lines: List[str] = [] search_in_pager_hist = screen.cmd_output(which, lines.append, as_ansi, add_wrap_markers) if search_in_pager_hist: pht = pagerhist(screen, as_ansi, add_wrap_markers, True) if pht: lines.insert(0, pht) for i in range(min(len(lines), 3)): x = lines[i] if x.startswith('\x1b]133;C'): lines[i] = x.partition('\\')[-1] return ''.join(lines) def process_remote_print(msg: str) -> str: from base64 import standard_b64decode from .cli import green text = standard_b64decode(msg).decode('utf-8', 'replace') return text.replace('\x1b', green(r'\e')).replace('\a', green(r'\a')).replace('\0', green(r'\0')) class EdgeWidths: left: Optional[float] top: Optional[float] right: Optional[float] bottom: Optional[float] def __init__(self, serialized: Optional[Dict[str, Optional[float]]] = None): if serialized is not None: self.left = serialized['left'] self.right = serialized['right'] self.top = serialized['top'] self.bottom = serialized['bottom'] else: self.left = self.top = self.right = self.bottom = None def serialize(self) -> Dict[str, Optional[float]]: return {'left': self.left, 'right': self.right, 'top': self.top, 'bottom': self.bottom} class GlobalWatchers: def __init__(self) -> None: self.options_spec: Optional[Dict[str, str]] = None self.ans = Watchers() self.extra = '' def __call__(self) -> Watchers: spec = get_options().watcher if spec == self.options_spec: return self.ans from .launch import load_watch_modules if self.extra: spec = spec.copy() spec[self.extra] = self.extra self.ans = load_watch_modules(spec.keys()) or self.ans self.options_spec = spec.copy() return self.ans def set_extra(self, extra: str) -> None: self.extra = extra global_watchers = GlobalWatchers() class Window: def __init__( self, tab: TabType, child: ChildType, args: CLIOptions, override_title: Optional[str] = None, copy_colors_from: Optional['Window'] = None, watchers: Optional[Watchers] = None ): if watchers: self.watchers = watchers self.watchers.add(global_watchers()) else: self.watchers = global_watchers().copy() self.last_focused_at = 0. self.started_at = monotonic() self.current_remote_data: List[str] = [] self.current_mouse_event_button = 0 self.current_clipboard_read_ask: Optional[bool] = None self.prev_osc99_cmd = NotificationCommand() self.actions_on_close: List[Callable[['Window'], None]] = [] self.actions_on_focus_change: List[Callable[['Window', bool], None]] = [] self.actions_on_removal: List[Callable[['Window'], None]] = [] self.current_marker_spec: Optional[Tuple[str, Union[str, Tuple[Tuple[int, str], ...]]]] = None self.kitten_result_processors: List[Callable[['Window', Any], None]] = [] self.pty_resized_once = False self.last_reported_pty_size = (-1, -1, -1, -1) self.needs_attention = False self.override_title = override_title self.default_title = os.path.basename(child.argv[0] or appname) self.child_title = self.default_title self.title_stack: Deque[str] = deque(maxlen=10) self.allow_remote_control = child.allow_remote_control self.id: int = add_window(tab.os_window_id, tab.id, self.title) self.margin = EdgeWidths() self.padding = EdgeWidths() self.kitten_result: Optional[Dict[str, Any]] = None if not self.id: raise Exception(f'No tab with id: {tab.id} in OS Window: {tab.os_window_id} was found, or the window counter wrapped') self.tab_id = tab.id self.os_window_id = tab.os_window_id self.tabref: Callable[[], Optional[TabType]] = weakref.ref(tab) self.clipboard_pending: Optional[ClipboardPending] = None self.destroyed = False self.geometry: WindowGeometry = WindowGeometry(0, 0, 0, 0, 0, 0) self.needs_layout = True self.is_visible_in_layout: bool = True self.child = child cell_width, cell_height = cell_size_for_window(self.os_window_id) opts = get_options() self.screen: Screen = Screen(self, 24, 80, opts.scrollback_lines, cell_width, cell_height, self.id) if copy_colors_from is not None: self.screen.copy_colors_from(copy_colors_from.screen) else: setup_colors(self.screen, opts) @property def file_transmission_control(self) -> 'FileTransmission': ans: Optional['FileTransmission'] = getattr(self, '_file_transmission', None) if ans is None: from .file_transmission import FileTransmission ans = self._file_transmission = FileTransmission(self.id) return ans def on_dpi_change(self, font_sz: float) -> None: self.update_effective_padding() def change_tab(self, tab: TabType) -> None: self.tab_id = tab.id self.os_window_id = tab.os_window_id self.tabref = weakref.ref(tab) def effective_margin(self, edge: EdgeLiteral, is_single_window: bool = False) -> int: q = getattr(self.margin, edge) if q is not None: return pt_to_px(q, self.os_window_id) opts = get_options() if is_single_window: q = getattr(opts.single_window_margin_width, edge) if q > -0.1: return pt_to_px(q, self.os_window_id) q = getattr(opts.window_margin_width, edge) return pt_to_px(q, self.os_window_id) def effective_padding(self, edge: EdgeLiteral) -> int: q = getattr(self.padding, edge) if q is not None: return pt_to_px(q, self.os_window_id) q = getattr(get_options().window_padding_width, edge) return pt_to_px(q, self.os_window_id) def update_effective_padding(self) -> None: set_window_padding( self.os_window_id, self.tab_id, self.id, self.effective_padding('left'), self.effective_padding('top'), self.effective_padding('right'), self.effective_padding('bottom')) def patch_edge_width(self, which: str, edge: EdgeLiteral, val: Optional[float]) -> None: q = self.padding if which == 'padding' else self.margin setattr(q, edge, val) if q is self.padding: self.update_effective_padding() def effective_border(self) -> int: val, unit = get_options().window_border_width if unit == 'pt': val = max(1 if val > 0 else 0, pt_to_px(val, self.os_window_id)) else: val = round(val) return int(val) def apply_options(self) -> None: opts = get_options() self.update_effective_padding() self.change_titlebar_color() setup_colors(self.screen, opts) @property def title(self) -> str: return self.override_title or self.child_title def __repr__(self) -> str: return f'Window(title={self.title}, id={self.id})' def as_dict(self, is_focused: bool = False, is_self: bool = False) -> WindowDict: return dict( id=self.id, is_focused=is_focused, title=self.title, pid=self.child.pid, cwd=self.child.current_cwd or self.child.cwd, cmdline=self.child.cmdline, env=self.child.environ, foreground_processes=self.child.foreground_processes, is_self=is_self, lines=self.screen.lines, columns=self.screen.columns, ) def serialize_state(self) -> Dict[str, Any]: return { 'version': 1, 'id': self.id, 'child_title': self.child_title, 'override_title': self.override_title, 'default_title': self.default_title, 'title_stack': list(self.title_stack), 'allow_remote_control': self.allow_remote_control, 'cwd': self.child.current_cwd or self.child.cwd, 'env': self.child.environ, 'cmdline': self.child.cmdline, 'margin': self.margin.serialize(), 'padding': self.padding.serialize(), } @property def current_colors(self) -> Dict[str, Optional[int]]: return self.screen.color_profile.as_dict() @property def at_prompt(self) -> bool: return self.screen.cursor_at_prompt() @property def has_running_program(self) -> bool: return not self.at_prompt def matches(self, field: str, pat: MatchPatternType) -> bool: if not pat: return False if field == 'env': assert isinstance(pat, tuple) key_pat, val_pat = pat for key, val in self.child.environ.items(): if key_pat.search(key) is not None and ( val_pat is None or val_pat.search(val) is not None): return True return False assert not isinstance(pat, tuple) if field in ('id', 'window_id'): return pat.pattern == str(self.id) if field == 'pid': return pat.pattern == str(self.child.pid) if field == 'title': return pat.search(self.override_title or self.title) is not None if field in 'cwd': return pat.search(self.child.current_cwd or self.child.cwd) is not None if field == 'cmdline': for x in self.child.cmdline: if pat.search(x) is not None: return True return False return False def matches_query(self, field: str, query: str, active_tab: Optional[TabType] = None) -> bool: if field in ('num', 'recent'): if active_tab is not None: try: q = int(query) except Exception: return False with suppress(Exception): if field == 'num': return active_tab.get_nth_window(q) is self return active_tab.nth_active_window_id(q) == self.id return False if field == 'state': if query == 'active': return active_tab is not None and self is active_tab.active_window if query == 'focused': return active_tab is not None and self is active_tab.active_window and current_os_window() == self.os_window_id if query == 'needs_attention': return self.needs_attention if query == 'parent_active': return active_tab is not None and self.tabref() is active_tab if query == 'parent_focused': return active_tab is not None and self.tabref() is active_tab and current_os_window() == self.os_window_id return False pat = compile_match_query(query, field != 'env') return self.matches(field, pat) def set_visible_in_layout(self, val: bool) -> None: val = bool(val) if val is not self.is_visible_in_layout: self.is_visible_in_layout = val update_window_visibility(self.os_window_id, self.tab_id, self.id, val) if val: self.refresh() def refresh(self) -> None: self.screen.mark_as_dirty() wakeup_io_loop() wakeup_main_loop() def set_geometry(self, new_geometry: WindowGeometry) -> None: if self.destroyed: return if self.needs_layout or new_geometry.xnum != self.screen.columns or new_geometry.ynum != self.screen.lines: self.screen.resize(new_geometry.ynum, new_geometry.xnum) self.needs_layout = False call_watchers(weakref.ref(self), 'on_resize', {'old_geometry': self.geometry, 'new_geometry': new_geometry}) current_pty_size = ( self.screen.lines, self.screen.columns, max(0, new_geometry.right - new_geometry.left), max(0, new_geometry.bottom - new_geometry.top)) update_ime_position = False if current_pty_size != self.last_reported_pty_size: get_boss().child_monitor.resize_pty(self.id, *current_pty_size) if not self.pty_resized_once: self.pty_resized_once = True self.child.mark_terminal_ready() update_ime_position = True self.last_reported_pty_size = current_pty_size else: mark_os_window_dirty(self.os_window_id) self.geometry = g = new_geometry set_window_render_data(self.os_window_id, self.tab_id, self.id, self.screen, *g[:4]) self.update_effective_padding() if update_ime_position: update_ime_position_for_window(self.id, True) def contains(self, x: int, y: int) -> bool: g = self.geometry return g.left <= x <= g.right and g.top <= y <= g.bottom def close(self) -> None: get_boss().mark_window_for_close(self) @ac('misc', ''' Send the specified text to the active window See :sc:`send_text ` for details. ''') def send_text(self, *args: str) -> bool: mode = keyboard_mode_name(self.screen) required_mode_, text = args[-2:] required_mode = frozenset(required_mode_.split(',')) if not required_mode & {mode, 'all'}: return True if not text: return True self.write_to_child(text) return False @ac('debug', 'Show a dump of the current lines in the scrollback + screen with their line attributes') def dump_lines_with_attrs(self) -> None: strings: List[str] = [] self.screen.dump_lines_with_attrs(strings.append) text = ''.join(strings) get_boss().display_scrollback(self, text, title='Dump of lines', report_cursor=False) def write_to_child(self, data: Union[str, bytes]) -> None: if data: if isinstance(data, str): data = data.encode('utf-8') if get_boss().child_monitor.needs_write(self.id, data) is not True: log_error(f'Failed to write to child {self.id} as it does not exist') def title_updated(self) -> None: update_window_title(self.os_window_id, self.tab_id, self.id, self.title) t = self.tabref() if t is not None: t.title_changed(self) def set_title(self, title: Optional[str]) -> None: if title: title = sanitize_title(title) self.override_title = title or None self.title_updated() def desktop_notify(self, osc_code: int, raw_data: str) -> None: if osc_code == 777: if not raw_data.startswith('notify;'): log_error(f'Ignoring unknown OSC 777: {raw_data}') return # unknown OSC 777 raw_data = raw_data[len('notify;'):] cmd = handle_notification_cmd(osc_code, raw_data, self.id, self.prev_osc99_cmd) if cmd is not None and osc_code == 99: self.prev_osc99_cmd = cmd # screen callbacks {{{ def use_utf8(self, on: bool) -> None: get_boss().child_monitor.set_iutf8_winid(self.id, on) def on_mouse_event(self, event: Dict[str, Any]) -> bool: event['mods'] = event.get('mods', 0) & mod_mask ev = MouseEvent(**event) self.current_mouse_event_button = ev.button action = get_options().mousemap.get(ev) if action is None: return False return get_boss().combine(action, window_for_dispatch=self, dispatch_type='MouseEvent') def open_url(self, url: str, hyperlink_id: int, cwd: Optional[str] = None) -> None: opts = get_options() if hyperlink_id: if not opts.allow_hyperlinks: return from urllib.parse import unquote, urlparse, urlunparse try: purl = urlparse(url) except Exception: return if (not purl.scheme or purl.scheme == 'file'): if purl.netloc: from socket import gethostname try: hostname = gethostname() except Exception: hostname = '' remote_hostname = purl.netloc.partition(':')[0] if remote_hostname and remote_hostname != hostname and remote_hostname != 'localhost': self.handle_remote_file(purl.netloc, unquote(purl.path)) return url = urlunparse(purl._replace(netloc='')) if opts.allow_hyperlinks & 0b10: from kittens.tui.operations import styled get_boss().choose( 'What would you like to do with this URL:\n' + styled(unquote(url), fg='yellow'), partial(self.hyperlink_open_confirmed, url, cwd), 'o:Open', 'c:Copy to clipboard', 'n;red:Nothing', default='o', window=self, ) return get_boss().open_url(url, cwd=cwd) def hyperlink_open_confirmed(self, url: str, cwd: Optional[str], q: str) -> None: if q == 'o': get_boss().open_url(url, cwd=cwd) elif q == 'c': set_clipboard_string(url) def handle_remote_file(self, netloc: str, remote_path: str) -> None: from kittens.remote_file.main import is_ssh_kitten_sentinel from kittens.ssh.main import get_connection_data from .utils import SSHConnectionData args = self.ssh_kitten_cmdline() conn_data: Union[None, List[str], SSHConnectionData] = None if args: ssh_cmdline = sorted(self.child.foreground_processes, key=lambda p: p['pid'])[-1]['cmdline'] or [''] if 'ControlPath=' in ' '.join(ssh_cmdline): idx = ssh_cmdline.index('--') conn_data = [is_ssh_kitten_sentinel] + list(ssh_cmdline[:idx + 2]) if conn_data is None: args = self.child.foreground_cmdline conn_data = get_connection_data(args, self.child.foreground_cwd or self.child.current_cwd or '') if conn_data is None: get_boss().show_error('Could not handle remote file', f'No SSH connection data found in: {args}') return get_boss().run_kitten( 'remote_file', '--hostname', netloc.partition(':')[0], '--path', remote_path, '--ssh-connection-data', json.dumps(conn_data) ) def focus_changed(self, focused: bool) -> None: if self.destroyed: return call_watchers(weakref.ref(self), 'on_focus_change', {'focused': focused}) for c in self.actions_on_focus_change: try: c(self, focused) except Exception: import traceback traceback.print_exc() self.screen.focus_changed(focused) if focused: self.last_focused_at = monotonic() update_ime_position_for_window(self.id, False, 1) changed = self.needs_attention self.needs_attention = False if changed: tab = self.tabref() if tab is not None: tab.relayout_borders() elif self.os_window_id == current_os_window(): # Cancel IME composition after loses focus update_ime_position_for_window(self.id, False, -1) def title_changed(self, new_title: Optional[str], is_base64: bool = False) -> None: self.child_title = process_title_from_child(new_title or self.default_title, is_base64) if self.override_title is None: self.title_updated() def icon_changed(self, new_icon: object) -> None: pass # TODO: Implement this @property def is_active(self) -> bool: return get_boss().active_window is self @property def has_activity_since_last_focus(self) -> bool: return self.screen.has_activity_since_last_focus() def on_activity_since_last_focus(self) -> None: if get_options().tab_activity_symbol: get_boss().on_activity_since_last_focus(self) def on_bell(self) -> None: cb = get_options().command_on_bell if cb and cb != ['none']: import shlex import subprocess env = self.child.foreground_environ env['KITTY_CHILD_CMDLINE'] = ' '.join(map(shlex.quote, self.child.cmdline)) subprocess.Popen(cb, env=env, cwd=self.child.foreground_cwd, preexec_fn=clear_handled_signals) if not self.is_active: changed = not self.needs_attention self.needs_attention = True tab = self.tabref() if tab is not None: if changed: tab.relayout_borders() tab.on_bell(self) def change_titlebar_color(self) -> None: opts = get_options() val = opts.macos_titlebar_color if is_macos else opts.wayland_titlebar_color if val > 0: if (val & 0xff) == 1: val = self.screen.color_profile.default_bg else: val = val >> 8 set_titlebar_color(self.os_window_id, val) else: set_titlebar_color(self.os_window_id, 0, True, -val) def change_colors(self, changes: Dict[DynamicColor, Optional[str]]) -> None: dirtied = default_bg_changed = False def item(raw: Optional[str]) -> int: if raw is None: return 0 v = to_color(raw) if v is None: return 0 return 0xff000000 | int(v) for which, val_ in changes.items(): val = item(val_) dirtied = True setattr(self.screen.color_profile, which.name, val) if which.name == 'default_bg': default_bg_changed = True if dirtied: self.screen.mark_as_dirty() if default_bg_changed: get_boss().default_bg_changed_for(self.id) def color_profile_popped(self, bg_changed: bool) -> None: if bg_changed: get_boss().default_bg_changed_for(self.id) def report_color(self, code: str, r: int, g: int, b: int) -> None: r |= r << 8 g |= g << 8 b |= b << 8 self.screen.send_escape_code_to_child(OSC, f'{code};rgb:{r:04x}/{g:04x}/{b:04x}') def report_notification_activated(self, identifier: str) -> None: self.screen.send_escape_code_to_child(OSC, f'99;i={identifier};') def set_dynamic_color(self, code: int, value: Union[str, bytes]) -> None: if isinstance(value, bytes): value = value.decode('utf-8') color_changes: Dict[DynamicColor, Optional[str]] = {} for val in value.split(';'): w = DYNAMIC_COLOR_CODES.get(code) if w is not None: if val == '?': col = getattr(self.screen.color_profile, w.name) self.report_color(str(code), col >> 16, (col >> 8) & 0xff, col & 0xff) else: q = None if code >= 100 else val color_changes[w] = q code += 1 if color_changes: self.change_colors(color_changes) def set_color_table_color(self, code: int, value: str) -> None: cp = self.screen.color_profile if code == 4: changed = False for c, val in parse_color_set(value): if val is None: # color query qc = self.screen.color_profile.as_color((c << 8) | 1) assert qc is not None self.report_color(f'4;{c}', qc.red, qc.green, qc.blue) else: changed = True cp.set_color(c, val) if changed: self.refresh() elif code == 104: if not value.strip(): cp.reset_color_table() else: for x in value.split(';'): try: y = int(x) except Exception: continue if 0 <= y <= 255: cp.reset_color(y) self.refresh() def request_capabilities(self, q: str) -> None: for result in get_capabilities(q, get_options()): self.screen.send_escape_code_to_child(DCS, result) def handle_remote_cmd(self, cmd: str) -> None: get_boss().handle_remote_cmd(cmd, self) def handle_remote_echo(self, msg: str) -> None: from base64 import standard_b64decode data = standard_b64decode(msg) self.write_to_child(data) def handle_remote_ssh(self, msg: str) -> None: from kittens.ssh.main import get_ssh_data for line in get_ssh_data(msg, f'{os.getpid()}-{self.id}'): self.write_to_child(line) def handle_kitten_result(self, msg: str) -> None: import base64 self.kitten_result = json.loads(base64.b85decode(msg)) for processor in self.kitten_result_processors: try: processor(self, self.kitten_result) except Exception: import traceback traceback.print_exc() def add_kitten_result_processor(self, callback: Callable[['Window', Any], None]) -> None: self.kitten_result_processors.append(callback) def handle_overlay_ready(self, msg: str) -> None: boss = get_boss() tab = boss.tab_for_window(self) if tab is not None: tab.move_window_to_top_of_group(self) def append_remote_data(self, msg: str) -> str: if not msg: cdata = ''.join(self.current_remote_data) self.current_remote_data = [] return cdata num, rest = msg.split(':', 1) max_size = get_options().clipboard_max_size * 1024 * 1024 if num == '0' or sum(map(len, self.current_remote_data)) > max_size: self.current_remote_data = [] self.current_remote_data.append(rest) return '' def handle_remote_edit(self, msg: str) -> None: cdata = self.append_remote_data(msg) if cdata: from .launch import remote_edit remote_edit(cdata, self) def handle_remote_clone(self, msg: str) -> None: cdata = self.append_remote_data(msg) if cdata: ac = get_options().allow_cloning if ac == 'ask': get_boss().confirm(_( 'A program running in this window wants to clone it into another window.' ' Allow it do so, once?'), partial(self.handle_remote_clone_confirmation, cdata), window=self, ) elif ac in ('yes', 'y', 'true'): self.handle_remote_clone_confirmation(cdata, True) def handle_remote_clone_confirmation(self, cdata: str, confirmed: bool) -> None: if confirmed: from .launch import clone_and_launch clone_and_launch(cdata, self) def handle_remote_askpass(self, msg: str) -> None: from .shm import SharedMemory with SharedMemory(name=msg, readonly=True) as shm: shm.seek(1) data = json.loads(shm.read_data_with_size()) def callback(ans: Any) -> None: data = json.dumps(ans) with SharedMemory(name=msg) as shm: shm.seek(1) shm.write_data_with_size(data) shm.flush() shm.seek(0) shm.write(b'\x01') message: str = data['message'] if data['type'] == 'confirm': get_boss().confirm( message, callback, window=self, confirm_on_cancel=bool(data.get('confirm_on_cancel')), confirm_on_accept=bool(data.get('confirm_on_accept', True))) elif data['type'] == 'choose': get_boss().choose( message, callback, *data['choices'], window=self, default=data.get('default', '')) elif data['type'] == 'get_line': get_boss().get_line( message, callback, window=self, is_password=bool(data.get('is_password')), prompt=data.get('prompt', '> ')) else: log_error(f'Ignoring ask request with unknown type: {data["type"]}') def handle_remote_print(self, msg: str) -> None: text = process_remote_print(msg) print(text, end='', file=sys.stderr) sys.stderr.flush() def send_cmd_response(self, response: Any) -> None: self.screen.send_escape_code_to_child(DCS, '@kitty-cmd' + json.dumps(response)) def file_transmission(self, data: str) -> None: self.file_transmission_control.handle_serialized_command(data) def clipboard_control(self, data: str, is_partial: bool = False) -> None: where, text = data.partition(';')[::2] if is_partial: if self.clipboard_pending is None: self.clipboard_pending = ClipboardPending(where, text) else: self.clipboard_pending = self.clipboard_pending._replace(data=self.clipboard_pending[1] + text) limit = get_options().clipboard_max_size if limit and len(self.clipboard_pending.data) > limit * 1024 * 1024: log_error('Discarding part of too large OSC 52 paste request') self.clipboard_pending = self.clipboard_pending._replace(data='', truncated=True) return if not where: if self.clipboard_pending is not None: text = self.clipboard_pending.data + text where = self.clipboard_pending.where try: if self.clipboard_pending.truncated: return finally: self.clipboard_pending = None else: where = 's0' cc = get_options().clipboard_control if text == '?': response = None if 's' in where or 'c' in where: if 'read-clipboard-ask' in cc: return self.ask_to_read_clipboard(False) response = get_clipboard_string() if 'read-clipboard' in cc else '' loc = 'c' elif 'p' in where: if 'read-primary-ask' in cc: return self.ask_to_read_clipboard(True) response = get_primary_selection() if 'read-primary' in cc else '' loc = 'p' response = response or '' self.send_osc52(loc, response or '') else: from base64 import standard_b64decode try: text = standard_b64decode(text).decode('utf-8') except Exception: text = '' if 's' in where or 'c' in where: if 'write-clipboard' in cc: set_clipboard_string(text) if 'p' in where: if 'write-primary' in cc: set_primary_selection(text) self.clipboard_pending = None def send_osc52(self, loc: str, response: str) -> None: from base64 import standard_b64encode self.screen.send_escape_code_to_child(OSC, '52;{};{}'.format( loc, standard_b64encode(response.encode('utf-8')).decode('ascii'))) def ask_to_read_clipboard(self, primary: bool = False) -> None: if self.current_clipboard_read_ask is not None: self.current_clipboard_read_ask = primary return self.current_clipboard_read_ask = primary get_boss().confirm(_( 'A program running in this window wants to read from the system clipboard.' ' Allow it do so, once?'), self.handle_clipboard_confirmation, window=self, ) def handle_clipboard_confirmation(self, confirmed: bool) -> None: try: loc = 'p' if self.current_clipboard_read_ask else 'c' response = '' if confirmed: response = get_primary_selection() if self.current_clipboard_read_ask else get_clipboard_string() self.send_osc52(loc, response) finally: self.current_clipboard_read_ask = None def manipulate_title_stack(self, pop: bool, title: str, icon: Any) -> None: if title: if pop: if self.title_stack: self.child_title = self.title_stack.pop() self.title_updated() else: if self.child_title: self.title_stack.append(self.child_title) # }}} # mouse actions {{{ @ac('mouse', ''' Handle a mouse click Try to perform the specified actions one after the other till one of them is successful. Supported actions are:: selection - check for a selection and if one exists abort processing link - if a link exists under the mouse, click it prompt - if the mouse click happens at a shell prompt move the cursor to the mouse location For examples, see :ref:`conf-kitty-mouse.mousemap` ''') def mouse_handle_click(self, *actions: str) -> None: for a in actions: if a == 'selection': if self.screen.has_selection(): break if a == 'link': if click_mouse_url(self.os_window_id, self.tab_id, self.id): break if a == 'prompt': # Do not send move cursor events too soon after the window is # focused, this is because there are people that click on # windows and start typing immediately and the cursor event # can interfere with that. See https://github.com/kovidgoyal/kitty/issues/4128 if monotonic() - self.last_focused_at < 1.5 * get_click_interval(): return if move_cursor_to_mouse_if_in_prompt(self.os_window_id, self.tab_id, self.id): self.screen.ignore_bells_for(1) break @ac('mouse', 'Click the URL under the mouse') def mouse_click_url(self) -> None: self.mouse_handle_click('link') @ac('mouse', 'Click the URL under the mouse only if the screen has no selection') def mouse_click_url_or_select(self) -> None: self.mouse_handle_click('selection', 'link') @ac('mouse', ''' Manipulate the selection based on the current mouse position For examples, see :ref:`conf-kitty-mouse.mousemap` ''') def mouse_selection(self, code: int) -> None: mouse_selection(self.os_window_id, self.tab_id, self.id, code, self.current_mouse_event_button) @ac('mouse', 'Paste the current primary selection') def paste_selection(self) -> None: txt = get_boss().current_primary_selection() if txt: self.paste_with_actions(txt) @ac('mouse', 'Paste the current primary selection or the clipboard if no selection is present') def paste_selection_or_clipboard(self) -> None: txt = get_boss().current_primary_selection_or_clipboard() if txt: self.paste_with_actions(txt) @ac('mouse', ''' Select clicked command output Requires :ref:`shell_integration` to work ''') def mouse_select_command_output(self) -> None: click_mouse_cmd_output(self.os_window_id, self.tab_id, self.id, True) @ac('mouse', ''' Show clicked command output in a pager like less Requires :ref:`shell_integration` to work ''') def mouse_show_command_output(self) -> None: if click_mouse_cmd_output(self.os_window_id, self.tab_id, self.id, False): self.show_cmd_output(CommandOutput.last_visited, 'Clicked command output') # }}} def text_for_selection(self, as_ansi: bool = False) -> str: sts = get_options().strip_trailing_spaces strip_trailing_spaces = sts == 'always' or (sts == 'smart' and not self.screen.is_rectangle_select()) lines = self.screen.text_for_selection(as_ansi, strip_trailing_spaces) return ''.join(lines) def call_watchers(self, which: Iterable[Watcher], data: Dict[str, Any]) -> None: boss = get_boss() for w in which: try: w(boss, self, data) except Exception: import traceback traceback.print_exc() def destroy(self) -> None: self.call_watchers(self.watchers.on_close, {}) self.destroyed = True del self.kitten_result_processors if hasattr(self, 'screen'): if self.is_active and self.os_window_id == current_os_window(): # Cancel IME composition when window is destroyed update_ime_position_for_window(self.id, False, -1) # Remove cycles so that screen is de-allocated immediately self.screen.reset_callbacks() del self.screen def as_text( self, as_ansi: bool = False, add_history: bool = False, add_wrap_markers: bool = False, alternate_screen: bool = False, add_cursor: bool = False ) -> str: return as_text(self.screen, as_ansi, add_history, add_wrap_markers, alternate_screen, add_cursor) def cmd_output(self, which: CommandOutput = CommandOutput.last_run, as_ansi: bool = False, add_wrap_markers: bool = False) -> str: return cmd_output(self.screen, which, as_ansi, add_wrap_markers) def get_cwd_of_child(self, oldest: bool = False) -> Optional[str]: return self.child.get_foreground_cwd(oldest) or self.child.current_cwd @property def cwd_of_child(self) -> Optional[str]: return self.get_cwd_of_child() @property def child_is_remote(self) -> bool: for p in self.child.foreground_processes: q = list(p['cmdline'] or ()) if q and q[0].lower() == 'ssh': return True return False def ssh_kitten_cmdline(self) -> List[str]: from kittens.ssh.utils import is_kitten_cmdline for p in self.child.foreground_processes: q = list(p['cmdline'] or ()) if is_kitten_cmdline(q): return q return [] def pipe_data(self, text: str, has_wrap_markers: bool = False) -> PipeData: text = text or '' if has_wrap_markers: text = text.replace('\r\n', '\n').replace('\r', '\n') lines = text.count('\n') input_line_number = (lines - (self.screen.lines - 1) - self.screen.scrolled_by) return { 'input_line_number': input_line_number, 'scrolled_by': self.screen.scrolled_by, 'cursor_x': self.screen.cursor.x + 1, 'cursor_y': self.screen.cursor.y + 1, 'lines': self.screen.lines, 'columns': self.screen.columns, 'text': text } def set_logo(self, path: str, position: str = '', alpha: float = -1) -> None: path = resolve_custom_file(path) if path else '' set_window_logo(self.os_window_id, self.tab_id, self.id, path, position or '', alpha) def paste_with_actions(self, text: str) -> None: if self.destroyed or not text: return opts = get_options() if 'filter' in opts.paste_actions: text = load_paste_filter()(text) if not text: return if 'quote-urls-at-prompt' in opts.paste_actions and self.at_prompt: prefixes = '|'.join(opts.url_prefixes) m = re.match(f'({prefixes}):(.+)', text) if m is not None: scheme, rest = m.group(1), m.group(2) if rest.startswith('//') or scheme in ('mailto', 'irc'): import shlex text = shlex.quote(text) btext = text.encode('utf-8') if 'confirm' in opts.paste_actions: msg = '' limit = 16 * 1024 if not self.screen.in_bracketed_paste_mode: msg = _('Pasting text into shells that do not support bracketed paste can be dangerous.') elif len(btext) > limit: msg = _('Pasting very large amounts of text ({} bytes) can be slow.').format(len(btext)) if msg: get_boss().confirm(msg + _(' Are you sure?'), partial(self.handle_paste_confirmation, btext), window=self) return self.paste_text(btext) def handle_paste_confirmation(self, btext: bytes, confirmed: bool) -> None: if confirmed: self.paste_text(btext) def paste_bytes(self, text: Union[str, bytes]) -> None: # paste raw bytes without any processing if isinstance(text, str): text = text.encode('utf-8') self.screen.paste_bytes(text) def paste_text(self, text: Union[str, bytes]) -> None: if text and not self.destroyed: if isinstance(text, str): text = text.encode('utf-8') if self.screen.in_bracketed_paste_mode: while True: new_text = text.replace(b'\033[201~', b'').replace(b'\x9b201~', b'') if len(text) == len(new_text): break text = new_text else: # Workaround for broken editors like nano that cannot handle # newlines in pasted text see https://github.com/kovidgoyal/kitty/issues/994 text = text.replace(b'\r\n', b'\n').replace(b'\n', b'\r') self.screen.paste(text) def clear_screen(self, reset: bool = False, scrollback: bool = False) -> None: self.screen.cursor.x = self.screen.cursor.y = 0 if reset: self.screen.reset() else: self.screen.erase_in_display(3 if scrollback else 2, False) # actions {{{ @ac('cp', 'Show scrollback in a pager like less') def show_scrollback(self) -> None: text = self.as_text(as_ansi=True, add_history=True, add_wrap_markers=True) data = self.pipe_data(text, has_wrap_markers=True) cursor_on_screen = self.screen.scrolled_by < self.screen.lines - self.screen.cursor.y get_boss().display_scrollback(self, data['text'], data['input_line_number'], report_cursor=cursor_on_screen) def show_cmd_output(self, which: CommandOutput, title: str = 'Command output', as_ansi: bool = True, add_wrap_markers: bool = True) -> None: text = self.cmd_output(which, as_ansi=as_ansi, add_wrap_markers=add_wrap_markers) text = text.replace('\r\n', '\n').replace('\r', '\n') get_boss().display_scrollback(self, text, title=title, report_cursor=False) @ac('cp', ''' Show output from the first shell command on screen in a pager like less Requires :ref:`shell_integration` to work ''') def show_first_command_output_on_screen(self) -> None: self.show_cmd_output(CommandOutput.first_on_screen, 'First command output on screen') @ac('cp', ''' Show output from the last shell command in a pager like less Requires :ref:`shell_integration` to work ''') def show_last_command_output(self) -> None: self.show_cmd_output(CommandOutput.last_run, 'Last command output') @ac('cp', ''' Show the first command output below the last scrolled position via scroll_to_prompt or the last mouse clicked command output in a pager like less Requires :ref:`shell_integration` to work ''') def show_last_visited_command_output(self) -> None: self.show_cmd_output(CommandOutput.last_visited, 'Last visited command output') @ac('cp', ''' Show the last non-empty output from a shell command in a pager like less Requires :ref:`shell_integration` to work ''') def show_last_non_empty_command_output(self) -> None: self.show_cmd_output(CommandOutput.last_non_empty, 'Last non-empty command output') @ac('cp', 'Paste the specified text into the current window') def paste(self, text: str) -> None: self.paste_with_actions(text) @ac('cp', 'Copy the selected text from the active window to the clipboard') def copy_to_clipboard(self) -> None: text = self.text_for_selection() if text: set_clipboard_string(text) @ac('cp', 'Copy the selected text from the active window to the clipboard with ANSI formatting codes') def copy_ansi_to_clipboard(self) -> None: text = self.text_for_selection(as_ansi=True) if text: set_clipboard_string(text) def encoded_key(self, key_event: KeyEvent) -> bytes: return encode_key_for_tty( key=key_event.key, shifted_key=key_event.shifted_key, alternate_key=key_event.alternate_key, mods=key_event.mods, action=key_event.action, text=key_event.text, key_encoding_flags=self.screen.current_key_encoding_flags(), cursor_key_mode=self.screen.cursor_key_mode, ).encode('ascii') @ac('cp', 'Copy the selected text from the active window to the clipboard, if no selection, send SIGINT (aka :kbd:`ctrl+c`)') def copy_or_interrupt(self) -> None: text = self.text_for_selection() if text: set_clipboard_string(text) else: self.scroll_end() self.write_to_child(self.encoded_key(KeyEvent(key=ord('c'), mods=GLFW_MOD_CONTROL))) @ac('cp', 'Copy the selected text from the active window to the clipboard and clear selection, if no selection, send SIGINT (aka :kbd:`ctrl+c`)') def copy_and_clear_or_interrupt(self) -> None: self.copy_or_interrupt() self.screen.clear_selection() @ac('cp', 'Pass the selected text from the active window to the specified program') def pass_selection_to_program(self, *args: str) -> None: cwd = self.cwd_of_child text = self.text_for_selection() if text: if args: open_cmd(args, text, cwd=cwd) else: open_url(text, cwd=cwd) @ac('cp', 'Clear the current selection') def clear_selection(self) -> None: self.screen.clear_selection() @ac('sc', 'Scroll up by one line') def scroll_line_up(self) -> None: if self.screen.is_main_linebuf(): self.screen.scroll(SCROLL_LINE, True) @ac('sc', 'Scroll down by one line') def scroll_line_down(self) -> None: if self.screen.is_main_linebuf(): self.screen.scroll(SCROLL_LINE, False) @ac('sc', 'Scroll up by one page') def scroll_page_up(self) -> None: if self.screen.is_main_linebuf(): self.screen.scroll(SCROLL_PAGE, True) @ac('sc', 'Scroll down by one page') def scroll_page_down(self) -> None: if self.screen.is_main_linebuf(): self.screen.scroll(SCROLL_PAGE, False) @ac('sc', 'Scroll to the top of the scrollback buffer') def scroll_home(self) -> None: if self.screen.is_main_linebuf(): self.screen.scroll(SCROLL_FULL, True) @ac('sc', 'Scroll to the bottom of the scrollback buffer') def scroll_end(self) -> None: if self.screen.is_main_linebuf(): self.screen.scroll(SCROLL_FULL, False) @ac('sc', ''' Scroll to the previous/next shell command prompt Allows easy jumping from one command to the next. Requires working :ref:`shell_integration`. Takes a single, optional, number as argument which is the number of prompts to jump, negative values jump up and positive values jump down. A value of zero will jump to the last prompt visited by this action. For example:: map ctrl+p scroll_to_prompt -1 # jump to previous map ctrl+n scroll_to_prompt 1 # jump to next map ctrl+o scroll_to_prompt 0 # jump to last visited ''') def scroll_to_prompt(self, num_of_prompts: int = -1) -> None: if self.screen.is_main_linebuf(): self.screen.scroll_to_prompt(num_of_prompts) @ac('sc', 'Scroll prompt to the top of the screen, filling screen with empty lines') def scroll_prompt_to_top(self, clear_scrollback: bool = False) -> None: if self.screen.is_main_linebuf(): self.screen.scroll_until_cursor_prompt() if clear_scrollback: self.screen.clear_scrollback() elif self.screen.scrolled_by > 0: self.screen.scroll(SCROLL_FULL, False) @ac('sc', 'Scroll prompt to the bottom of the screen, filling in extra lines from the scrollback buffer') def scroll_prompt_to_bottom(self) -> None: self.screen.scroll_prompt_to_bottom() @ac('mk', 'Toggle the current marker on/off') def toggle_marker(self, ftype: str, spec: Union[str, Tuple[Tuple[int, str], ...]], flags: int) -> None: from .marks import marker_from_spec key = ftype, spec if key == self.current_marker_spec: self.remove_marker() return self.screen.set_marker(marker_from_spec(ftype, spec, flags)) self.current_marker_spec = key def set_marker(self, spec: Union[str, Sequence[str]]) -> None: from .marks import marker_from_spec from .options.utils import parse_marker_spec, toggle_marker if isinstance(spec, str): func, (ftype, spec_, flags) = toggle_marker('toggle_marker', spec) else: ftype, spec_, flags = parse_marker_spec(spec[0], spec[1:]) key = ftype, spec_ self.screen.set_marker(marker_from_spec(ftype, spec_, flags)) self.current_marker_spec = key @ac('mk', 'Remove a previously created marker') def remove_marker(self) -> None: if self.current_marker_spec is not None: self.screen.set_marker() self.current_marker_spec = None @ac('mk', 'Scroll to the next or previous mark of the specified type') def scroll_to_mark(self, prev: bool = True, mark: int = 0) -> None: self.screen.scroll_to_next_mark(mark, prev) @ac('misc', ''' Send the specified SIGNAL to the foreground process in the active window For example:: map F1 signal_child SIGTERM ''') def signal_child(self, *signals: int) -> None: pid = self.child.pid_for_cwd if pid is not None: for sig in signals: os.kill(pid, sig) # }}}