Terminal end of OSC 5522 read requests
This commit is contained in:
parent
f0d61c2de9
commit
11724c8a5f
@ -50,7 +50,8 @@ and the terminal emulator should hold off displaying it. A value of ``1`` means
|
||||
the notification is done, and should be displayed. You can specify the title or
|
||||
body multiple times and the terminal emulator will concatenate them, thereby
|
||||
allowing arbitrarily long text (terminal emulators are free to impose a sensible
|
||||
limit to avoid Denial-of-Service attacks).
|
||||
limit to avoid Denial-of-Service attacks). The size of the payload must be no
|
||||
longer than ``2048`` bytes, *before being encoded*.
|
||||
|
||||
Both the ``title`` and ``body`` payloads must be either UTF-8 encoded plain
|
||||
text with no embedded escape codes, or UTF-8 text that is Base64 encoded, in
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
# License: GPLv3 Copyright: 2022, Kovid Goyal <kovid at kovidgoyal.net>
|
||||
|
||||
import io
|
||||
from enum import Enum, IntEnum, auto
|
||||
from enum import Enum, IntEnum
|
||||
from gettext import gettext as _
|
||||
from tempfile import SpooledTemporaryFile
|
||||
from typing import (
|
||||
@ -138,7 +138,8 @@ def develop() -> Tuple[Clipboard, Clipboard]:
|
||||
|
||||
|
||||
class ProtocolType(Enum):
|
||||
osc_52 = auto()
|
||||
osc_52 = 52
|
||||
osc_5522 = 5522
|
||||
|
||||
|
||||
class ReadRequest(NamedTuple):
|
||||
@ -147,6 +148,20 @@ class ReadRequest(NamedTuple):
|
||||
id: str = ''
|
||||
protocol_type: ProtocolType = ProtocolType.osc_52
|
||||
|
||||
def encode_response(self, status: str = 'DATA', mime: str = '', payload: bytes = b'') -> bytes:
|
||||
ans = f'{self.protocol_type.value};type=read:status={status}'
|
||||
if status == 'OK' and self.is_primary_selection:
|
||||
ans += ':loc=primary'
|
||||
if self.id:
|
||||
ans += f':id={self.id}'
|
||||
if mime:
|
||||
ans += f':mime={mime}'
|
||||
a = ans.encode('ascii')
|
||||
if payload:
|
||||
import base64
|
||||
a += b';' + base64.standard_b64encode(payload)
|
||||
return a
|
||||
|
||||
|
||||
def encode_osc52(loc: str, response: str) -> str:
|
||||
from base64 import standard_b64encode
|
||||
@ -240,6 +255,26 @@ class ClipboardRequestManager:
|
||||
self.currently_asking_permission_for: Optional[ReadRequest] = None
|
||||
self.in_flight_write_request: Optional[WriteRequest] = None
|
||||
|
||||
def parse_osc_5522(self, data: str) -> None:
|
||||
from .notify import sanitize_id
|
||||
metadata, _, payload = data.partition(';')
|
||||
m: Dict[str, str] = {}
|
||||
for record in metadata.split(':'):
|
||||
try:
|
||||
k, v = record.split('=', 1)
|
||||
except Exception:
|
||||
log_error('Malformed OSC 5522: metadata is not key=value pairs')
|
||||
return
|
||||
m[k] = v
|
||||
typ = m.get('type', '')
|
||||
if typ == 'read':
|
||||
rr = ReadRequest(
|
||||
is_primary_selection=m.get('loc', '') == 'primary',
|
||||
mime_types=payload.split(),
|
||||
protocol_type=ProtocolType.osc_5522, id=sanitize_id(m.get('id', ''))
|
||||
)
|
||||
self.handle_read_request(rr)
|
||||
|
||||
def parse_osc_52(self, data: str, is_partial: bool = False) -> None:
|
||||
where, text = data.partition(';')[::2]
|
||||
if text == '?':
|
||||
@ -286,11 +321,43 @@ class ClipboardRequestManager:
|
||||
|
||||
def fulfill_read_request(self, rr: ReadRequest, allowed: bool = True) -> None:
|
||||
if rr.protocol_type is ProtocolType.osc_52:
|
||||
self.fulfill_legacy_read_request(rr, allowed)
|
||||
return self.fulfill_legacy_read_request(rr, allowed)
|
||||
w = get_boss().window_id_map.get(self.window_id)
|
||||
if w is None:
|
||||
return
|
||||
cp = get_boss().primary_selection if rr.is_primary_selection else get_boss().clipboard
|
||||
if not cp.enabled:
|
||||
w.screen.send_escape_code_to_child(OSC, rr.encode_response(status='EINVAL'))
|
||||
return
|
||||
w.screen.send_escape_code_to_child(OSC, rr.encode_response(status='OK'))
|
||||
|
||||
current_mime = ''
|
||||
|
||||
def write_chunks(data: bytes) -> None:
|
||||
assert w is not None
|
||||
mv = memoryview(data)
|
||||
while mv:
|
||||
w.screen.send_escape_code_to_child(OSC, rr.encode_response(payload=mv[:4096], mime=current_mime))
|
||||
mv = mv[4096:]
|
||||
|
||||
for mime in rr.mime_types:
|
||||
current_mime = mime
|
||||
if mime == '.':
|
||||
w.screen.send_escape_code_to_child(
|
||||
OSC, rr.encode_response(payload=' '.join(cp.get_available_mime_types_for_paste()).encode('utf-8'), mime=current_mime))
|
||||
continue
|
||||
try:
|
||||
cp.get_mime(mime, write_chunks)
|
||||
except Exception as e:
|
||||
log_error(f'Failed to read requested mime type {mime} with error: {e}')
|
||||
w.screen.send_escape_code_to_child(OSC, rr.encode_response(status='DONE'))
|
||||
|
||||
def reject_read_request(self, rr: ReadRequest) -> None:
|
||||
if rr.protocol_type is ProtocolType.osc_52:
|
||||
self.fulfill_legacy_read_request(rr, False)
|
||||
return self.fulfill_legacy_read_request(rr, False)
|
||||
w = get_boss().window_id_map.get(self.window_id)
|
||||
if w is not None:
|
||||
w.screen.send_escape_code_to_child(OSC, rr.encode_response(status='EPERM'))
|
||||
|
||||
def fulfill_legacy_read_request(self, rr: ReadRequest, allowed: bool = True) -> None:
|
||||
cp = get_boss().primary_selection if rr.is_primary_selection else get_boss().clipboard
|
||||
|
||||
@ -99,6 +99,10 @@ def sanitize_identifier_pat() -> 're.Pattern[str]':
|
||||
return re.compile(r'[^a-zA-Z0-9-_+.]+')
|
||||
|
||||
|
||||
def sanitize_id(v: str) -> str:
|
||||
return sanitize_identifier_pat().sub('', v)
|
||||
|
||||
|
||||
def parse_osc_99(raw: str) -> NotificationCommand:
|
||||
cmd = NotificationCommand()
|
||||
metadata, payload = raw.partition(';')[::2]
|
||||
@ -114,7 +118,7 @@ def parse_osc_99(raw: str) -> NotificationCommand:
|
||||
if k == 'p':
|
||||
payload_type = v
|
||||
elif k == 'i':
|
||||
cmd.identifier = sanitize_identifier_pat().sub('', v)
|
||||
cmd.identifier = sanitize_id(v)
|
||||
elif k == 'e':
|
||||
payload_is_encoded = v == '1'
|
||||
elif k == 'd':
|
||||
|
||||
@ -1170,7 +1170,9 @@ class Window:
|
||||
self.file_transmission_control.handle_serialized_command(data)
|
||||
|
||||
def clipboard_control(self, data: str, is_partial: Optional[bool] = False) -> None:
|
||||
if is_partial is not None:
|
||||
if is_partial is None:
|
||||
self.clipboard_request_manager.parse_osc_5522(data)
|
||||
else:
|
||||
self.clipboard_request_manager.parse_osc_52(data, is_partial)
|
||||
|
||||
def manipulate_title_stack(self, pop: bool, title: str, icon: Any) -> None:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user