kitty/kittens/icat/main.py
Kovid Goyal e36d41b46f
Add a --hold option to icat
Keeps it alive after display images
2020-09-18 11:48:31 +05:30

498 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
# vim:fileencoding=utf-8
# License: GPL v3 Copyright: 2017, Kovid Goyal <kovid at kovidgoyal.net>
import contextlib
import mimetypes
import os
import re
import socket
import signal
import sys
import zlib
from base64 import standard_b64encode
from functools import lru_cache
from math import ceil
from tempfile import NamedTemporaryFile
from typing import (
Dict, Generator, List, NamedTuple, Optional, Pattern, Tuple, Union
)
from kitty.cli import parse_args
from kitty.cli_stub import IcatCLIOptions
from kitty.constants import appname
from kitty.typing import GRT_f, GRT_t
from kitty.utils import (
TTYIO, ScreenSize, ScreenSizeGetter, fit_image, screen_size_function
)
from ..tui.images import (
ConvertFailed, GraphicsCommand, NoImageMagick, OpenFailed, convert, fsenc,
identify
)
from ..tui.operations import clear_images_on_screen, raw_mode
OPTIONS = '''\
--align
type=choices
choices=center,left,right
default=center
Horizontal alignment for the displayed image.
--place
Choose where on the screen to display the image. The image will
be scaled to fit into the specified rectangle. The syntax for
specifying rectangles is <:italic:`width`>x<:italic:`height`>@<:italic:`left`>x<:italic:`top`>.
All measurements are in cells (i.e. cursor positions) with the
origin :italic:`(0, 0)` at the top-left corner of the screen.
--scale-up
type=bool-set
When used in combination with :option:`--place` it will cause images that
are smaller than the specified area to be scaled up to use as much
of the specified area as possible.
--clear
type=bool-set
Remove all images currently displayed on the screen.
--transfer-mode
type=choices
choices=detect,file,stream
default=detect
Which mechanism to use to transfer images to the terminal. The default is to
auto-detect. :italic:`file` means to use a temporary file and :italic:`stream` means to
send the data via terminal escape codes. Note that if you use the :italic:`file`
transfer mode and you are connecting over a remote session then image display
will not work.
--detect-support
type=bool-set
Detect support for image display in the terminal. If not supported, will exit
with exit code 1, otherwise will exit with code 0 and print the supported
transfer mode to stderr, which can be used with the :option:`--transfer-mode` option.
--detection-timeout
type=float
default=10
The amount of time (in seconds) to wait for a response form the terminal, when
detecting image display support.
--print-window-size
type=bool-set
Print out the window size as :italic:`widthxheight` (in pixels) and quit. This is a
convenience method to query the window size if using kitty icat from a
scripting language that cannot make termios calls.
--stdin
type=choices
choices=detect,yes,no
default=detect
Read image data from stdin. The default is to do it automatically, when STDIN is not a terminal,
but you can turn it off or on explicitly, if needed.
--silent
type=bool-set
Do not print out anything to stdout during operation.
--z-index -z
default=0
Z-index of the image. When negative, text will be displayed on top of the image. Use
a double minus for values under the threshold for drawing images under cell background
colors. For example, --1 evaluates as -1,073,741,825.
--hold
type=bool-set
Wait for keypress before exiting after displaying the images.
'''
screen_size: Optional[ScreenSizeGetter] = None
can_transfer_with_files = False
def get_screen_size_function() -> ScreenSizeGetter:
global screen_size
if screen_size is None:
screen_size = screen_size_function()
return screen_size
def get_screen_size() -> ScreenSize:
screen_size = get_screen_size_function()
return screen_size()
@lru_cache(maxsize=2)
def options_spec() -> str:
return OPTIONS.format(appname='{}-icat'.format(appname))
def write_gr_cmd(cmd: GraphicsCommand, payload: Optional[bytes] = None) -> None:
sys.stdout.buffer.write(cmd.serialize(payload or b''))
sys.stdout.flush()
def calculate_in_cell_x_offset(width: int, cell_width: int, align: str) -> int:
if align == 'left':
return 0
extra_pixels = width % cell_width
if not extra_pixels:
return 0
if align == 'right':
return cell_width - extra_pixels
return (cell_width - extra_pixels) // 2
def set_cursor(cmd: GraphicsCommand, width: int, height: int, align: str) -> None:
ss = get_screen_size()
cw = int(ss.width / ss.cols)
num_of_cells_needed = int(ceil(width / cw))
if num_of_cells_needed > ss.cols:
w, h = fit_image(width, height, ss.width, height)
ch = int(ss.height / ss.rows)
num_of_rows_needed = int(ceil(height / ch))
cmd.c, cmd.r = ss.cols, num_of_rows_needed
else:
cmd.X = calculate_in_cell_x_offset(width, cw, align)
extra_cells = 0
if align == 'center':
extra_cells = (ss.cols - num_of_cells_needed) // 2
elif align == 'right':
extra_cells = (ss.cols - num_of_cells_needed)
if extra_cells:
sys.stdout.buffer.write(b' ' * extra_cells)
def set_cursor_for_place(place: 'Place', cmd: GraphicsCommand, width: int, height: int, align: str) -> None:
x = place.left + 1
ss = get_screen_size()
cw = int(ss.width / ss.cols)
num_of_cells_needed = int(ceil(width / cw))
cmd.X = calculate_in_cell_x_offset(width, cw, align)
extra_cells = 0
if align == 'center':
extra_cells = (place.width - num_of_cells_needed) // 2
elif align == 'right':
extra_cells = place.width - num_of_cells_needed
sys.stdout.buffer.write('\033[{};{}H'.format(place.top + 1, x + extra_cells).encode('ascii'))
def write_chunked(cmd: GraphicsCommand, data: bytes) -> None:
if cmd.f != 100:
data = zlib.compress(data)
cmd.o = 'z'
data = standard_b64encode(data)
while data:
chunk, data = data[:4096], data[4096:]
cmd.m = 1 if data else 0
write_gr_cmd(cmd, chunk)
cmd.clear()
def show(
outfile: str,
width: int, height: int, zindex: int,
fmt: 'GRT_f',
transmit_mode: 'GRT_t' = 't',
align: str = 'center',
place: Optional['Place'] = None
) -> None:
cmd = GraphicsCommand()
cmd.a = 'T'
cmd.f = fmt
cmd.s = width
cmd.v = height
cmd.z = zindex
if place:
set_cursor_for_place(place, cmd, width, height, align)
else:
set_cursor(cmd, width, height, align)
if can_transfer_with_files:
cmd.t = transmit_mode
write_gr_cmd(cmd, standard_b64encode(os.path.abspath(outfile).encode(fsenc)))
else:
with open(outfile, 'rb') as f:
data = f.read()
if transmit_mode == 't':
os.unlink(outfile)
if fmt == 100:
cmd.S = len(data)
write_chunked(cmd, data)
def parse_z_index(val: str) -> int:
origin = 0
if val.startswith('--'):
val = val[1:]
origin = -1073741824
return origin + int(val)
class ParsedOpts:
place: Optional['Place'] = None
z_index: int = 0
def process(path: str, args: IcatCLIOptions, parsed_opts: ParsedOpts, is_tempfile: bool) -> bool:
m = identify(path)
ss = get_screen_size()
available_width = parsed_opts.place.width * (ss.width // ss.cols) if parsed_opts.place else ss.width
available_height = parsed_opts.place.height * (ss.height // ss.rows) if parsed_opts.place else 10 * m.height
needs_scaling = m.width > available_width or m.height > available_height
needs_scaling = needs_scaling or args.scale_up
file_removed = False
if m.fmt == 'png' and not needs_scaling:
outfile = path
transmit_mode: 'GRT_t' = 't' if is_tempfile else 'f'
fmt: 'GRT_f' = 100
width, height = m.width, m.height
file_removed = transmit_mode == 't'
else:
fmt = 24 if m.mode == 'rgb' else 32
transmit_mode = 't'
outfile, width, height = convert(path, m, available_width, available_height, args.scale_up)
show(outfile, width, height, parsed_opts.z_index, fmt, transmit_mode, align=args.align, place=parsed_opts.place)
if not args.place:
print() # ensure cursor is on a new line
return file_removed
def scan(d: str) -> Generator[Tuple[str, str], None, None]:
for dirpath, dirnames, filenames in os.walk(d):
for f in filenames:
mt = mimetypes.guess_type(f)[0]
if mt and mt.startswith('image/'):
yield os.path.join(dirpath, f), mt
def detect_support(wait_for: float = 10, silent: bool = False) -> bool:
global can_transfer_with_files
if not silent:
print('Checking for graphics ({}s max. wait)...'.format(wait_for), end='\r')
sys.stdout.flush()
try:
received = b''
responses: Dict[int, bool] = {}
def parse_responses() -> None:
for m in re.finditer(b'\033_Gi=([1|2]);(.+?)\033\\\\', received):
iid = m.group(1)
if iid in (b'1', b'2'):
iid_ = int(iid.decode('ascii'))
if iid_ not in responses:
responses[iid_] = m.group(2) == b'OK'
def more_needed(data: bytes) -> bool:
nonlocal received
received += data
parse_responses()
return 1 not in responses or 2 not in responses
with NamedTemporaryFile() as f:
f.write(b'abcd'), f.flush()
gc = GraphicsCommand()
gc.a = 'q'
gc.s = gc.v = gc.i = 1
write_gr_cmd(gc, standard_b64encode(b'abcd'))
gc.t = 'f'
gc.i = 2
write_gr_cmd(gc, standard_b64encode(f.name.encode(fsenc)))
with TTYIO() as io:
io.recv(more_needed, timeout=wait_for)
finally:
if not silent:
sys.stdout.buffer.write(b'\033[J'), sys.stdout.flush()
can_transfer_with_files = bool(responses.get(2))
return responses.get(1, False)
class Place(NamedTuple):
width: int
height: int
left: int
top: int
def parse_place(raw: str) -> Optional[Place]:
if raw:
area, pos = raw.split('@', 1)
w, h = map(int, area.split('x'))
l, t = map(int, pos.split('x'))
return Place(w, h, l, t)
return None
help_text = (
'A cat like utility to display images in the terminal.'
' You can specify multiple image files and/or directories.'
' Directories are scanned recursively for image files. If STDIN'
' is not a terminal, image data will be read from it as well.'
' You can also specify HTTP(S) or FTP URLs which will be'
' automatically downloaded and displayed.'
)
usage = 'image-file-or-url-or-directory ...'
@contextlib.contextmanager
def socket_timeout(seconds: int) -> Generator[None, None, None]:
old = socket.getdefaulttimeout()
socket.setdefaulttimeout(seconds)
try:
yield
finally:
socket.setdefaulttimeout(old)
def process_single_item(
item: Union[bytes, str],
args: IcatCLIOptions,
parsed_opts: ParsedOpts,
url_pat: Optional[Pattern] = None,
maybe_dir: bool = True
) -> None:
is_tempfile = False
file_removed = False
try:
if isinstance(item, bytes):
tf = NamedTemporaryFile(prefix='stdin-image-data-', delete=False)
tf.write(item), tf.close()
item = tf.name
is_tempfile = True
if url_pat is not None and url_pat.match(item) is not None:
from urllib.request import urlretrieve
with NamedTemporaryFile(prefix='url-image-data-', delete=False) as tf:
try:
with socket_timeout(30):
urlretrieve(item, filename=tf.name)
except Exception as e:
raise SystemExit('Failed to download image at URL: {} with error: {}'.format(item, e))
item = tf.name
is_tempfile = True
file_removed = process(item, args, parsed_opts, is_tempfile)
elif item.lower().startswith('file://'):
from urllib.parse import urlparse
from urllib.request import url2pathname
pitem = urlparse(item)
if os.sep == '\\':
item = pitem.netloc + pitem.path
else:
item = pitem.path
item = url2pathname(item)
file_removed = process(item, args, parsed_opts, is_tempfile)
else:
if maybe_dir and os.path.isdir(item):
for (x, mt) in scan(item):
process_single_item(x, args, parsed_opts, url_pat=None, maybe_dir=False)
else:
file_removed = process(item, args, parsed_opts, is_tempfile)
finally:
if is_tempfile and not file_removed:
os.remove(item)
def main(args: List[str] = sys.argv) -> None:
global can_transfer_with_files
cli_opts, items_ = parse_args(args[1:], options_spec, usage, help_text, '{} +kitten icat'.format(appname), result_class=IcatCLIOptions)
items: List[Union[str, bytes]] = list(items_)
if cli_opts.print_window_size:
screen_size_function.cache_clear()
with open(os.ctermid()) as tty:
ss = screen_size_function(tty)()
print('{}x{}'.format(ss.width, ss.height), end='')
raise SystemExit(0)
if not sys.stdout.isatty():
sys.stdout = open(os.ctermid(), 'w')
stdin_data = None
if cli_opts.stdin == 'yes' or (not sys.stdin.isatty() and cli_opts.stdin == 'detect'):
stdin_data = sys.stdin.buffer.read()
if stdin_data:
items.insert(0, stdin_data)
sys.stdin.close()
sys.stdin = open(os.ctermid(), 'r')
screen_size = get_screen_size_function()
signal.signal(signal.SIGWINCH, lambda signum, frame: setattr(screen_size, 'changed', True))
if screen_size().width == 0:
if cli_opts.detect_support:
raise SystemExit(1)
raise SystemExit(
'Terminal does not support reporting screen sizes via the TIOCGWINSZ ioctl'
)
parsed_opts = ParsedOpts()
if cli_opts.place:
try:
parsed_opts.place = parse_place(cli_opts.place)
except Exception:
raise SystemExit('Not a valid place specification: {}'.format(cli_opts.place))
try:
parsed_opts.z_index = parse_z_index(cli_opts.z_index)
except Exception:
raise SystemExit('Not a valid z-index specification: {}'.format(cli_opts.z_index))
if cli_opts.detect_support:
if not detect_support(wait_for=cli_opts.detection_timeout, silent=True):
raise SystemExit(1)
print('file' if can_transfer_with_files else 'stream', end='', file=sys.stderr)
return
if cli_opts.transfer_mode == 'detect':
if not detect_support(wait_for=cli_opts.detection_timeout, silent=cli_opts.silent):
raise SystemExit('This terminal emulator does not support the graphics protocol, use a terminal emulator such as kitty that does support it')
else:
can_transfer_with_files = cli_opts.transfer_mode == 'file'
errors = []
if cli_opts.clear:
sys.stdout.write(clear_images_on_screen(delete_data=True))
if not items:
return
if not items:
raise SystemExit('You must specify at least one file to cat')
if parsed_opts.place:
if len(items) > 1 or (isinstance(items[0], str) and os.path.isdir(items[0])):
raise SystemExit(f'The --place option can only be used with a single image, not {items}')
sys.stdout.buffer.write(b'\0337') # save cursor
url_pat = re.compile(r'(?:https?|ftp)://', flags=re.I)
for item in items:
try:
process_single_item(item, cli_opts, parsed_opts, url_pat)
except NoImageMagick as e:
raise SystemExit(str(e))
except ConvertFailed as e:
raise SystemExit(str(e))
except OpenFailed as e:
errors.append(e)
if parsed_opts.place:
sys.stdout.buffer.write(b'\0338') # restore cursor
if errors:
for err in errors:
print(err, file=sys.stderr)
if cli_opts.hold:
with open(os.ctermid()) as tty:
with raw_mode(tty.fileno()):
tty.buffer.read(1)
raise SystemExit(1 if errors else 0)
if __name__ == '__main__':
main()
elif __name__ == '__doc__':
cd = sys.cli_docs # type: ignore
cd['usage'] = usage
cd['options'] = options_spec
cd['help_text'] = help_text