The legacy osc 52 protocol now works with the new clipboard requests manager class

This commit is contained in:
Kovid Goyal 2022-11-28 20:53:56 +05:30
parent a8725d6307
commit 3ee9f723f2
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
4 changed files with 152 additions and 4 deletions

View File

@ -4,6 +4,7 @@
import io import io
from enum import Enum, IntEnum, auto from enum import Enum, IntEnum, auto
from gettext import gettext as _ from gettext import gettext as _
from tempfile import SpooledTemporaryFile
from typing import ( from typing import (
IO, Callable, Dict, List, NamedTuple, Optional, Sequence, Tuple, Union, IO, Callable, Dict, List, NamedTuple, Optional, Sequence, Tuple, Union,
) )
@ -14,6 +15,7 @@ from .fast_data_types import (
GLFW_CLIPBOARD, GLFW_PRIMARY_SELECTION, OSC, get_boss, get_clipboard_mime, GLFW_CLIPBOARD, GLFW_PRIMARY_SELECTION, OSC, get_boss, get_clipboard_mime,
get_options, set_clipboard_data_types, get_options, set_clipboard_data_types,
) )
from .utils import log_error
DataType = Union[bytes, 'IO[bytes]'] DataType = Union[bytes, 'IO[bytes]']
@ -22,6 +24,11 @@ class ClipboardType(IntEnum):
clipboard = GLFW_CLIPBOARD clipboard = GLFW_CLIPBOARD
primary_selection = GLFW_PRIMARY_SELECTION primary_selection = GLFW_PRIMARY_SELECTION
@staticmethod
def from_osc52_where_field(where: str) -> 'ClipboardType':
where = where or 's0'
return ClipboardType.clipboard if 'c' in where or 's' in where else ClipboardType.primary_selection
class Clipboard: class Clipboard:
@ -147,29 +154,135 @@ def encode_osc52(loc: str, response: str) -> str:
loc, standard_b64encode(response.encode('utf-8')).decode('ascii')) loc, standard_b64encode(response.encode('utf-8')).decode('ascii'))
class MimePos(NamedTuple):
start: int
size: int
class WriteRequest:
def __init__(
self, clipboard_type: ClipboardType = ClipboardType.clipboard, protocol_type: ProtocolType = ProtocolType.osc_52,
rollover_size: int = 16 * 1024 * 1024, max_size: int = -1,
) -> None:
self.clipboard_type = clipboard_type
self.protocol_type = protocol_type
self.max_size_exceeded = False
self.tempfile = SpooledTemporaryFile(max_size=rollover_size)
self.mime_map: Dict[str, MimePos] = {}
self.currently_writing_mime = ''
self.current_leftover_bytes = memoryview(b'')
self.max_size = (get_options().clipboard_max_size * 1024 * 1024) if max_size < 0 else max_size
def close(self) -> None:
if not self.tempfile.closed:
self.tempfile.close()
def add_base64_data(self, data: Union[str, bytes], mime: str = 'text/plain') -> None:
if isinstance(data, str):
data = data.encode('ascii')
if self.currently_writing_mime and self.currently_writing_mime != mime:
self.flush_base64_data()
if not self.currently_writing_mime:
self.mime_map[mime] = MimePos(self.tempfile.tell(), -1)
self.currently_writing_mime = mime
if len(self.current_leftover_bytes) > 0:
extra = 4 - len(self.current_leftover_bytes)
if len(data) >= extra:
self.write_base64_data(memoryview(bytes(self.current_leftover_bytes) + data[:extra]))
data = memoryview(data)[extra:]
self.current_leftover_bytes = memoryview(b'')
else:
self.current_leftover_bytes = memoryview(bytes(self.current_leftover_bytes) + data)
else:
data = memoryview(data)
extra = len(data) % 4
if extra > 0:
self.current_leftover_bytes = data[-extra:]
data = data[:-extra]
if len(data) > 0:
self.write_base64_data(data)
else:
self.write_base64_data(data)
def flush_base64_data(self) -> None:
if self.currently_writing_mime:
b = self.current_leftover_bytes
padding = 4 - len(b)
if padding in (1, 2):
self.write_base64_data(memoryview(bytes(b) + b'=' * padding))
start = self.mime_map[self.currently_writing_mime][0]
self.mime_map[self.currently_writing_mime] = MimePos(start, self.tempfile.tell() - start)
self.currently_writing_mime = ''
self.current_leftover_bytes = memoryview(b'')
def write_base64_data(self, b: bytes) -> None:
from base64 import standard_b64decode
if not self.max_size_exceeded:
d = standard_b64decode(b)
self.tempfile.write(d)
if self.max_size > 0 and self.tempfile.tell() > (self.max_size * 1024 * 1024):
log_error(f'Clipboard write request has more data than allowed by clipboard_max_size ({self.max_size}), truncating')
self.max_size_exceeded = True
def data_for(self, mime: str = 'text/plain', offset: int = 0, size: int = -1) -> bytes:
start, full_size = self.mime_map[mime]
if size == -1:
size = full_size
self.tempfile.seek(start + offset)
return self.tempfile.read(size)
class ClipboardRequestManager: class ClipboardRequestManager:
def __init__(self, window_id: int) -> None: def __init__(self, window_id: int) -> None:
self.window_id = window_id self.window_id = window_id
self.currently_asking_permission_for: Optional[ReadRequest] = None self.currently_asking_permission_for: Optional[ReadRequest] = None
self.in_flight_write_request: Optional[WriteRequest] = None
def parse_osc_52(self, data: str, is_partial: bool = False) -> None: def parse_osc_52(self, data: str, is_partial: bool = False) -> None:
where, text = data.partition(';')[::2] where, text = data.partition(';')[::2]
ct = ClipboardType.clipboard if 's' in where or 'c' in where else ClipboardType.primary_selection
if text == '?': if text == '?':
rr = ReadRequest(clipboard_type=ct) rr = ReadRequest(clipboard_type=ClipboardType.from_osc52_where_field(where))
self.handle_read_request(rr) self.handle_read_request(rr)
else:
wr = self.in_flight_write_request
if wr is None:
wr = self.in_flight_write_request = WriteRequest(ClipboardType.from_osc52_where_field(where))
wr.add_base64_data(text)
if is_partial:
return
self.in_flight_write_request = None
self.handle_write_request(wr)
def handle_write_request(self, wr: WriteRequest) -> None:
wr.flush_base64_data()
q = 'write-clipboard' if wr.clipboard_type is ClipboardType.clipboard else 'write-primary'
allowed = q in get_options().clipboard_control
self.fulfill_write_request(wr, allowed)
def fulfill_write_request(self, wr: WriteRequest, allowed: bool = True) -> None:
if wr.protocol_type is ProtocolType.osc_52:
self.fulfill_legacy_write_request(wr, allowed)
def fulfill_legacy_write_request(self, wr: WriteRequest, allowed: bool = True) -> None:
cp = get_boss().primary_selection if wr.clipboard_type is ClipboardType.primary_selection else get_boss().clipboard
w = get_boss().window_id_map.get(self.window_id)
if w is not None and cp.enabled and allowed:
cp.set_text(wr.data_for('text/plain'))
def handle_read_request(self, rr: ReadRequest) -> None: def handle_read_request(self, rr: ReadRequest) -> None:
cc = get_options().clipboard_control cc = get_options().clipboard_control
if rr.clipboard_type is ClipboardType.primary_selection: if rr.clipboard_type is ClipboardType.primary_selection:
ask_for_permission = 'read-primary-ask' in cc ask_for_permission = 'read-primary-ask' in cc
allowed = 'read-primary' in cc
else: else:
ask_for_permission = 'read-clipboard-ask' in cc ask_for_permission = 'read-clipboard-ask' in cc
allowed = 'read-clipboard' in cc
if ask_for_permission: if ask_for_permission:
self.ask_to_read_clipboard(rr) self.ask_to_read_clipboard(rr)
else: else:
self.fulfill_read_request(rr) self.fulfill_read_request(rr, allowed=allowed)
def fulfill_read_request(self, rr: ReadRequest, allowed: bool = True) -> None: def fulfill_read_request(self, rr: ReadRequest, allowed: bool = True) -> None:
if rr.protocol_type is ProtocolType.osc_52: if rr.protocol_type is ProtocolType.osc_52:
@ -207,3 +320,8 @@ class ClipboardRequestManager:
self.currently_asking_permission_for = None self.currently_asking_permission_for = None
if rr is not None: if rr is not None:
self.fulfill_read_request(rr, confirmed) self.fulfill_read_request(rr, confirmed)
def close(self) -> None:
if self.in_flight_write_request is not None:
self.in_flight_write_request.close()
self.in_flight_write_request = None

