163 lines
5.6 KiB
Python
163 lines
5.6 KiB
Python
#!/usr/bin/env python
|
|
# License: GPLv3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
|
|
|
|
import sys
|
|
from base64 import standard_b64encode
|
|
from gettext import gettext as _
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from kitty.cli import parse_args
|
|
from kitty.cli_stub import BroadcastCLIOptions
|
|
from kitty.key_encoding import encode_key_event
|
|
from kitty.rc.base import MATCH_TAB_OPTION, MATCH_WINDOW_OPTION
|
|
from kitty.remote_control import create_basic_command, encode_send
|
|
from kitty.short_uuid import uuid4
|
|
from kitty.typing import KeyEventType, ScreenSize
|
|
|
|
from ..tui.handler import Handler
|
|
from ..tui.line_edit import LineEdit
|
|
from ..tui.loop import Loop
|
|
from ..tui.operations import RESTORE_CURSOR, SAVE_CURSOR, styled
|
|
|
|
|
|
def session_command(payload: Dict[str, Any], start: bool = True) -> bytes:
|
|
payload = payload.copy()
|
|
payload['data'] = 'session:' + ('start' if start else 'end')
|
|
send = create_basic_command('send-text', payload, no_response=True)
|
|
return encode_send(send)
|
|
|
|
|
|
class Broadcast(Handler):
|
|
|
|
def __init__(self, opts: BroadcastCLIOptions, initial_strings: List[str]) -> None:
|
|
self.opts = opts
|
|
self.hide_input = False
|
|
self.initial_strings = initial_strings
|
|
self.payload = {'exclude_active': True, 'data': '', 'match': opts.match, 'match_tab': opts.match_tab, 'session_id': uuid4()}
|
|
self.line_edit = LineEdit()
|
|
self.session_started = False
|
|
if not opts.match and not opts.match_tab:
|
|
self.payload['all'] = True
|
|
|
|
def initialize(self) -> None:
|
|
self.write_broadcast_session()
|
|
self.print('Type the text to broadcast below, press', styled(self.opts.end_session, fg='yellow'), 'to quit:')
|
|
for x in self.initial_strings:
|
|
self.write_broadcast_text(x)
|
|
self.write(SAVE_CURSOR)
|
|
|
|
def commit_line(self) -> None:
|
|
self.write(RESTORE_CURSOR + SAVE_CURSOR)
|
|
self.cmd.clear_to_end_of_screen()
|
|
self.line_edit.write(self.write, screen_cols=self.screen_size.cols)
|
|
|
|
def on_resize(self, screen_size: ScreenSize) -> None:
|
|
super().on_resize(screen_size)
|
|
self.commit_line()
|
|
|
|
def on_text(self, text: str, in_bracketed_paste: bool = False) -> None:
|
|
self.write_broadcast_text(text)
|
|
if not self.hide_input:
|
|
self.line_edit.on_text(text, in_bracketed_paste)
|
|
self.commit_line()
|
|
|
|
def on_interrupt(self) -> None:
|
|
self.write_broadcast_text('\x03')
|
|
self.line_edit.clear()
|
|
self.commit_line()
|
|
|
|
def on_eot(self) -> None:
|
|
self.write_broadcast_text('\x04')
|
|
|
|
def on_key(self, key_event: KeyEventType) -> None:
|
|
if key_event.matches(self.opts.hide_input_toggle):
|
|
self.hide_input ^= True
|
|
self.cmd.set_cursor_visible(not self.hide_input)
|
|
if self.hide_input:
|
|
self.end_line()
|
|
self.print('Input hidden, press', styled(self.opts.hide_input_toggle, fg='yellow'), 'to unhide:')
|
|
self.end_line()
|
|
return
|
|
if key_event.matches(self.opts.end_session):
|
|
self.quit_loop(0)
|
|
return
|
|
if not self.hide_input and self.line_edit.on_key(key_event):
|
|
self.commit_line()
|
|
if key_event.matches('enter'):
|
|
self.write_broadcast_text('\r')
|
|
self.end_line()
|
|
return
|
|
|
|
ek = encode_key_event(key_event)
|
|
ek = standard_b64encode(ek.encode('utf-8')).decode('ascii')
|
|
self.write_broadcast_data('kitty-key:' + ek)
|
|
|
|
def end_line(self) -> None:
|
|
self.print('')
|
|
self.line_edit.clear()
|
|
self.write(SAVE_CURSOR)
|
|
|
|
def write_broadcast_text(self, text: str) -> None:
|
|
self.write_broadcast_data('base64:' + standard_b64encode(text.encode('utf-8')).decode('ascii'))
|
|
|
|
def write_broadcast_data(self, data: str) -> None:
|
|
payload = self.payload.copy()
|
|
payload['data'] = data
|
|
send = create_basic_command('send-text', payload, no_response=True)
|
|
self.write(encode_send(send))
|
|
|
|
def write_broadcast_session(self, start: bool = True) -> None:
|
|
self.session_started = start
|
|
self.write(session_command(self.payload, start))
|
|
|
|
|
|
OPTIONS = ('''
|
|
--hide-input-toggle
|
|
default=Ctrl+Alt+Esc
|
|
Key to press that will toggle hiding of the input in the broadcast window itself.
|
|
Useful while typing a password, prevents the password from being visible on the screen.
|
|
|
|
|
|
--end-session
|
|
default=Ctrl+Esc
|
|
Key to press to end the broadcast session.
|
|
|
|
|
|
''' + MATCH_WINDOW_OPTION + '\n\n' + MATCH_TAB_OPTION.replace('--match -m', '--match-tab -t')).format
|
|
help_text = 'Broadcast typed text to kitty windows. By default text is sent to all windows, unless one of the matching options is specified'
|
|
usage = '[initial text to send ...]'
|
|
|
|
|
|
def parse_broadcast_args(args: List[str]) -> Tuple[BroadcastCLIOptions, List[str]]:
|
|
return parse_args(args, OPTIONS, usage, help_text, 'kitty +kitten broadcast', result_class=BroadcastCLIOptions)
|
|
|
|
|
|
def main(args: List[str]) -> Optional[Dict[str, Any]]:
|
|
try:
|
|
opts, items = parse_broadcast_args(args[1:])
|
|
except SystemExit as e:
|
|
if e.code != 0:
|
|
print(e.args[0], file=sys.stderr)
|
|
input(_('Press Enter to quit'))
|
|
return None
|
|
|
|
sys.stdout.flush()
|
|
loop = Loop()
|
|
handler = Broadcast(opts, items)
|
|
try:
|
|
loop.loop(handler)
|
|
finally:
|
|
if handler.session_started:
|
|
sys.stdout.buffer.write(session_command(handler.payload, False))
|
|
sys.stdout.buffer.flush()
|
|
return None
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main(sys.argv)
|
|
elif __name__ == '__doc__':
|
|
cd = sys.cli_docs # type: ignore
|
|
cd['usage'] = usage
|
|
cd['options'] = OPTIONS
|
|
cd['help_text'] = help_text
|