#!/usr/bin/env python
# matrixctl
# Copyright (c) 2020 Michael Sasser <Michael@MichaelSasser.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Read and parse the configuration file with this module."""
from __future__ import annotations
import logging
import os
import sys
import typing as t
from collections import ChainMap
from collections.abc import Iterable
from collections.abc import MutableMapping
from getpass import getuser
from pathlib import Path
from jinja2 import Template
from jinja2 import Undefined
from ruamel.yaml import YAML as RuamelYAML
from ruamel.yaml.error import YAMLError
from matrixctl import __version__
from matrixctl.errors import ConfigFileError
from matrixctl.structures import Config
from matrixctl.structures import ConfigServer
from matrixctl.structures import ConfigServerAPI
__author__: str = "Michael Sasser"
__email__: str = "Michael@MichaelSasser.org"
logger = logging.getLogger(__name__)
# Make sure the number of places of the source files line number does not
# change. Otherwise the debug output shifts.
[docs]def tree_printer(tree: t.Any, depth: int = 0) -> None:
"""Print the configuration file recursively.
Parameters
----------
tree : any
Initial a ``matrixctl.typehints.Config`` and partials of it
afterwards.
depth : int
The depth of the table
Returns
-------
None
"""
if not isinstance(tree, dict):
raise ConfigFileError(
"There is something wrong with your config file."
)
for key in tree:
if isinstance(tree[key], (str, int, float, bool)):
logger.debug(
"%s├─── %s: %s",
"│ " * depth,
key,
secrets_filter(tree, key),
)
elif isinstance(tree[key], (list, tuple)):
logger.debug(
"%s├─── %s: [%s]", "│ " * depth, key, ", ".join(tree[key])
)
else:
logger.debug("%s├─┬─ %s:", "│ " * depth, key)
tree_printer(tree[key], depth + 1)
logger.debug("%s┴", "│ " * depth)
[docs]def secrets_filter(tree: dict[str, str], key: str) -> t.Any:
"""Redact secrets when printing the configuration file.
Parameters
----------
tree : dict [str, str]
A patrial of ``tree`` from ``tree_printer``. (Can only be this type)
afterwards.
key : str
A ``dict`` key. (Can only be this type)
Returns
-------
None
"""
redact = {"token", "synapse_password"}
return (
f"<redacted length={len(tree[key])}>" if key in redact else tree[key]
)
[docs]class JinjaUndefined(Undefined): # type: ignore
"""Use this class as undefined argument in a Jinja2 Template.
The class replaces every undefined template with an enpty string.
"""
def __getattr__(self, _: str) -> t.Any:
"""Return en empty string."""
return ""
[docs]class YAML:
"""Use the YAML class to read and parse the configuration file(s)."""
DEFAULT_PATHS: list[Path] = [
Path("/etc/matrixctl/config"),
Path.home() / ".config/matrixctl/config",
]
JINJA_PREDEFINED: dict[str, str | int] = {
"home": str(Path.home()),
"user": getuser(),
"default_ssh_port": 22,
"default_api_concurrent_limit": 4,
}
__slots__ = ("__yaml", "server")
def __init__(
self, paths: Iterable[Path] | None = None, server: str | None = None
) -> None:
logger.debug("Loading Config file(s)")
self.server: str = server or "default"
self.__yaml: Config = self.get_server_config(
paths or self.get_paths_to_config(), self.server
)
if not self.__yaml: # dict is empty
logger.error(
"You need to create a configuration file for MatrixCtl. "
"Make sure to check out the docs: https://matrixctl.rtfd.io/en"
"/latest/getting_started/config_file.html"
)
# TODO: Remove the warning below before releasing v1.0.0.
if int(__version__[0]) < 1:
logger.error(
"Since MatixCtl v0.11.0 the configuration file uses the "
"yaml format. If you used MatrixCtl before, make sure to "
"update your config file to the yaml format."
)
logger.debug("Config loaded for Server: %s", self.server)
tree_printer(self.__yaml)
[docs] @staticmethod
def get_paths_to_config() -> tuple[Path, ...]:
"""Generate a tuple of path which may contain a configuration file.
.. Note::
This function preserves the order. The priority of the user
configuration in ``XDG_CONFIG_HOME`` is higher than the global
configuration in ``/etc/matrixctl/``. The priority of the
file extension ``yaml`` is greater than the priority of the file
extension ``yml``.
.. Warning::
The paths returned by this function might not exist.
Returns
-------
config_paths : tuple of pathlib.Path
A tuple of paths, which might contain a config file.
"""
env_config_home: str | None = os.environ.get("XDG_CONFIG_HOME")
paths: tuple[Path, ...] = (
Path("/etc/matrixctl/config.yml"),
Path("/etc/matrixctl/config.yaml"),
(
Path(env_config_home) / "matrixctl/config.yml"
if env_config_home is not None
else Path.home() / ".config/matrixctl/config.yml"
),
(
Path(env_config_home) / "matrixctl/config.yaml"
if env_config_home is not None
else Path.home() / ".config/matrixctl/config.yaml"
),
)
return tuple(sorted(paths, key=paths.index)) # unique, order preserved
[docs] @staticmethod
def read_from_file(yaml: RuamelYAML, path: Path) -> Config:
"""Read the config from a YAML file and render the Jinja2 tmplates.
.. Note::
- The Renderer does one pass. This means, you can only render
templated strings but not the templated string of another
templated string.
- If the file was empty or does not exist, an empty dict will be
returned.
Parameters
----------
yaml : ruamel.yaml.Yaml
The yaml object.
path : Path
The path where the config file is located.
Returns
-------
full_config : matrixctl.typehints.Config
The full (with server name) config file as dict.
"""
try:
# The user should be able to use any file and location
# skipcq: PTC-W6004
with open(path) as stream:
template: Template = Template(
stream.read(), undefined=JinjaUndefined
)
rendered = YAML.JINJA_PREDEFINED | yaml.load(template.render())
rendered["home"] = str(Path.home())
# Override default return type t.Any with Config
return t.cast(Config, yaml.load(template.render(rendered)))
except YAMLError:
logger.error(
(
"Please check your config file %s. MatrixCtl was "
"not able to read it."
),
str(path),
)
except FileNotFoundError:
logger.debug("The config file %s does not exist.", str(path))
except IsADirectoryError:
logger.error(
(
"The path to the configuration file you entered %s "
"seems to be a directory and not a "
"configuration file. Make sure the path is correct."
),
str(path),
)
return t.cast(Config, {})
[docs] @staticmethod
def apply_defaults(server: ConfigServer) -> ConfigServer:
"""Apply defaults to the configuration.
Parameters
----------
server : matrixctl.structures.ConfigServer
The configuration of a (home)server.
Returns
-------
server : matrixctl.structures.ConfigServer
The configuration of a (home)server with applied defaults.
"""
# Create api if it does not exist
try:
server["api"]["concurrent_limit"]
except KeyError:
server["api"] = t.cast(ConfigServerAPI, {})
# Create default for concurrent_limit
try:
server["api"]["concurrent_limit"]
except KeyError:
server["api"]["concurrent_limit"] = 4
return server
[docs] def get_server_config(
self,
paths: Iterable[Path],
server: str,
) -> Config:
"""Read and concentrate the config in one dict.
The ``servers: ...`` will be removed form the dict.
A new entry ``server`` will be created, which represents the selected
server.
Notes
-----
When all files were empty or don't exist, an empty dict will be
returned.
Parameters
----------
paths : Iterable of pathlib.Path
The paths to the configfiles.
server : str
The selected server. (Default: "default")
Returns
-------
server_config : matrixctl.typehints.Config
The config for the selected server.
"""
# RuamelYAML should not be part of the class.
yaml: RuamelYAML = RuamelYAML(typ="safe")
configs: t.Generator[Config, None, None] = (
YAML.read_from_file(yaml, path) for path in paths
)
try:
conf: Config = t.cast(
Config,
dict(
ChainMap(
*(
t.cast(MutableMapping[t.Any, t.Any], config)
for config in configs
if config
)
)
),
)
conf["server"] = self.apply_defaults(conf["servers"][server])
return conf
except KeyError:
logger.error(
"The server %s does not exist in your config file.", server
)
sys.exit(1)
except TypeError:
logger.error(
(
"The Path(s) to the configuration file you entered %s "
"seems to have syntax paroblems. Make sure you use the "
"correct YAML syntax."
),
paths,
)
sys.exit(1)
# TODO: doctest + fixture
[docs] def get(self, *keys: str) -> t.Any:
"""Get a value from a config entry safely.
**Usage**
Pass strings, describing the path in the ``self.__yaml`` dictionary.
Let's say, you are looking for the synapse path:
Examples
--------
.. code-block:: python
from matrixctl.handlers.yaml import YAML
yaml: YAML = YAML()
port: int = yaml.get("server", "ssh", "port")
print(port)
# Output: 22
Parameters
----------
*keys : str
A tuple of strings describing the values you are looking for.
Returns
-------
answer : any
The value of the entry you described.
"""
yaml_walker: t.Any = self.__yaml
try:
for key in keys:
yaml_walker = yaml_walker[key]
except KeyError:
tree: str = ".".join(keys[:-1]).replace(
"server", f"servers.{self.server}"
)
logger.error(
(
"Please check your config file. For this operation your "
"config file needs to have the entry %s in %s."
),
keys[-1],
tree,
)
sys.exit(1)
if not isinstance(yaml_walker, dict):
# print(yaml_walker)
return yaml_walker
# There is currently no scenario where a whole structure would be
# beneficial.
raise ConfigFileError(
"The key you have asked for seems to be incorrect. "
"Please make sure you ask for an single entry, "
"not a entire section."
)
def __repr__(self) -> str:
return repr(self.__yaml)
def __str__(self) -> str:
return str(self.__yaml)
# vim: set ft=python :