View File

@ -2876,7 +2876,7 @@ also :opt:`clipboard_max_size`.
''' '''
) )
opt('clipboard_max_size', '64', opt('clipboard_max_size', '512',
option_type='positive_float', option_type='positive_float',
long_text=''' long_text='''
The maximum size (in MB) of data from programs running in kitty that will be The maximum size (in MB) of data from programs running in kitty that will be

View File

@ -1280,6 +1280,7 @@ class Window:
def destroy(self) -> None: def destroy(self) -> None:
self.call_watchers(self.watchers.on_close, {}) self.call_watchers(self.watchers.on_close, {})
self.destroyed = True self.destroyed = True
self.clipboard_request_manager.close()
del self.kitten_result_processors del self.kitten_result_processors
if hasattr(self, 'screen'): if hasattr(self, 'screen'):
if self.is_active and self.os_window_id == current_focused_os_window_id(): if self.is_active and self.os_window_id == current_focused_os_window_id():

29
kitty_tests/clipboard.py Normal file
View File

@ -0,0 +1,29 @@
#!/usr/bin/env python
# License: GPLv3 Copyright: 2022, Kovid Goyal <kovid at kovidgoyal.net>
from . import BaseTest
from kitty.clipboard import WriteRequest
class TestClipboard(BaseTest):
def test_clipboard_write_request(self):
wr = WriteRequest(max_size=64)
wr.add_base64_data('bGlnaHQgd29yaw')
self.ae(bytes(wr.current_leftover_bytes), b'aw')
wr.flush_base64_data()
self.ae(wr.data_for(), b'light work')
wr = WriteRequest(max_size=64)
wr.add_base64_data('bGlnaHQgd29yaw==')
self.ae(wr.data_for(), b'light work')
wr = WriteRequest(max_size=64)
wr.add_base64_data('bGlnaHQgd29')
for x in b'y', b'a', b'y', b'4', b'=':
wr.add_base64_data(x)
self.ae(wr.data_for(), b'light work.')
wr = WriteRequest(max_size=64)
for x in 'bGlnaHQgd29y':
wr.add_base64_data(x)
self.ae(wr.data_for(), b'light wor')