Rewrite clipboard kitten to only change termios settings while it is actually reading from terminal
This commit is contained in:
parent
f637bf2377
commit
3463931dad
@ -1,56 +1,20 @@
|
||||
#!/usr/bin/env python3
|
||||
# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
|
||||
|
||||
import codecs
|
||||
import io
|
||||
import os
|
||||
import select
|
||||
import sys
|
||||
from typing import List, NoReturn, Optional
|
||||
|
||||
from kitty.cli import parse_args
|
||||
from kitty.cli_stub import ClipboardCLIOptions
|
||||
from kitty.fast_data_types import parse_input_from_terminal
|
||||
|
||||
from ..tui.handler import Handler
|
||||
from ..tui.loop import Loop
|
||||
|
||||
|
||||
class Clipboard(Handler):
|
||||
|
||||
def __init__(self, data_to_send: Optional[bytes], args: ClipboardCLIOptions):
|
||||
self.args = args
|
||||
self.clipboard_contents: Optional[str] = None
|
||||
self.data_to_send = data_to_send
|
||||
self.quit_on_write = False
|
||||
|
||||
def initialize(self) -> None:
|
||||
if self.data_to_send is not None:
|
||||
self.cmd.write_to_clipboard(self.data_to_send, self.args.use_primary)
|
||||
if not self.args.get_clipboard:
|
||||
if self.args.wait_for_completion:
|
||||
# ask kitty for the TN terminfo capability and
|
||||
# only quit after a response is received
|
||||
self.print('\x1bP+q544e\x1b\\', end='')
|
||||
self.print('Waiting for completion...')
|
||||
return
|
||||
self.quit_on_write = True
|
||||
return
|
||||
self.cmd.request_from_clipboard(self.args.use_primary)
|
||||
|
||||
def on_writing_finished(self) -> None:
|
||||
if self.quit_on_write:
|
||||
self.quit_loop(0)
|
||||
|
||||
def on_clipboard_response(self, text: str, from_primary: bool = False) -> None:
|
||||
self.clipboard_contents = text
|
||||
self.quit_loop(0)
|
||||
|
||||
def on_capability_response(self, name: str, val: str) -> None:
|
||||
self.quit_loop(0)
|
||||
|
||||
def on_interrupt(self) -> None:
|
||||
self.quit_loop(1)
|
||||
|
||||
def on_eot(self) -> None:
|
||||
self.quit_loop(1)
|
||||
|
||||
from ..tui.operations import (
|
||||
raw_mode, request_from_clipboard, write_to_clipboard
|
||||
)
|
||||
|
||||
OPTIONS = r'''
|
||||
--get-clipboard
|
||||
@ -84,6 +48,62 @@ popup, see :opt:`clipboard_control` for details.
|
||||
'''
|
||||
|
||||
usage = ''
|
||||
got_capability_response = False
|
||||
got_clipboard_response = False
|
||||
clipboard_contents = ''
|
||||
clipboard_from_primary = False
|
||||
|
||||
|
||||
def ignore(x: str) -> None:
|
||||
pass
|
||||
|
||||
|
||||
def on_text(x: str) -> None:
|
||||
if '\x03' in x:
|
||||
raise KeyboardInterrupt()
|
||||
if '\x04' in x:
|
||||
raise EOFError()
|
||||
|
||||
|
||||
def on_dcs(dcs: str) -> None:
|
||||
global got_capability_response
|
||||
if dcs.startswith('1+r'):
|
||||
got_capability_response = True
|
||||
|
||||
|
||||
def on_osc(osc: str) -> None:
|
||||
global clipboard_contents, clipboard_from_primary, got_clipboard_response
|
||||
idx = osc.find(';')
|
||||
if idx <= 0:
|
||||
return
|
||||
q = osc[:idx]
|
||||
if q == '52':
|
||||
got_clipboard_response = True
|
||||
widx = osc.find(';', idx + 1)
|
||||
if widx < idx:
|
||||
clipboard_from_primary = osc.find('p', idx + 1) > -1
|
||||
clipboard_contents = ''
|
||||
else:
|
||||
from base64 import standard_b64decode
|
||||
clipboard_from_primary = osc.find('p', idx+1, widx) > -1
|
||||
data = memoryview(osc.encode('ascii'))
|
||||
clipboard_contents = standard_b64decode(data[widx+1:]).decode('utf-8')
|
||||
|
||||
|
||||
def wait_loop(tty_fd: int) -> None:
|
||||
os.set_blocking(tty_fd, False)
|
||||
decoder = codecs.getincrementaldecoder('utf-8')('ignore')
|
||||
with raw_mode(tty_fd):
|
||||
buf = ''
|
||||
while not got_capability_response and not got_clipboard_response:
|
||||
rd = select.select([tty_fd], [], [])[0]
|
||||
if rd:
|
||||
raw = os.read(tty_fd, io.DEFAULT_BUFFER_SIZE)
|
||||
if not raw:
|
||||
raise EOFError()
|
||||
data = decoder.decode(raw)
|
||||
buf = (buf + data) if buf else data
|
||||
buf = parse_input_from_terminal(on_text, on_dcs, ignore, on_osc, ignore, ignore, buf, False)
|
||||
|
||||
|
||||
def main(args: List[str]) -> NoReturn:
|
||||
@ -93,17 +113,34 @@ def main(args: List[str]) -> NoReturn:
|
||||
data: Optional[bytes] = None
|
||||
if not sys.stdin.isatty():
|
||||
data = sys.stdin.buffer.read()
|
||||
try:
|
||||
sys.stdin = open(os.ctermid())
|
||||
except FileNotFoundError:
|
||||
raise SystemExit('Not connected to a controlling terminal device, no /dev/tty')
|
||||
loop = Loop()
|
||||
handler = Clipboard(data, cli_opts)
|
||||
loop.loop(handler)
|
||||
if loop.return_code == 0 and handler.clipboard_contents:
|
||||
sys.stdout.write(handler.clipboard_contents)
|
||||
sys.stdout.flush()
|
||||
raise SystemExit(loop.return_code)
|
||||
wait_for_capability_response = False
|
||||
data_to_write = []
|
||||
if data:
|
||||
data_to_write.append(write_to_clipboard(data, cli_opts.use_primary).encode('ascii'))
|
||||
if not cli_opts.get_clipboard and cli_opts.wait_for_completion:
|
||||
data_to_write.append(b'\x1bP+q544e\x1b\\')
|
||||
wait_for_capability_response = True
|
||||
if cli_opts.get_clipboard:
|
||||
data_to_write.append(request_from_clipboard(cli_opts.use_primary).encode('ascii'))
|
||||
wait_for_capability_response = True
|
||||
tty_fd = os.open(os.ctermid(), os.O_RDWR | os.O_CLOEXEC)
|
||||
retcode = 0
|
||||
with open(tty_fd, 'wb', closefd=True) as ttyf:
|
||||
for x in data_to_write:
|
||||
ttyf.write(x)
|
||||
ttyf.flush()
|
||||
if wait_for_capability_response:
|
||||
try:
|
||||
wait_loop(tty_fd)
|
||||
except KeyboardInterrupt:
|
||||
sys.excepthook = lambda *a: None
|
||||
raise
|
||||
except EOFError:
|
||||
retcode = 1
|
||||
if clipboard_contents:
|
||||
print(end=clipboard_contents)
|
||||
|
||||
raise SystemExit(retcode)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user