From 3a344d86febd397be8d4087595640f4efe8e7944 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sat, 7 Oct 2017 09:01:31 +0530 Subject: [PATCH] Add graphics protocol detection to icat.py --- kitty/icat.py | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/kitty/icat.py b/kitty/icat.py index bd85b80f6..6731389da 100755 --- a/kitty/icat.py +++ b/kitty/icat.py @@ -6,16 +6,21 @@ import argparse import fcntl import mimetypes import os +import re +import selectors import signal import struct import subprocess import sys import termios +import tty import zlib from base64 import standard_b64encode from collections import namedtuple from gettext import gettext as _ from math import ceil, floor +from tempfile import NamedTemporaryFile +from time import monotonic try: from kitty.constants import appname @@ -173,9 +178,56 @@ def scan(d): yield os.path.join(dirpath, f), mt +def detect_support(wait_for=10): + fd = sys.stdin.fileno() + old = termios.tcgetattr(fd) + oldfl = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, oldfl | os.O_NONBLOCK) + print('Checking for graphics ({}s max. wait)...'.format(wait_for), end='\r') + sys.stdout.flush() + tty.setraw(fd) + try: + received = b'' + start_time = monotonic() + responses = {} + + def parse_responses(): + for m in re.finditer(b'\033_Gi=([1|2]);(.+?)\033\\\\', received): + iid = m.group(1) + if iid in (b'1', b'2'): + iid = int(iid.decode('ascii')) + if iid not in responses: + responses[iid] = m.group(2) == b'OK' + + def read(): + nonlocal received + d = sys.stdin.buffer.read() + if not d: # EOF + responses[1] = responses[2] = False + return + received += d + parse_responses() + + with NamedTemporaryFile() as f: + f.write(b'abcd'), f.flush() + write_gr_cmd(dict(a='q', s=1, v=1, i=1), standard_b64encode(b'abcd')) + write_gr_cmd(dict(a='q', s=1, v=1, i=2, t='f'), standard_b64encode(f.name.encode(sys.getfilesystemencoding() or 'utf-8'))) + sel = selectors.DefaultSelector() + sel.register(sys.stdin, selectors.EVENT_READ, read) + while monotonic() - start_time < wait_for and 1 not in responses and 2 not in responses: + for key, mask in sel.select(0.1): + read() + finally: + sys.stdout.buffer.write(b'\033[J'), sys.stdout.flush() + termios.tcsetattr(fd, termios.TCSADRAIN, old) + fcntl.fcntl(fd, fcntl.F_SETFL, oldfl) + detect_support.has_files = bool(responses.get(2)) + return responses.get(1, False) + + def main(args=sys.argv): signal.signal(signal.SIGWINCH, lambda: screen_size(refresh=True)) - if not sys.stdout.isatty(): + if not sys.stdout.isatty() or not sys.stdin.isatty(): raise SystemExit( 'Must be run in a terminal, stdout is currently not a terminal' ) @@ -186,6 +238,8 @@ def main(args=sys.argv): args = option_parser().parse_args(args[1:]) if not args.items: raise SystemExit('You must specify at least one file to cat') + if not detect_support(): + raise SystemExit('This terminal emulator does not support the graphics protocol, use a terminal emulator such as kitty that does support it') errors = [] for item in args.items: try: