diff --git a/kittens/ask/main.py b/kittens/ask/main.py index db819313c..8829730a9 100644 --- a/kittens/ask/main.py +++ b/kittens/ask/main.py @@ -1,84 +1,15 @@ #!/usr/bin/env python3 # License: GPL v3 Copyright: 2018, Kovid Goyal -import os -import re import sys -from contextlib import suppress from typing import ( - TYPE_CHECKING, - Callable, - Dict, - Iterator, List, - NamedTuple, Optional, - Tuple, ) -from kitty.cli import parse_args -from kitty.cli_stub import AskCLIOptions -from kitty.constants import cache_dir -from kitty.fast_data_types import truncate_point_for_length, wcswidth -from kitty.types import run_once -from kitty.typing import BossType, KeyEventType, TypedDict -from kitty.utils import ScreenSize +from kitty.typing import BossType, TypedDict -from ..tui.handler import Handler, result_handler -from ..tui.loop import Loop, MouseEvent, debug -from ..tui.operations import MouseTracking, alternate_screen, styled - -if TYPE_CHECKING: - import readline - debug -else: - readline = None - - -def get_history_items() -> List[str]: - return list(map(readline.get_history_item, range(1, readline.get_current_history_length() + 1))) - - -def sort_key(item: str) -> Tuple[int, str]: - return len(item), item.lower() - - -class HistoryCompleter: - - def __init__(self, name: Optional[str] = None): - self.matches: List[str] = [] - self.history_path = None - if name: - ddir = os.path.join(cache_dir(), 'ask') - with suppress(FileExistsError): - os.makedirs(ddir) - self.history_path = os.path.join(ddir, name) - - def complete(self, text: str, state: int) -> Optional[str]: - response = None - if state == 0: - history_values = get_history_items() - if text: - self.matches = sorted( - (h for h in history_values if h and h.startswith(text)), key=sort_key) - else: - self.matches = [] - try: - response = self.matches[state] - except IndexError: - response = None - return response - - def __enter__(self) -> 'HistoryCompleter': - if self.history_path: - with suppress(Exception): - readline.read_history_file(self.history_path) - readline.set_completer(self.complete) - return self - - def __exit__(self, *a: object) -> None: - if self.history_path: - readline.write_history_file(self.history_path) +from ..tui.handler import result_handler def option_text() -> str: @@ -134,397 +65,8 @@ class Response(TypedDict): items: List[str] response: Optional[str] - -class Choice(NamedTuple): - text: str - idx: int - color: str - letter: str - - -class Range(NamedTuple): - start: int - end: int - y: int - - def has_point(self, x: int, y: int) -> bool: - return y == self.y and self.start <= x <= self.end - - -def truncate_at_space(text: str, width: int) -> Tuple[str, str]: - p = truncate_point_for_length(text, width) - if p < len(text): - i = text.rfind(' ', 0, p + 1) - if i > 0 and p - i < 12: - p = i + 1 - return text[:p], text[p:] - - -def extra_for(width: int, screen_width: int) -> int: - return max(0, screen_width - width) // 2 + 1 - - -class Password(Handler): - - def __init__(self, cli_opts: AskCLIOptions, prompt: str, is_password: bool = True, initial_text: str = '') -> None: - self.cli_opts = cli_opts - self.prompt = prompt - self.initial_text = initial_text - from kittens.tui.line_edit import LineEdit - self.line_edit = LineEdit(is_password=is_password) - - def initialize(self) -> None: - self.cmd.set_cursor_shape('beam') - if self.initial_text: - self.line_edit.on_text(self.initial_text, True) - self.draw_screen() - - @Handler.atomic_update - def draw_screen(self) -> None: - self.cmd.clear_screen() - if self.cli_opts.message: - for line in self.cli_opts.message.splitlines(): - self.print(line) - self.print() - self.line_edit.write(self.write, self.prompt) - - def on_text(self, text: str, in_bracketed_paste: bool = False) -> None: - self.line_edit.on_text(text, in_bracketed_paste) - self.draw_screen() - - def on_key(self, key_event: KeyEventType) -> None: - if self.line_edit.on_key(key_event): - self.draw_screen() - return - if key_event.matches('enter'): - self.quit_loop(0) - if key_event.matches('esc'): - self.quit_loop(1) - - def on_resize(self, screen_size: ScreenSize) -> None: - self.screen_size = screen_size - self.draw_screen() - - def on_interrupt(self) -> None: - self.quit_loop(1) - on_eot = on_interrupt - - @property - def response(self) -> str: - if self._tui_loop.return_code == 0: - return self.line_edit.current_input - return '' - - -class Choose(Handler): # {{{ - mouse_tracking = MouseTracking.buttons_only - - def __init__(self, cli_opts: AskCLIOptions) -> None: - self.prefix_style_pat = re.compile(r'(?:\x1b\[[^m]*?m)+') - self.cli_opts = cli_opts - self.choices: Dict[str, Choice] = {} - self.clickable_ranges: Dict[str, List[Range]] = {} - if cli_opts.type == 'yesno': - self.allowed = frozenset('yn') - else: - allowed = [] - for choice in cli_opts.choices: - letter, text = choice.split(':', maxsplit=1) - color = '' - if ';' in letter: - letter, color = letter.split(';', maxsplit=1) - letter = letter.lower() - idx = text.lower().index(letter) - allowed.append(letter) - self.choices[letter] = Choice(text, idx, color, letter) - self.allowed = frozenset(allowed) - self.response = '' - self.response_on_accept = cli_opts.default or '' - if cli_opts.type in ('yesno', 'choices') and self.response_on_accept not in self.allowed: - self.response_on_accept = 'y' if cli_opts.type == 'yesno' else tuple(self.choices.keys())[0] - self.message = cli_opts.message - self.hidden_text_start_pos = self.hidden_text_end_pos = -1 - self.hidden_text = '' - self.replacement_text = t = f'Press {styled(self.cli_opts.unhide_key, fg="green")} or click to show' - self.replacement_range = Range(-1, -1, -1) - if self.message and self.cli_opts.hidden_text_placeholder: - self.hidden_text_start_pos = self.message.find(self.cli_opts.hidden_text_placeholder) - if self.hidden_text_start_pos > -1: - self.hidden_text = sys.stdin.read().rstrip() - self.hidden_text_end_pos = self.hidden_text_start_pos + len(t) - suffix = self.message[self.hidden_text_start_pos + len(self.cli_opts.hidden_text_placeholder):] - self.message = self.message[:self.hidden_text_start_pos] + t + suffix - - def initialize(self) -> None: - self.cmd.set_cursor_visible(False) - self.draw_screen() - - def finalize(self) -> None: - self.cmd.set_cursor_visible(True) - - def draw_long_text(self, text: str) -> Iterator[str]: - if not text: - yield '' - return - width = self.screen_size.cols - 2 - m = self.prefix_style_pat.match(text) - prefix = m.group() if m else '' - while text: - t, text = truncate_at_space(text, width) - t = t.strip() - yield ' ' * extra_for(wcswidth(t), width) + styled(prefix + t, bold=True) - - @Handler.atomic_update - def draw_screen(self) -> None: - self.cmd.clear_screen() - msg_lines: List[str] = [] - if self.message: - for line in self.message.splitlines(): - msg_lines.extend(self.draw_long_text(line)) - y = self.screen_size.rows - len(msg_lines) - y = max(0, (y // 2) - 2) - self.print(end='\r\n'*y) - for line in msg_lines: - if self.replacement_text in line: - idx = line.find(self.replacement_text) - x = wcswidth(line[:idx]) - self.replacement_range = Range(x, x + wcswidth(self.replacement_text), y) - self.print(line) - y += 1 - if self.screen_size.rows > 2: - self.print() - y += 1 - if self.cli_opts.type == 'yesno': - self.draw_yesno(y) - else: - self.draw_choice(y) - - def draw_choice_boxes(self, y: int, *choices: Choice) -> None: - self.clickable_ranges.clear() - width = self.screen_size.cols - 2 - current_line_length = 0 - current_line: List[Tuple[str, str]] = [] - lines: List[List[Tuple[str, str]]] = [] - sep = ' ' - sep_sz = len(sep) + 2 # for the borders - - for choice in choices: - self.clickable_ranges[choice.letter] = [] - text = ' ' + choice.text[:choice.idx] - text += styled(choice.text[choice.idx], fg=choice.color or 'green') - text += choice.text[choice.idx + 1:] + ' ' - sz = wcswidth(text) - if sz + sep_sz + current_line_length > width: - lines.append(current_line) - current_line = [] - current_line_length = 0 - current_line.append((choice.letter, text)) - current_line_length += sz + sep_sz - if current_line: - lines.append(current_line) - - def top(text: str) -> str: - return '╭' + '─' * wcswidth(text) + '╮' - - def middle(text: str) -> str: - return f'│{text}│' - - def bottom(text: str) -> str: - return '╰' + '─' * wcswidth(text) + '╯' - - def highlight(text: str, only_edges: bool = False) -> str: - if only_edges: - return styled(text[0], fg='yellow') + text[1:-1] + styled(text[-1], fg='yellow') - return styled(text, fg='yellow') - - def print_line(add_borders: Callable[[str], str], *items: Tuple[str, str], is_last: bool = False) -> None: - nonlocal y - texts = [] - positions = [] - x = 0 - for (letter, text) in items: - positions.append((letter, x, wcswidth(text) + 2)) - text = add_borders(text) - if letter == self.response_on_accept: - text = highlight(text, only_edges=add_borders is middle) - text += sep - x += wcswidth(text) - texts.append(text) - line = ''.join(texts).rstrip() - offset = extra_for(wcswidth(line), width) - for (letter, x, sz) in positions: - x += offset - self.clickable_ranges[letter].append(Range(x, x + sz - 1, y)) - self.print(' ' * offset, line, sep='', end='' if is_last else '\r\n') - y += 1 - - self.cmd.set_line_wrapping(False) - for boxed_line in lines: - print_line(top, *boxed_line) - print_line(middle, *boxed_line) - print_line(bottom, *boxed_line, is_last=boxed_line is lines[-1]) - self.cmd.set_line_wrapping(True) - - def draw_choice(self, y: int) -> None: - if y + 3 <= self.screen_size.rows: - self.draw_choice_boxes(y, *self.choices.values()) - return - self.clickable_ranges.clear() - current_line = '' - current_ranges: Dict[str, int] = {} - width = self.screen_size.cols - 2 - - def commit_line(end: str = '\r\n') -> None: - nonlocal current_line, y - x = extra_for(wcswidth(current_line), width) - self.print(' ' * x + current_line, end=end) - for letter, sz in current_ranges.items(): - self.clickable_ranges[letter] = [Range(x, x + sz - 3, y)] - x += sz - current_ranges.clear() - y += 1 - current_line = '' - - for letter, choice in self.choices.items(): - text = choice.text[:choice.idx] - text += styled(choice.text[choice.idx], fg=choice.color or 'green', underline='straight' if letter == self.response_on_accept else None) - text += choice.text[choice.idx + 1:] - text += ' ' - sz = wcswidth(text) - if sz + wcswidth(current_line) >= width: - commit_line() - current_line += text - current_ranges[letter] = sz - if current_line: - commit_line(end='') - - def draw_yesno(self, y: int) -> None: - yes = styled('Y', fg='green') + 'es' - no = styled('N', fg='red') + 'o' - if y + 3 <= self.screen_size.rows: - self.draw_choice_boxes(y, Choice('Yes', 0, 'green', 'y'), Choice('No', 0, 'red', 'n')) - return - sep = ' ' * 3 - text = yes + sep + no - w = wcswidth(text) - x = extra_for(w, self.screen_size.cols - 2) - nx = x + wcswidth(yes) + len(sep) - self.clickable_ranges = {'y': [Range(x, x + wcswidth(yes) - 1, y)], 'n': [Range(nx, nx + wcswidth(no) - 1, y)]} - self.print(' ' * x + text, end='') - - def on_text(self, text: str, in_bracketed_paste: bool = False) -> None: - text = text.lower() - if text in self.allowed: - self.response = text - self.quit_loop(0) - elif self.cli_opts.type == 'yesno': - self.on_interrupt() - elif self.hidden_text and text == self.cli_opts.unhide_key: - self.unhide() - - def unhide(self) -> None: - if self.hidden_text and self.message: - self.message = self.message[:self.hidden_text_start_pos] + self.hidden_text + self.message[self.hidden_text_end_pos:] - self.hidden_text = '' - self.draw_screen() - - def on_key(self, key_event: KeyEventType) -> None: - if key_event.matches('esc'): - self.on_interrupt() - elif key_event.matches('enter'): - self.response = self.response_on_accept - self.quit_loop(0) - - def on_click(self, ev: MouseEvent) -> None: - for letter, ranges in self.clickable_ranges.items(): - for r in ranges: - if r.has_point(ev.cell_x, ev.cell_y): - self.response = letter - self.quit_loop(0) - return - if self.hidden_text and self.replacement_range.has_point(ev.cell_x, ev.cell_y): - self.unhide() - - def on_resize(self, screen_size: ScreenSize) -> None: - self.screen_size = screen_size - self.draw_screen() - - def on_interrupt(self) -> None: - self.quit_loop(1) - on_eot = on_interrupt -# }}} - - -@run_once -def init_readline() -> None: - import readline - with suppress(OSError): - readline.read_init_file() - if 'libedit' in readline.__doc__: - readline.parse_and_bind("bind ^I rl_complete") - else: - readline.parse_and_bind('tab: complete') - - def main(args: List[str]) -> Response: - # For some reason importing readline in a key handler in the main kitty process - # causes a crash of the python interpreter, probably because of some global - # lock - global readline - msg = 'Ask the user for input' - try: - cli_opts, items = parse_args(args[1:], option_text, '', msg, 'kitty +kitten ask', result_class=AskCLIOptions) - except SystemExit as e: - if e.code != 0: - print(e.args[0]) - input('Press Enter to quit') - raise SystemExit(e.code) - - if cli_opts.type in ('yesno', 'choices'): - loop = Loop() - handler = Choose(cli_opts) - loop.loop(handler) - return {'items': items, 'response': handler.response} - - prompt = cli_opts.prompt - if prompt[0] == prompt[-1] and prompt[0] in '\'"': - prompt = prompt[1:-1] - if cli_opts.type == 'password': - loop = Loop() - phandler = Password(cli_opts, prompt) - loop.loop(phandler) - return {'items': items, 'response': phandler.response} - - # we do this file descriptor dance to get readline to work even when STDOUT - # is redirected - orig_stdout = os.dup(sys.stdout.fileno()) - try: - with open(os.ctermid(), 'r') as tty: - os.dup2(tty.fileno(), sys.stdin.fileno()) - with open(os.ctermid(), 'w') as tty: - os.dup2(tty.fileno(), sys.stdout.fileno()) - import readline as rl - readline = rl - init_readline() - response = None - - with alternate_screen(), HistoryCompleter(cli_opts.name), suppress(KeyboardInterrupt, EOFError): - if cli_opts.message: - print(styled(cli_opts.message, bold=True)) - if cli_opts.default: - def prefill_text() -> None: - readline.insert_text(cli_opts.default or '') - readline.redisplay() - readline.set_pre_input_hook(prefill_text) - response = input(prompt) - readline.set_pre_input_hook() - else: - response = input(prompt) - sys.stdout.flush() - os.dup2(orig_stdout, sys.stdout.fileno()) - finally: - os.close(orig_stdout) - return {'items': items, 'response': response} + raise SystemExit('This must be run as kitten ask') @result_handler() @@ -535,13 +77,8 @@ def handle_result(args: List[str], data: Response, target_window_id: int, boss: if __name__ == '__main__': - ans = main(sys.argv) - if ans: - import json - print(json.dumps(ans)) + main(sys.argv) elif __name__ == '__doc__': - import sys - cd = sys.cli_docs # type: ignore cd['usage'] = '' cd['options'] = option_text diff --git a/tools/cmd/ask/get_line.go b/tools/cmd/ask/get_line.go new file mode 100644 index 000000000..41d4e827b --- /dev/null +++ b/tools/cmd/ask/get_line.go @@ -0,0 +1,93 @@ +// License: GPLv3 Copyright: 2023, Kovid Goyal, + +package ask + +import ( + "fmt" + "io" + "os" + "path/filepath" + "time" + + "kitty/tools/tui/loop" + "kitty/tools/tui/readline" + "kitty/tools/utils" +) + +var _ = fmt.Print + +func get_line(o *Options, items []string) (result map[string]any, err error) { + lp, err := loop.New(loop.NoAlternateScreen, loop.NoRestoreColors) + if err != nil { + return + } + cwd, _ := os.Getwd() + result = map[string]any{"items": items, "response": ""} + ropts := readline.RlInit{Prompt: o.Prompt} + if o.Name != "" { + base := filepath.Join(utils.CacheDir(), "ask") + ropts.HistoryPath = filepath.Join(base, o.Name+".history.json") + os.MkdirAll(base, 0o755) + } + rl := readline.New(lp, ropts) + if o.Default != "" { + rl.SetText(o.Default) + } + lp.OnInitialize = func() (string, error) { + rl.Start() + return "", nil + } + lp.OnFinalize = func() string { rl.End(); return "" } + + lp.OnResumeFromStop = func() error { + rl.Start() + return nil + } + + lp.OnResize = rl.OnResize + + lp.OnKeyEvent = func(event *loop.KeyEvent) error { + if event.MatchesPressOrRepeat("ctrl+c") { + return fmt.Errorf("Canceled by user") + } + err := rl.OnKeyEvent(event) + if err != nil { + if err == io.EOF { + lp.Quit(0) + return nil + } + if err == readline.ErrAcceptInput { + hi := readline.HistoryItem{Timestamp: time.Now(), Cmd: rl.AllText(), ExitCode: 0, Cwd: cwd} + rl.AddHistoryItem(hi) + result["response"] = rl.AllText() + lp.Quit(0) + return nil + } + return err + } + if event.Handled { + rl.Redraw() + return nil + } + return nil + } + + lp.OnText = func(text string, from_key_event, in_bracketed_paste bool) error { + err := rl.OnText(text, from_key_event, in_bracketed_paste) + if err == nil { + rl.Redraw() + } + return err + } + + err = lp.Run() + rl.Shutdown() + if err != nil { + return nil, err + } + ds := lp.DeathSignalName() + if ds != "" { + return nil, fmt.Errorf("Killed by signal: %s", ds) + } + return +} diff --git a/tools/cmd/ask/main.go b/tools/cmd/ask/main.go index dee148ec6..2861b79d4 100644 --- a/tools/cmd/ask/main.go +++ b/tools/cmd/ask/main.go @@ -34,7 +34,7 @@ func main(_ *cli.Command, o *Options, args []string) (rc int, err error) { } case "password": show_message(o.Message) - pw, err := tui.ReadPassword(o.Prompt, true) + pw, err := tui.ReadPassword(o.Prompt, false) if err != nil { if errors.Is(err, tui.Canceled) { pw = "" @@ -43,6 +43,12 @@ func main(_ *cli.Command, o *Options, args []string) (rc int, err error) { } } result = map[string]any{"items": args, "response": pw} + case "line": + show_message(o.Message) + result, err = get_line(o, args) + if err != nil { + return 1, err + } default: return 1, fmt.Errorf("Unknown type: %s", o.Type) }