A new kitten to easily query kitty for config values
This commit is contained in:
parent
dab555ea3b
commit
6a755bda45
@ -4,6 +4,14 @@ Changelog
|
||||
|kitty| is a feature full, cross-platform, *fast*, GPU based terminal emulator.
|
||||
To update |kitty|, :doc:`follow the instructions <binary>`.
|
||||
|
||||
0.19.2 [future]
|
||||
-------------------
|
||||
|
||||
- A new :doc:`kittens/query_terminal` kitten to easily query the running kitty
|
||||
via escape codes to detect its version, and the values of
|
||||
configuration options that enable or disable terminal features.
|
||||
|
||||
|
||||
0.19.1 [2020-10-06]
|
||||
-------------------
|
||||
|
||||
|
||||
11
docs/kittens/query_terminal.rst
Normal file
11
docs/kittens/query_terminal.rst
Normal file
@ -0,0 +1,11 @@
|
||||
Query terminal
|
||||
=================
|
||||
|
||||
Used to query kitty from terminal programs about version, values of various
|
||||
runtime options controlling its features, etc.
|
||||
|
||||
|
||||
Command Line Interface
|
||||
-------------------------
|
||||
|
||||
.. include:: ../generated/cli-kitten-query_terminal.rst
|
||||
0
kittens/query_terminal/__init__.py
Normal file
0
kittens/query_terminal/__init__.py
Normal file
162
kittens/query_terminal/main.py
Normal file
162
kittens/query_terminal/main.py
Normal file
@ -0,0 +1,162 @@
|
||||
#!/usr/bin/env python
|
||||
# vim:fileencoding=utf-8
|
||||
# License: GPLv3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
|
||||
|
||||
import re
|
||||
import sys
|
||||
from binascii import unhexlify, hexlify
|
||||
from contextlib import suppress
|
||||
from typing import Dict, Iterable, List, Type
|
||||
|
||||
from kitty.cli import parse_args
|
||||
from kitty.cli_stub import QueryTerminalCLIOptions
|
||||
from kitty.constants import appname
|
||||
from kitty.utils import TTYIO
|
||||
from kitty.terminfo import names
|
||||
|
||||
|
||||
class Query:
|
||||
name: str = ''
|
||||
ans: str = ''
|
||||
query_name: str = ''
|
||||
help_text: str = ''
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.encoded_query_name = hexlify(self.query_name.encode('utf-8')).decode('ascii')
|
||||
self.pat = re.compile('\x1bP([01])\\+r{}(.*?)\x1b\\\\'.format(self.encoded_query_name).encode('ascii'))
|
||||
|
||||
def query_code(self) -> str:
|
||||
return "\x1bP+q{}\x1b\\".format(self.encoded_query_name)
|
||||
|
||||
def decode_response(self, res: bytes) -> str:
|
||||
return unhexlify(res).decode('utf-8')
|
||||
|
||||
def more_needed(self, buffer: bytes) -> bool:
|
||||
m = self.pat.search(buffer)
|
||||
if m is None:
|
||||
return True
|
||||
if m.group(1) == b'1':
|
||||
q = m.group(2)
|
||||
if q.startswith(b'='):
|
||||
with suppress(Exception):
|
||||
self.ans = self.decode_response(memoryview(q)[1:])
|
||||
return False
|
||||
|
||||
def output_line(self) -> str:
|
||||
return self.ans
|
||||
|
||||
|
||||
all_queries: Dict[str, Type[Query]] = {}
|
||||
|
||||
|
||||
def query(cls: Type[Query]) -> Type[Query]:
|
||||
all_queries[cls.name] = cls
|
||||
return cls
|
||||
|
||||
|
||||
@query
|
||||
class TerminalName(Query):
|
||||
name: str = 'name'
|
||||
query_name: str = 'TN'
|
||||
help_text: str = f'Terminal name ({names[0]})'
|
||||
|
||||
|
||||
@query
|
||||
class TerminalVersion(Query):
|
||||
name: str = 'version'
|
||||
query_name: str = 'kitty-query-version'
|
||||
help_text: str = 'Terminal version, for e.g.: 0.19.2'
|
||||
|
||||
|
||||
@query
|
||||
class AllowHyperlinks(Query):
|
||||
name: str = 'allow_hyperlinks'
|
||||
query_name: str = 'kitty-query-allow_hyperlinks'
|
||||
help_text: str = 'yes, no or ask'
|
||||
|
||||
|
||||
def do_queries(queries: Iterable, cli_opts: QueryTerminalCLIOptions) -> Dict[str, str]:
|
||||
actions = tuple(all_queries[x]() for x in queries)
|
||||
qstring = ''.join(a.query_code() for a in actions)
|
||||
received = b''
|
||||
|
||||
def more_needed(data: bytes) -> bool:
|
||||
nonlocal received
|
||||
received += data
|
||||
for a in actions:
|
||||
if a.more_needed(received):
|
||||
return True
|
||||
return False
|
||||
|
||||
with TTYIO() as ttyio:
|
||||
ttyio.send(qstring)
|
||||
ttyio.recv(more_needed, timeout=cli_opts.wait_for)
|
||||
|
||||
return {a.name: a.output_line() for a in actions}
|
||||
|
||||
|
||||
def options_spec() -> str:
|
||||
return '''\
|
||||
--wait-for
|
||||
type=float
|
||||
default=10
|
||||
The amount of time (in seconds) to wait for a response from the terminal, after
|
||||
querying it.
|
||||
'''
|
||||
|
||||
|
||||
help_text = '''\
|
||||
Query the terminal this kitten is run in for various
|
||||
capabilities. This sends escape codes to the terminal
|
||||
and based on its response prints out data about supported
|
||||
capabilities. Note that this is a blocking operation, since
|
||||
it has to wait for a response from the terminal. You can control
|
||||
the maximum wait time via the ``--wait-for`` option.
|
||||
|
||||
The output is lines of the form::
|
||||
|
||||
query: data
|
||||
|
||||
If a particular query is unsupported by the running kitty version,
|
||||
the data will be blank.
|
||||
|
||||
Note that when calling this from another program, be very
|
||||
careful not to perform any I/O on the terminal device
|
||||
until the kitten exits.
|
||||
|
||||
Available queries are::
|
||||
|
||||
{}
|
||||
'''.format(' ' + '\n '.join(
|
||||
f'{name}: {c.help_text}' for name, c in all_queries.items()))
|
||||
usage = '[query1 query2 ...]'
|
||||
|
||||
|
||||
def main(args: List[str] = sys.argv) -> None:
|
||||
cli_opts, items_ = parse_args(
|
||||
args[1:],
|
||||
options_spec,
|
||||
usage,
|
||||
help_text,
|
||||
'{} +kitten query_terminal'.format(appname),
|
||||
result_class=QueryTerminalCLIOptions
|
||||
)
|
||||
queries: List[str] = list(items_)
|
||||
if 'all' in queries or not queries:
|
||||
queries = sorted(all_queries)
|
||||
else:
|
||||
extra = frozenset(queries) - frozenset(all_queries)
|
||||
if extra:
|
||||
raise SystemExit(f'Unknown queries: {", ".join(extra)}')
|
||||
|
||||
for key, val in do_queries(queries, cli_opts).items():
|
||||
print(key + ':', val)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
elif __name__ == '__doc__':
|
||||
cd = sys.cli_docs # type: ignore
|
||||
cd['usage'] = usage
|
||||
cd['options'] = options_spec
|
||||
cd['help_text'] = help_text
|
||||
@ -13,6 +13,7 @@ class CLIOptions:
|
||||
LaunchCLIOptions = AskCLIOptions = ClipboardCLIOptions = DiffCLIOptions = CLIOptions
|
||||
HintsCLIOptions = IcatCLIOptions = PanelCLIOptions = ResizeCLIOptions = CLIOptions
|
||||
ErrorCLIOptions = UnicodeCLIOptions = RCOptions = RemoteFileCLIOptions = CLIOptions
|
||||
QueryTerminalCLIOptions = CLIOptions
|
||||
|
||||
|
||||
def generate_stub() -> None:
|
||||
@ -50,6 +51,9 @@ def generate_stub() -> None:
|
||||
from kittens.icat.main import options_spec
|
||||
do(options_spec(), 'IcatCLIOptions')
|
||||
|
||||
from kittens.query_terminal.main import options_spec
|
||||
do(options_spec(), 'QueryTerminalCLIOptions')
|
||||
|
||||
from kittens.panel.main import OPTIONS
|
||||
do(OPTIONS(), 'PanelCLIOptions')
|
||||
|
||||
|
||||
@ -4,7 +4,10 @@
|
||||
|
||||
import re
|
||||
from binascii import hexlify, unhexlify
|
||||
from typing import cast, Dict
|
||||
from typing import TYPE_CHECKING, Dict, Generator, Optional, cast
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .options_stub import Options
|
||||
|
||||
|
||||
def modify_key_bytes(keybytes: bytes, amt: int) -> bytes:
|
||||
@ -449,28 +452,42 @@ def key_as_bytes(name: str) -> bytes:
|
||||
return ans.encode('ascii')
|
||||
|
||||
|
||||
def get_capabilities(query_string: str) -> str:
|
||||
def get_capabilities(query_string: str, opts: 'Options') -> Generator[str, None, None]:
|
||||
from .fast_data_types import ERROR_PREFIX
|
||||
ans = []
|
||||
try:
|
||||
for q in query_string.split(';'):
|
||||
name = qname = unhexlify(q).decode('utf-8')
|
||||
if name in ('TN', 'name'):
|
||||
val = names[0]
|
||||
|
||||
def result(encoded_query_name: str, x: Optional[str] = None) -> str:
|
||||
if x is None:
|
||||
return '0+r' + encoded_query_name
|
||||
return '1+r' + encoded_query_name + '=' + hexlify(x.encode('utf-8')).decode('ascii')
|
||||
|
||||
for encoded_query_name in query_string.split(';'):
|
||||
name = qname = unhexlify(encoded_query_name).decode('utf-8')
|
||||
if name in ('TN', 'name'):
|
||||
yield result(encoded_query_name, names[0])
|
||||
elif name.startswith('kitty-query-'):
|
||||
name = name[len('kitty-query-'):]
|
||||
if name == 'version':
|
||||
from .constants import str_version
|
||||
yield result(encoded_query_name, str_version)
|
||||
elif name == 'allow_hyperlinks':
|
||||
yield result(encoded_query_name,
|
||||
'ask' if opts.allow_hyperlinks == 0b11 else ('yes' if opts.allow_hyperlinks else 'no'))
|
||||
else:
|
||||
from .utils import log_error
|
||||
log_error('Unknown kitty terminfo query:', name)
|
||||
yield result(encoded_query_name)
|
||||
else:
|
||||
try:
|
||||
val = queryable_capabilities[name]
|
||||
except KeyError:
|
||||
try:
|
||||
val = queryable_capabilities[name]
|
||||
except KeyError:
|
||||
try:
|
||||
qname = termcap_aliases[name]
|
||||
val = queryable_capabilities[qname]
|
||||
except Exception:
|
||||
from .utils import log_error
|
||||
log_error(ERROR_PREFIX, 'Unknown terminfo property:', name)
|
||||
raise
|
||||
if qname in string_capabilities and '%' not in val:
|
||||
val = key_as_bytes(qname).decode('ascii')
|
||||
ans.append(q + '=' + hexlify(str(val).encode('utf-8')).decode('ascii'))
|
||||
return '1+r' + ';'.join(ans)
|
||||
except Exception:
|
||||
return '0+r' + query_string
|
||||
qname = termcap_aliases[name]
|
||||
val = queryable_capabilities[qname]
|
||||
except Exception:
|
||||
from .utils import log_error
|
||||
log_error(ERROR_PREFIX, 'Unknown terminfo property:', name)
|
||||
yield result(encoded_query_name)
|
||||
continue
|
||||
if qname in string_capabilities and '%' not in val:
|
||||
val = key_as_bytes(qname).decode('ascii')
|
||||
yield result(encoded_query_name, val)
|
||||
|
||||
@ -704,7 +704,8 @@ class Window:
|
||||
self.refresh()
|
||||
|
||||
def request_capabilities(self, q: str) -> None:
|
||||
self.screen.send_escape_code_to_child(DCS, get_capabilities(q))
|
||||
for result in get_capabilities(q, self.opts):
|
||||
self.screen.send_escape_code_to_child(DCS, result)
|
||||
|
||||
def handle_remote_cmd(self, cmd: str) -> None:
|
||||
get_boss().handle_remote_cmd(cmd, self)
|
||||
|
||||
@ -31,8 +31,8 @@ class Callbacks:
|
||||
|
||||
def request_capabilities(self, q):
|
||||
from kitty.terminfo import get_capabilities
|
||||
c = get_capabilities(q)
|
||||
self.write(c.encode('ascii'))
|
||||
for c in get_capabilities(q, None):
|
||||
self.write(c.encode('ascii'))
|
||||
|
||||
def use_utf8(self, on):
|
||||
self.iutf8 = on
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user