Add graphics protocol detection to icat.py
This commit is contained in:
parent
6a65f23f98
commit
3a344d86fe
@ -6,16 +6,21 @@ import argparse
|
||||
import fcntl
|
||||
import mimetypes
|
||||
import os
|
||||
import re
|
||||
import selectors
|
||||
import signal
|
||||
import struct
|
||||
import subprocess
|
||||
import sys
|
||||
import termios
|
||||
import tty
|
||||
import zlib
|
||||
from base64 import standard_b64encode
|
||||
from collections import namedtuple
|
||||
from gettext import gettext as _
|
||||
from math import ceil, floor
|
||||
from tempfile import NamedTemporaryFile
|
||||
from time import monotonic
|
||||
|
||||
try:
|
||||
from kitty.constants import appname
|
||||
@ -173,9 +178,56 @@ def scan(d):
|
||||
yield os.path.join(dirpath, f), mt
|
||||
|
||||
|
||||
def detect_support(wait_for=10):
|
||||
fd = sys.stdin.fileno()
|
||||
old = termios.tcgetattr(fd)
|
||||
oldfl = fcntl.fcntl(fd, fcntl.F_GETFL)
|
||||
fcntl.fcntl(fd, fcntl.F_SETFL, oldfl | os.O_NONBLOCK)
|
||||
print('Checking for graphics ({}s max. wait)...'.format(wait_for), end='\r')
|
||||
sys.stdout.flush()
|
||||
tty.setraw(fd)
|
||||
try:
|
||||
received = b''
|
||||
start_time = monotonic()
|
||||
responses = {}
|
||||
|
||||
def parse_responses():
|
||||
for m in re.finditer(b'\033_Gi=([1|2]);(.+?)\033\\\\', received):
|
||||
iid = m.group(1)
|
||||
if iid in (b'1', b'2'):
|
||||
iid = int(iid.decode('ascii'))
|
||||
if iid not in responses:
|
||||
responses[iid] = m.group(2) == b'OK'
|
||||
|
||||
def read():
|
||||
nonlocal received
|
||||
d = sys.stdin.buffer.read()
|
||||
if not d: # EOF
|
||||
responses[1] = responses[2] = False
|
||||
return
|
||||
received += d
|
||||
parse_responses()
|
||||
|
||||
with NamedTemporaryFile() as f:
|
||||
f.write(b'abcd'), f.flush()
|
||||
write_gr_cmd(dict(a='q', s=1, v=1, i=1), standard_b64encode(b'abcd'))
|
||||
write_gr_cmd(dict(a='q', s=1, v=1, i=2, t='f'), standard_b64encode(f.name.encode(sys.getfilesystemencoding() or 'utf-8')))
|
||||
sel = selectors.DefaultSelector()
|
||||
sel.register(sys.stdin, selectors.EVENT_READ, read)
|
||||
while monotonic() - start_time < wait_for and 1 not in responses and 2 not in responses:
|
||||
for key, mask in sel.select(0.1):
|
||||
read()
|
||||
finally:
|
||||
sys.stdout.buffer.write(b'\033[J'), sys.stdout.flush()
|
||||
termios.tcsetattr(fd, termios.TCSADRAIN, old)
|
||||
fcntl.fcntl(fd, fcntl.F_SETFL, oldfl)
|
||||
detect_support.has_files = bool(responses.get(2))
|
||||
return responses.get(1, False)
|
||||
|
||||
|
||||
def main(args=sys.argv):
|
||||
signal.signal(signal.SIGWINCH, lambda: screen_size(refresh=True))
|
||||
if not sys.stdout.isatty():
|
||||
if not sys.stdout.isatty() or not sys.stdin.isatty():
|
||||
raise SystemExit(
|
||||
'Must be run in a terminal, stdout is currently not a terminal'
|
||||
)
|
||||
@ -186,6 +238,8 @@ def main(args=sys.argv):
|
||||
args = option_parser().parse_args(args[1:])
|
||||
if not args.items:
|
||||
raise SystemExit('You must specify at least one file to cat')
|
||||
if not detect_support():
|
||||
raise SystemExit('This terminal emulator does not support the graphics protocol, use a terminal emulator such as kitty that does support it')
|
||||
errors = []
|
||||
for item in args.items:
|
||||
try:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user