forked from s3lph/matemat
161 lines
6.3 KiB
Python
161 lines
6.3 KiB
Python
|
|
from typing import Any, Dict, Iterable, List, Tuple, Union
|
|
|
|
import os
|
|
import sys
|
|
import logging
|
|
import importlib
|
|
from configparser import ConfigParser
|
|
|
|
config: Dict[str, Any] = dict()
|
|
stock_provider = None
|
|
dispenser = None
|
|
|
|
|
|
def parse_logging(symbolic_level: str, symbolic_target: str) -> Tuple[int, logging.Handler]:
|
|
"""
|
|
Parse the LogLevel and LogTarget from configuration.
|
|
The LogLevel value may be either an integer or a log level literal defined in the Python builtin logging module.
|
|
The LogTarget value may be one of the following:
|
|
- `stderr` to log to error output (default)
|
|
- `stdout` to log to standard output
|
|
- `none` to disable logging
|
|
- Any other value is interpreted as a filename
|
|
|
|
:param symbolic_level: The value for the LogLevel key.
|
|
:param symbolic_target: The value for the LogTarget key.
|
|
:return: A tuple of the log level (as an int) and the logging.Handler derived from the log target string.
|
|
"""
|
|
try:
|
|
# Attempt to cast the log level into an int
|
|
level: int = int(symbolic_level)
|
|
except ValueError:
|
|
try:
|
|
# If this fails, look up the name in the logging class
|
|
level = int(logging.getLevelName(symbolic_level))
|
|
except ValueError:
|
|
# LogLevel value was neither an int nor a known literal
|
|
raise ValueError(f'Unknown log level: {symbolic_level}')
|
|
# Special handling of the strings "stderr", "stdout" and "none"
|
|
if symbolic_target == 'stderr':
|
|
target: logging.Handler = logging.StreamHandler(sys.stderr)
|
|
elif symbolic_target == 'stdout':
|
|
target = logging.StreamHandler(sys.stdout)
|
|
elif symbolic_target == 'none':
|
|
target = logging.NullHandler()
|
|
else:
|
|
# Fallback: Interpret string as filename
|
|
target = logging.FileHandler(os.path.abspath(os.path.expanduser(symbolic_target)))
|
|
# Return log level and log target
|
|
return level, target
|
|
|
|
|
|
def parse_config_file(paths: Union[str, Iterable[str]]) -> None:
|
|
"""
|
|
Parse the configuration file at the given path.
|
|
|
|
:param paths: The config file(s) to parse.
|
|
:return: A dictionary containing the parsed configuration.
|
|
"""
|
|
global config
|
|
# Set up default values
|
|
|
|
# Address to listen on
|
|
config['listen'] = '::'
|
|
# TCP port to listen on
|
|
config['port'] = 80
|
|
# Root directory of statically served content
|
|
config['staticroot'] = '/var/matemat/static'
|
|
# Root directory of Jinja2 templates
|
|
config['templateroot'] = '/var/matemat/templates'
|
|
# Root directory of themes
|
|
config['themeroot'] = '/var/matemat/themes'
|
|
# Active theme - "base" is the default theme that does not override anything
|
|
config['theme'] = 'base'
|
|
# Log level
|
|
config['log_level'] = logging.INFO
|
|
# Log target: An IO stream (stderr, stdout, ...) or a filename
|
|
config['log_handler'] = logging.StreamHandler()
|
|
# Variables passed to pagelets
|
|
config['pagelet_variables'] = dict()
|
|
# Statically configured headers
|
|
config['headers'] = dict()
|
|
# Pluggable interface config
|
|
config['interfaces'] = dict()
|
|
|
|
# Initialize the config parser
|
|
parser: ConfigParser = ConfigParser()
|
|
# Replace the original option transformation by a string constructor to preserve the case of config keys
|
|
parser.optionxform = str
|
|
# Normalize the input argument (turn a scalar into a list and expand ~ in paths)
|
|
files: List[str] = list()
|
|
if isinstance(paths, str):
|
|
files.append(os.path.expanduser(paths))
|
|
else:
|
|
for path in paths:
|
|
if not isinstance(path, str):
|
|
raise TypeError(f'Not a string: {path}')
|
|
files.append(os.path.expanduser(path))
|
|
# Read the configuration files
|
|
parser.read(files, 'utf-8')
|
|
|
|
# Read values from the [Matemat] section, if present, falling back to default values
|
|
if 'Matemat' in parser.sections():
|
|
config['listen'] = parser['Matemat'].get('Address', config['listen'])
|
|
config['port'] = int(parser['Matemat'].get('Port', config['port']))
|
|
config['staticroot'] = parser['Matemat'].get('StaticPath', os.path.expanduser(config['staticroot']))
|
|
config['log_level'], config['log_handler'] =\
|
|
parse_logging(parser['Matemat'].get('LogLevel', config['log_level']),
|
|
parser['Matemat'].get('LogTarget', 'stderr'))
|
|
config['templateroot'] = parser['Matemat'].get('TemplatePath', os.path.expanduser(config['templateroot']))
|
|
config['themeroot'] = parser['Matemat'].get('ThemePath', os.path.expanduser(config['themeroot']))
|
|
config['theme'] = parser['Matemat'].get('Theme', config['theme'])
|
|
|
|
# Read all values from the [Pagelets] section, if present. These values are passed to pagelet functions
|
|
if 'Pagelets' in parser.sections():
|
|
for k, v in parser['Pagelets'].items():
|
|
config['pagelet_variables'][k] = v
|
|
|
|
# Read all values from the [HttpHeaders] section, if present. These values are set as HTTP response headers
|
|
if 'HttpHeaders' in parser.sections():
|
|
for k, v in parser['HttpHeaders'].items():
|
|
config['headers'][k] = v
|
|
|
|
# Read all values from the [Providers] section, if present. These values are set as HTTP response headers
|
|
if 'Interfaces' in parser.sections():
|
|
for k, v in parser['Interfaces'].items():
|
|
config['interfaces'][k] = v
|
|
|
|
|
|
def get_config() -> Dict[str, Any]:
|
|
global config
|
|
return config
|
|
|
|
|
|
def get_app_config() -> Dict[str, Any]:
|
|
global config
|
|
return config['pagelet_variables']
|
|
|
|
|
|
def get_stock_provider() -> 'StockProvider':
|
|
global config, stock_provider
|
|
if stock_provider is None:
|
|
providers = config.get('interfaces', {})
|
|
fqcn = providers.get('StockProvider', 'matemat.interfacing.stock.DatabaseStockProvider')
|
|
modname, clsname = fqcn.rsplit('.', 1)
|
|
module = importlib.import_module(modname)
|
|
cls = getattr(module, clsname)
|
|
stock_provider = cls()
|
|
return stock_provider
|
|
|
|
|
|
def get_dispenser() -> 'Dispenser':
|
|
global config, dispenser
|
|
if dispenser is None:
|
|
providers = config.get('interfaces', {})
|
|
fqcn = providers.get('Dispenser', 'matemat.interfacing.dispenser.NullDispenser')
|
|
modname, clsname = fqcn.rsplit('.', 1)
|
|
module = importlib.import_module(modname)
|
|
cls = getattr(module, clsname)
|
|
dispenser = cls()
|
|
return dispenser
|