Allow specifying watchers in session files and via a command line argument

This commit is contained in:
Kovid Goyal 2020-08-23 10:41:36 +05:30
parent f4ddaacb3c
commit 392c31f5fe
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
9 changed files with 117 additions and 26 deletions

View File

@ -21,6 +21,9 @@ To update |kitty|, :doc:`follow the instructions <binary>`.
- Allow tracking focus change events in watchers (:iss:`2918`) - Allow tracking focus change events in watchers (:iss:`2918`)
- Allow specifying watchers in session files and via a command line argument
(:iss:`2933`)
0.18.3 [2020-08-11] 0.18.3 [2020-08-11]
------------------- -------------------

View File

@ -353,6 +353,15 @@ For example:
focus focus
launch emacs launch emacs
# Add a watcher that will be called with various events that occur
# on all subsequent windows. See the documentation of the launch command
# for details on watchers.
watcher /some/python/file.py
launch mpd
launch irssi
# Remove the watcher for further windows
watcher clear
Mouse features Mouse features
------------------- -------------------

View File

@ -171,8 +171,12 @@ class Boss:
cocoa_set_notification_activated_callback(notification_activated) cocoa_set_notification_activated_callback(notification_activated)
def startup_first_child(self, os_window_id: Optional[int]) -> None: def startup_first_child(self, os_window_id: Optional[int]) -> None:
from kitty.launch import load_watch_modules
startup_sessions = create_sessions(self.opts, self.args, default_session=self.opts.startup_session) startup_sessions = create_sessions(self.opts, self.args, default_session=self.opts.startup_session)
watchers = load_watch_modules(self.args.watcher)
for startup_session in startup_sessions: for startup_session in startup_sessions:
if watchers is not None and watchers.has_watchers:
startup_session.add_watchers_to_all_windows(watchers)
self.add_os_window(startup_session, os_window_id=os_window_id) self.add_os_window(startup_session, os_window_id=os_window_id)
os_window_id = None os_window_id = None
if self.args.start_as != 'normal': if self.args.start_as != 'normal':

View File

