Implement kitty side write OSC 5522
This commit is contained in:
parent
fe75493c37
commit
fdd42d5f19
@ -2,12 +2,11 @@
|
||||
# License: GPLv3 Copyright: 2022, Kovid Goyal <kovid at kovidgoyal.net>
|
||||
|
||||
import io
|
||||
import os
|
||||
from enum import Enum, IntEnum
|
||||
from gettext import gettext as _
|
||||
from tempfile import SpooledTemporaryFile
|
||||
from typing import (
|
||||
IO, Callable, Dict, List, NamedTuple, Optional, Tuple, Union,
|
||||
)
|
||||
from tempfile import TemporaryFile
|
||||
from typing import IO, Callable, Dict, List, Mapping, NamedTuple, Optional, Tuple, Union
|
||||
|
||||
from .conf.utils import uniq
|
||||
from .constants import supports_primary_selection
|
||||
@ -17,7 +16,50 @@ from .fast_data_types import (
|
||||
)
|
||||
from .utils import log_error
|
||||
|
||||
DataType = Union[bytes, 'IO[bytes]']
|
||||
|
||||
class Tempfile:
|
||||
|
||||
def __init__(self, max_size: int) -> None:
|
||||
self.file: Union[io.BytesIO, IO[bytes]] = io.BytesIO()
|
||||
self.max_size = max_size
|
||||
|
||||
def rollover_if_needed(self, sz: int) -> None:
|
||||
if isinstance(self.file, io.BytesIO) and self.file.tell() + sz > self.max_size:
|
||||
before = self.file.getvalue()
|
||||
self.file = TemporaryFile()
|
||||
self.file.write(before)
|
||||
|
||||
def write(self, data: bytes) -> None:
|
||||
self.rollover_if_needed(len(data))
|
||||
self.file.write(data)
|
||||
|
||||
def tell(self) -> int:
|
||||
return self.file.tell()
|
||||
|
||||
def seek(self, pos: int) -> None:
|
||||
self.file.seek(pos, os.SEEK_SET)
|
||||
|
||||
def read(self, offset: int, size: int) -> bytes:
|
||||
self.file.seek(offset)
|
||||
return self.file.read(size)
|
||||
|
||||
def create_chunker(self, offset: int, size: int) -> Callable[[], Callable[[], bytes]]:
|
||||
def chunk_creator() -> Callable[[], bytes]:
|
||||
pos = offset
|
||||
limit = offset + size
|
||||
|
||||
def chunker() -> bytes:
|
||||
nonlocal pos, limit
|
||||
if pos >= limit:
|
||||
return b''
|
||||
ans = self.read(pos, min(io.DEFAULT_BUFFER_SIZE, limit - pos))
|
||||
pos = self.file.tell()
|
||||
return ans
|
||||
return chunker
|
||||
return chunk_creator
|
||||
|
||||
|
||||
DataType = Union[bytes, Callable[[], Callable[[], bytes]]]
|
||||
TARGETS_MIME = '.'
|
||||
|
||||
|
||||
@ -43,7 +85,7 @@ class Clipboard:
|
||||
x = x.encode('utf-8')
|
||||
self.set_mime({'text/plain': x})
|
||||
|
||||
def set_mime(self, data: Dict[str, DataType]) -> None:
|
||||
def set_mime(self, data: Mapping[str, DataType]) -> None:
|
||||
if self.enabled and isinstance(data, dict):
|
||||
self.data = data
|
||||
set_clipboard_data_types(self.clipboard_type, tuple(self.data))
|
||||
@ -64,10 +106,10 @@ class Clipboard:
|
||||
if isinstance(data, bytes):
|
||||
output(data)
|
||||
else:
|
||||
data.seek(0, 0)
|
||||
chunker = data()
|
||||
q = b' '
|
||||
while q:
|
||||
q = data.read(io.DEFAULT_BUFFER_SIZE)
|
||||
q = chunker()
|
||||
output(q)
|
||||
|
||||
def get_mime_data(self, mime: str) -> bytes:
|
||||
@ -100,12 +142,7 @@ class Clipboard:
|
||||
return ans
|
||||
return chunker
|
||||
|
||||
data.seek(0, 0)
|
||||
|
||||
def io_chunker() -> bytes:
|
||||
assert not isinstance(data, bytes)
|
||||
return data.read(io.DEFAULT_BUFFER_SIZE)
|
||||
return io_chunker
|
||||
return data()
|
||||
|
||||
|
||||
def set_clipboard_string(x: Union[str, bytes]) -> None:
|
||||
@ -178,21 +215,35 @@ class MimePos(NamedTuple):
|
||||
class WriteRequest:
|
||||
|
||||
def __init__(
|
||||
self, is_primary_selection: bool = False, protocol_type: ProtocolType = ProtocolType.osc_52,
|
||||
self, is_primary_selection: bool = False, protocol_type: ProtocolType = ProtocolType.osc_52, id: str = '',
|
||||
rollover_size: int = 16 * 1024 * 1024, max_size: int = -1,
|
||||
) -> None:
|
||||
self.id = id
|
||||
self.is_primary_selection = is_primary_selection
|
||||
self.protocol_type = protocol_type
|
||||
self.max_size_exceeded = False
|
||||
self.tempfile = SpooledTemporaryFile(max_size=rollover_size)
|
||||
self.tempfile = Tempfile(max_size=rollover_size)
|
||||
self.mime_map: Dict[str, MimePos] = {}
|
||||
self.currently_writing_mime = ''
|
||||
self.current_leftover_bytes = memoryview(b'')
|
||||
self.max_size = (get_options().clipboard_max_size * 1024 * 1024) if max_size < 0 else max_size
|
||||
self.commited = False
|
||||
|
||||
def close(self) -> None:
|
||||
if not self.tempfile.closed:
|
||||
self.tempfile.close()
|
||||
def encode_response(self, status: str = 'OK') -> bytes:
|
||||
ans = f'{self.protocol_type.value};type=write:status={status}'
|
||||
if self.id:
|
||||
ans += f':id={self.id}'
|
||||
a = ans.encode('ascii')
|
||||
return a
|
||||
|
||||
def commit(self) -> None:
|
||||
if self.commited:
|
||||
return
|
||||
self.commited = True
|
||||
cp = get_boss().primary_selection if self.is_primary_selection else get_boss().clipboard
|
||||
if cp.enabled:
|
||||
x = {mime: self.tempfile.create_chunker(pos.start, pos.size) for mime, pos in self.mime_map.items()}
|
||||
cp.set_mime(x)
|
||||
|
||||
def add_base64_data(self, data: Union[str, bytes], mime: str = 'text/plain') -> None:
|
||||
if isinstance(data, str):
|
||||
@ -245,8 +296,7 @@ class WriteRequest:
|
||||
start, full_size = self.mime_map[mime]
|
||||
if size == -1:
|
||||
size = full_size
|
||||
self.tempfile.seek(start + offset)
|
||||
return self.tempfile.read(size)
|
||||
return self.tempfile.read(start+offset, size)
|
||||
|
||||
|
||||
class ClipboardRequestManager:
|
||||
@ -258,6 +308,7 @@ class ClipboardRequestManager:
|
||||
|
||||
def parse_osc_5522(self, data: str) -> None:
|
||||
import base64
|
||||
|
||||
from .notify import sanitize_id
|
||||
metadata, _, epayload = data.partition(';')
|
||||
m: Dict[str, str] = {}
|
||||
@ -277,6 +328,42 @@ class ClipboardRequestManager:
|
||||
protocol_type=ProtocolType.osc_5522, id=sanitize_id(m.get('id', ''))
|
||||
)
|
||||
self.handle_read_request(rr)
|
||||
elif typ == 'write':
|
||||
wr = self.in_flight_write_request
|
||||
if wr is None:
|
||||
wr = self.in_flight_write_request = WriteRequest(
|
||||
is_primary_selection=m.get('loc', '') == 'primary',
|
||||
protocol_type=ProtocolType.osc_5522, id=sanitize_id(m.get('id', ''))
|
||||
)
|
||||
self.handle_write_request(wr)
|
||||
else:
|
||||
w = get_boss().window_id_map.get(self.window_id)
|
||||
if w is not None:
|
||||
w.screen.send_escape_code_to_child(OSC, wr.encode_response(status='EBUSY'))
|
||||
|
||||
elif typ == 'wdata':
|
||||
wr = self.in_flight_write_request
|
||||
w = get_boss().window_id_map.get(self.window_id)
|
||||
if wr is None:
|
||||
return
|
||||
mime = m.get('mime', '')
|
||||
if mime:
|
||||
try:
|
||||
wr.add_base64_data(payload, mime)
|
||||
except OSError:
|
||||
if w is not None:
|
||||
w.screen.send_escape_code_to_child(OSC, wr.encode_response(status='EIO'))
|
||||
self.in_flight_write_request = None
|
||||
raise
|
||||
except Exception:
|
||||
if w is not None:
|
||||
w.screen.send_escape_code_to_child(OSC, wr.encode_response(status='EINVAL'))
|
||||
self.in_flight_write_request = None
|
||||
raise
|
||||
else:
|
||||
wr.flush_base64_data()
|
||||
wr.commit()
|
||||
self.in_flight_write_request = None
|
||||
|
||||
def parse_osc_52(self, data: str, is_partial: bool = False) -> None:
|
||||
where, text = data.partition(';')[::2]
|
||||
@ -302,12 +389,19 @@ class ClipboardRequestManager:
|
||||
def fulfill_write_request(self, wr: WriteRequest, allowed: bool = True) -> None:
|
||||
if wr.protocol_type is ProtocolType.osc_52:
|
||||
self.fulfill_legacy_write_request(wr, allowed)
|
||||
return
|
||||
w = get_boss().window_id_map.get(self.window_id)
|
||||
cp = get_boss().primary_selection if wr.is_primary_selection else get_boss().clipboard
|
||||
if not allowed or not cp.enabled:
|
||||
self.in_flight_write_request = None
|
||||
if w is not None:
|
||||
w.screen.send_escape_code_to_child(OSC, wr.encode_response(status='EPERM' if not allowed else 'ENOCLIPBOARD'))
|
||||
|
||||
def fulfill_legacy_write_request(self, wr: WriteRequest, allowed: bool = True) -> None:
|
||||
cp = get_boss().primary_selection if wr.is_primary_selection else get_boss().clipboard
|
||||
w = get_boss().window_id_map.get(self.window_id)
|
||||
if w is not None and cp.enabled and allowed:
|
||||
cp.set_text(wr.data_for('text/plain'))
|
||||
wr.commit()
|
||||
|
||||
def handle_read_request(self, rr: ReadRequest) -> None:
|
||||
cc = get_options().clipboard_control
|
||||
@ -330,7 +424,7 @@ class ClipboardRequestManager:
|
||||
return
|
||||
cp = get_boss().primary_selection if rr.is_primary_selection else get_boss().clipboard
|
||||
if not cp.enabled:
|
||||
w.screen.send_escape_code_to_child(OSC, rr.encode_response(status='EINVAL'))
|
||||
w.screen.send_escape_code_to_child(OSC, rr.encode_response(status='ENOCLIPBOARD'))
|
||||
return
|
||||
if not allowed:
|
||||
w.screen.send_escape_code_to_child(OSC, rr.encode_response(status='EPERM'))
|
||||
@ -399,5 +493,4 @@ class ClipboardRequestManager:
|
||||
|
||||
def close(self) -> None:
|
||||
if self.in_flight_write_request is not None:
|
||||
self.in_flight_write_request.close()
|
||||
self.in_flight_write_request = None
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user