Get env conf working with tests

This commit is contained in:
Kovid Goyal 2022-02-26 09:48:36 +05:30
parent 846021296f
commit 53c8485a7a
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
7 changed files with 131 additions and 23 deletions

View File

@ -10,17 +10,50 @@ from kitty.conf.utils import (
) )
from kitty.constants import config_dir from kitty.constants import config_dir
from .options.types import Options as SSHOptions, defaults from .options.types import Options as SSHOptions, defaults, option_names
SYSTEM_CONF = '/etc/xdg/kitty/ssh.conf' SYSTEM_CONF = '/etc/xdg/kitty/ssh.conf'
defconf = os.path.join(config_dir, 'ssh.conf') defconf = os.path.join(config_dir, 'ssh.conf')
def options_for_host(hostname: str, per_host_opts: Dict[str, SSHOptions]) -> SSHOptions:
import fnmatch
matches = []
for pat, opts in per_host_opts.items():
if fnmatch.fnmatchcase(hostname, pat):
matches.append(opts)
if not matches:
return SSHOptions({})
base = matches[0]
rest = matches[1:]
if rest:
ans = SSHOptions(base._asdict())
for name in option_names:
for opts in rest:
val = getattr(opts, name)
if isinstance(val, dict):
getattr(ans, name).update(val)
else:
setattr(ans, name, val)
else:
ans = base
return ans
def load_config(*paths: str, overrides: Optional[Iterable[str]] = None) -> Dict[str, SSHOptions]: def load_config(*paths: str, overrides: Optional[Iterable[str]] = None) -> Dict[str, SSHOptions]:
from .options.parse import ( from .options.parse import (
create_result_dict, merge_result_dicts, parse_conf_item create_result_dict, merge_result_dicts, parse_conf_item
) )
from .options.utils import init_results_dict from .options.utils import get_per_hosts_dict, init_results_dict, first_seen_positions
def merge_dicts(base: Dict[str, Any], vals: Dict[str, Any]) -> Dict[str, Any]:
base_phd = get_per_hosts_dict(base)
vals_phd = get_per_hosts_dict(vals)
for hostname in base_phd:
vals_phd[hostname] = merge_result_dicts(base_phd[hostname], vals_phd.get(hostname, {}))
ans: Dict[str, Any] = vals_phd.pop(vals['hostname'])
ans['per_host_dicts'] = vals_phd
return ans
def parse_config(lines: Iterable[str]) -> Dict[str, Any]: def parse_config(lines: Iterable[str]) -> Dict[str, Any]:
ans: Dict[str, Any] = init_results_dict(create_result_dict()) ans: Dict[str, Any] = init_results_dict(create_result_dict())
@ -28,13 +61,18 @@ def load_config(*paths: str, overrides: Optional[Iterable[str]] = None) -> Dict[
return ans return ans
overrides = tuple(overrides) if overrides is not None else () overrides = tuple(overrides) if overrides is not None else ()
opts_dict, paths = _load_config(defaults, parse_config, merge_result_dicts, *paths, overrides=overrides) first_seen_positions.clear()
first_seen_positions['*'] = 0
opts_dict, paths = _load_config(
defaults, parse_config, merge_dicts, *paths, overrides=overrides, initialize_defaults=init_results_dict)
ans: Dict[str, SSHOptions] = {} ans: Dict[str, SSHOptions] = {}
for hostname, host_opts_dict in opts_dict['per_host_dicts'].items(): phd = get_per_hosts_dict(opts_dict)
opts = SSHOptions(host_opts_dict) for hostname in sorted(phd, key=first_seen_positions.__getitem__):
opts = SSHOptions(phd[hostname])
opts.config_paths = paths opts.config_paths = paths
opts.config_overrides = overrides opts.config_overrides = overrides
ans[hostname] = opts ans[hostname] = opts
first_seen_positions.clear()
return ans return ans

View File

@ -15,7 +15,7 @@ agr = definition.add_group
egr = definition.end_group egr = definition.end_group
opt = definition.add_option opt = definition.add_option
agr('global', 'Global') # {{{ agr('host', 'Host environment') # {{{
opt('hostname', '*', option_type='hostname', opt('hostname', '*', option_type='hostname',
long_text=''' long_text='''
@ -26,4 +26,23 @@ against is the hostname used by the remote computer, not the name you pass
to SSH to connect to it. to SSH to connect to it.
''' '''
) )
opt('+env', '',
option_type='env',
add_to_default=False,
long_text='''
Specify environment variables to set on the remote host. Note that
environment variables can refer to each other, so if you use::
env MYVAR1=a
env MYVAR2=$MYVAR1/$HOME/b
The value of MYVAR2 will be :code:`a/<path to home directory>/b`. Using
:code:`VAR=` will set it to the empty string and using just :code:`VAR`
will delete the variable from the child process' environment. The definitions
are processed alphabetically.
'''
)
egr() # }}} egr() # }}}

View File

@ -1,18 +1,23 @@
# generated by gen-config.py DO NOT edit # generated by gen-config.py DO NOT edit
import typing import typing
from kittens.ssh.options.utils import hostname from kittens.ssh.options.utils import env, hostname
from kitty.conf.utils import merge_dicts from kitty.conf.utils import merge_dicts
class Parser: class Parser:
def env(self, val: str, ans: typing.Dict[str, typing.Any]) -> None:
for k, v in env(val, ans["env"]):
ans["env"][k] = v
def hostname(self, val: str, ans: typing.Dict[str, typing.Any]) -> None: def hostname(self, val: str, ans: typing.Dict[str, typing.Any]) -> None:
hostname(val, ans) hostname(val, ans)
def create_result_dict() -> typing.Dict[str, typing.Any]: def create_result_dict() -> typing.Dict[str, typing.Any]:
return { return {
'env': {},
} }

View File

@ -4,11 +4,12 @@ import typing
option_names = ( # {{{ option_names = ( # {{{
'hostname',) # }}} 'env', 'hostname') # }}}
class Options: class Options:
hostname: str = '*' hostname: str = '*'
env: typing.Dict[str, str] = {}
config_paths: typing.Tuple[str, ...] = () config_paths: typing.Tuple[str, ...] = ()
config_overrides: typing.Tuple[str, ...] = () config_overrides: typing.Tuple[str, ...] = ()
@ -59,3 +60,4 @@ class Options:
defaults = Options() defaults = Options()
defaults.env = {}

View File

@ -1,30 +1,51 @@
#!/usr/bin/env python #!/usr/bin/env python
# License: GPLv3 Copyright: 2022, Kovid Goyal <kovid at kovidgoyal.net> # License: GPLv3 Copyright: 2022, Kovid Goyal <kovid at kovidgoyal.net>
from typing import Any, Dict, Optional from typing import Any, Dict, Optional, Iterable, Tuple
DELETE_ENV_VAR = '_delete_this_env_var_'
def env(val: str, current_val: Dict[str, str]) -> Iterable[Tuple[str, str]]:
val = val.strip()
if val:
if '=' in val:
key, v = val.split('=', 1)
key, v = key.strip(), v.strip()
if key:
yield key, v
else:
yield val, DELETE_ENV_VAR
def init_results_dict(ans: Dict[str, Any]) -> Dict[str, Any]: def init_results_dict(ans: Dict[str, Any]) -> Dict[str, Any]:
ans['current_hostname'] = '*' ans['hostname'] = '*'
ans['current_host_dict'] = chd = {'hostname': '*'} ans['per_host_dicts'] = {}
ans['per_host_dicts'] = {'*': chd}
return ans return ans
ignored_dict_keys = tuple(init_results_dict({})) def get_per_hosts_dict(results_dict: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
ans: Dict[str, Dict[str, Any]] = results_dict.get('per_host_dicts', {}).copy()
h = results_dict['hostname']
hd = {k: v for k, v in results_dict.items() if k != 'per_host_dicts'}
ans[h] = hd
return ans
first_seen_positions: Dict[str, int] = {}
def hostname(val: str, dict_with_parse_results: Optional[Dict[str, Any]] = None) -> str: def hostname(val: str, dict_with_parse_results: Optional[Dict[str, Any]] = None) -> str:
if dict_with_parse_results is not None: if dict_with_parse_results is not None:
ch = dict_with_parse_results['current_hostname'] ch = dict_with_parse_results['hostname']
if val != ch: if val != ch:
hd = dict_with_parse_results.copy() from .parse import create_result_dict
for k in ignored_dict_keys: phd = get_per_hosts_dict(dict_with_parse_results)
del hd[k]
phd = dict_with_parse_results['per_host_dicts']
phd[ch] = hd
dict_with_parse_results.clear() dict_with_parse_results.clear()
dict_with_parse_results.update(phd.pop(val, create_result_dict()))
dict_with_parse_results['per_host_dicts'] = phd dict_with_parse_results['per_host_dicts'] = phd
dict_with_parse_results['current_hostname'] = val dict_with_parse_results['hostname'] = val
dict_with_parse_results['current_host_dict'] = phd.setdefault(val, {'hostname': val}) if val not in first_seen_positions:
first_seen_positions[val] = len(first_seen_positions)
return val return val

View File

@ -253,9 +253,10 @@ def load_config(
parse_config: Callable[[Iterable[str]], Dict[str, Any]], parse_config: Callable[[Iterable[str]], Dict[str, Any]],
merge_configs: Callable[[Dict[str, Any], Dict[str, Any]], Dict[str, Any]], merge_configs: Callable[[Dict[str, Any], Dict[str, Any]], Dict[str, Any]],
*paths: str, *paths: str,
overrides: Optional[Iterable[str]] = None overrides: Optional[Iterable[str]] = None,
initialize_defaults: Callable[[Dict[str, Any]], Dict[str, Any]] = lambda x: x,
) -> Tuple[Dict[str, Any], Tuple[str, ...]]: ) -> Tuple[Dict[str, Any], Tuple[str, ...]]:
ans = defaults._asdict() ans = initialize_defaults(defaults._asdict())
found_paths = [] found_paths = []
for path in paths: for path in paths:
if not path: if not path:

View File

@ -7,6 +7,7 @@ import shlex
import shutil import shutil
import tempfile import tempfile
from kittens.ssh.config import load_config, options_for_host
from kittens.ssh.main import bootstrap_script, get_connection_data from kittens.ssh.main import bootstrap_script, get_connection_data
from kitty.constants import is_macos from kitty.constants import is_macos
from kitty.fast_data_types import CURSOR_BEAM from kitty.fast_data_types import CURSOR_BEAM
@ -43,6 +44,27 @@ print(' '.join(map(str, buf)))'''), lines=13, cols=77)
t('ssh un@ip -iident -p34', host='un@ip', port=34, identity_file='ident') t('ssh un@ip -iident -p34', host='un@ip', port=34, identity_file='ident')
t('ssh -p 33 main', port=33) t('ssh -p 33 main', port=33)
def test_ssh_config_parsing(self):
def parse(conf):
with tempfile.NamedTemporaryFile(suffix='test.conf') as cf:
cf.write(conf.encode('utf-8'))
cf.flush()
return load_config(cf.name)
def for_host(hostname, conf):
if isinstance(conf, str):
conf = parse(conf)
return options_for_host(hostname, conf)
self.ae(for_host('x', '').env, {})
self.ae(for_host('x', 'env a=b').env, {'a': 'b'})
pc = parse('env a=b\nhostname 2\nenv a=c\nenv b=b')
self.ae(set(pc.keys()), {'*', '2'})
self.ae(for_host('x', pc).env, {'a': 'b'})
self.ae(for_host('2', pc).env, {'a': 'c', 'b': 'b'})
self.ae(for_host('x', 'env a=').env, {'a': ''})
self.ae(for_host('x', 'env a').env, {'a': '_delete_this_env_var_'})
def test_ssh_bootstrap_script(self): def test_ssh_bootstrap_script(self):
# test handling of data in tty before tarfile is sent # test handling of data in tty before tarfile is sent
all_possible_sh = tuple(sh for sh in ('dash', 'zsh', 'bash', 'posh', 'sh') if shutil.which(sh)) all_possible_sh = tuple(sh for sh in ('dash', 'zsh', 'bash', 'posh', 'sh') if shutil.which(sh))