@ -544,6 +544,15 @@ def parse_cmdline(oc: Options, disabled: OptionSpecSeq, ans: Any, args: Optional
return leftover_args return leftover_args
WATCHER_DEFINITION = '''
--watcher -w
type=list
Path to a python file. Appropriately named functions in this file will be called
for various events, such as when the window is resized, focused or closed. See the section
on watchers in the launch command documentation :doc:`launch`. Relative paths are
resolved relative to the kitty config directory.'''
def options_spec() -> str: def options_spec() -> str:
if not hasattr(options_spec, 'ans'): if not hasattr(options_spec, 'ans'):
OPTIONS = ''' OPTIONS = '''
@ -593,6 +602,11 @@ Path to a file containing the startup :italic:`session` (tabs, windows, layout,
Use - to read from STDIN. See the README file for details and an example. Use - to read from STDIN. See the README file for details and an example.
{watcher}
Note that this watcher will be added only to all initially crated windows, not new windows
created after startup.
--hold --hold
type=bool-set type=bool-set
Remain open after child process exits. Note that this only affects the first Remain open after child process exits. Note that this only affects the first
@ -695,8 +709,8 @@ type=bool-set
! !
''' '''
setattr(options_spec, 'ans', OPTIONS.format( setattr(options_spec, 'ans', OPTIONS.format(
appname=appname, config_help=CONFIG_HELP.format(appname=appname, conf_name=appname) appname=appname, config_help=CONFIG_HELP.format(appname=appname, conf_name=appname),
watcher=WATCHER_DEFINITION
)) ))
ans: str = getattr(options_spec, 'ans') ans: str = getattr(options_spec, 'ans')
return ans return ans

View File

@ -254,6 +254,8 @@ def complete_kitty_cli_arg(ans: Completions, opt: Optional[OptionDict], prefix:
complete_files_and_dirs(ans, prefix, files_group_name='Config files', predicate=is_conf_file) complete_files_and_dirs(ans, prefix, files_group_name='Config files', predicate=is_conf_file)
elif dest == 'session': elif dest == 'session':
complete_files_and_dirs(ans, prefix, files_group_name='Session files') complete_files_and_dirs(ans, prefix, files_group_name='Session files')
elif dest == 'watcher':
complete_files_and_dirs(ans, prefix, files_group_name='Watcher files')
elif dest == 'directory': elif dest == 'directory':
complete_files_and_dirs(ans, prefix, files_group_name='Directories', predicate=os.path.isdir) complete_files_and_dirs(ans, prefix, files_group_name='Directories', predicate=os.path.isdir)
elif dest == 'start_as': elif dest == 'start_as':

View File

@ -8,7 +8,7 @@ from typing import Any, Dict, List, Optional, Sequence, Tuple
from .boss import Boss from .boss import Boss
from .child import Child from .child import Child
from .cli import parse_args from .cli import parse_args, WATCHER_DEFINITION
from .cli_stub import LaunchCLIOptions from .cli_stub import LaunchCLIOptions
from .constants import resolve_custom_file from .constants import resolve_custom_file
from .fast_data_types import set_clipboard_string from .fast_data_types import set_clipboard_string
@ -152,13 +152,7 @@ Set the WM_NAME property on X11 for the newly created OS Window when using
:option:`launch --type`=os-window. Defaults to :option:`launch --os-window-class`. :option:`launch --type`=os-window. Defaults to :option:`launch --os-window-class`.
--watcher -w ''' + WATCHER_DEFINITION
type=list
Path to a python file. Appropriately named functions in this file will be called
for various events, such as when the window is resized or closed. See the section
on watchers in the launch command documentation :doc:`launch`. Relative paths are
resolved relative to the kitty config directory.
'''
def parse_launch_args(args: Optional[Sequence[str]] = None) -> Tuple[LaunchCLIOptions, List[str]]: def parse_launch_args(args: Optional[Sequence[str]] = None) -> Tuple[LaunchCLIOptions, List[str]]:
@ -202,12 +196,12 @@ def tab_for_window(boss: Boss, opts: LaunchCLIOptions, target_tab: Optional[Tab]
return tab return tab
def load_watch_modules(opts: LaunchCLIOptions) -> Optional[Watchers]: def load_watch_modules(watchers: Sequence[str]) -> Optional[Watchers]:
if not opts.watcher: if not watchers:
return None return None
import runpy import runpy
ans = Watchers() ans = Watchers()
for path in opts.watcher: for path in watchers:
path = resolve_custom_file(path) path = resolve_custom_file(path)
m = runpy.run_path(path, run_name='__kitty_watcher__') m = runpy.run_path(path, run_name='__kitty_watcher__')
w = m.get('on_close') w = m.get('on_close')
@ -307,7 +301,7 @@ def launch(boss: Boss, opts: LaunchCLIOptions, args: List[str], target_tab: Opti
else: else:
tab = tab_for_window(boss, opts, target_tab) tab = tab_for_window(boss, opts, target_tab)
if tab is not None: if tab is not None:
watchers = load_watch_modules(opts) watchers = load_watch_modules(opts.watcher)
new_window: Window = tab.new_window(env=env or None, watchers=watchers or None, **kw) new_window: Window = tab.new_window(env=env or None, watchers=watchers or None, **kw)
if opts.keep_focus and active: if opts.keep_focus and active:
boss.set_active_window(active, switch_os_window_if_needed=True) boss.set_active_window(active, switch_os_window_if_needed=True)

View File

@ -14,6 +14,7 @@ from .options_stub import Options
from .os_window_size import WindowSize, WindowSizeData, WindowSizes from .os_window_size import WindowSize, WindowSizeData, WindowSizes
from .typing import SpecialWindowInstance from .typing import SpecialWindowInstance
from .utils import log_error, resolved_shell from .utils import log_error, resolved_shell
from .window import Watchers
def get_os_window_sizing_data(opts: Options, session: Optional['Session'] = None) -> WindowSizeData: def get_os_window_sizing_data(opts: Options, session: Optional['Session'] = None) -> WindowSizeData:
@ -26,14 +27,15 @@ def get_os_window_sizing_data(opts: Options, session: Optional['Session'] = None
class Tab: class Tab:
def __init__(self, opts: Options, name: str): def __init__(self, opts: Options, name: str, watchers: Watchers):
self.windows: List[Union[List[str], 'SpecialWindowInstance']] = [] self.windows: List['SpecialWindowInstance'] = []
self.name = name.strip() self.name = name.strip()
self.active_window_idx = 0 self.active_window_idx = 0
self.enabled_layouts = opts.enabled_layouts self.enabled_layouts = opts.enabled_layouts
self.layout = (self.enabled_layouts or ['tall'])[0] self.layout = (self.enabled_layouts or ['tall'])[0]
self.cwd: Optional[str] = None self.cwd: Optional[str] = None
self.next_title: Optional[str] = None self.next_title: Optional[str] = None
self.watchers: Watchers = watchers.copy()
class Session: class Session:
@ -44,11 +46,25 @@ class Session:
self.default_title = default_title self.default_title = default_title
self.os_window_size: Optional[WindowSizes] = None self.os_window_size: Optional[WindowSizes] = None
self.os_window_class: Optional[str] = None self.os_window_class: Optional[str] = None
self.watchers = Watchers()
def add_watchers_to_all_windows(self, watchers: Watchers) -> None:
def add(w: 'SpecialWindowInstance') -> 'SpecialWindowInstance':
if w.watchers is None:
return w._replace(watchers=watchers)
wt = w.watchers.copy()
wt.add(watchers)
return w._replace(watchers=wt)
for tab in self.tabs:
tab.windows = [add(w) for w in tab.windows]
def add_tab(self, opts: Options, name: str = '') -> None: def add_tab(self, opts: Options, name: str = '') -> None:
if self.tabs and not self.tabs[-1].windows: if self.tabs and not self.tabs[-1].windows:
del self.tabs[-1] del self.tabs[-1]
self.tabs.append(Tab(opts, name)) if self.tabs:
self.tabs[-1].watchers = self.watchers.copy()
self.tabs.append(Tab(opts, name, self.watchers))
def set_next_title(self, title: str) -> None: def set_next_title(self, title: str) -> None:
self.tabs[-1].next_title = title.strip() self.tabs[-1].next_title = title.strip()
@ -65,10 +81,16 @@ class Session:
cmd = None cmd = None
from .tabs import SpecialWindow from .tabs import SpecialWindow
t = self.tabs[-1] t = self.tabs[-1]
t.windows.append(SpecialWindow(cmd, cwd=t.cwd, override_title=t.next_title or self.default_title)) watchers: Optional[Watchers] = None
if self.watchers.has_watchers:
watchers = self.watchers.copy()
sw = SpecialWindow(cmd, cwd=t.cwd, override_title=t.next_title or self.default_title, watchers=watchers)
t.windows.append(sw)
t.next_title = None t.next_title = None
def add_special_window(self, sw: 'SpecialWindowInstance') -> None: def add_special_window(self, sw: 'SpecialWindowInstance') -> None:
if self.watchers.has_watchers:
sw = sw._replace(watchers=self.watchers.copy())
self.tabs[-1].windows.append(sw) self.tabs[-1].windows.append(sw)
def focus(self) -> None: def focus(self) -> None:
@ -87,9 +109,13 @@ class Session:
def parse_session(raw: str, opts: Options, default_title: Optional[str] = None) -> Generator[Session, None, None]: def parse_session(raw: str, opts: Options, default_title: Optional[str] = None) -> Generator[Session, None, None]:
def finalize_session(ans: Session) -> Session: def finalize_session(ans: Session) -> Session:
from .tabs import SpecialWindow
for t in ans.tabs: for t in ans.tabs:
if not t.windows: if not t.windows:
t.windows.append(resolved_shell(opts)) w: Optional[Watchers] = None
if t.watchers.has_watchers:
w = t.watchers.copy()
t.windows.append(SpecialWindow(cmd=resolved_shell(opts), watchers=w))
return ans return ans
ans = Session(default_title) ans = Session(default_title)
@ -107,7 +133,9 @@ def parse_session(raw: str, opts: Options, default_title: Optional[str] = None)
ans.add_tab(opts, rest) ans.add_tab(opts, rest)
elif cmd == 'new_os_window': elif cmd == 'new_os_window':
yield finalize_session(ans) yield finalize_session(ans)
wt = ans.watchers
ans = Session(default_title) ans = Session(default_title)
ans.watchers = wt.copy()
ans.add_tab(opts, rest) ans.add_tab(opts, rest)
elif cmd == 'layout': elif cmd == 'layout':
ans.set_layout(rest) ans.set_layout(rest)
@ -127,6 +155,14 @@ def parse_session(raw: str, opts: Options, default_title: Optional[str] = None)
ans.os_window_size = WindowSizes(WindowSize(*w), WindowSize(*h)) ans.os_window_size = WindowSizes(WindowSize(*w), WindowSize(*h))
elif cmd == 'os_window_class': elif cmd == 'os_window_class':
ans.os_window_class = rest ans.os_window_class = rest
elif cmd == 'watcher':
from .launch import load_watch_modules
if rest == 'clear':
ans.watchers = Watchers()
else:
watchers = load_watch_modules((rest,))
if watchers is not None:
ans.watchers.add(watchers)
else: else:
raise ValueError('Unknown command in session file: {}'.format(cmd)) raise ValueError('Unknown command in session file: {}'.format(cmd))
yield finalize_session(ans) yield finalize_session(ans)

View File

@ -48,6 +48,7 @@ class SpecialWindowInstance(NamedTuple):
cwd: Optional[str] cwd: Optional[str]
overlay_for: Optional[int] overlay_for: Optional[int]
env: Optional[Dict[str, str]] env: Optional[Dict[str, str]]
watchers: Optional[Watchers]
def SpecialWindow( def SpecialWindow(
@ -57,9 +58,10 @@ def SpecialWindow(
cwd_from: Optional[int] = None, cwd_from: Optional[int] = None,
cwd: Optional[str] = None, cwd: Optional[str] = None,
overlay_for: Optional[int] = None, overlay_for: Optional[int] = None,
env: Optional[Dict[str, str]] = None env: Optional[Dict[str, str]] = None,
watchers: Optional[Watchers] = None
) -> SpecialWindowInstance: ) -> SpecialWindowInstance:
return SpecialWindowInstance(cmd, stdin, override_title, cwd_from, cwd, overlay_for, env) return SpecialWindowInstance(cmd, stdin, override_title, cwd_from, cwd, overlay_for, env, watchers)
def add_active_id_to_history(items: Deque[int], item_id: int, maxlen: int = 64) -> None: def add_active_id_to_history(items: Deque[int], item_id: int, maxlen: int = 64) -> None:
@ -333,14 +335,14 @@ class Tab: # {{{
special_window: SpecialWindowInstance, special_window: SpecialWindowInstance,
location: Optional[str] = None, location: Optional[str] = None,
copy_colors_from: Optional[Window] = None, copy_colors_from: Optional[Window] = None,
allow_remote_control: bool = False allow_remote_control: bool = False,
) -> Window: ) -> Window:
return self.new_window( return self.new_window(
use_shell=False, cmd=special_window.cmd, stdin=special_window.stdin, use_shell=False, cmd=special_window.cmd, stdin=special_window.stdin,
override_title=special_window.override_title, override_title=special_window.override_title,
cwd_from=special_window.cwd_from, cwd=special_window.cwd, overlay_for=special_window.overlay_for, cwd_from=special_window.cwd_from, cwd=special_window.cwd, overlay_for=special_window.overlay_for,
env=special_window.env, location=location, copy_colors_from=copy_colors_from, env=special_window.env, location=location, copy_colors_from=copy_colors_from,
allow_remote_control=allow_remote_control allow_remote_control=allow_remote_control, watchers=special_window.watchers
) )
def close_window(self) -> None: def close_window(self) -> None:

View File

@ -87,10 +87,37 @@ class Watcher:
class Watchers: class Watchers:
on_resize: List[Watcher]
on_close: List[Watcher]
on_focus_change: List[Watcher]
def __init__(self) -> None: def __init__(self) -> None:
self.on_resize: List[Watcher] = [] self.on_resize = []
self.on_close: List[Watcher] = [] self.on_close = []
self.on_focus_change: List[Watcher] = [] self.on_focus_change = []
def add(self, others: 'Watchers') -> None:
def merge(base: List[Watcher], other: List[Watcher]) -> None:
for x in other:
if x not in base:
base.append(x)
merge(self.on_resize, others.on_resize)
merge(self.on_close, others.on_close)
merge(self.on_focus_change, others.on_focus_change)
def clear(self) -> None:
del self.on_close[:], self.on_resize[:], self.on_focus_change[:]
def copy(self) -> 'Watchers':
ans = Watchers()
ans.on_close = self.on_close[:]
ans.on_resize = self.on_resize[:]
ans.on_focus_change = self.on_focus_change
return ans
@property
def has_watchers(self) -> bool:
return bool(self.on_close or self.on_resize or self.on_focus_change)
def call_watchers(windowref: Callable[[], Optional['Window']], which: str, data: Dict[str, Any]) -> None: def call_watchers(windowref: Callable[[], Optional['Window']], which: str, data: Dict[str, Any]) -> None: