From 3ee9f723f21fb7c4bdc70ccdeef430dcff98e58d Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Mon, 28 Nov 2022 20:53:56 +0530 Subject: [PATCH] The legacy osc 52 protocol now works with the new clipboard requests manager class --- kitty/clipboard.py | 124 +++++++++++++++++++++++++++++++++++- kitty/options/definition.py | 2 +- kitty/window.py | 1 + kitty_tests/clipboard.py | 29 +++++++++ 4 files changed, 152 insertions(+), 4 deletions(-) create mode 100644 kitty_tests/clipboard.py diff --git a/kitty/clipboard.py b/kitty/clipboard.py index 7ba0dc7a4..d9a7a7d6a 100644 --- a/kitty/clipboard.py +++ b/kitty/clipboard.py @@ -4,6 +4,7 @@ import io from enum import Enum, IntEnum, auto from gettext import gettext as _ +from tempfile import SpooledTemporaryFile from typing import ( IO, Callable, Dict, List, NamedTuple, Optional, Sequence, Tuple, Union, ) @@ -14,6 +15,7 @@ from .fast_data_types import ( GLFW_CLIPBOARD, GLFW_PRIMARY_SELECTION, OSC, get_boss, get_clipboard_mime, get_options, set_clipboard_data_types, ) +from .utils import log_error DataType = Union[bytes, 'IO[bytes]'] @@ -22,6 +24,11 @@ class ClipboardType(IntEnum): clipboard = GLFW_CLIPBOARD primary_selection = GLFW_PRIMARY_SELECTION + @staticmethod + def from_osc52_where_field(where: str) -> 'ClipboardType': + where = where or 's0' + return ClipboardType.clipboard if 'c' in where or 's' in where else ClipboardType.primary_selection + class Clipboard: @@ -147,29 +154,135 @@ def encode_osc52(loc: str, response: str) -> str: loc, standard_b64encode(response.encode('utf-8')).decode('ascii')) +class MimePos(NamedTuple): + start: int + size: int + + +class WriteRequest: + + def __init__( + self, clipboard_type: ClipboardType = ClipboardType.clipboard, protocol_type: ProtocolType = ProtocolType.osc_52, + rollover_size: int = 16 * 1024 * 1024, max_size: int = -1, + ) -> None: + self.clipboard_type = clipboard_type + self.protocol_type = protocol_type + self.max_size_exceeded = False + self.tempfile = SpooledTemporaryFile(max_size=rollover_size) + self.mime_map: Dict[str, MimePos] = {} + self.currently_writing_mime = '' + self.current_leftover_bytes = memoryview(b'') + self.max_size = (get_options().clipboard_max_size * 1024 * 1024) if max_size < 0 else max_size + + def close(self) -> None: + if not self.tempfile.closed: + self.tempfile.close() + + def add_base64_data(self, data: Union[str, bytes], mime: str = 'text/plain') -> None: + if isinstance(data, str): + data = data.encode('ascii') + if self.currently_writing_mime and self.currently_writing_mime != mime: + self.flush_base64_data() + if not self.currently_writing_mime: + self.mime_map[mime] = MimePos(self.tempfile.tell(), -1) + self.currently_writing_mime = mime + if len(self.current_leftover_bytes) > 0: + extra = 4 - len(self.current_leftover_bytes) + if len(data) >= extra: + self.write_base64_data(memoryview(bytes(self.current_leftover_bytes) + data[:extra])) + data = memoryview(data)[extra:] + self.current_leftover_bytes = memoryview(b'') + else: + self.current_leftover_bytes = memoryview(bytes(self.current_leftover_bytes) + data) + else: + data = memoryview(data) + extra = len(data) % 4 + if extra > 0: + self.current_leftover_bytes = data[-extra:] + data = data[:-extra] + if len(data) > 0: + self.write_base64_data(data) + else: + self.write_base64_data(data) + + def flush_base64_data(self) -> None: + if self.currently_writing_mime: + b = self.current_leftover_bytes + padding = 4 - len(b) + if padding in (1, 2): + self.write_base64_data(memoryview(bytes(b) + b'=' * padding)) + start = self.mime_map[self.currently_writing_mime][0] + self.mime_map[self.currently_writing_mime] = MimePos(start, self.tempfile.tell() - start) + self.currently_writing_mime = '' + self.current_leftover_bytes = memoryview(b'') + + def write_base64_data(self, b: bytes) -> None: + from base64 import standard_b64decode + if not self.max_size_exceeded: + d = standard_b64decode(b) + self.tempfile.write(d) + if self.max_size > 0 and self.tempfile.tell() > (self.max_size * 1024 * 1024): + log_error(f'Clipboard write request has more data than allowed by clipboard_max_size ({self.max_size}), truncating') + self.max_size_exceeded = True + + def data_for(self, mime: str = 'text/plain', offset: int = 0, size: int = -1) -> bytes: + start, full_size = self.mime_map[mime] + if size == -1: + size = full_size + self.tempfile.seek(start + offset) + return self.tempfile.read(size) + + class ClipboardRequestManager: def __init__(self, window_id: int) -> None: self.window_id = window_id self.currently_asking_permission_for: Optional[ReadRequest] = None + self.in_flight_write_request: Optional[WriteRequest] = None def parse_osc_52(self, data: str, is_partial: bool = False) -> None: where, text = data.partition(';')[::2] - ct = ClipboardType.clipboard if 's' in where or 'c' in where else ClipboardType.primary_selection if text == '?': - rr = ReadRequest(clipboard_type=ct) + rr = ReadRequest(clipboard_type=ClipboardType.from_osc52_where_field(where)) self.handle_read_request(rr) + else: + wr = self.in_flight_write_request + if wr is None: + wr = self.in_flight_write_request = WriteRequest(ClipboardType.from_osc52_where_field(where)) + wr.add_base64_data(text) + if is_partial: + return + self.in_flight_write_request = None + self.handle_write_request(wr) + + def handle_write_request(self, wr: WriteRequest) -> None: + wr.flush_base64_data() + q = 'write-clipboard' if wr.clipboard_type is ClipboardType.clipboard else 'write-primary' + allowed = q in get_options().clipboard_control + self.fulfill_write_request(wr, allowed) + + def fulfill_write_request(self, wr: WriteRequest, allowed: bool = True) -> None: + if wr.protocol_type is ProtocolType.osc_52: + self.fulfill_legacy_write_request(wr, allowed) + + def fulfill_legacy_write_request(self, wr: WriteRequest, allowed: bool = True) -> None: + cp = get_boss().primary_selection if wr.clipboard_type is ClipboardType.primary_selection else get_boss().clipboard + w = get_boss().window_id_map.get(self.window_id) + if w is not None and cp.enabled and allowed: + cp.set_text(wr.data_for('text/plain')) def handle_read_request(self, rr: ReadRequest) -> None: cc = get_options().clipboard_control if rr.clipboard_type is ClipboardType.primary_selection: ask_for_permission = 'read-primary-ask' in cc + allowed = 'read-primary' in cc else: ask_for_permission = 'read-clipboard-ask' in cc + allowed = 'read-clipboard' in cc if ask_for_permission: self.ask_to_read_clipboard(rr) else: - self.fulfill_read_request(rr) + self.fulfill_read_request(rr, allowed=allowed) def fulfill_read_request(self, rr: ReadRequest, allowed: bool = True) -> None: if rr.protocol_type is ProtocolType.osc_52: @@ -207,3 +320,8 @@ class ClipboardRequestManager: self.currently_asking_permission_for = None if rr is not None: self.fulfill_read_request(rr, confirmed) + + def close(self) -> None: + if self.in_flight_write_request is not None: + self.in_flight_write_request.close() + self.in_flight_write_request = None diff --git a/kitty/options/definition.py b/kitty/options/definition.py index 1cb3a526e..bb3dc1c47 100644 --- a/kitty/options/definition.py +++ b/kitty/options/definition.py @@ -2876,7 +2876,7 @@ also :opt:`clipboard_max_size`. ''' ) -opt('clipboard_max_size', '64', +opt('clipboard_max_size', '512', option_type='positive_float', long_text=''' The maximum size (in MB) of data from programs running in kitty that will be diff --git a/kitty/window.py b/kitty/window.py index 349e2e290..9ade5ff79 100644 --- a/kitty/window.py +++ b/kitty/window.py @@ -1280,6 +1280,7 @@ class Window: def destroy(self) -> None: self.call_watchers(self.watchers.on_close, {}) self.destroyed = True + self.clipboard_request_manager.close() del self.kitten_result_processors if hasattr(self, 'screen'): if self.is_active and self.os_window_id == current_focused_os_window_id(): diff --git a/kitty_tests/clipboard.py b/kitty_tests/clipboard.py new file mode 100644 index 000000000..72562e036 --- /dev/null +++ b/kitty_tests/clipboard.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python +# License: GPLv3 Copyright: 2022, Kovid Goyal + + +from . import BaseTest + +from kitty.clipboard import WriteRequest + + +class TestClipboard(BaseTest): + + def test_clipboard_write_request(self): + wr = WriteRequest(max_size=64) + wr.add_base64_data('bGlnaHQgd29yaw') + self.ae(bytes(wr.current_leftover_bytes), b'aw') + wr.flush_base64_data() + self.ae(wr.data_for(), b'light work') + wr = WriteRequest(max_size=64) + wr.add_base64_data('bGlnaHQgd29yaw==') + self.ae(wr.data_for(), b'light work') + wr = WriteRequest(max_size=64) + wr.add_base64_data('bGlnaHQgd29') + for x in b'y', b'a', b'y', b'4', b'=': + wr.add_base64_data(x) + self.ae(wr.data_for(), b'light work.') + wr = WriteRequest(max_size=64) + for x in 'bGlnaHQgd29y': + wr.add_base64_data(x) + self.ae(wr.data_for(), b'light wor')