kitty/kittens/icat/main.py
Kovid Goyal d0c50248ea
Graphics protocol: Only delete temp files if they have the string tty-graphics-protocol in their file paths.
This prevents deletion of arbitrary files in /tmp via the graphics
protocol.
2022-08-16 11:25:33 +05:30

625 lines
21 KiB
Python

#!/usr/bin/env python3
# License: GPL v3 Copyright: 2017, Kovid Goyal <kovid at kovidgoyal.net>
import contextlib
import os
import re
import signal
import socket
import sys
import zlib
from base64 import standard_b64encode
from math import ceil
from tempfile import NamedTemporaryFile
from typing import (
Dict, Generator, List, NamedTuple, Optional, Pattern, Tuple, Union
)
from kitty.cli import parse_args
from kitty.cli_stub import IcatCLIOptions
from kitty.constants import appname
from kitty.guess_mime_type import guess_type
from kitty.rgb import to_color
from kitty.types import run_once
from kitty.typing import GRT_f, GRT_t
from kitty.utils import (
TTYIO, ScreenSize, ScreenSizeGetter, screen_size_function
)
from ..tui.images import (
ConvertFailed, Dispose, GraphicsCommand, NoImageMagick, OpenFailed,
OutdatedImageMagick, RenderedImage, fsenc, identify,
render_as_single_image, render_image
)
from ..tui.operations import clear_images_on_screen, raw_mode
OPTIONS = '''\
--align
type=choices
choices=center,left,right
default=center
Horizontal alignment for the displayed image.
--place
Choose where on the screen to display the image. The image will be scaled to fit
into the specified rectangle. The syntax for specifying rectangles is
<:italic:`width`>x<:italic:`height`>@<:italic:`left`>x<:italic:`top`>.
All measurements are in cells (i.e. cursor positions) with the origin
:italic:`(0, 0)` at the top-left corner of the screen.
--scale-up
type=bool-set
When used in combination with :option:`--place` it will cause images that are
smaller than the specified area to be scaled up to use as much of the specified
area as possible.
--background
default=none
Specify a background color, this will cause transparent images to be composited
on top of the specified color.
--mirror
default=none
type=choices
choices=none,horizontal,vertical,both
Mirror the image about a horizontal or vertical axis or both.
--clear
type=bool-set
Remove all images currently displayed on the screen.
--transfer-mode
type=choices
choices=detect,file,stream
default=detect
Which mechanism to use to transfer images to the terminal. The default is to
auto-detect. :italic:`file` means to use a temporary file and :italic:`stream`
means to send the data via terminal escape codes. Note that if you use the
:italic:`file` transfer mode and you are connecting over a remote session then
image display will not work.
--detect-support
type=bool-set
Detect support for image display in the terminal. If not supported, will exit
with exit code 1, otherwise will exit with code 0 and print the supported
transfer mode to stderr, which can be used with the :option:`--transfer-mode`
option.
--detection-timeout
type=float
default=10
The amount of time (in seconds) to wait for a response form the terminal, when
detecting image display support.
--print-window-size
type=bool-set
Print out the window size as <:italic:`width`>x<:italic:`height`> (in pixels) and quit. This is a
convenience method to query the window size if using :code:`kitty +kitten icat`
from a scripting language that cannot make termios calls.
--stdin
type=choices
choices=detect,yes,no
default=detect
Read image data from STDIN. The default is to do it automatically, when STDIN is
not a terminal, but you can turn it off or on explicitly, if needed.
--silent
type=bool-set
Do not print out anything to STDOUT during operation.
--z-index -z
default=0
Z-index of the image. When negative, text will be displayed on top of the image.
Use a double minus for values under the threshold for drawing images under cell
background colors. For example, :code:`--1` evaluates as -1,073,741,825.
--loop -l
default=-1
type=int
Number of times to loop animations. Negative values loop forever. Zero means
only the first frame of the animation is displayed. Otherwise, the animation
is looped the specified number of times.
--hold
type=bool-set
Wait for a key press before exiting after displaying the images.
'''
screen_size: Optional[ScreenSizeGetter] = None
can_transfer_with_files = False
def get_screen_size_function() -> ScreenSizeGetter:
global screen_size
if screen_size is None:
screen_size = screen_size_function()
return screen_size
def get_screen_size() -> ScreenSize:
screen_size = get_screen_size_function()
return screen_size()
@run_once
def options_spec() -> str:
return OPTIONS.format(appname=f'{appname}-icat')
def write_gr_cmd(cmd: GraphicsCommand, payload: Optional[bytes] = None) -> None:
sys.stdout.buffer.write(cmd.serialize(payload or b''))
sys.stdout.flush()
def calculate_in_cell_x_offset(width: int, cell_width: int, align: str) -> int:
if align == 'left':
return 0
extra_pixels = width % cell_width
if not extra_pixels:
return 0
if align == 'right':
return cell_width - extra_pixels
return (cell_width - extra_pixels) // 2
def set_cursor(cmd: GraphicsCommand, width: int, height: int, align: str) -> None:
ss = get_screen_size()
cw = int(ss.width / ss.cols)
num_of_cells_needed = int(ceil(width / cw))
cmd.X = calculate_in_cell_x_offset(width, cw, align)
extra_cells = 0
if align == 'center':
extra_cells = (ss.cols - num_of_cells_needed) // 2
elif align == 'right':
extra_cells = (ss.cols - num_of_cells_needed)
if extra_cells:
sys.stdout.buffer.write(b' ' * extra_cells)
def set_cursor_for_place(place: 'Place', cmd: GraphicsCommand, width: int, height: int, align: str) -> None:
x = place.left + 1
ss = get_screen_size()
cw = int(ss.width / ss.cols)
num_of_cells_needed = int(ceil(width / cw))
cmd.X = calculate_in_cell_x_offset(width, cw, align)
extra_cells = 0
if align == 'center':
extra_cells = (place.width - num_of_cells_needed) // 2
elif align == 'right':
extra_cells = place.width - num_of_cells_needed
sys.stdout.buffer.write(f'\033[{place.top + 1};{x + extra_cells}H'.encode('ascii'))
def write_chunked(cmd: GraphicsCommand, data: bytes) -> None:
cmd = cmd.clone()
if cmd.f != 100:
data = zlib.compress(data)
cmd.o = 'z'
data = standard_b64encode(data)
ac = cmd.a
quiet = cmd.q
while data:
chunk, data = data[:4096], data[4096:]
cmd.m = 1 if data else 0
write_gr_cmd(cmd, chunk)
cmd.clear()
cmd.a = ac
cmd.q = quiet
def show(
outfile: str,
width: int, height: int, zindex: int,
fmt: 'GRT_f',
transmit_mode: 'GRT_t' = 't',
align: str = 'center',
place: Optional['Place'] = None,
use_number: int = 0
) -> None:
cmd = GraphicsCommand()
cmd.a = 'T'
cmd.f = fmt
cmd.s = width
cmd.v = height
cmd.z = zindex
if use_number:
cmd.I = use_number # noqa
cmd.q = 2
if place:
set_cursor_for_place(place, cmd, width, height, align)
else:
set_cursor(cmd, width, height, align)
if can_transfer_with_files:
cmd.t = transmit_mode
write_gr_cmd(cmd, standard_b64encode(os.path.abspath(outfile).encode(fsenc)))
else:
with open(outfile, 'rb') as f:
data = f.read()
if transmit_mode == 't':
os.unlink(outfile)
if fmt == 100:
cmd.S = len(data)
write_chunked(cmd, data)
def show_frames(frame_data: RenderedImage, use_number: int, loops: int) -> None:
transmit_cmd = GraphicsCommand()
transmit_cmd.a = 'f'
transmit_cmd.I = use_number # noqa
transmit_cmd.q = 2
if can_transfer_with_files:
transmit_cmd.t = 't'
transmit_cmd.f = 24 if frame_data.mode == 'rgb' else 32
def control(frame_number: int = 0, loops: Optional[int] = None, gap: Optional[int] = 0, animation_control: int = 0) -> None:
cmd = GraphicsCommand()
cmd.a = 'a'
cmd.I = use_number # noqa
cmd.r = frame_number
if loops is not None:
cmd.v = loops + 1
if gap is not None:
cmd.z = gap if gap > 0 else -1
if animation_control:
cmd.s = animation_control
write_gr_cmd(cmd)
anchor_frame = 0
for frame in frame_data.frames:
frame_number = frame.index + 1
if frame.dispose < Dispose.previous:
anchor_frame = frame_number
if frame_number == 1:
control(frame_number, gap=frame.gap, loops=None if loops < 1 else loops)
continue
if frame.dispose is Dispose.previous:
if anchor_frame != frame_number:
transmit_cmd.c = anchor_frame
else:
transmit_cmd.c = (frame_number - 1) if frame.needs_blend else 0
transmit_cmd.s = frame.width
transmit_cmd.v = frame.height
transmit_cmd.x = frame.canvas_x
transmit_cmd.y = frame.canvas_y
transmit_cmd.z = frame.gap if frame.gap > 0 else -1
if can_transfer_with_files:
write_gr_cmd(transmit_cmd, standard_b64encode(os.path.abspath(frame.path).encode(fsenc)))
else:
with open(frame.path, 'rb') as f:
data = f.read()
write_chunked(transmit_cmd, data)
if frame_number == 2:
control(animation_control=2)
control(animation_control=3)
def parse_z_index(val: str) -> int:
origin = 0
if val.startswith('--'):
val = val[1:]
origin = -1073741824
return origin + int(val)
class ParsedOpts:
place: Optional['Place'] = None
z_index: int = 0
remove_alpha: str = ''
flip: bool = False
flop: bool = False
def process(path: str, args: IcatCLIOptions, parsed_opts: ParsedOpts, is_tempfile: bool) -> bool:
m = identify(path)
ss = get_screen_size()
available_width = parsed_opts.place.width * (ss.width // ss.cols) if parsed_opts.place else ss.width
available_height = parsed_opts.place.height * (ss.height // ss.rows) if parsed_opts.place else 10 * m.height
needs_scaling = m.width > available_width or m.height > available_height
needs_scaling = needs_scaling or args.scale_up
needs_conversion = needs_scaling or bool(parsed_opts.remove_alpha) or parsed_opts.flip or parsed_opts.flop
file_removed = False
use_number = 0
if m.fmt == 'png' and not needs_conversion:
outfile = path
transmit_mode: 'GRT_t' = 't' if is_tempfile else 'f'
fmt: 'GRT_f' = 100
width, height = m.width, m.height
file_removed = transmit_mode == 't'
else:
fmt = 24 if m.mode == 'rgb' else 32
transmit_mode = 't'
if len(m) == 1 or args.loop == 0:
outfile, width, height = render_as_single_image(
path, m, available_width, available_height, args.scale_up,
remove_alpha=parsed_opts.remove_alpha, flip=parsed_opts.flip, flop=parsed_opts.flop)
else:
import struct
use_number = max(1, struct.unpack('@I', os.urandom(4))[0])
with NamedTemporaryFile(prefix='tty-graphics-protocol-') as f:
prefix = f.name
frame_data = render_image(
path, prefix, m, available_width, available_height, args.scale_up,
remove_alpha=parsed_opts.remove_alpha, flip=parsed_opts.flip, flop=parsed_opts.flop)
outfile, width, height = frame_data.frames[0].path, frame_data.width, frame_data.height
show(
outfile, width, height, parsed_opts.z_index, fmt, transmit_mode,
align=args.align, place=parsed_opts.place, use_number=use_number
)
if use_number:
show_frames(frame_data, use_number, args.loop)
if not can_transfer_with_files:
for fr in frame_data.frames:
with contextlib.suppress(FileNotFoundError):
os.unlink(fr.path)
if not args.place:
print() # ensure cursor is on a new line
return file_removed
def scan(d: str) -> Generator[Tuple[str, str], None, None]:
for dirpath, dirnames, filenames in os.walk(d):
for f in filenames:
mt = guess_type(f)
if mt and mt.startswith('image/'):
yield os.path.join(dirpath, f), mt
def detect_support(wait_for: float = 10, silent: bool = False) -> bool:
global can_transfer_with_files
if not silent:
print(f'Checking for graphics ({wait_for}s max. wait)...', end='\r')
sys.stdout.flush()
try:
received = b''
responses: Dict[int, bool] = {}
def parse_responses() -> None:
for m in re.finditer(b'\033_Gi=([1|2]);(.+?)\033\\\\', received):
iid = m.group(1)
if iid in (b'1', b'2'):
iid_ = int(iid.decode('ascii'))
if iid_ not in responses:
responses[iid_] = m.group(2) == b'OK'
def more_needed(data: bytes) -> bool:
nonlocal received
received += data
parse_responses()
return 1 not in responses or 2 not in responses
with NamedTemporaryFile(prefix='tty-graphics-protocol') as f:
f.write(b'abcd')
f.flush()
gc = GraphicsCommand()
gc.a = 'q'
gc.s = gc.v = gc.i = 1
write_gr_cmd(gc, standard_b64encode(b'abcd'))
gc.t = 'f'
gc.i = 2
write_gr_cmd(gc, standard_b64encode(f.name.encode(fsenc)))
with TTYIO() as io:
io.recv(more_needed, timeout=wait_for)
finally:
if not silent:
sys.stdout.buffer.write(b'\033[J'), sys.stdout.flush()
can_transfer_with_files = bool(responses.get(2))
return responses.get(1, False)
class Place(NamedTuple):
width: int
height: int
left: int
top: int
def parse_place(raw: str) -> Optional[Place]:
if raw:
area, pos = raw.split('@', 1)
w, h = map(int, area.split('x'))
l, t = map(int, pos.split('x'))
return Place(w, h, l, t)
return None
help_text = (
'A cat like utility to display images in the terminal.'
' You can specify multiple image files and/or directories.'
' Directories are scanned recursively for image files. If STDIN'
' is not a terminal, image data will be read from it as well.'
' You can also specify HTTP(S) or FTP URLs which will be'
' automatically downloaded and displayed.'
)
usage = 'image-file-or-url-or-directory ...'
@contextlib.contextmanager
def socket_timeout(seconds: int) -> Generator[None, None, None]:
old = socket.getdefaulttimeout()
socket.setdefaulttimeout(seconds)
try:
yield
finally:
socket.setdefaulttimeout(old)
def process_single_item(
item: Union[bytes, str],
args: IcatCLIOptions,
parsed_opts: ParsedOpts,
url_pat: Optional['Pattern[str]'] = None,
maybe_dir: bool = True
) -> None:
is_tempfile = False
file_removed = False
try:
if isinstance(item, bytes):
with NamedTemporaryFile(prefix='tty-graphics-protocol-', delete=False) as tf:
tf.write(item)
item = tf.name
is_tempfile = True
if url_pat is not None and url_pat.match(item) is not None:
from urllib.request import urlretrieve
with NamedTemporaryFile(prefix='tty-graphics-protocol-', delete=False) as tf:
try:
with socket_timeout(30):
urlretrieve(item, filename=tf.name)
except Exception as e:
raise SystemExit(f'Failed to download image at URL: {item} with error: {e}')
item = tf.name
is_tempfile = True
file_removed = process(item, args, parsed_opts, is_tempfile)
elif item.lower().startswith('file://'):
from urllib.parse import urlparse
from urllib.request import url2pathname
pitem = urlparse(item)
if os.sep == '\\':
item = pitem.netloc + pitem.path
else:
item = pitem.path
item = url2pathname(item)
file_removed = process(item, args, parsed_opts, is_tempfile)
else:
if maybe_dir and os.path.isdir(item):
for (x, mt) in scan(item):
process_single_item(x, args, parsed_opts, url_pat=None, maybe_dir=False)
else:
file_removed = process(item, args, parsed_opts, is_tempfile)
finally:
if is_tempfile and not file_removed:
os.remove(item)
def main(args: List[str] = sys.argv) -> None:
global can_transfer_with_files
cli_opts, items_ = parse_args(args[1:], options_spec, usage, help_text, f'{appname} +kitten icat', result_class=IcatCLIOptions)
items: List[Union[str, bytes]] = list(items_)
if cli_opts.print_window_size:
screen_size_function.cache_clear()
with open(os.ctermid()) as tty:
try:
fd = tty.fileno()
except AttributeError:
# use default value for fd if ctermid is not available
fd = None
ss = screen_size_function(fd)()
print(f'{ss.width}x{ss.height}', end='')
raise SystemExit(0)
if not sys.stdout.isatty():
sys.stdout = open(os.ctermid(), 'w')
stdin_data = None
if cli_opts.stdin == 'yes' or (cli_opts.stdin == 'detect' and sys.stdin is not None and not sys.stdin.isatty()):
stdin_data = sys.stdin.buffer.read()
if stdin_data:
items.insert(0, stdin_data)
sys.stdin.close()
sys.stdin = open(os.ctermid())
screen_size = get_screen_size_function()
signal.signal(signal.SIGWINCH, lambda signum, frame: setattr(screen_size, 'changed', True))
if screen_size().width == 0:
if cli_opts.detect_support:
raise SystemExit(1)
raise SystemExit(
'Terminal does not support reporting screen sizes via the TIOCGWINSZ ioctl'
)
parsed_opts = ParsedOpts()
if cli_opts.place:
try:
parsed_opts.place = parse_place(cli_opts.place)
except Exception:
raise SystemExit(f'Not a valid place specification: {cli_opts.place}')
try:
parsed_opts.z_index = parse_z_index(cli_opts.z_index)
except Exception:
raise SystemExit(f'Not a valid z-index specification: {cli_opts.z_index}')
if cli_opts.background != 'none':
ra = to_color(cli_opts.background)
if ra is None:
raise SystemExit(f'Not a valid color specification: {cli_opts.background}')
parsed_opts.remove_alpha = ra.as_sharp
parsed_opts.flip = cli_opts.mirror in ('both', 'vertical')
parsed_opts.flop = cli_opts.mirror in ('both', 'horizontal')
if cli_opts.detect_support:
if not detect_support(wait_for=cli_opts.detection_timeout, silent=True):
raise SystemExit(1)
print('file' if can_transfer_with_files else 'stream', end='', file=sys.stderr)
return
if cli_opts.transfer_mode == 'detect':
if not detect_support(wait_for=cli_opts.detection_timeout, silent=cli_opts.silent):
raise SystemExit('This terminal emulator does not support the graphics protocol, use a terminal emulator such as kitty that does support it')
else:
can_transfer_with_files = cli_opts.transfer_mode == 'file'
errors = []
if cli_opts.clear:
sys.stdout.write(clear_images_on_screen(delete_data=True))
if not items:
return
if not items:
raise SystemExit('You must specify at least one file to cat')
if parsed_opts.place:
if len(items) > 1 or (isinstance(items[0], str) and os.path.isdir(items[0])):
raise SystemExit(f'The --place option can only be used with a single image, not {items}')
sys.stdout.buffer.write(b'\0337') # save cursor
url_pat = re.compile(r'(?:https?|ftp)://', flags=re.I)
def hold_if_needed(exit_code_or_msg: Union[int, str]) -> None:
if cli_opts.hold:
if isinstance(exit_code_or_msg, str):
print(exit_code_or_msg, file=sys.stderr, flush=True)
exit_code_or_msg = 1
with open(os.ctermid()) as tty, raw_mode(tty.fileno()):
tty.buffer.read(1)
raise SystemExit(exit_code_or_msg)
for item in items:
try:
process_single_item(item, cli_opts, parsed_opts, url_pat)
except NoImageMagick as e:
hold_if_needed(str(e))
except OutdatedImageMagick as e:
print(e.detailed_error, file=sys.stderr)
hold_if_needed(str(e))
except ConvertFailed as e:
hold_if_needed(str(e))
except OpenFailed as e:
errors.append(e)
if parsed_opts.place:
sys.stdout.buffer.write(b'\0338') # restore cursor
if errors:
for err in errors:
print(err, file=sys.stderr)
hold_if_needed(1 if errors else 0)
raise SystemExit()
if __name__ == '__main__':
main()
elif __name__ == '__doc__':
cd = sys.cli_docs # type: ignore
cd['usage'] = usage
cd['options'] = options_spec
cd['help_text'] = help_text