#!/usr/bin/env python3 # vim:fileencoding=utf-8 # License: GPL v3 Copyright: 2018, Kovid Goyal import codecs import os import sys from base64 import standard_b64encode from collections import defaultdict, deque from contextlib import suppress from enum import IntEnum from itertools import count from typing import ( Any, Callable, ClassVar, DefaultDict, Deque, Dict, Generic, Iterator, List, Optional, Sequence, Tuple, Type, TypeVar, Union, cast ) from kitty.conf.utils import positive_float, positive_int from kitty.fast_data_types import create_canvas from kitty.typing import ( GRT_C, CompletedProcess, GRT_a, GRT_d, GRT_f, GRT_m, GRT_o, GRT_t, HandlerType ) from kitty.utils import ScreenSize, find_exe, fit_image from .operations import cursor try: fsenc = sys.getfilesystemencoding() or 'utf-8' codecs.lookup(fsenc) except Exception: fsenc = 'utf-8' class Dispose(IntEnum): undefined = 0 none = 1 background = 2 previous = 3 class Frame: gap: int # milliseconds canvas_width: int canvas_height: int width: int height: int index: int xdpi: float ydpi: float canvas_x: int canvas_y: int mode: str needs_blend: bool dimensions_swapped: bool dispose: Dispose path: str = '' def __init__(self, identify_data: Union['Frame', Dict[str, str]]): if isinstance(identify_data, Frame): for k in Frame.__annotations__: setattr(self, k, getattr(identify_data, k)) else: self.gap = max(0, int(identify_data['gap']) * 10) sz, pos = identify_data['canvas'].split('+', 1) self.canvas_width, self.canvas_height = map(positive_int, sz.split('x', 1)) self.canvas_x, self.canvas_y = map(int, pos.split('+', 1)) self.width, self.height = map(positive_int, identify_data['size'].split('x', 1)) self.xdpi, self.ydpi = map(positive_float, identify_data['dpi'].split('x', 1)) self.index = positive_int(identify_data['index']) q = identify_data['transparency'].lower() self.mode = 'rgba' if q in ('blend', 'true') else 'rgb' self.needs_blend = q == 'blend' self.dispose = getattr(Dispose, identify_data['dispose'].lower()) if identify_data.get('orientation') in ('5', '6', '7', '8'): self.canvas_width, self.canvas_height = self.canvas_height, self.canvas_width self.width, self.height = self.height, self.width self.dimensions_swapped = True else: self.dimensions_swapped = False def __repr__(self) -> str: canvas = f'{self.canvas_width}x{self.canvas_height}:{self.canvas_x}+{self.canvas_y}' geom = f'{self.width}x{self.height}' return f'Frame(index={self.index}, gap={self.gap}, geom={geom}, canvas={canvas}, dispose={self.dispose.name})' class ImageData: def __init__(self, fmt: str, width: int, height: int, mode: str, frames: List[Frame]): self.width, self.height, self.fmt, self.mode = width, height, fmt, mode self.transmit_fmt: GRT_f = (24 if self.mode == 'rgb' else 32) self.frames = frames def __len__(self) -> int: return len(self.frames) def __iter__(self) -> Iterator[Frame]: yield from self.frames def __repr__(self) -> str: frames = '\n '.join(map(repr, self.frames)) return f'Image(fmt={self.fmt}, mode={self.mode},\n {frames}\n)' class OpenFailed(ValueError): def __init__(self, path: str, message: str): ValueError.__init__( self, 'Failed to open image: {} with error: {}'.format(path, message) ) self.path = path class ConvertFailed(ValueError): def __init__(self, path: str, message: str): ValueError.__init__( self, 'Failed to convert image: {} with error: {}'.format(path, message) ) self.path = path class NoImageMagick(Exception): pass class OutdatedImageMagick(ValueError): def __init__(self, detailed_error: str): super().__init__('ImageMagick on this system is too old ImageMagick 7+ required which was first released in 2016') self.detailed_error = detailed_error last_imagemagick_cmd: Sequence[str] = () def run_imagemagick(path: str, cmd: Sequence[str], keep_stdout: bool = True) -> CompletedProcess: global last_imagemagick_cmd import subprocess last_imagemagick_cmd = cmd try: p = subprocess.run(cmd, stdout=subprocess.PIPE if keep_stdout else subprocess.DEVNULL, stderr=subprocess.PIPE) except FileNotFoundError: raise NoImageMagick('ImageMagick is required to process images') if p.returncode != 0: raise OpenFailed(path, p.stderr.decode('utf-8')) return p def identify(path: str) -> ImageData: import json q = '{"fmt":"%m","canvas":"%g","transparency":"%A","gap":"%T","index":"%p","size":"%wx%h","dpi":"%xx%y","dispose":"%D","orientation":"%[EXIF:Orientation]"}' exe = find_exe('magick') if exe: cmd = [exe, 'identify'] else: cmd = ['identify'] p = run_imagemagick(path, cmd + ['-format', q, '--', path]) data = json.loads(b'[' + p.stdout.rstrip(b',') + b']') first = data[0] frames = list(map(Frame, data)) image_fmt = first['fmt'].lower() if image_fmt == 'gif' and not any(f.gap > 0 for f in frames): # Some broken GIF images have all zero gaps, browsers with their usual # idiot ideas render these with a default 100ms gap https://bugzilla.mozilla.org/show_bug.cgi?id=125137 # Browsers actually force a 100ms gap at any zero gap frame, but that # just means it is impossible to deliberately use zero gap frames for # sophisticated blending, so we dont do that. for f in frames: f.gap = 100 mode = 'rgb' for f in frames: if f.mode == 'rgba': mode = 'rgba' break return ImageData(image_fmt, frames[0].canvas_width, frames[0].canvas_height, mode, frames) class RenderedImage(ImageData): def __init__(self, fmt: str, width: int, height: int, mode: str): super().__init__(fmt, width, height, mode, []) def render_image( path: str, output_prefix: str, m: ImageData, available_width: int, available_height: int, scale_up: bool, only_first_frame: bool = False ) -> RenderedImage: import tempfile has_multiple_frames = len(m) > 1 get_multiple_frames = has_multiple_frames and not only_first_frame exe = find_exe('magick') if exe: cmd = [exe, 'convert'] else: exe = find_exe('convert') if exe is None: raise OSError('Failed to find the ImageMagick convert executable, make sure it is present in PATH') cmd = [exe] cmd += ['-background', 'none', '--', path] if only_first_frame and has_multiple_frames: cmd[-1] += '[0]' scaled = False width, height = m.width, m.height if scale_up: if width < available_width: r = available_width / width width, height = available_width, int(height * r) scaled = True if scaled or width > available_width or height > available_height: width, height = fit_image(width, height, available_width, available_height) resize_cmd = ['-resize', '{}x{}!'.format(width, height)] if get_multiple_frames: # we have to coalesce, resize and de-coalesce all frames resize_cmd = ['-coalesce'] + resize_cmd + ['-deconstruct'] cmd += resize_cmd cmd += ['-depth', '8', '-auto-orient', '-set', 'filename:f', '%w-%h-%g-%p'] ans = RenderedImage(m.fmt, width, height, m.mode) if only_first_frame: ans.frames = [Frame(m.frames[0])] else: ans.frames = list(map(Frame, m.frames)) bytes_per_pixel = 3 if m.mode == 'rgb' else 4 def check_resize(frame: Frame) -> None: # ImageMagick sometimes generates RGBA images smaller than the specified # size. See https://github.com/kovidgoyal/kitty/issues/276 for examples sz = os.path.getsize(frame.path) expected_size = bytes_per_pixel * frame.width * frame.height if sz < expected_size: missing = expected_size - sz if missing % (bytes_per_pixel * width) != 0: raise ConvertFailed( path, 'ImageMagick failed to convert {} correctly,' ' it generated {} < {} of data (w={}, h={}, bpp={})'.format( path, sz, expected_size, frame.width, frame.height, bytes_per_pixel)) frame.height -= missing // (bytes_per_pixel * frame.width) if frame.index == 0: ans.height = frame.height ans.width = frame.width with tempfile.TemporaryDirectory(dir=os.path.dirname(output_prefix)) as tdir: output_template = os.path.join(tdir, f'im-%[filename:f].{m.mode}') if get_multiple_frames: cmd.append('+adjoin') run_imagemagick(path, cmd + [output_template]) unseen = {x.index for x in m} for x in os.listdir(tdir): try: parts = x.split('.', 1)[0].split('-') index = int(parts[-1]) unseen.discard(index) f = ans.frames[index] f.width, f.height = map(positive_int, parts[1:3]) sz, pos = parts[3].split('+', 1) f.canvas_width, f.canvas_height = map(positive_int, sz.split('x', 1)) f.canvas_x, f.canvas_y = map(int, pos.split('+', 1)) except Exception: raise OutdatedImageMagick(f'Unexpected output filename: {x!r} produced by ImageMagick command: {last_imagemagick_cmd}') f.path = output_prefix + f'-{index}.{m.mode}' os.rename(os.path.join(tdir, x), f.path) check_resize(f) f = ans.frames[0] if f.width != ans.width or f.height != ans.height: with open(f.path, 'r+b') as ff: data = ff.read() ff.seek(0) ff.truncate() cd = create_canvas(data, f.width, f.canvas_x, f.canvas_y, ans.width, ans.height, 3 if ans.mode == 'rgb' else 4) ff.write(cd) if get_multiple_frames: if unseen: raise ConvertFailed(path, f'Failed to render {len(unseen)} out of {len(m)} frames of animation') elif not ans.frames[0].path: raise ConvertFailed(path, 'Failed to render image') return ans def render_as_single_image( path: str, m: ImageData, available_width: int, available_height: int, scale_up: bool, tdir: Optional[str] = None ) -> Tuple[str, int, int]: import tempfile fd, output = tempfile.mkstemp(prefix='icat-', suffix=f'.{m.mode}', dir=tdir) os.close(fd) result = render_image(path, output, m, available_width, available_height, scale_up, only_first_frame=True) os.rename(result.frames[0].path, output) return output, result.width, result.height def can_display_images() -> bool: import shutil ans: Optional[bool] = getattr(can_display_images, 'ans', None) if ans is None: ans = shutil.which('convert') is not None setattr(can_display_images, 'ans', ans) return ans ImageKey = Tuple[str, int, int] SentImageKey = Tuple[int, int, int] T = TypeVar('T') class Alias(Generic[T]): currently_processing: ClassVar[str] = '' def __init__(self, defval: T) -> None: self.name = '' self.defval = defval def __get__(self, instance: Optional['GraphicsCommand'], cls: Optional[Type['GraphicsCommand']] = None) -> T: if instance is None: return self.defval return cast(T, instance._actual_values.get(self.name, self.defval)) def __set__(self, instance: 'GraphicsCommand', val: T) -> None: if val == self.defval: instance._actual_values.pop(self.name, None) else: instance._actual_values[self.name] = val def __set_name__(self, owner: Type['GraphicsCommand'], name: str) -> None: if len(name) == 1: Alias.currently_processing = name self.name = Alias.currently_processing class GraphicsCommand: a = action = Alias(cast(GRT_a, 't')) q = quiet = Alias(0) f = format = Alias(32) t = transmission_type = Alias(cast(GRT_t, 'd')) s = data_width = animation_state = Alias(0) v = data_height = loop_count = Alias(0) S = data_size = Alias(0) O = data_offset = Alias(0) # noqa i = image_id = Alias(0) I = image_number = Alias(0) # noqa p = placement_id = Alias(0) o = compression = Alias(cast(Optional[GRT_o], None)) m = more = Alias(cast(GRT_m, 0)) x = left_edge = Alias(0) y = top_edge = Alias(0) w = width = Alias(0) h = height = Alias(0) X = cell_x_offset = blend_mode = Alias(0) Y = cell_y_offset = bgcolor = Alias(0) c = columns = other_frame_number = dest_frame = Alias(0) r = rows = frame_number = source_frame = Alias(0) z = z_index = gap = Alias(0) C = cursor_movement = compose_mode = Alias(cast(GRT_C, 0)) d = delete_action = Alias(cast(GRT_d, 'a')) def __init__(self) -> None: self._actual_values: Dict[str, Any] = {} def __repr__(self) -> str: return self.serialize().decode('ascii').replace('\033', '^]') def clone(self) -> 'GraphicsCommand': ans = GraphicsCommand() ans._actual_values = self._actual_values.copy() return ans def serialize(self, payload: Union[bytes, str] = b'') -> bytes: items = [] for k, val in self._actual_values.items(): items.append('{}={}'.format(k, val)) ans: List[bytes] = [] w = ans.append w(b'\033_G') w(','.join(items).encode('ascii')) if payload: w(b';') if isinstance(payload, str): payload = standard_b64encode(payload.encode('utf-8')) w(payload) w(b'\033\\') return b''.join(ans) def clear(self) -> None: self._actual_values = {} def iter_transmission_chunks(self, data: Optional[bytes] = None, level: int = -1, compression_threshold: int = 1024) -> Iterator[bytes]: if data is None: yield self.serialize() return gc = self.clone() gc.S = len(data) if level and len(data) >= compression_threshold: import zlib compressed = zlib.compress(data, level) if len(compressed) < len(data): gc.o = 'z' data = compressed gc.S = len(data) data = standard_b64encode(data) while data: chunk, data = data[:4096], data[4096:] gc.m = 1 if data else 0 yield gc.serialize(chunk) gc.clear() class Placement: cmd: GraphicsCommand x: int = 0 y: int = 0 def __init__(self, cmd: GraphicsCommand, x: int = 0, y: int = 0): self.cmd = cmd self.x = x self.y = y class ImageManager: def __init__(self, handler: HandlerType): self.image_id_counter = count() self.handler = handler self.filesystem_ok: Optional[bool] = None self.image_data: Dict[str, ImageData] = {} self.failed_images: Dict[str, Exception] = {} self.converted_images: Dict[ImageKey, ImageKey] = {} self.sent_images: Dict[ImageKey, int] = {} self.image_id_to_image_data: Dict[int, ImageData] = {} self.image_id_to_converted_data: Dict[int, ImageKey] = {} self.transmission_status: Dict[int, Union[str, int]] = {} self.placements_in_flight: DefaultDict[int, Deque[Placement]] = defaultdict(deque) self.update_image_placement_for_resend: Optional[Callable[[int, Placement], bool]] @property def next_image_id(self) -> int: return next(self.image_id_counter) + 2 @property def screen_size(self) -> ScreenSize: return self.handler.screen_size def __enter__(self) -> None: import tempfile self.tdir = tempfile.mkdtemp(prefix='kitten-images-') with tempfile.NamedTemporaryFile(dir=self.tdir, delete=False) as f: f.write(b'abcd') gc = GraphicsCommand() gc.a = 'q' gc.s = gc.v = gc.i = 1 gc.t = 'f' self.handler.cmd.gr_command(gc, standard_b64encode(f.name.encode(fsenc))) def __exit__(self, *a: Any) -> None: import shutil shutil.rmtree(self.tdir, ignore_errors=True) self.handler.cmd.clear_images_on_screen(delete_data=True) self.delete_all_sent_images() del self.handler def delete_all_sent_images(self) -> None: gc = GraphicsCommand() gc.a = 'd' for img_id in self.transmission_status: gc.i = img_id self.handler.cmd.gr_command(gc) self.transmission_status.clear() def handle_response(self, apc: str) -> None: cdata, payload = apc[1:].partition(';')[::2] control = {} for x in cdata.split(','): k, v = x.partition('=')[::2] control[k] = v try: image_id = int(control.get('i', '0')) except Exception: image_id = 0 if image_id == 1: self.filesystem_ok = payload == 'OK' return if not image_id: return if not self.transmission_status.get(image_id): self.transmission_status[image_id] = payload else: in_flight = self.placements_in_flight[image_id] if in_flight: pl = in_flight.popleft() if payload.startswith('ENOENT:'): with suppress(Exception): self.resend_image(image_id, pl) if not in_flight: self.placements_in_flight.pop(image_id, None) def resend_image(self, image_id: int, pl: Placement) -> None: if self.update_image_placement_for_resend is not None and not self.update_image_placement_for_resend(image_id, pl): return image_data = self.image_id_to_image_data[image_id] skey = self.image_id_to_converted_data[image_id] self.transmit_image(image_data, image_id, *skey) with cursor(self.handler.write): self.handler.cmd.set_cursor_position(pl.x, pl.y) self.handler.cmd.gr_command(pl.cmd) def send_image(self, path: str, max_cols: Optional[int] = None, max_rows: Optional[int] = None, scale_up: bool = False) -> SentImageKey: path = os.path.abspath(path) if path in self.failed_images: raise self.failed_images[path] if path not in self.image_data: try: self.image_data[path] = identify(path) except Exception as e: self.failed_images[path] = e raise m = self.image_data[path] ss = self.screen_size if max_cols is None: max_cols = ss.cols if max_rows is None: max_rows = ss.rows available_width = max_cols * ss.cell_width available_height = max_rows * ss.cell_height key = path, available_width, available_height skey = self.converted_images.get(key) if skey is None: try: self.converted_images[key] = skey = self.convert_image(path, available_width, available_height, m, scale_up) except Exception as e: self.failed_images[path] = e raise final_width, final_height = skey[1:] if final_width == 0: return 0, 0, 0 image_id = self.sent_images.get(skey) if image_id is None: image_id = self.next_image_id self.transmit_image(m, image_id, *skey) self.sent_images[skey] = image_id self.image_id_to_converted_data[image_id] = skey self.image_id_to_image_data[image_id] = m return image_id, skey[1], skey[2] def hide_image(self, image_id: int) -> None: gc = GraphicsCommand() gc.a = 'd' gc.i = image_id self.handler.cmd.gr_command(gc) def show_image(self, image_id: int, x: int, y: int, src_rect: Optional[Tuple[int, int, int, int]] = None) -> None: gc = GraphicsCommand() gc.a = 'p' gc.i = image_id if src_rect is not None: gc.x, gc.y, gc.w, gc.h = map(int, src_rect) self.placements_in_flight[image_id].append(Placement(gc, x, y)) with cursor(self.handler.write): self.handler.cmd.set_cursor_position(x, y) self.handler.cmd.gr_command(gc) def convert_image(self, path: str, available_width: int, available_height: int, image_data: ImageData, scale_up: bool = False) -> ImageKey: rgba_path, width, height = render_as_single_image(path, image_data, available_width, available_height, scale_up, tdir=self.tdir) return rgba_path, width, height def transmit_image(self, image_data: ImageData, image_id: int, rgba_path: str, width: int, height: int) -> int: self.transmission_status[image_id] = 0 gc = GraphicsCommand() gc.a = 't' gc.f = image_data.transmit_fmt gc.s = width gc.v = height gc.i = image_id if self.filesystem_ok: gc.t = 'f' self.handler.cmd.gr_command( gc, standard_b64encode(rgba_path.encode(fsenc))) else: import zlib with open(rgba_path, 'rb') as f: data = f.read() gc.S = len(data) data = zlib.compress(data) gc.o = 'z' data = standard_b64encode(data) while data: chunk, data = data[:4096], data[4096:] gc.m = 1 if data else 0 self.handler.cmd.gr_command(gc, chunk) gc.clear() return image_id