kitty/kitty/remote_control.py
2018-01-10 13:23:04 +05:30

489 lines
16 KiB
Python

#!/usr/bin/env python
# vim:fileencoding=utf-8
# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
import json
import re
import sys
import types
from functools import partial
from .cli import emph, parse_args
from .config import parse_send_text_bytes
from .constants import appname, version
from .tabs import SpecialWindow
from .utils import non_blocking_read, read_with_timeout
def cmd(short_desc, desc=None, options_spec=None, no_response=False):
def w(func):
func.short_desc = short_desc
func.desc = desc or short_desc
func.name = func.__name__[4:].replace('_', '-')
func.options_spec = options_spec
func.is_cmd = True
func.impl = lambda: globals()[func.__name__[4:]]
func.no_response = no_response
return func
return w
def parse_subcommand_cli(func, args):
opts, items = parse_args(args[1:], (func.options_spec or '\n').format, '...', func.desc, '{} @ {}'.format(appname, func.name))
return opts, items
@cmd(
'List all tabs/windows',
'List all windows. The list is returned as JSON tree. The top-level is a list of'
' operating system {appname} windows. Each OS window has an |_ id| and a list'
' of |_ tabs|. Each tab has its own |_ id|, a |_ title| and a list of |_ windows|.'
' Each window has an |_ id|, |_ title|, |_ current working directory|, |_ process id (PID)| and'
' |_ command-line| of the process running in the window.\n\n'
'You can use these criteria to select windows/tabs for the other commands.'.format(appname=appname)
)
def cmd_ls(global_opts, opts, args):
pass
def ls(boss, window):
data = list(boss.list_os_windows())
data = json.dumps(data, indent=2, sort_keys=True)
return data
MATCH_WINDOW_OPTION = '''\
--match -m
The window to match. Match specifications are of the form:
|_ field:regexp|. Where field can be one of: id, title, pid, cwd, cmdline.
You can use the |_ ls| command to get a list of windows. Note that for
numeric fields such as id and pid the expression is interpreted as a number,
not a regular expression.
'''
MATCH_TAB_OPTION = '''\
--match -m
The tab to match. Match specifications are of the form:
|_ field:regexp|. Where field can be one of: id, title, pid, cwd, cmdline.
You can use the |_ ls| command to get a list of tabs. Note that for
numeric fields such as id and pid the expression is interpreted as a number,
not a regular expression. When using title or id, first a matching tab is
looked for and if not found a matching window is looked for, and the tab
for that window is used.
'''
@cmd(
'Send arbitrary text to specified windows',
'Send arbitrary text to specified windows. The text follows Python'
' escaping rules. So you can use escapes like |_ \\x1b| to send control codes'
' and |_ \\u21fa| to send unicode characters. If you use the |_ --match| option'
' the text will be sent to all matched windows. By default, text is sent to'
' only the currently active window.',
options_spec=MATCH_WINDOW_OPTION + '''\n
--stdin
type=bool-set
Read the text to be sent from |_ stdin|. Note that in this case the text is sent as is,
not interpreted for escapes. If stdin is a terminal, you can press Ctrl-D to end reading.
--from-file
Path to a file whose contents you wish to send. Note that in this case the file contents
are sent as is, not interpreted for escapes.
''',
no_response=True
)
def cmd_send_text(global_opts, opts, args):
limit = 1024
ret = {'match': opts.match, 'is_binary': False}
def pipe(src=sys.stdin):
ret['is_binary'] = True
import select
with non_blocking_read() as fd:
keep_going = True
while keep_going:
rd = select.select([fd], [], [])
if rd:
data = sys.stdin.buffer.read()
if not data:
break
data = data.decode('utf-8')
if '\x04' in data:
data = data[:data.index('\x04')]
keep_going = False
while data:
ret['text'] = data[:limit]
yield ret
data = data[limit:]
else:
break
def chunks(text):
ret['is_binary'] = False
while text:
ret['text'] = text[:limit]
yield ret
text = text[limit:]
def file_pipe(path):
ret['is_binary'] = True
with open(path, encoding='utf-8') as f:
while True:
data = f.read(limit)
if not data:
break
ret['text'] = data
yield ret
sources = []
if opts.stdin:
sources.append(pipe())
if opts.from_file:
sources.append(file_pipe(opts.from_file))
text = ' '.join(args)
sources.append(chunks(text))
def chain():
for src in sources:
yield from src
return chain()
def send_text(boss, window, payload):
windows = [boss.active_window]
match = payload['match']
if match:
windows = tuple(boss.match_windows(match))
data = payload['text'].encode('utf-8') if payload['is_binary'] else parse_send_text_bytes(payload['text'])
for window in windows:
if window is not None:
window.write_to_child(data)
@cmd(
'Set the window title',
'Set the title for the specified window(s). If you use the |_ --match| option'
' the title will be set for all matched windows. By default, only the window'
' in which the command is run is affected. If you do not specify a title, the'
' last title set by the child process running in the window will be used.',
options_spec=MATCH_WINDOW_OPTION
)
def cmd_set_window_title(global_opts, opts, args):
return {'title': ' '.join(args), 'match': opts.match}
def set_window_title(boss, window, payload):
windows = [window or boss.active_window]
match = payload['match']
if match:
windows = tuple(boss.match_windows(match))
if not windows:
raise ValueError('No matching windows for expression: {}'.format(match))
for window in windows:
if window:
window.set_title(payload['title'])
@cmd(
'Set the tab title',
'Set the title for the specified tab(s). If you use the |_ --match| option'
' the title will be set for all matched tabs. By default, only the tab'
' in which the command is run is affected. If you do not specify a title, the'
' title of the currently active window in the tab is used.',
options_spec=MATCH_TAB_OPTION
)
def cmd_set_tab_title(global_opts, opts, args):
return {'title': ' '.join(args), 'match': opts.match}
def set_tab_title(boss, window, payload):
match = payload['match']
if match:
tabs = tuple(boss.match_tabs(match))
if not tabs:
raise ValueError('No matching windows for expression: {}'.format(match))
else:
tabs = [boss.tab_for_window(window) if window else boss.active_tab]
for tab in tabs:
if tab:
tab.set_title(payload['title'])
@cmd(
'Close the specified window(s)',
options_spec=MATCH_WINDOW_OPTION + '''\n
--self
type=bool-set
If specified close the window this command is run in, rather than the active window.
'''
)
def cmd_close_window(global_opts, opts, args):
return {'match': opts.match, 'self': opts.self}
def close_window(boss, window, payload):
match = payload['match']
if match:
windows = tuple(boss.match_windows(match))
if not windows:
raise ValueError('No matching windows for expression: {}'.format(match))
else:
windows = [window if window and payload['self'] else boss.active_window]
for window in windows:
if window:
boss.close_window(window)
@cmd(
'Close the specified tab(s)',
options_spec=MATCH_TAB_OPTION + '''\n
--self
type=bool-set
If specified close the tab this command is run in, rather than the active tab.
'''
)
def cmd_close_tab(global_opts, opts, args):
return {'match': opts.match, 'self': opts.self}
def close_tab(boss, window, payload):
match = payload['match']
if match:
tabs = tuple(boss.match_tabs(match))
if not tabs:
raise ValueError('No matching windows for expression: {}'.format(match))
else:
tabs = [boss.tab_for_window(window) if window and payload['self'] else boss.active_tab]
for tab in tabs:
if window:
if tab:
boss.close_tab(tab)
@cmd(
'Open new window',
'Open a new window in the specified tab. If you use the |_ --match| option'
' the first matching tab is used. Otherwise the currently active tab is used.'
' Prints out the id of the newly opened window. Any command line arguments'
' are assumed to be the command line used to run in the new window, if none'
' are provided, the default shell is run. For example:\n'
'|_ kitty @ new-window --title Email mutt|',
options_spec=MATCH_TAB_OPTION + '''\n
--title
The title for the new window. By default it will use the title set by the
program running in it.
--cwd
The initial working directory for the new window.
--keep-focus
type=bool-set
Keep the current window focused instead of switching to the newly opened window
--new-tab
type=bool-set
Open a new tab
--tab-title
When using --new-tab set the title of the tab.
'''
)
def cmd_new_window(global_opts, opts, args):
return {'match': opts.match, 'title': opts.title, 'cwd': opts.cwd,
'new_tab': opts.new_tab, 'tab_title': opts.tab_title,
'keep_focus': opts.keep_focus, 'args': args or []}
def new_window(boss, window, payload):
w = SpecialWindow(cmd=payload['args'] or None, override_title=payload['title'], cwd=payload['cwd'])
old_window = boss.active_window
if payload['new_tab']:
boss._new_tab(w)
tab = boss.active_tab
if payload['tab_title']:
tab.set_title(payload['tab_title'])
wid = boss.active_window.id
if payload['keep_focus'] and old_window:
boss.set_active_window(old_window)
return str(wid)
match = payload['match']
if match:
tabs = tuple(boss.match_tabs(match))
if not tabs:
raise ValueError('No matching windows for expression: {}'.format(match))
else:
tabs = [boss.active_tab]
tab = tabs[0]
w = tab.new_special_window(w)
if payload['keep_focus'] and old_window:
boss.set_active_window(old_window)
return str(w.id)
@cmd(
'Focus the specified window',
options_spec=MATCH_WINDOW_OPTION
)
def cmd_focus_window(global_opts, opts, args):
return {'match': opts.match}
def focus_window(boss, window, payload):
windows = [window or boss.active_window]
match = payload['match']
if match:
windows = tuple(boss.match_windows(match))
if not windows:
raise ValueError('No matching windows for expression: {}'.format(match))
for window in windows:
if window:
boss.set_active_window(window)
break
@cmd(
'Get text from the specified window',
options_spec=MATCH_WINDOW_OPTION + '''\n
--extent
default=screen
choices=screen, all, selection
What text to get. The default of screen means all text currently on the screen. all means
all the screen+scrollback and selection means currently selected text.
--ansi
type=bool-set
By default, only plain text is return. If you specify this flag, the text will
include the formatting escape codes for colors/bold/italic/etc. Note that when
getting the current selection, the result is always plain text.
--self
type=bool-set
If specified get text from the window this command is run in, rather than the active window.
'''
)
def cmd_get_text(global_opts, opts, args):
return {'match': opts.match, 'extent': opts.extent, 'ansi': opts.ansi, 'self': opts.self}
def get_text(boss, window, payload):
match = payload['match']
if match:
windows = tuple(boss.match_windows(match))
if not windows:
raise ValueError('No matching windows for expression: {}'.format(match))
else:
windows = [window if window and payload['self'] else boss.active_window]
window = windows[0]
if payload['extent'] == 'selection':
ans = window.text_for_selection()
else:
f = window.buffer_as_ansi if payload['ansi'] else window.buffer_as_text
ans = f(add_history=payload['extent'] == 'all')
return ans
cmap = {v.name: v for v in globals().values() if hasattr(v, 'is_cmd')}
def handle_cmd(boss, window, cmd):
cmd = json.loads(cmd)
v = cmd['version']
if tuple(v)[:2] > version[:2]:
return {'ok': False, 'error': 'The kitty client you are using to send remote commands is newer than this kitty instance. This is not supported.'}
c = cmap[cmd['cmd']]
func = partial(c.impl(), boss, window)
payload = cmd.get('payload')
ans = func() if payload is None else func(payload)
response = {'ok': True}
if ans is not None:
response['data'] = ans
if not c.no_response:
return response
global_options_spec = partial('''\
'''.format, appname=appname)
def read_from_stdin(send, no_response):
send = ('@kitty-cmd' + json.dumps(send)).encode('ascii')
out = sys.stdout if sys.stdout.isatty() else sys.stderr
if not out.isatty():
raise SystemExit('Neither stdout nor stderr is a terminal')
out.buffer.write(b'\x1bP' + send + b'\x1b\\')
out.flush()
if no_response:
return {'ok': True}
if not sys.stdin.isatty():
raise SystemExit('stdin is not a terminal')
received = b''
dcs = re.compile(br'\x1bP@kitty-cmd([^\x1b]+)\x1b\\')
match = None
def more_needed(data):
nonlocal received, match
received += data
match = dcs.search(received)
return match is None
read_with_timeout(more_needed)
if match is None:
raise SystemExit('Failed to receive response from ' + appname)
response = json.loads(match.group(1).decode('ascii'))
return response
def main(args):
all_commands = tuple(sorted(cmap))
cmds = (' |G {}|\n {}'.format(cmap[c].name, cmap[c].short_desc) for c in all_commands)
msg = (
'Control {appname} by sending it commands. Add'
' |_ allow_remote_control yes| to kitty.conf for this'
' to work.\n\n|T Commands|:\n{cmds}\n\n'
'You can get help for each individual command by using:\n'
'{appname} @ |_ command| -h'
).format(appname=appname, cmds='\n'.join(cmds))
global_opts, items = parse_args(args[1:], global_options_spec, 'command ...', msg, '{} @'.format(appname))
if not items:
raise SystemExit('You must specify a command')
cmd = items[0]
try:
func = cmap[cmd]
except KeyError:
raise SystemExit('{} is not a known command. Known commands are: {}'.format(
emph(cmd), ', '.join(all_commands)))
opts, items = parse_subcommand_cli(func, items)
payload = func(global_opts, opts, items)
send = {
'cmd': cmd,
'version': version,
}
if func.no_response and isinstance(payload, types.GeneratorType):
for item in payload:
send['payload'] = item
read_from_stdin(send, func.no_response)
return
if payload is not None:
send['payload'] = payload
response = read_from_stdin(send, func.no_response)
if not response.get('ok'):
if response.get('tb'):
print(response['tb'], file=sys.stderr)
raise SystemExit(response['error'])
if 'data' in response:
print(response['data'])