broadcast kitten: Show a "fake" cursor in all windows being broadcast too

Fixes #4225
This commit is contained in:
Kovid Goyal 2021-11-12 22:59:20 +05:30
parent 74a5d3a25e
commit 0830e66e76
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
4 changed files with 98 additions and 12 deletions

View File

@ -53,6 +53,9 @@ To update |kitty|, :doc:`follow the instructions <binary>`.
- A new option :opt:`bell_path` to specify the path to a sound file
to use as the bell sound
- broadcast kitten: Show a "fake" cursor in all windows being broadcast too
(:iss:`4225`)
- Fix a regression that caused :option:`kitty --title` to not work when
opening new OS windows using :option:`kitty --single-instance` (:iss:`3893`)

View File

@ -11,6 +11,7 @@ from kitty.cli_stub import BroadcastCLIOptions
from kitty.key_encoding import encode_key_event
from kitty.rc.base import MATCH_TAB_OPTION, MATCH_WINDOW_OPTION
from kitty.remote_control import create_basic_command, encode_send
from kitty.short_uuid import uuid4
from kitty.typing import KeyEventType, ScreenSize
from ..tui.handler import Handler
@ -19,17 +20,26 @@ from ..tui.loop import Loop
from ..tui.operations import RESTORE_CURSOR, SAVE_CURSOR, styled
def session_command(payload: Dict[str, Any], start: bool = True) -> bytes:
payload = payload.copy()
payload['data'] = 'session:' + ('start' if start else 'end')
send = create_basic_command('send-text', payload, no_response=True)
return encode_send(send)
class Broadcast(Handler):
def __init__(self, opts: BroadcastCLIOptions, initial_strings: List[str]) -> None:
self.opts = opts
self.initial_strings = initial_strings
self.payload = {'exclude_active': True, 'data': '', 'match': opts.match, 'match_tab': opts.match_tab}
self.payload = {'exclude_active': True, 'data': '', 'match': opts.match, 'match_tab': opts.match_tab, 'session_id': uuid4()}
self.line_edit = LineEdit()
self.session_started = False
if not opts.match and not opts.match_tab:
self.payload['all'] = True
def initialize(self) -> None:
self.write_broadcast_session()
self.print('Type the text to broadcast below, press', styled('Ctrl+Esc', fg='yellow'), 'to quit:')
for x in self.initial_strings:
self.write_broadcast_text(x)
@ -83,6 +93,10 @@ class Broadcast(Handler):
send = create_basic_command('send-text', payload, no_response=True)
self.write(encode_send(send))
def write_broadcast_session(self, start: bool = True) -> None:
self.session_started = start
self.write(session_command(self.payload, start))
OPTIONS = (MATCH_WINDOW_OPTION + '\n\n' + MATCH_TAB_OPTION.replace('--match -m', '--match-tab -t')).format
help_text = 'Broadcast typed text to all kitty windows. By default text is sent to all windows, unless one of the matching options is specified'
@ -105,7 +119,12 @@ def main(args: List[str]) -> Optional[Dict[str, Any]]:
sys.stdout.flush()
loop = Loop()
handler = Broadcast(opts, items)
try:
loop.loop(handler)
finally:
if handler.session_started:
sys.stdout.buffer.write(session_command(handler.payload, False))
sys.stdout.buffer.flush()
return None

View File

@ -430,6 +430,10 @@ def add_timer(
pass
def remove_timer(timer_id: int) -> None:
pass
def monitor_pid(pid: int) -> None:
pass

View File

@ -3,9 +3,12 @@
import base64
import sys
from typing import TYPE_CHECKING, List, Optional, Union
from functools import partial
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Union
from kitty.fast_data_types import KeyEvent as WindowSystemKeyEvent
from kitty.fast_data_types import (
KeyEvent as WindowSystemKeyEvent, add_timer, get_boss, remove_timer
)
from kitty.key_encoding import decode_key_event_as_window_system_key
from kitty.options.utils import parse_send_text_bytes
@ -19,6 +22,30 @@ if TYPE_CHECKING:
from kitty.cli_stub import SendTextRCOptions as CLIOptions
class Session:
id: str
window_ids: Set[int]
timer_id: int = 0
def __init__(self, id: str):
self.id = id
self.window_ids = set()
sessions_map: Dict[str, Session] = {}
TIMEOUT_FOR_SESSION = 3.0
def clear_unfocused_cursors(sid: str, *a: Any) -> None:
s = sessions_map.pop(sid, None)
if s is not None:
boss = get_boss()
for wid in s.window_ids:
qw = boss.window_id_map.get(wid)
if qw is not None:
qw.screen.render_unfocused_cursor = 0
class SendText(RemoteCommand):
'''
data+: The data being sent. Can be either: text: followed by text or base64: followed by standard base64 encoded bytes
@ -26,6 +53,7 @@ class SendText(RemoteCommand):
match_tab: A string indicating the tab to send text to
all: A boolean indicating all windows should be matched.
exclude_active: A boolean that prevents sending text to the active window
session_id: A string that identifies a "broadcast session"
'''
short_desc = 'Send arbitrary text to specified windows'
desc = (
@ -138,6 +166,7 @@ Do not send text to the active window, even if it is one of the matched windows.
windows += tuple(tab)
pdata: str = payload_get('data')
encoding, _, q = pdata.partition(':')
session = ''
if encoding == 'text':
data: Union[bytes, WindowSystemKeyEvent] = q.encode('utf-8')
elif encoding == 'base64':
@ -148,18 +177,49 @@ Do not send text to the active window, even if it is one of the matched windows.
if candidate is None:
raise ValueError(f'Could not decode window system key: {q}')
data = candidate
elif encoding == 'session':
session = q
else:
raise TypeError(f'Invalid encoding for send-text data: {encoding}')
exclude_active = payload_get('exclude_active')
for window in windows:
actual_windows = (w for w in windows if w is not None and (not exclude_active or w is not boss.active_window))
sid = payload_get('session_id', '')
def create_or_update_session() -> Session:
s = sessions_map.setdefault(sid, Session(sid))
if s.timer_id:
remove_timer(s.timer_id)
s.timer_id = 0
return s
if session == 'end':
s = create_or_update_session()
for w in actual_windows:
w.screen.render_unfocused_cursor = 0
s.window_ids.discard(w.id)
clear_unfocused_cursors(sid)
elif session == 'start':
s = create_or_update_session()
if window is not None:
if not exclude_active or window is not boss.active_window:
if isinstance(data, WindowSystemKeyEvent):
kdata = window.encoded_key(data)
if kdata:
window.write_to_child(kdata)
window.actions_on_close.append(partial(clear_unfocused_cursors, sid))
s.timer_id = add_timer(partial(clear_unfocused_cursors, sid), TIMEOUT_FOR_SESSION, False)
for w in actual_windows:
w.screen.render_unfocused_cursor = 1
s.window_ids.add(w.id)
else:
window.write_to_child(data)
if sid:
s = create_or_update_session()
s.timer_id = add_timer(partial(clear_unfocused_cursors, sid), TIMEOUT_FOR_SESSION, False)
for w in actual_windows:
if sid:
w.screen.render_unfocused_cursor = 1
s.window_ids.add(w.id)
if isinstance(data, WindowSystemKeyEvent):
kdata = w.encoded_key(data)
if kdata:
w.write_to_child(kdata)
else:
w.write_to_child(data)
return None