Port the hyperlinked_grep kitten to Go

This commit is contained in:
Kovid Goyal 2023-03-05 13:41:57 +05:30
parent a0d30f4dd8
commit 6660071d3a
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
5 changed files with 497 additions and 184 deletions

View File

@ -1,192 +1,10 @@
#!/usr/bin/env python
# License: GPLv3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
import argparse
import os
import re
import signal
import subprocess
import sys
from typing import Callable, List, cast
from urllib.parse import quote_from_bytes
from kitty.utils import get_hostname
def write_hyperlink(write: Callable[[bytes], None], url: bytes, line: bytes, frag: bytes = b'') -> None:
text = b'\033]8;;' + url
if frag:
text += b'#' + frag
text += b'\033\\' + line + b'\033]8;;\033\\'
write(text)
def parse_options(argv: List[str]) -> argparse.Namespace:
p = argparse.ArgumentParser(add_help=False)
p.add_argument('--context-separator', default='--')
p.add_argument('-c', '--count', action='store_true')
p.add_argument('--count-matches', action='store_true')
p.add_argument('--field-context-separator', default='-')
p.add_argument('--field-match-separator', default='-')
p.add_argument('--files', action='store_true')
p.add_argument('-l', '--files-with-matches', action='store_true')
p.add_argument('--files-without-match', action='store_true')
p.add_argument('-h', '--help', action='store_true')
p.add_argument('--json', action='store_true')
p.add_argument('-I', '--no-filename', action='store_true')
p.add_argument('--no-heading', action='store_true')
p.add_argument('-N', '--no-line-number', action='store_true')
p.add_argument('-0', '--null', action='store_true')
p.add_argument('--null-data', action='store_true')
p.add_argument('--path-separator', default=os.path.sep)
p.add_argument('--stats', action='store_true')
p.add_argument('--type-list', action='store_true')
p.add_argument('-V', '--version', action='store_true')
p.add_argument('--vimgrep', action='store_true')
p.add_argument(
'-p', '--pretty',
default=sys.stdout.isatty(),
action='store_true',
)
p.add_argument('--kitten', action='append', default=[])
args, _ = p.parse_known_args(argv)
return args
def main() -> None:
i = 1
args = parse_options(sys.argv[1:])
all_link_options = {'matching_lines', 'context_lines', 'file_headers'}
link_options = set()
delegate_to_rg = False
for raw in args.kitten:
p, _, s = raw.partition('=')
if p != 'hyperlink':
raise SystemExit(f'Unknown argument for --kitten: {raw}')
for option in s.split(','):
if option == 'all':
link_options.update(all_link_options)
delegate_to_rg = False
elif option == 'none':
delegate_to_rg = True
link_options.clear()
elif option not in all_link_options:
a = ', '.join(sorted(all_link_options))
raise SystemExit(f"hyperlink option must be one of all, none, {a}, not '{option}'")
else:
link_options.add(option)
delegate_to_rg = False
while i < len(sys.argv):
if sys.argv[i] == '--kitten':
del sys.argv[i:i+2]
elif sys.argv[i].startswith('--kitten='):
del sys.argv[i]
else:
i += 1
if not link_options: # Default to linking everything if no options given
link_options.update(all_link_options)
link_file_headers = 'file_headers' in link_options
link_context_lines = 'context_lines' in link_options
link_matching_lines = 'matching_lines' in link_options
if any((
args.context_separator != '--',
args.field_context_separator != '-',
args.field_match_separator != '-',
args.help,
args.json,
args.no_filename,
args.null,
args.null_data,
args.path_separator != os.path.sep,
args.type_list,
args.version,
not args.pretty,
)):
delegate_to_rg = True
if delegate_to_rg:
os.execlp('rg', 'rg', *sys.argv[1:])
cmdline = ['rg', '--pretty', '--with-filename'] + sys.argv[1:]
try:
p = subprocess.Popen(cmdline, stdout=subprocess.PIPE)
except FileNotFoundError:
raise SystemExit('Could not find the rg executable in your PATH. Is ripgrep installed?')
assert p.stdout is not None
write: Callable[[bytes], None] = cast(Callable[[bytes], None], sys.stdout.buffer.write)
sgr_pat = re.compile(br'\x1b\[.*?m')
osc_pat = re.compile(b'\x1b\\].*?\x1b\\\\')
num_pat = re.compile(br'^(\d+)([:-])')
path_with_count_pat = re.compile(br'(.*?)(:\d+)')
path_with_linenum_pat = re.compile(br'^(.*?):(\d+):')
stats_pat = re.compile(br'^\d+ matches$')
vimgrep_pat = re.compile(br'^(.*?):(\d+):(\d+):')
in_stats = False
in_result: bytes = b''
hostname = get_hostname().encode('utf-8')
def get_quoted_url(file_path: bytes) -> bytes:
return b'file://' + hostname + quote_from_bytes(os.path.abspath(file_path)).encode('utf-8')
try:
for line in p.stdout:
line = osc_pat.sub(b'', line) # remove any existing hyperlinks
clean_line = sgr_pat.sub(b'', line).rstrip() # remove SGR formatting
if not clean_line:
in_result = b''
write(b'\n')
elif in_stats:
write(line)
elif in_result:
if not args.no_line_number:
m = num_pat.match(clean_line)
if m is not None:
is_match_line = m.group(2) == b':'
if (is_match_line and link_matching_lines) or (not is_match_line and link_context_lines):
write_hyperlink(write, in_result, line, frag=m.group(1))
continue
write(line)
else:
if line.strip():
# The option priority should be consistent with ripgrep here.
if args.stats and not in_stats and stats_pat.match(clean_line):
in_stats = True
elif args.count or args.count_matches:
m = path_with_count_pat.match(clean_line)
if m is not None and link_file_headers:
write_hyperlink(write, get_quoted_url(m.group(1)), line)
continue
elif args.files or args.files_with_matches or args.files_without_match:
if link_file_headers:
write_hyperlink(write, get_quoted_url(clean_line), line)
continue
elif args.vimgrep or args.no_heading:
# When the vimgrep option is present, it will take precedence.
m = vimgrep_pat.match(clean_line) if args.vimgrep else path_with_linenum_pat.match(clean_line)
if m is not None and (link_file_headers or link_matching_lines):
write_hyperlink(write, get_quoted_url(m.group(1)), line, frag=m.group(2))
continue
else:
in_result = get_quoted_url(clean_line)
if link_file_headers:
write_hyperlink(write, in_result, line)
continue
write(line)
except KeyboardInterrupt:
p.send_signal(signal.SIGINT)
except (EOFError, BrokenPipeError):
pass
finally:
p.stdout.close()
raise SystemExit(p.wait())
if __name__ == '__main__':
main()
raise SystemExit('This should be run as kitten hyperlinked_grep')
elif __name__ == '__wrapper_of__':
cd = sys.cli_docs # type: ignore
cd['wrapper_of'] = 'rg'

