kitty/kittens/tui/images.py
2023-01-09 16:47:42 +05:30

606 lines
22 KiB
Python

#!/usr/bin/env python3
# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
import codecs
import os
import sys
from base64 import standard_b64encode
from collections import defaultdict, deque
from contextlib import suppress
from enum import IntEnum
from itertools import count
from typing import Any, Callable, ClassVar, DefaultDict, Deque, Dict, Generic, Iterator, List, Optional, Sequence, Tuple, Type, TypeVar, Union, cast
from kitty.conf.utils import positive_float, positive_int
from kitty.fast_data_types import create_canvas
from kitty.typing import GRT_C, CompletedProcess, GRT_a, GRT_d, GRT_f, GRT_m, GRT_o, GRT_t, HandlerType
from kitty.utils import ScreenSize, fit_image, which
from .operations import cursor
try:
fsenc = sys.getfilesystemencoding() or 'utf-8'
codecs.lookup(fsenc)
except Exception:
fsenc = 'utf-8'
class Dispose(IntEnum):
undefined = 0
none = 1
background = 2
previous = 3
class Frame:
gap: int # milliseconds
canvas_width: int
canvas_height: int
width: int
height: int
index: int
xdpi: float
ydpi: float
canvas_x: int
canvas_y: int
mode: str
needs_blend: bool
dimensions_swapped: bool
dispose: Dispose
path: str = ''
def __init__(self, identify_data: Union['Frame', Dict[str, str]]):
if isinstance(identify_data, Frame):
for k in Frame.__annotations__:
setattr(self, k, getattr(identify_data, k))
else:
self.gap = max(0, int(identify_data['gap']) * 10)
sz, pos = identify_data['canvas'].split('+', 1)
self.canvas_width, self.canvas_height = map(positive_int, sz.split('x', 1))
self.canvas_x, self.canvas_y = map(int, pos.split('+', 1))
self.width, self.height = map(positive_int, identify_data['size'].split('x', 1))
self.xdpi, self.ydpi = map(positive_float, identify_data['dpi'].split('x', 1))
self.index = positive_int(identify_data['index'])
q = identify_data['transparency'].lower()
self.mode = 'rgba' if q in ('blend', 'true') else 'rgb'
self.needs_blend = q == 'blend'
self.dispose = getattr(Dispose, identify_data['dispose'].lower())
self.dimensions_swapped = identify_data.get('orientation') in ('5', '6', '7', '8')
if self.dimensions_swapped:
self.canvas_width, self.canvas_height = self.canvas_height, self.canvas_width
self.width, self.height = self.height, self.width
def __repr__(self) -> str:
canvas = f'{self.canvas_width}x{self.canvas_height}:{self.canvas_x}+{self.canvas_y}'
geom = f'{self.width}x{self.height}'
return f'Frame(index={self.index}, gap={self.gap}, geom={geom}, canvas={canvas}, dispose={self.dispose.name})'
class ImageData:
def __init__(self, fmt: str, width: int, height: int, mode: str, frames: List[Frame]):
self.width, self.height, self.fmt, self.mode = width, height, fmt, mode
self.transmit_fmt: GRT_f = (24 if self.mode == 'rgb' else 32)
self.frames = frames
def __len__(self) -> int:
return len(self.frames)
def __iter__(self) -> Iterator[Frame]:
yield from self.frames
def __repr__(self) -> str:
frames = '\n '.join(map(repr, self.frames))
return f'Image(fmt={self.fmt}, mode={self.mode},\n {frames}\n)'
class OpenFailed(ValueError):
def __init__(self, path: str, message: str):
ValueError.__init__(
self, f'Failed to open image: {path} with error: {message}'
)
self.path = path
class ConvertFailed(ValueError):
def __init__(self, path: str, message: str):
ValueError.__init__(
self, f'Failed to convert image: {path} with error: {message}'
)
self.path = path
class NoImageMagick(Exception):
pass
class OutdatedImageMagick(ValueError):
def __init__(self, detailed_error: str):
super().__init__('ImageMagick on this system is too old ImageMagick 7+ required which was first released in 2016')
self.detailed_error = detailed_error
last_imagemagick_cmd: Sequence[str] = ()
def run_imagemagick(path: str, cmd: Sequence[str], keep_stdout: bool = True) -> 'CompletedProcess[bytes]':
global last_imagemagick_cmd
import subprocess
last_imagemagick_cmd = cmd
try:
p = subprocess.run(cmd, stdout=subprocess.PIPE if keep_stdout else subprocess.DEVNULL, stderr=subprocess.PIPE)
except FileNotFoundError:
raise NoImageMagick('ImageMagick is required to process images')
if p.returncode != 0:
raise OpenFailed(path, p.stderr.decode('utf-8'))
return p
def identify(path: str) -> ImageData:
import json
q = (
'{"fmt":"%m","canvas":"%g","transparency":"%A","gap":"%T","index":"%p","size":"%wx%h",'
'"dpi":"%xx%y","dispose":"%D","orientation":"%[EXIF:Orientation]"},'
)
exe = which('magick')
if exe:
cmd = [exe, 'identify']
else:
cmd = ['identify']
p = run_imagemagick(path, cmd + ['-format', q, '--', path])
raw = p.stdout.rstrip(b',')
data = json.loads(b'[' + raw + b']')
first = data[0]
frames = list(map(Frame, data))
image_fmt = first['fmt'].lower()
if image_fmt == 'gif' and not any(f.gap > 0 for f in frames):
# Some broken GIF images have all zero gaps, browsers with their usual
# idiot ideas render these with a default 100ms gap https://bugzilla.mozilla.org/show_bug.cgi?id=125137
# Browsers actually force a 100ms gap at any zero gap frame, but that
# just means it is impossible to deliberately use zero gap frames for
# sophisticated blending, so we dont do that.
for f in frames:
f.gap = 100
mode = 'rgb'
for f in frames:
if f.mode == 'rgba':
mode = 'rgba'
break
return ImageData(image_fmt, frames[0].canvas_width, frames[0].canvas_height, mode, frames)
class RenderedImage(ImageData):
def __init__(self, fmt: str, width: int, height: int, mode: str):
super().__init__(fmt, width, height, mode, [])
def render_image(
path: str, output_prefix: str,
m: ImageData,
available_width: int, available_height: int,
scale_up: bool,
only_first_frame: bool = False,
remove_alpha: str = '',
flip: bool = False, flop: bool = False,
) -> RenderedImage:
import tempfile
has_multiple_frames = len(m) > 1
get_multiple_frames = has_multiple_frames and not only_first_frame
exe = which('magick')
if exe:
cmd = [exe, 'convert']
else:
exe = which('convert')
if exe is None:
raise OSError('Failed to find the ImageMagick convert executable, make sure it is present in PATH')
cmd = [exe]
if remove_alpha:
cmd += ['-background', remove_alpha, '-alpha', 'remove']
else:
cmd += ['-background', 'none']
if flip:
cmd.append('-flip')
if flop:
cmd.append('-flop')
cmd += ['--', path]
if only_first_frame and has_multiple_frames:
cmd[-1] += '[0]'
cmd.append('-auto-orient')
scaled = False
width, height = m.width, m.height
if scale_up:
if width < available_width:
r = available_width / width
width, height = available_width, int(height * r)
scaled = True
if scaled or width > available_width or height > available_height:
width, height = fit_image(width, height, available_width, available_height)
resize_cmd = ['-resize', f'{width}x{height}!']
if get_multiple_frames:
# we have to coalesce, resize and de-coalesce all frames
resize_cmd = ['-coalesce'] + resize_cmd + ['-deconstruct']
cmd += resize_cmd
cmd += ['-depth', '8', '-set', 'filename:f', '%w-%h-%g-%p']
ans = RenderedImage(m.fmt, width, height, m.mode)
if only_first_frame:
ans.frames = [Frame(m.frames[0])]
else:
ans.frames = list(map(Frame, m.frames))
bytes_per_pixel = 3 if m.mode == 'rgb' else 4
def check_resize(frame: Frame) -> None:
# ImageMagick sometimes generates RGBA images smaller than the specified
# size. See https://github.com/kovidgoyal/kitty/issues/276 for examples
sz = os.path.getsize(frame.path)
expected_size = bytes_per_pixel * frame.width * frame.height
if sz < expected_size:
missing = expected_size - sz
if missing % (bytes_per_pixel * width) != 0:
raise ConvertFailed(
path, 'ImageMagick failed to convert {} correctly,'
' it generated {} < {} of data (w={}, h={}, bpp={})'.format(
path, sz, expected_size, frame.width, frame.height, bytes_per_pixel))
frame.height -= missing // (bytes_per_pixel * frame.width)
if frame.index == 0:
ans.height = frame.height
ans.width = frame.width
with tempfile.TemporaryDirectory(dir=os.path.dirname(output_prefix)) as tdir:
output_template = os.path.join(tdir, f'im-%[filename:f].{m.mode}')
if get_multiple_frames:
cmd.append('+adjoin')
run_imagemagick(path, cmd + [output_template])
unseen = {x.index for x in m}
for x in os.listdir(tdir):
try:
parts = x.split('.', 1)[0].split('-')
index = int(parts[-1])
unseen.discard(index)
f = ans.frames[index]
f.width, f.height = map(positive_int, parts[1:3])
sz, pos = parts[3].split('+', 1)
f.canvas_width, f.canvas_height = map(positive_int, sz.split('x', 1))
f.canvas_x, f.canvas_y = map(int, pos.split('+', 1))
except Exception:
raise OutdatedImageMagick(f'Unexpected output filename: {x!r} produced by ImageMagick command: {last_imagemagick_cmd}')
f.path = output_prefix + f'-{index}.{m.mode}'
os.rename(os.path.join(tdir, x), f.path)
check_resize(f)
f = ans.frames[0]
if f.width != ans.width or f.height != ans.height:
with open(f.path, 'r+b') as ff:
data = ff.read()
ff.seek(0)
ff.truncate()
cd = create_canvas(data, f.width, f.canvas_x, f.canvas_y, ans.width, ans.height, 3 if ans.mode == 'rgb' else 4)
ff.write(cd)
if get_multiple_frames:
if unseen:
raise ConvertFailed(path, f'Failed to render {len(unseen)} out of {len(m)} frames of animation')
elif not ans.frames[0].path:
raise ConvertFailed(path, 'Failed to render image')
return ans
def render_as_single_image(
path: str, m: ImageData,
available_width: int, available_height: int,
scale_up: bool,
tdir: Optional[str] = None,
remove_alpha: str = '', flip: bool = False, flop: bool = False,
) -> Tuple[str, int, int]:
import tempfile
fd, output = tempfile.mkstemp(prefix='tty-graphics-protocol-', suffix=f'.{m.mode}', dir=tdir)
os.close(fd)
result = render_image(
path, output, m, available_width, available_height, scale_up,
only_first_frame=True, remove_alpha=remove_alpha, flip=flip, flop=flop)
os.rename(result.frames[0].path, output)
return output, result.width, result.height
def can_display_images() -> bool:
ans: Optional[bool] = getattr(can_display_images, 'ans', None)
if ans is None:
ans = which('convert') is not None
setattr(can_display_images, 'ans', ans)
return ans
ImageKey = Tuple[str, int, int]
SentImageKey = Tuple[int, int, int]
T = TypeVar('T')
class Alias(Generic[T]):
currently_processing: ClassVar[str] = ''
def __init__(self, defval: T) -> None:
self.name = ''
self.defval = defval
def __get__(self, instance: Optional['GraphicsCommand'], cls: Optional[Type['GraphicsCommand']] = None) -> T:
if instance is None:
return self.defval
return cast(T, instance._actual_values.get(self.name, self.defval))
def __set__(self, instance: 'GraphicsCommand', val: T) -> None:
if val == self.defval:
instance._actual_values.pop(self.name, None)
else:
instance._actual_values[self.name] = val
def __set_name__(self, owner: Type['GraphicsCommand'], name: str) -> None:
if len(name) == 1:
Alias.currently_processing = name
self.name = Alias.currently_processing
class GraphicsCommand:
a = action = Alias(cast(GRT_a, 't'))
q = quiet = Alias(0)
f = format = Alias(32)
t = transmission_type = Alias(cast(GRT_t, 'd'))
s = data_width = animation_state = Alias(0)
v = data_height = loop_count = Alias(0)
S = data_size = Alias(0)
O = data_offset = Alias(0) # noqa
i = image_id = Alias(0)
I = image_number = Alias(0) # noqa
p = placement_id = Alias(0)
o = compression = Alias(cast(Optional[GRT_o], None))
m = more = Alias(cast(GRT_m, 0))
x = left_edge = Alias(0)
y = top_edge = Alias(0)
w = width = Alias(0)
h = height = Alias(0)
X = cell_x_offset = blend_mode = Alias(0)
Y = cell_y_offset = bgcolor = Alias(0)
c = columns = other_frame_number = dest_frame = Alias(0)
r = rows = frame_number = source_frame = Alias(0)
z = z_index = gap = Alias(0)
C = cursor_movement = compose_mode = Alias(cast(GRT_C, 0))
d = delete_action = Alias(cast(GRT_d, 'a'))
def __init__(self) -> None:
self._actual_values: Dict[str, Any] = {}
def __repr__(self) -> str:
return self.serialize().decode('ascii').replace('\033', '^]')
def clone(self) -> 'GraphicsCommand':
ans = GraphicsCommand()
ans._actual_values = self._actual_values.copy()
return ans
def serialize(self, payload: Union[bytes, str] = b'') -> bytes:
items = []
for k, val in self._actual_values.items():
items.append(f'{k}={val}')
ans: List[bytes] = []
w = ans.append
w(b'\033_G')
w(','.join(items).encode('ascii'))
if payload:
w(b';')
if isinstance(payload, str):
payload = standard_b64encode(payload.encode('utf-8'))
w(payload)
w(b'\033\\')
return b''.join(ans)
def clear(self) -> None:
self._actual_values = {}
def iter_transmission_chunks(self, data: Optional[bytes] = None, level: int = -1, compression_threshold: int = 1024) -> Iterator[bytes]:
if data is None:
yield self.serialize()
return
gc = self.clone()
gc.S = len(data)
if level and len(data) >= compression_threshold:
import zlib
compressed = zlib.compress(data, level)
if len(compressed) < len(data):
gc.o = 'z'
data = compressed
gc.S = len(data)
data = standard_b64encode(data)
while data:
chunk, data = data[:4096], data[4096:]
gc.m = 1 if data else 0
yield gc.serialize(chunk)
gc.clear()
class Placement:
cmd: GraphicsCommand
x: int = 0
y: int = 0
def __init__(self, cmd: GraphicsCommand, x: int = 0, y: int = 0):
self.cmd = cmd
self.x = x
self.y = y
class ImageManager:
def __init__(self, handler: HandlerType):
self.image_id_counter = count()
self.handler = handler
self.filesystem_ok: Optional[bool] = None
self.image_data: Dict[str, ImageData] = {}
self.failed_images: Dict[str, Exception] = {}
self.converted_images: Dict[ImageKey, ImageKey] = {}
self.sent_images: Dict[ImageKey, int] = {}
self.image_id_to_image_data: Dict[int, ImageData] = {}
self.image_id_to_converted_data: Dict[int, ImageKey] = {}
self.transmission_status: Dict[int, Union[str, int]] = {}
self.placements_in_flight: DefaultDict[int, Deque[Placement]] = defaultdict(deque)
self.update_image_placement_for_resend: Optional[Callable[[int, Placement], bool]]
@property
def next_image_id(self) -> int:
return next(self.image_id_counter) + 2
@property
def screen_size(self) -> ScreenSize:
return self.handler.screen_size
def __enter__(self) -> None:
import tempfile
self.tdir = tempfile.mkdtemp(prefix='kitten-images-')
with tempfile.NamedTemporaryFile(dir=self.tdir, delete=False) as f:
f.write(b'abcd')
gc = GraphicsCommand()
gc.a = 'q'
gc.s = gc.v = gc.i = 1
gc.t = 'f'
self.handler.cmd.gr_command(gc, standard_b64encode(f.name.encode(fsenc)))
def __exit__(self, *a: Any) -> None:
import shutil
shutil.rmtree(self.tdir, ignore_errors=True)
self.handler.cmd.clear_images_on_screen(delete_data=True)
self.delete_all_sent_images()
del self.handler
def delete_all_sent_images(self) -> None:
gc = GraphicsCommand()
gc.a = 'd'
for img_id in self.transmission_status:
gc.i = img_id
self.handler.cmd.gr_command(gc)
self.transmission_status.clear()
def handle_response(self, apc: str) -> None:
cdata, payload = apc[1:].partition(';')[::2]
control = {}
for x in cdata.split(','):
k, v = x.partition('=')[::2]
control[k] = v
try:
image_id = int(control.get('i', '0'))
except Exception:
image_id = 0
if image_id == 1:
self.filesystem_ok = payload == 'OK'
return
if not image_id:
return
if not self.transmission_status.get(image_id):
self.transmission_status[image_id] = payload
else:
in_flight = self.placements_in_flight[image_id]
if in_flight:
pl = in_flight.popleft()
if payload.startswith('ENOENT:'):
with suppress(Exception):
self.resend_image(image_id, pl)
if not in_flight:
self.placements_in_flight.pop(image_id, None)
def resend_image(self, image_id: int, pl: Placement) -> None:
if self.update_image_placement_for_resend is not None and not self.update_image_placement_for_resend(image_id, pl):
return
image_data = self.image_id_to_image_data[image_id]
skey = self.image_id_to_converted_data[image_id]
self.transmit_image(image_data, image_id, *skey)
with cursor(self.handler.write):
self.handler.cmd.set_cursor_position(pl.x, pl.y)
self.handler.cmd.gr_command(pl.cmd)
def send_image(self, path: str, max_cols: Optional[int] = None, max_rows: Optional[int] = None, scale_up: bool = False) -> SentImageKey:
path = os.path.abspath(path)
if path in self.failed_images:
raise self.failed_images[path]
if path not in self.image_data:
try:
self.image_data[path] = identify(path)
except Exception as e:
self.failed_images[path] = e
raise
m = self.image_data[path]
ss = self.screen_size
if max_cols is None:
max_cols = ss.cols
if max_rows is None:
max_rows = ss.rows
available_width = max_cols * ss.cell_width
available_height = max_rows * ss.cell_height
key = path, available_width, available_height
skey = self.converted_images.get(key)
if skey is None:
try:
self.converted_images[key] = skey = self.convert_image(path, available_width, available_height, m, scale_up)
except Exception as e:
self.failed_images[path] = e
raise
final_width, final_height = skey[1:]
if final_width == 0:
return 0, 0, 0
image_id = self.sent_images.get(skey)
if image_id is None:
image_id = self.next_image_id
self.transmit_image(m, image_id, *skey)
self.sent_images[skey] = image_id
self.image_id_to_converted_data[image_id] = skey
self.image_id_to_image_data[image_id] = m
return image_id, skey[1], skey[2]
def hide_image(self, image_id: int) -> None:
gc = GraphicsCommand()
gc.a = 'd'
gc.i = image_id
self.handler.cmd.gr_command(gc)
def show_image(self, image_id: int, x: int, y: int, src_rect: Optional[Tuple[int, int, int, int]] = None) -> None:
gc = GraphicsCommand()
gc.a = 'p'
gc.i = image_id
if src_rect is not None:
gc.x, gc.y, gc.w, gc.h = map(int, src_rect)
self.placements_in_flight[image_id].append(Placement(gc, x, y))
with cursor(self.handler.write):
self.handler.cmd.set_cursor_position(x, y)
self.handler.cmd.gr_command(gc)
def convert_image(self, path: str, available_width: int, available_height: int, image_data: ImageData, scale_up: bool = False) -> ImageKey:
rgba_path, width, height = render_as_single_image(path, image_data, available_width, available_height, scale_up, tdir=self.tdir)
return rgba_path, width, height
def transmit_image(self, image_data: ImageData, image_id: int, rgba_path: str, width: int, height: int) -> int:
self.transmission_status[image_id] = 0
gc = GraphicsCommand()
gc.a = 't'
gc.f = image_data.transmit_fmt
gc.s = width
gc.v = height
gc.i = image_id
if self.filesystem_ok:
gc.t = 'f'
self.handler.cmd.gr_command(
gc, standard_b64encode(rgba_path.encode(fsenc)))
else:
import zlib
with open(rgba_path, 'rb') as f:
data = f.read()
gc.S = len(data)
data = zlib.compress(data)
gc.o = 'z'
data = standard_b64encode(data)
while data:
chunk, data = data[:4096], data[4096:]
gc.m = 1 if data else 0
self.handler.cmd.gr_command(gc, chunk)
gc.clear()
return image_id