diff --git a/docs/desktop-notifications.rst b/docs/desktop-notifications.rst index 1f7c3e199..646256b43 100644 --- a/docs/desktop-notifications.rst +++ b/docs/desktop-notifications.rst @@ -50,7 +50,8 @@ and the terminal emulator should hold off displaying it. A value of ``1`` means the notification is done, and should be displayed. You can specify the title or body multiple times and the terminal emulator will concatenate them, thereby allowing arbitrarily long text (terminal emulators are free to impose a sensible -limit to avoid Denial-of-Service attacks). +limit to avoid Denial-of-Service attacks). The size of the payload must be no +longer than ``2048`` bytes, *before being encoded*. Both the ``title`` and ``body`` payloads must be either UTF-8 encoded plain text with no embedded escape codes, or UTF-8 text that is Base64 encoded, in diff --git a/kitty/clipboard.py b/kitty/clipboard.py index af3e0ea91..d08f2aba4 100644 --- a/kitty/clipboard.py +++ b/kitty/clipboard.py @@ -2,7 +2,7 @@ # License: GPLv3 Copyright: 2022, Kovid Goyal import io -from enum import Enum, IntEnum, auto +from enum import Enum, IntEnum from gettext import gettext as _ from tempfile import SpooledTemporaryFile from typing import ( @@ -138,7 +138,8 @@ def develop() -> Tuple[Clipboard, Clipboard]: class ProtocolType(Enum): - osc_52 = auto() + osc_52 = 52 + osc_5522 = 5522 class ReadRequest(NamedTuple): @@ -147,6 +148,20 @@ class ReadRequest(NamedTuple): id: str = '' protocol_type: ProtocolType = ProtocolType.osc_52 + def encode_response(self, status: str = 'DATA', mime: str = '', payload: bytes = b'') -> bytes: + ans = f'{self.protocol_type.value};type=read:status={status}' + if status == 'OK' and self.is_primary_selection: + ans += ':loc=primary' + if self.id: + ans += f':id={self.id}' + if mime: + ans += f':mime={mime}' + a = ans.encode('ascii') + if payload: + import base64 + a += b';' + base64.standard_b64encode(payload) + return a + def encode_osc52(loc: str, response: str) -> str: from base64 import standard_b64encode @@ -240,6 +255,26 @@ class ClipboardRequestManager: self.currently_asking_permission_for: Optional[ReadRequest] = None self.in_flight_write_request: Optional[WriteRequest] = None + def parse_osc_5522(self, data: str) -> None: + from .notify import sanitize_id + metadata, _, payload = data.partition(';') + m: Dict[str, str] = {} + for record in metadata.split(':'): + try: + k, v = record.split('=', 1) + except Exception: + log_error('Malformed OSC 5522: metadata is not key=value pairs') + return + m[k] = v + typ = m.get('type', '') + if typ == 'read': + rr = ReadRequest( + is_primary_selection=m.get('loc', '') == 'primary', + mime_types=payload.split(), + protocol_type=ProtocolType.osc_5522, id=sanitize_id(m.get('id', '')) + ) + self.handle_read_request(rr) + def parse_osc_52(self, data: str, is_partial: bool = False) -> None: where, text = data.partition(';')[::2] if text == '?': @@ -286,11 +321,43 @@ class ClipboardRequestManager: def fulfill_read_request(self, rr: ReadRequest, allowed: bool = True) -> None: if rr.protocol_type is ProtocolType.osc_52: - self.fulfill_legacy_read_request(rr, allowed) + return self.fulfill_legacy_read_request(rr, allowed) + w = get_boss().window_id_map.get(self.window_id) + if w is None: + return + cp = get_boss().primary_selection if rr.is_primary_selection else get_boss().clipboard + if not cp.enabled: + w.screen.send_escape_code_to_child(OSC, rr.encode_response(status='EINVAL')) + return + w.screen.send_escape_code_to_child(OSC, rr.encode_response(status='OK')) + + current_mime = '' + + def write_chunks(data: bytes) -> None: + assert w is not None + mv = memoryview(data) + while mv: + w.screen.send_escape_code_to_child(OSC, rr.encode_response(payload=mv[:4096], mime=current_mime)) + mv = mv[4096:] + + for mime in rr.mime_types: + current_mime = mime + if mime == '.': + w.screen.send_escape_code_to_child( + OSC, rr.encode_response(payload=' '.join(cp.get_available_mime_types_for_paste()).encode('utf-8'), mime=current_mime)) + continue + try: + cp.get_mime(mime, write_chunks) + except Exception as e: + log_error(f'Failed to read requested mime type {mime} with error: {e}') + w.screen.send_escape_code_to_child(OSC, rr.encode_response(status='DONE')) def reject_read_request(self, rr: ReadRequest) -> None: if rr.protocol_type is ProtocolType.osc_52: - self.fulfill_legacy_read_request(rr, False) + return self.fulfill_legacy_read_request(rr, False) + w = get_boss().window_id_map.get(self.window_id) + if w is not None: + w.screen.send_escape_code_to_child(OSC, rr.encode_response(status='EPERM')) def fulfill_legacy_read_request(self, rr: ReadRequest, allowed: bool = True) -> None: cp = get_boss().primary_selection if rr.is_primary_selection else get_boss().clipboard diff --git a/kitty/notify.py b/kitty/notify.py index ad316c509..6ad8cea26 100644 --- a/kitty/notify.py +++ b/kitty/notify.py @@ -99,6 +99,10 @@ def sanitize_identifier_pat() -> 're.Pattern[str]': return re.compile(r'[^a-zA-Z0-9-_+.]+') +def sanitize_id(v: str) -> str: + return sanitize_identifier_pat().sub('', v) + + def parse_osc_99(raw: str) -> NotificationCommand: cmd = NotificationCommand() metadata, payload = raw.partition(';')[::2] @@ -114,7 +118,7 @@ def parse_osc_99(raw: str) -> NotificationCommand: if k == 'p': payload_type = v elif k == 'i': - cmd.identifier = sanitize_identifier_pat().sub('', v) + cmd.identifier = sanitize_id(v) elif k == 'e': payload_is_encoded = v == '1' elif k == 'd': diff --git a/kitty/window.py b/kitty/window.py index 9ade5ff79..d4acd5a3f 100644 --- a/kitty/window.py +++ b/kitty/window.py @@ -1170,7 +1170,9 @@ class Window: self.file_transmission_control.handle_serialized_command(data) def clipboard_control(self, data: str, is_partial: Optional[bool] = False) -> None: - if is_partial is not None: + if is_partial is None: + self.clipboard_request_manager.parse_osc_5522(data) + else: self.clipboard_request_manager.parse_osc_52(data, is_partial) def manipulate_title_stack(self, pop: bool, title: str, icon: Any) -> None: