Add type information to the Graphics Command infrastructure

This commit is contained in:
Kovid Goyal 2020-03-10 20:14:04 +05:30
parent fc0adfd965
commit 01142cdc8c
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
5 changed files with 253 additions and 103 deletions

View File

@ -12,7 +12,7 @@ from base64 import standard_b64encode
from functools import lru_cache from functools import lru_cache
from math import ceil from math import ceil
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from typing import Dict, List, NamedTuple, Optional, Union from typing import TYPE_CHECKING, Dict, List, NamedTuple, Optional, Union
from kitty.cli import parse_args from kitty.cli import parse_args
from kitty.cli_stub import IcatCLIOptions from kitty.cli_stub import IcatCLIOptions
@ -22,9 +22,12 @@ from kitty.utils import (
) )
from ..tui.images import ( from ..tui.images import (
ConvertFailed, NoImageMagick, OpenFailed, convert, fsenc, identify ConvertFailed, NoImageMagick, OpenFailed, convert, fsenc, identify, GraphicsCommand
) )
from ..tui.operations import clear_images_on_screen, serialize_gr_command from ..tui.operations import clear_images_on_screen
if TYPE_CHECKING:
from ..tui.images import GRT_f, GRT_t # noqa
OPTIONS = '''\ OPTIONS = '''\
--align --align
@ -128,8 +131,8 @@ def options_spec() -> str:
return OPTIONS.format(appname='{}-icat'.format(appname)) return OPTIONS.format(appname='{}-icat'.format(appname))
def write_gr_cmd(cmd, payload=None): def write_gr_cmd(cmd: GraphicsCommand, payload: Optional[bytes] = None) -> None:
sys.stdout.buffer.write(serialize_gr_command(cmd, payload)) sys.stdout.buffer.write(cmd.serialize(payload or b''))
sys.stdout.flush() sys.stdout.flush()
@ -144,7 +147,7 @@ def calculate_in_cell_x_offset(width, cell_width, align):
return (cell_width - extra_pixels) // 2 return (cell_width - extra_pixels) // 2
def set_cursor(cmd, width, height, align): def set_cursor(cmd: GraphicsCommand, width, height, align):
ss = get_screen_size() ss = get_screen_size()
cw = int(ss.width / ss.cols) cw = int(ss.width / ss.cols)
num_of_cells_needed = int(ceil(width / cw)) num_of_cells_needed = int(ceil(width / cw))
@ -152,9 +155,9 @@ def set_cursor(cmd, width, height, align):
w, h = fit_image(width, height, ss.width, height) w, h = fit_image(width, height, ss.width, height)
ch = int(ss.height / ss.rows) ch = int(ss.height / ss.rows)
num_of_rows_needed = int(ceil(height / ch)) num_of_rows_needed = int(ceil(height / ch))
cmd['c'], cmd['r'] = ss.cols, num_of_rows_needed cmd.c, cmd.r = ss.cols, num_of_rows_needed
else: else:
cmd['X'] = calculate_in_cell_x_offset(width, cw, align) cmd.X = calculate_in_cell_x_offset(width, cw, align)
extra_cells = 0 extra_cells = 0
if align == 'center': if align == 'center':
extra_cells = (ss.cols - num_of_cells_needed) // 2 extra_cells = (ss.cols - num_of_cells_needed) // 2
@ -164,12 +167,12 @@ def set_cursor(cmd, width, height, align):
sys.stdout.buffer.write(b' ' * extra_cells) sys.stdout.buffer.write(b' ' * extra_cells)
def set_cursor_for_place(place, cmd, width, height, align): def set_cursor_for_place(place, cmd: GraphicsCommand, width, height, align):
x = place.left + 1 x = place.left + 1
ss = get_screen_size() ss = get_screen_size()
cw = int(ss.width / ss.cols) cw = int(ss.width / ss.cols)
num_of_cells_needed = int(ceil(width / cw)) num_of_cells_needed = int(ceil(width / cw))
cmd['X'] = calculate_in_cell_x_offset(width, cw, align) cmd.X = calculate_in_cell_x_offset(width, cw, align)
extra_cells = 0 extra_cells = 0
if align == 'center': if align == 'center':
extra_cells = (place.width - num_of_cells_needed) // 2 extra_cells = (place.width - num_of_cells_needed) // 2
@ -178,27 +181,31 @@ def set_cursor_for_place(place, cmd, width, height, align):
sys.stdout.buffer.write('\033[{};{}H'.format(place.top + 1, x + extra_cells).encode('ascii')) sys.stdout.buffer.write('\033[{};{}H'.format(place.top + 1, x + extra_cells).encode('ascii'))
def write_chunked(cmd, data): def write_chunked(cmd: GraphicsCommand, data: bytes) -> None:
if cmd['f'] != 100: if cmd.f != 100:
data = zlib.compress(data) data = zlib.compress(data)
cmd['o'] = 'z' cmd.o = 'z'
data = standard_b64encode(data) data = standard_b64encode(data)
while data: while data:
chunk, data = data[:4096], data[4096:] chunk, data = data[:4096], data[4096:]
m = 1 if data else 0 cmd.m = 1 if data else 0
cmd['m'] = m
write_gr_cmd(cmd, chunk) write_gr_cmd(cmd, chunk)
cmd.clear() cmd.clear()
def show(outfile, width, height, zindex, fmt, transmit_mode='t', align='center', place=None): def show(outfile, width: int, height: int, zindex: int, fmt: 'GRT_f', transmit_mode: 'GRT_t' = 't', align: str = 'center', place=None):
cmd = {'a': 'T', 'f': fmt, 's': width, 'v': height, 'z': zindex} cmd = GraphicsCommand()
cmd.a = 'T'
cmd.f = fmt
cmd.s = width
cmd.v = height
cmd.z = zindex
if place: if place:
set_cursor_for_place(place, cmd, width, height, align) set_cursor_for_place(place, cmd, width, height, align)
else: else:
set_cursor(cmd, width, height, align) set_cursor(cmd, width, height, align)
if can_transfer_with_files: if can_transfer_with_files:
cmd['t'] = transmit_mode cmd.t = transmit_mode
write_gr_cmd(cmd, standard_b64encode(os.path.abspath(outfile).encode(fsenc))) write_gr_cmd(cmd, standard_b64encode(os.path.abspath(outfile).encode(fsenc)))
else: else:
with open(outfile, 'rb') as f: with open(outfile, 'rb') as f:
@ -206,7 +213,7 @@ def show(outfile, width, height, zindex, fmt, transmit_mode='t', align='center',
if transmit_mode == 't': if transmit_mode == 't':
os.unlink(outfile) os.unlink(outfile)
if fmt == 100: if fmt == 100:
cmd['S'] = len(data) cmd.S = len(data)
write_chunked(cmd, data) write_chunked(cmd, data)
@ -228,8 +235,8 @@ def process(path, args, is_tempfile):
file_removed = False file_removed = False
if m.fmt == 'png' and not needs_scaling: if m.fmt == 'png' and not needs_scaling:
outfile = path outfile = path
transmit_mode = 't' if is_tempfile else 'f' transmit_mode: 'GRT_t' = 't' if is_tempfile else 'f'
fmt = 100 fmt: 'GRT_f' = 100
width, height = m.width, m.height width, height = m.width, m.height
file_removed = transmit_mode == 't' file_removed = transmit_mode == 't'
else: else:
@ -250,7 +257,7 @@ def scan(d):
yield os.path.join(dirpath, f), mt yield os.path.join(dirpath, f), mt
def detect_support(wait_for=10, silent=False): def detect_support(wait_for: int = 10, silent: bool = False) -> bool:
global can_transfer_with_files global can_transfer_with_files
if not silent: if not silent:
print('Checking for graphics ({}s max. wait)...'.format(wait_for), end='\r') print('Checking for graphics ({}s max. wait)...'.format(wait_for), end='\r')
@ -275,8 +282,13 @@ def detect_support(wait_for=10, silent=False):
with NamedTemporaryFile() as f: with NamedTemporaryFile() as f:
f.write(b'abcd'), f.flush() f.write(b'abcd'), f.flush()
write_gr_cmd(dict(a='q', s=1, v=1, i=1), standard_b64encode(b'abcd')) gc = GraphicsCommand()
write_gr_cmd(dict(a='q', s=1, v=1, i=2, t='f'), standard_b64encode(f.name.encode(fsenc))) gc.a = 'q'
gc.s = gc.v = gc.i = 1
write_gr_cmd(gc, standard_b64encode(b'abcd'))
gc.t = 'f'
gc.i = 2
write_gr_cmd(gc, standard_b64encode(f.name.encode(fsenc)))
with TTYIO() as io: with TTYIO() as io:
io.recv(more_needed, timeout=float(wait_for)) io.recv(more_needed, timeout=float(wait_for))
finally: finally:

View File

@ -3,16 +3,21 @@
# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net> # License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
from typing import Callable, Optional, Type from typing import TYPE_CHECKING, Callable, Optional, Type
from .operations import commander from .operations import commander
if TYPE_CHECKING:
from kitty.utils import ScreenSize
ScreenSize
class Handler: class Handler:
image_manager_class: Optional[Type['ImageManagerBase']] = None image_manager_class: Optional[Type['ImageManagerBase']] = None
def _initialize(self, screen_size, term_manager, schedule_write, tui_loop, debug, image_manager=None): def _initialize(self, screen_size: 'ScreenSize', term_manager, schedule_write, tui_loop, debug, image_manager=None):
self.screen_size = screen_size self.screen_size = screen_size
self._term_manager = term_manager self._term_manager = term_manager
self._tui_loop = tui_loop self._tui_loop = tui_loop

View File

@ -9,9 +9,12 @@ from base64 import standard_b64encode
from collections import defaultdict, deque from collections import defaultdict, deque
from contextlib import suppress from contextlib import suppress
from itertools import count from itertools import count
from typing import Any, DefaultDict, Deque, Dict, Tuple from typing import (
TYPE_CHECKING, Any, DefaultDict, Deque, Dict, List, Optional, Sequence,
Tuple, Union
)
from kitty.utils import fit_image from kitty.utils import ScreenSize, fit_image
from .handler import ImageManagerBase from .handler import ImageManagerBase
from .operations import cursor from .operations import cursor
@ -23,16 +26,33 @@ except Exception:
fsenc = 'utf-8' fsenc = 'utf-8'
try:
from typing import TypedDict, Literal
GRT_a = Literal['t', 'T', 'q', 'p', 'd']
GRT_f = Literal[24, 32, 100]
GRT_t = Literal['d', 'f', 't', 's']
GRT_o = Literal['z']
GRT_m = Literal[0, 1]
GRT_d = Literal['a', 'A', 'c', 'C', 'i', 'I', 'p', 'P', 'q', 'Q', 'x', 'X', 'y', 'Y', 'z', 'Z']
except ImportError:
TypedDict = dict
if TYPE_CHECKING:
import subprocess
from .handler import Handler
class ImageData: class ImageData:
def __init__(self, fmt, width, height, mode): def __init__(self, fmt: str, width: int, height: int, mode: str):
self.width, self.height, self.fmt, self.mode = width, height, fmt, mode self.width, self.height, self.fmt, self.mode = width, height, fmt, mode
self.transmit_fmt = str(24 if self.mode == 'rgb' else 32) self.transmit_fmt: 'GRT_f' = (24 if self.mode == 'rgb' else 32)
class OpenFailed(ValueError): class OpenFailed(ValueError):
def __init__(self, path, message): def __init__(self, path: str, message: str):
ValueError.__init__( ValueError.__init__(
self, 'Failed to open image: {} with error: {}'.format(path, message) self, 'Failed to open image: {} with error: {}'.format(path, message)
) )
@ -41,7 +61,7 @@ class OpenFailed(ValueError):
class ConvertFailed(ValueError): class ConvertFailed(ValueError):
def __init__(self, path, message): def __init__(self, path: str, message: str):
ValueError.__init__( ValueError.__init__(
self, 'Failed to convert image: {} with error: {}'.format(path, message) self, 'Failed to convert image: {} with error: {}'.format(path, message)
) )
@ -52,7 +72,7 @@ class NoImageMagick(Exception):
pass pass
def run_imagemagick(path, cmd, keep_stdout=True): def run_imagemagick(path: str, cmd: Sequence[str], keep_stdout: bool = True) -> 'subprocess.CompletedProcess[bytes]':
import subprocess import subprocess
try: try:
p = subprocess.run(cmd, stdout=subprocess.PIPE if keep_stdout else subprocess.DEVNULL, stderr=subprocess.PIPE) p = subprocess.run(cmd, stdout=subprocess.PIPE if keep_stdout else subprocess.DEVNULL, stderr=subprocess.PIPE)
@ -63,14 +83,19 @@ def run_imagemagick(path, cmd, keep_stdout=True):
return p return p
def identify(path): def identify(path: str) -> ImageData:
p = run_imagemagick(path, ['identify', '-format', '%m %w %h %A', '--', path]) p = run_imagemagick(path, ['identify', '-format', '%m %w %h %A', '--', path])
parts: Tuple[str, ...] = tuple(filter(None, p.stdout.decode('utf-8').split())) parts: Tuple[str, ...] = tuple(filter(None, p.stdout.decode('utf-8').split()))
mode = 'rgb' if parts[3].lower() == 'false' else 'rgba' mode = 'rgb' if parts[3].lower() == 'false' else 'rgba'
return ImageData(parts[0].lower(), int(parts[1]), int(parts[2]), mode) return ImageData(parts[0].lower(), int(parts[1]), int(parts[2]), mode)
def convert(path, m, available_width, available_height, scale_up, tdir=None): def convert(
path: str, m: ImageData,
available_width: int, available_height: int,
scale_up: bool,
tdir: Optional[str] = None
) -> Tuple[str, int, int]:
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
width, height = m.width, m.height width, height = m.width, m.height
cmd = ['convert', '-background', 'none', '--', path] cmd = ['convert', '-background', 'none', '--', path]
@ -102,58 +127,121 @@ def convert(path, m, available_width, available_height, scale_up, tdir=None):
return outfile.name, width, height return outfile.name, width, height
def can_display_images(): def can_display_images() -> bool:
import shutil import shutil
ans = getattr(can_display_images, 'ans', None) ans: Optional[bool] = getattr(can_display_images, 'ans', None)
if ans is None: if ans is None:
ans = shutil.which('convert') is not None ans = shutil.which('convert') is not None
setattr(can_display_images, 'ans', ans) setattr(can_display_images, 'ans', ans)
return ans return ans
ImageKey = Tuple[str, int, int]
SentImageKey = Tuple[int, int, int]
class GraphicsCommand:
a: 'GRT_a' = 't' # action
f: 'GRT_f' = 32 # image data format
t: 'GRT_t' = 'd' # transmission medium
s: int = 0 # sent image width
v: int = 0 # sent image height
S: int = 0 # size of data to read from file
O: int = 0 # offset of data to read from file
i: int = 0 # image id
o: Optional['GRT_o'] = None # type of compression
m: 'GRT_m' = 0 # 0 or 1 whether there is more chunked data
x: int = 0 # left edge of image area to display
y: int = 0 # top edge of image area to display
w: int = 0 # image width to display
h: int = 0 # image height to display
X: int = 0 # X-offset within cell
Y: int = 0 # Y-offset within cell
c: int = 0 # number of cols to display image over
r: int = 0 # number of rows to display image over
z: int = 0 # z-index
d: 'GRT_d' = 'a' # what to delete
def serialize(self, payload: bytes = b'') -> bytes:
items = []
for k in GraphicsCommand.__annotations__:
val: Union[str, None, int] = getattr(self, k)
defval: Union[str, None, int] = getattr(GraphicsCommand, k)
if val != defval and val is not None:
items.append('{}={}'.format(k, val))
ans: List[bytes] = []
w = ans.append
w(b'\033_G')
w(','.join(items).encode('ascii'))
if payload:
w(b';')
w(payload)
w(b'\033\\')
return b''.join(ans)
def clear(self) -> None:
for k in GraphicsCommand.__annotations__:
defval: Union[str, None, int] = getattr(GraphicsCommand, k)
setattr(self, k, defval)
class Placement(TypedDict):
cmd: GraphicsCommand
x: int
y: int
class ImageManager(ImageManagerBase): class ImageManager(ImageManagerBase):
def __init__(self, handler): def __init__(self, handler: 'Handler'):
self.image_id_counter = count() self.image_id_counter = count()
self.handler = handler self.handler = handler
self.filesystem_ok = None self.filesystem_ok: Optional[bool] = None
self.image_data = {} self.image_data: Dict[str, ImageData] = {}
self.failed_images = {} self.failed_images: Dict[str, Exception] = {}
self.converted_images = {} self.converted_images: Dict[ImageKey, ImageKey] = {}
self.sent_images = {} self.sent_images: Dict[ImageKey, int] = {}
self.image_id_to_image_data = {} self.image_id_to_image_data: Dict[int, ImageData] = {}
self.image_id_to_converted_data = {} self.image_id_to_converted_data: Dict[int, ImageKey] = {}
self.transmission_status = {} self.transmission_status: Dict[int, Union[str, int]] = {}
self.placements_in_flight: DefaultDict[int, Deque[Dict[str, Any]]] = defaultdict(deque) self.placements_in_flight: DefaultDict[int, Deque[Placement]] = defaultdict(deque)
@property @property
def next_image_id(self): def next_image_id(self) -> int:
return next(self.image_id_counter) + 2 return next(self.image_id_counter) + 2
@property @property
def screen_size(self): def screen_size(self) -> ScreenSize:
return self.handler.screen_size return self.handler.screen_size
def __enter__(self): def __enter__(self) -> None:
import tempfile import tempfile
self.tdir = tempfile.mkdtemp(prefix='kitten-images-') self.tdir = tempfile.mkdtemp(prefix='kitten-images-')
with tempfile.NamedTemporaryFile(dir=self.tdir, delete=False) as f: with tempfile.NamedTemporaryFile(dir=self.tdir, delete=False) as f:
f.write(b'abcd') f.write(b'abcd')
self.handler.cmd.gr_command(dict(a='q', s=1, v=1, i=1, t='f'), standard_b64encode(f.name.encode(fsenc))) gc = GraphicsCommand()
gc.a = 'q'
gc.s = gc.v = gc.i = 1
gc.t = 'f'
self.handler.cmd.gr_command(gc, standard_b64encode(f.name.encode(fsenc)))
def __exit__(self, *a): def __exit__(self, *a: Any) -> None:
import shutil import shutil
shutil.rmtree(self.tdir, ignore_errors=True) shutil.rmtree(self.tdir, ignore_errors=True)
self.handler.cmd.clear_images_on_screen(delete_data=True) self.handler.cmd.clear_images_on_screen(delete_data=True)
self.delete_all_sent_images() self.delete_all_sent_images()
del self.handler del self.handler
def delete_all_sent_images(self): def delete_all_sent_images(self) -> None:
gc = GraphicsCommand()
gc.a = 'd'
for img_id in self.transmission_status: for img_id in self.transmission_status:
self.handler.cmd.gr_command({'a': 'd', 'i': img_id}) gc.i = img_id
self.handler.cmd.gr_command(gc)
self.transmission_status.clear() self.transmission_status.clear()
def handle_response(self, apc): def handle_response(self, apc: str) -> None:
cdata, payload = apc[1:].partition(';')[::2] cdata, payload = apc[1:].partition(';')[::2]
control = {} control = {}
for x in cdata.split(','): for x in cdata.split(','):
@ -180,7 +268,7 @@ class ImageManager(ImageManagerBase):
if not in_flight: if not in_flight:
self.placements_in_flight.pop(image_id, None) self.placements_in_flight.pop(image_id, None)
def resend_image(self, image_id, pl): def resend_image(self, image_id: int, pl: Placement) -> None:
image_data = self.image_id_to_image_data[image_id] image_data = self.image_id_to_image_data[image_id]
skey = self.image_id_to_converted_data[image_id] skey = self.image_id_to_converted_data[image_id]
self.transmit_image(image_data, image_id, *skey) self.transmit_image(image_data, image_id, *skey)
@ -188,7 +276,7 @@ class ImageManager(ImageManagerBase):
self.handler.cmd.set_cursor_position(pl['x'], pl['y']) self.handler.cmd.set_cursor_position(pl['x'], pl['y'])
self.handler.cmd.gr_command(pl['cmd']) self.handler.cmd.gr_command(pl['cmd'])
def send_image(self, path, max_cols=None, max_rows=None, scale_up=False): def send_image(self, path: str, max_cols: Optional[int] = None, max_rows: Optional[int] = None, scale_up: bool = False) -> SentImageKey:
path = os.path.abspath(path) path = os.path.abspath(path)
if path in self.failed_images: if path in self.failed_images:
raise self.failed_images[path] raise self.failed_images[path]
@ -226,41 +314,50 @@ class ImageManager(ImageManagerBase):
self.image_id_to_image_data[image_id] = m self.image_id_to_image_data[image_id] = m
return image_id, skey[1], skey[2] return image_id, skey[1], skey[2]
def hide_image(self, image_id): def hide_image(self, image_id: int) -> None:
self.handler.cmd.gr_command({'a': 'd', 'i': image_id}) gc = GraphicsCommand()
gc.a = 'd'
gc.i = image_id
self.handler.cmd.gr_command(gc)
def show_image(self, image_id, x, y, src_rect=None): def show_image(self, image_id: int, x: int, y: int, src_rect: Optional[Tuple[int, int, int, int]] = None) -> None:
cmd = {'a': 'p', 'i': image_id} gc = GraphicsCommand()
gc.a = 'p'
gc.i = image_id
if src_rect is not None: if src_rect is not None:
cmd['x'], cmd['y'], cmd['w'], cmd['h'] = map(int, src_rect) gc.x, gc.y, gc.w, gc.h = map(int, src_rect)
self.placements_in_flight[image_id].append({'cmd': cmd, 'x': x, 'y': y}) self.placements_in_flight[image_id].append({'cmd': gc, 'x': x, 'y': y})
with cursor(self.handler.write): with cursor(self.handler.write):
self.handler.cmd.set_cursor_position(x, y) self.handler.cmd.set_cursor_position(x, y)
self.handler.cmd.gr_command(cmd) self.handler.cmd.gr_command(gc)
def convert_image(self, path, available_width, available_height, image_data, scale_up=False): def convert_image(self, path: str, available_width: int, available_height: int, image_data: ImageData, scale_up: bool = False) -> ImageKey:
rgba_path, width, height = convert(path, image_data, available_width, available_height, scale_up, tdir=self.tdir) rgba_path, width, height = convert(path, image_data, available_width, available_height, scale_up, tdir=self.tdir)
return rgba_path, width, height return rgba_path, width, height
def transmit_image(self, image_data, image_id, rgba_path, width, height): def transmit_image(self, image_data: ImageData, image_id: int, rgba_path: str, width: int, height: int) -> int:
self.transmission_status[image_id] = 0 self.transmission_status[image_id] = 0
cmd = {'a': 't', 'f': image_data.transmit_fmt, 's': width, 'v': height, 'i': image_id} gc = GraphicsCommand()
gc.a = 't'
gc.f = image_data.transmit_fmt
gc.s = width
gc.v = height
gc.i = image_id
if self.filesystem_ok: if self.filesystem_ok:
cmd['t'] = 'f' gc.t = 'f'
self.handler.cmd.gr_command( self.handler.cmd.gr_command(
cmd, standard_b64encode(rgba_path.encode(fsenc))) gc, standard_b64encode(rgba_path.encode(fsenc)))
else: else:
import zlib import zlib
with open(rgba_path, 'rb') as f: with open(rgba_path, 'rb') as f:
data = f.read() data = f.read()
cmd['S'] = len(data) gc.S = len(data)
data = zlib.compress(data) data = zlib.compress(data)
cmd['o'] = 'z' gc.o = 'z'
data = standard_b64encode(data) data = standard_b64encode(data)
while data: while data:
chunk, data = data[:4096], data[4096:] chunk, data = data[:4096], data[4096:]
m = 1 if data else 0 gc.m = 1 if data else 0
cmd['m'] = m self.handler.cmd.gr_command(gc, chunk)
self.handler.cmd.gr_command(cmd, chunk) gc.clear()
cmd.clear()
return image_id return image_id

View File

@ -5,10 +5,15 @@
import sys import sys
from contextlib import contextmanager from contextlib import contextmanager
from functools import wraps from functools import wraps
from typing import List, Optional, Union from typing import TYPE_CHECKING, Dict, Optional, Tuple, Union
from kitty.rgb import Color, color_as_sharp, to_color from kitty.rgb import Color, color_as_sharp, to_color
if TYPE_CHECKING:
from kitty.utils import ScreenSize
from .images import GraphicsCommand
ScreenSize, GraphicsCommand
S7C1T = '\033 F' S7C1T = '\033 F'
SAVE_CURSOR = '\0337' SAVE_CURSOR = '\0337'
RESTORE_CURSOR = '\0338' RESTORE_CURSOR = '\0338'
@ -37,9 +42,9 @@ MODES = dict(
) )
def set_mode(which: str, private=True) -> str: def set_mode(which: str, private: bool = True) -> str:
num, private = MODES[which] num, private_ = MODES[which]
return '\033[{}{}h'.format(private, num) return '\033[{}{}h'.format(private_, num)
def reset_mode(which: str) -> str: def reset_mode(which: str) -> str:
@ -75,18 +80,18 @@ def set_cursor_visible(yes_or_no: bool) -> str:
return set_mode('DECTCEM') if yes_or_no else reset_mode('DECTCEM') return set_mode('DECTCEM') if yes_or_no else reset_mode('DECTCEM')
def set_cursor_position(x, y) -> str: # (0, 0) is top left def set_cursor_position(x: int, y: int) -> str: # (0, 0) is top left
return '\033[{};{}H'.format(y + 1, x + 1) return '\033[{};{}H'.format(y + 1, x + 1)
def set_cursor_shape(shape='block', blink=True) -> str: def set_cursor_shape(shape: str = 'block', blink: bool = True) -> str:
val = {'block': 1, 'underline': 3, 'bar': 5}.get(shape, 1) val = {'block': 1, 'underline': 3, 'bar': 5}.get(shape, 1)
if not blink: if not blink:
val += 1 val += 1
return '\033[{} q'.format(val) return '\033[{} q'.format(val)
def set_scrolling_region(screen_size=None, top=None, bottom=None) -> str: def set_scrolling_region(screen_size: Optional['ScreenSize'] = None, top: Optional[int] = None, bottom: Optional[int] = None) -> str:
if screen_size is None: if screen_size is None:
return '\033[r' return '\033[r'
if top is None: if top is None:
@ -100,7 +105,7 @@ def set_scrolling_region(screen_size=None, top=None, bottom=None) -> str:
return '\033[{};{}r'.format(top + 1, bottom + 1) return '\033[{};{}r'.format(top + 1, bottom + 1)
def scroll_screen(amt=1) -> str: def scroll_screen(amt: int = 1) -> str:
return '\033[' + str(abs(amt)) + ('T' if amt < 0 else 'S') return '\033[' + str(abs(amt)) + ('T' if amt < 0 else 'S')
@ -111,7 +116,10 @@ UNDERLINE_STYLES = {name: i + 1 for i, name in enumerate(
'straight double curly'.split())} 'straight double curly'.split())}
def color_code(color, intense=False, base=30): ColorSpec = Union[int, str, Tuple[int, int, int]]
def color_code(color: ColorSpec, intense: bool = False, base: int = 30) -> str:
if isinstance(color, str): if isinstance(color, str):
e = str((base + 60 if intense else base) + STANDARD_COLORS[color]) e = str((base + 60 if intense else base) + STANDARD_COLORS[color])
elif isinstance(color, int): elif isinstance(color, int):
@ -121,20 +129,37 @@ def color_code(color, intense=False, base=30):
return e return e
def sgr(*parts) -> str: def sgr(*parts: str) -> str:
return '\033[{}m'.format(';'.join(parts)) return '\033[{}m'.format(';'.join(parts))
def colored(text, color, intense=False, reset_to=None, reset_to_intense=False) -> str: def colored(
text: str,
color: ColorSpec,
intense: bool = False,
reset_to: Optional[ColorSpec] = None,
reset_to_intense: bool = False
) -> str:
e = color_code(color, intense) e = color_code(color, intense)
return '\033[{}m{}\033[{}m'.format(e, text, 39 if reset_to is None else color_code(reset_to, reset_to_intense)) return '\033[{}m{}\033[{}m'.format(e, text, 39 if reset_to is None else color_code(reset_to, reset_to_intense))
def faint(text) -> str: def faint(text: str) -> str:
return colored(text, 'black', True) return colored(text, 'black', True)
def styled(text: str, fg=None, bg=None, fg_intense=False, bg_intense=False, italic=None, bold=None, underline=None, underline_color=None, reverse=None) -> str: def styled(
text: str,
fg: Optional[ColorSpec] = None,
bg: Optional[ColorSpec] = None,
fg_intense: bool = False,
bg_intense: bool = False,
italic: Optional[bool] = None,
bold: Optional[bool] = None,
underline: Optional[str] = None,
underline_color: Optional[ColorSpec] = None,
reverse: Optional[bool] = None
) -> str:
start, end = [], [] start, end = [], []
if fg is not None: if fg is not None:
start.append(color_code(fg, fg_intense)) start.append(color_code(fg, fg_intense))
@ -167,24 +192,28 @@ def styled(text: str, fg=None, bg=None, fg_intense=False, bg_intense=False, ital
return '\033[{}m{}\033[{}m'.format(';'.join(start), text, ';'.join(end)) return '\033[{}m{}\033[{}m'.format(';'.join(start), text, ';'.join(end))
def serialize_gr_command(cmd, payload=None) -> bytes: def serialize_gr_command(cmd: Dict[str, Union[int, str]], payload: Optional[bytes] = None) -> bytes:
cmd = ','.join('{}={}'.format(k, v) for k, v in cmd.items()) from .images import GraphicsCommand
ans: List[bytes] = [] gc = GraphicsCommand()
w = ans.append for k, v in cmd.items():
w(b'\033_G'), w(cmd.encode('ascii')) setattr(gc, k, v)
if payload: return gc.serialize(payload or b'')
w(b';')
w(payload)
w(b'\033\\')
return b''.join(ans)
def gr_command(cmd, payload=None) -> str: def gr_command(cmd: Union[Dict, 'GraphicsCommand'], payload: Optional[bytes] = None) -> str:
return serialize_gr_command(cmd, payload).decode('ascii') if isinstance(cmd, dict):
raw = serialize_gr_command(cmd, payload)
else:
raw = cmd.serialize(payload or b'')
return raw.decode('ascii')
def clear_images_on_screen(delete_data=False) -> str: def clear_images_on_screen(delete_data: bool = False) -> str:
return serialize_gr_command({'a': 'd', 'd': 'A' if delete_data else 'a'}).decode('ascii') from .images import GraphicsCommand
gc = GraphicsCommand()
gc.a = 'd'
gc.d = 'A' if delete_data else 'a'
return gc.serialize().decode('ascii')
def init_state(alternate_screen=True): def init_state(alternate_screen=True):

View File

@ -4,8 +4,15 @@
import re import re
from typing import TYPE_CHECKING, List, Generator, Any, Type
def mark(text, args, Mark, extra_cli_args, *a): if TYPE_CHECKING:
from kitty.cli_stub import HintsCLIOptions
from kittens.hints.main import Mark as MarkClass
HintsCLIOptions, MarkClass
def mark(text: str, args: 'HintsCLIOptions', Mark: Type['MarkClass'], extra_cli_args: List[str], *a: Any) -> Generator['MarkClass', None, None]:
for idx, m in enumerate(re.finditer(args.regex, text)): for idx, m in enumerate(re.finditer(args.regex, text)):
start, end = m.span() start, end = m.span()
mark_text = text[start:end].replace('\n', '').replace('\0', '') mark_text = text[start:end].replace('\n', '').replace('\0', '')