View File

@ -24,7 +24,7 @@ exec_kitty() {
is_wrapped_kitten() {
wrapped_kittens="clipboard icat unicode_input ssh"
wrapped_kittens="clipboard icat hyperlinked_grep unicode_input ssh"
[ -n "$1" ] && {
case " $wrapped_kittens " in
*" $1 "*) printf "%s" "$1" ;;

View File

@ -0,0 +1,425 @@
// License: GPLv3 Copyright: 2023, Kovid Goyal, <kovid at kovidgoyal.net>
package hyperlinked_grep
import (
"bufio"
"bytes"
"errors"
"fmt"
"net/url"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"unicode"
"kitty/tools/cli"
"kitty/tools/utils"
"golang.org/x/sys/unix"
)
var _ = fmt.Print
var RgExe = (&utils.Once[string]{Run: func() string {
ans := utils.Which("rg")
if ans != "" {
return ans
}
ans = utils.Which("rg", utils.DefaultExeSearchPaths()...)
if ans == "" {
ans = "rg"
}
return ans
}}).Get
func get_options_for_rg() (expecting_args map[string]bool, alias_map map[string]string, err error) {
var raw []byte
raw, err = exec.Command(RgExe(), "--help").Output()
if err != nil {
err = fmt.Errorf("Failed to execute rg: %w", err)
return
}
scanner := bufio.NewScanner(strings.NewReader(utils.UnsafeBytesToString(raw)))
options_started := false
expecting_args = make(map[string]bool, 64)
alias_map = make(map[string]string, 52)
for scanner.Scan() {
line := scanner.Text()
if options_started {
s := strings.TrimLeft(line, " ")
indent := len(line) - len(s)
if indent < 12 && indent > 0 {
s, _, expecting_arg := strings.Cut(s, "<")
single_letter_aliases := make([]string, 0, 1)
long_option_names := make([]string, 0, 1)
for _, x := range strings.Split(s, ",") {
x = strings.TrimSpace(x)
if strings.HasPrefix(x, "--") {
long_option_names = append(long_option_names, x[2:])
} else if strings.HasPrefix(x, "-") {
single_letter_aliases = append(single_letter_aliases, x[1:])
}
}
if len(long_option_names) == 0 {
err = fmt.Errorf("Failed to parse rg help output line: %s", line)
return
}
for _, x := range single_letter_aliases {
alias_map[x] = long_option_names[0]
}
for _, x := range long_option_names[1:] {
alias_map[x] = long_option_names[0]
}
expecting_args[long_option_names[0]] = expecting_arg
}
} else {
if strings.HasPrefix(line, "OPTIONS:") {
options_started = true
}
}
}
return
}
type kitten_options struct {
matching_lines, context_lines, file_headers bool
with_filename, heading, line_number bool
stats, count, count_matches bool
files, files_with_matches, files_without_match bool
vimgrep bool
}
func default_kitten_opts() *kitten_options {
return &kitten_options{
matching_lines: true, context_lines: true, file_headers: true,
with_filename: true, heading: true, line_number: true,
}
}
func parse_args(args ...string) (delegate_to_rg bool, sanitized_args []string, kitten_opts *kitten_options, err error) {
options_that_expect_args, alias_map, err := get_options_for_rg()
if err != nil {
return
}
options_that_expect_args["kitten"] = true
kitten_opts = default_kitten_opts()
sanitized_args = make([]string, 0, len(args))
expecting_option_arg := ""
context_separator := "--"
field_context_separator := "-"
field_match_separator := "-"
handle_option_arg := func(key, val string, with_equals bool) error {
if key != "kitten" {
if with_equals {
sanitized_args = append(sanitized_args, "--"+key+"="+val)
} else {
sanitized_args = append(sanitized_args, key, val)
}
}
switch key {
case "path-separator":
if val != string(os.PathSeparator) {
delegate_to_rg = true
}
case "context-separator":
context_separator = val
case "field-context-separator":
field_context_separator = val
case "field-match-separator":
field_match_separator = val
case "kitten":
k, v, found := strings.Cut(val, "=")
if !found || k != "hyperlink" {
return fmt.Errorf("Unknown --kitten option: %s", val)
}
for _, x := range strings.Split(v, ",") {
switch x {
case "none":
kitten_opts.context_lines = false
kitten_opts.file_headers = false
kitten_opts.matching_lines = false
case "all":
kitten_opts.context_lines = true
kitten_opts.file_headers = true
kitten_opts.matching_lines = true
case "matching_lines":
kitten_opts.matching_lines = true
case "file_headers":
kitten_opts.file_headers = true
case "context_lines":
kitten_opts.context_lines = true
default:
return fmt.Errorf("hyperlink option invalid: %s", x)
}
}
}
return nil
}
handle_bool_option := func(key string) {
switch key {
case "no-context-separator":
context_separator = ""
case "no-filename":
kitten_opts.with_filename = false
case "with-filename":
kitten_opts.with_filename = true
case "heading":
kitten_opts.heading = true
case "no-heading":
kitten_opts.heading = false
case "line-number":
kitten_opts.line_number = true
case "no-line-number":
kitten_opts.line_number = false
case "pretty":
kitten_opts.line_number = true
kitten_opts.heading = true
case "stats":
kitten_opts.stats = true
case "count":
kitten_opts.count = true
case "count-matches":
kitten_opts.count_matches = true
case "files":
kitten_opts.files = true
case "files-with-matches":
kitten_opts.files_with_matches = true
case "files-without-match":
kitten_opts.files_without_match = true
case "vimgrep":
kitten_opts.vimgrep = true
case "null", "null-data", "type-list", "version", "help":
delegate_to_rg = true
}
}
for i, x := range args {
if expecting_option_arg != "" {
if err = handle_option_arg(expecting_option_arg, x, false); err != nil {
return
}
expecting_option_arg = ""
} else {
if x == "--" {
sanitized_args = append(sanitized_args, args[i:]...)
break
}
if strings.HasPrefix(x, "--") {
a, b, found := strings.Cut(x, "=")
a = a[2:]
q := alias_map[a]
if q != "" {
a = q
}
if found {
if _, is_known_option := options_that_expect_args[a]; is_known_option {
if err = handle_option_arg(a, b, true); err != nil {
return
}
} else {
sanitized_args = append(sanitized_args, x)
}
} else {
if options_that_expect_args[a] {
expecting_option_arg = a
} else {
handle_bool_option(a)
sanitized_args = append(sanitized_args, x)
}
}
} else if strings.HasPrefix(x, "-") {
sanitized_args = append(sanitized_args, x)
for _, ch := range x[1 : len(x)-1] {
target := alias_map[string(ch)]
if target != "" {
handle_bool_option(target)
}
}
target := alias_map[string(rune(x[len(x)-1]))]
if target != "" {
if options_that_expect_args[target] {
expecting_option_arg = target
} else {
handle_bool_option(target)
}
}
} else {
sanitized_args = append(sanitized_args, x)
}
}
}
if !kitten_opts.with_filename || context_separator != "--" || field_context_separator != "-" || field_match_separator != "-" {
delegate_to_rg = true
}
return
}
type stdout_filter struct {
prefix []byte
process_line func(string)
}
func (self *stdout_filter) Write(p []byte) (n int, err error) {
n = len(p)
for len(p) > 0 {
idx := bytes.IndexByte(p, '\n')
if idx < 0 {
self.prefix = append(self.prefix, p...)
break
}
line := p[:idx]
if len(self.prefix) > 0 {
self.prefix = append(self.prefix, line...)
line = self.prefix
}
p = p[idx+1:]
self.process_line(utils.UnsafeBytesToString(line))
self.prefix = self.prefix[:0]
}
return
}
func main(_ *cli.Command, _ *Options, args []string) (rc int, err error) {
delegate_to_rg, sanitized_args, kitten_opts, err := parse_args(args...)
if delegate_to_rg {
sanitized_args = append([]string{"rg"}, sanitized_args...)
err = unix.Exec(RgExe(), sanitized_args, os.Environ())
if err != nil {
err = fmt.Errorf("Failed to execute rg: %w", err)
rc = 1
}
return
}
cmdline := append([]string{"--pretty", "--with-filename"}, sanitized_args...)
cmd := exec.Command(RgExe(), cmdline...)
cmd.Stdin = os.Stdin
cmd.Stderr = os.Stderr
buf := stdout_filter{prefix: make([]byte, 0, 8*1024)}
cmd.Stdout = &buf
sgr_pat := regexp.MustCompile("\x1b\\[.*?m")
osc_pat := regexp.MustCompile("\x1b\\].*?\x1b\\\\")
num_pat := regexp.MustCompile(`^(\d+)([:-])`)
path_with_count_pat := regexp.MustCompile(`^(.*?)(:\d+)`)
path_with_linenum_pat := regexp.MustCompile(`^(.*?):(\d+):`)
stats_pat := regexp.MustCompile(`^\d+ matches$`)
vimgrep_pat := regexp.MustCompile(`^(.*?):(\d+):(\d+):`)
in_stats := false
in_result := ""
hostname := utils.Hostname()
get_quoted_url := func(file_path string) string {
q, err := filepath.Abs(file_path)
if err == nil {
file_path = q
}
file_path = filepath.ToSlash(file_path)
file_path = strings.Join(utils.Map(url.PathEscape, strings.Split(file_path, "/")), "/")
return "file://" + hostname + file_path
}
write := func(items ...string) {
for _, x := range items {
os.Stdout.WriteString(x)
}
}
write_hyperlink := func(url, line, frag string) {
write("\033]8;;", url)
if frag != "" {
write("#", frag)
}
write("\033\\", line, "\n\033]8;;\033\\")
}
buf.process_line = func(line string) {
line = osc_pat.ReplaceAllLiteralString(line, "") // remove existing hyperlinks
clean_line := strings.TrimRightFunc(line, unicode.IsSpace)
clean_line = sgr_pat.ReplaceAllLiteralString(clean_line, "") // remove SGR formatting
if clean_line == "" {
in_result = ""
write("\n")
} else if in_stats {
write(line, "\n")
} else if in_result != "" {
if kitten_opts.line_number {
m := num_pat.FindStringSubmatch(clean_line)
if len(m) > 0 {
is_match_line := len(m) > 1 && m[2] == ":"
if (is_match_line && kitten_opts.matching_lines) || (!is_match_line && kitten_opts.context_lines) {
write_hyperlink(in_result, line, m[1])
return
}
}
}
write(line, "\n")
} else {
if strings.TrimSpace(line) != "" {
// The option priority should be consistent with ripgrep here.
if kitten_opts.stats && !in_stats && stats_pat.MatchString(clean_line) {
in_stats = true
} else if kitten_opts.count || kitten_opts.count_matches {
if m := path_with_count_pat.FindStringSubmatch(clean_line); len(m) > 0 && kitten_opts.file_headers {
write_hyperlink(get_quoted_url(m[1]), line, "")
return
}
} else if kitten_opts.files || kitten_opts.files_with_matches || kitten_opts.files_without_match {
if kitten_opts.file_headers {
write_hyperlink(get_quoted_url(clean_line), line, "")
return
}
} else if kitten_opts.vimgrep || !kitten_opts.heading {
var m []string
// When the vimgrep option is present, it will take precedence.
if kitten_opts.vimgrep {
m = vimgrep_pat.FindStringSubmatch(clean_line)
} else {
m = path_with_linenum_pat.FindStringSubmatch(clean_line)
}
if len(m) > 0 && (kitten_opts.file_headers || kitten_opts.matching_lines) {
write_hyperlink(get_quoted_url(m[1]), line, m[2])
return
}
} else {
in_result = get_quoted_url(clean_line)
if kitten_opts.file_headers {
write_hyperlink(in_result, line, "")
return
}
}
}
write(line, "\n")
}
}
err = cmd.Run()
var ee *exec.ExitError
if err != nil {
if errors.As(err, &ee) {
return ee.ExitCode(), nil
}
return 1, fmt.Errorf("Failed to execute rg: %w", err)
}
return
}
func specialize_command(hg *cli.Command) {
hg.Usage = "arguments for the rg command"
hg.ShortDescription = "Add hyperlinks to the output of ripgrep"
hg.HelpText = "The hyperlinked_grep kitten is a thin wrapper around the rg command. It automatically adds hyperlinks to the output of rg allowing the user to click on search results to have them open directly in their editor. For details on its usage, see :doc:`/kittens/hyperlinked_grep`."
hg.IgnoreAllArgs = true
hg.OnlyArgsAllowed = true
hg.ArgCompleter = cli.CompletionForWrapper("rg")
}
func EntryPoint(parent *cli.Command) {
create_cmd(parent, main)
}

View File

@ -0,0 +1,67 @@
// License: GPLv3 Copyright: 2023, Kovid Goyal, <kovid at kovidgoyal.net>
package hyperlinked_grep
import (
"fmt"
"testing"
)
var _ = fmt.Print
func TestRgArgParsing(t *testing.T) {
if RgExe() == "rg" {
t.Skip("Skipping as rg not found in PATH")
}
check_failure := func(args ...string) {
_, _, _, err := parse_args(args...)
if err == nil {
t.Fatalf("No error when parsing: %#v", args)
}
}
check_failure("--kitten", "xyz")
check_failure("--kitten", "xyz=1")
check_kitten_opts := func(matching, context, headers bool, args ...string) {
_, _, kitten_opts, err := parse_args(args...)
if err != nil {
t.Fatalf("error when parsing: %#v: %s", args, err)
}
if matching != kitten_opts.matching_lines {
t.Fatalf("Matching lines not correct for: %#v", args)
}
if context != kitten_opts.context_lines {
t.Fatalf("Context lines not correct for: %#v", args)
}
if headers != kitten_opts.file_headers {
t.Fatalf("File headers not correct for: %#v", args)
}
}
check_kitten_opts(true, true, true)
check_kitten_opts(false, false, false, "--kitten", "hyperlink=none")
check_kitten_opts(false, false, true, "--kitten", "hyperlink=none", "--count", "--kitten=hyperlink=file_headers")
check_kitten_opts(false, false, true, "--kitten", "hyperlink=none,file_headers")
check_kitten_opts = func(with_filename, heading, line_number bool, args ...string) {
_, _, kitten_opts, err := parse_args(args...)
if err != nil {
t.Fatalf("error when parsing: %#v: %s", args, err)
}
if with_filename != kitten_opts.with_filename {
t.Fatalf("with_filename not correct for: %#v", args)
}
if heading != kitten_opts.heading {
t.Fatalf("heading not correct for: %#v", args)
}
if line_number != kitten_opts.line_number {
t.Fatalf("line_number not correct for: %#v", args)
}
}
check_kitten_opts(true, true, true)
check_kitten_opts(true, false, true, "--no-heading")
check_kitten_opts(true, true, true, "--no-heading", "--pretty")
check_kitten_opts(true, true, true, "--no-heading", "--heading")
}

View File

@ -9,6 +9,7 @@ import (
"kitty/tools/cmd/at"
"kitty/tools/cmd/clipboard"
"kitty/tools/cmd/edit_in_kitty"
"kitty/tools/cmd/hyperlinked_grep"
"kitty/tools/cmd/icat"
"kitty/tools/cmd/pytest"
"kitty/tools/cmd/ssh"
@ -36,6 +37,8 @@ func KittyToolEntryPoints(root *cli.Command) {
ssh.EntryPoint(root)
// unicode_input
unicode_input.EntryPoint(root)
// hyperlinked_grep
hyperlinked_grep.EntryPoint(root)
// __pytest__
pytest.EntryPoint(root)
// __hold_till_enter__