Source code for wishbone.config.configfile

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#  configfile.py
#
#  Copyright 2018 Jelle Smet <development@smetj.net>
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
#  MA 02110-1301, USA.
#
#

import yaml
from easydict import EasyDict
from jsonschema import validate
from .schema import SCHEMA

LOG_FILTER_TEMPLATE = """
    {{%- if data.level <= {loglevel} -%}}
        pass
    {{%- else -%}}
        nomatch
    {{%- endif -%}}"""

LOG_COLOR_TEMPLATE = """
    {%- if data.level == 0 -%}
        \x1B[0;35m
    {%- elif data.level == 1 -%}
        \x1B[1;35m
    {%- elif data.level == 2 -%}
        \x1B[0;31m
    {%- elif data.level == 3 -%}
        \x1B[1;31m
    {%- elif data.level == 4 -%}
        \x1B[1;33m
    {%- elif data.level == 5 -%}
        \x1B[1;30m
    {%- elif data.level == 6 -%}
        \x1B[1;37m
    {%- else -%}
        \x1B[1;37m
    {%- endif -%}
    {{strftime(epoch(), "YYYY-MM-DDTHH:mm:ss.SSSSZZ")}} {{data.identification}}[{{data.pid}}] {{data.txt_level}} {{data.module}}: {{data.message}}\x1B[0m"""


[docs]class ConfigFile(object): """ Generates a wishbone.router configuration object used to initialize a wishbone router object. This config generator has some tailered functionality which makes it suitable when bootstrapping from CLI. It does following automatic configurations: - Initializes a ``wishbone.module.flow.funnel`` module called ``_logs`` which is connected to the ``_logs`` queue of all modules except if this module has already been connected in the bootstrap file. - Initializes a ``wishbone.module.flow.funnel`` module called ``_metrics`` which is connected to the ``_metrics`` queue of all modules except if this module has already been connected in the bootstrap file. The ``_metrics`` modules is by default not connected to any other modules. The effect of this is that all metrics are dropped unless the user connects a module for furhter processing the metrics. - Adds a ``wishbone.module.flow.queueselect`` module called ``_logs_filter`` responsible for dropping logs which log level do not correspond to the define ``--log-level`` - Adds either a ``wishbone.module.output.stdout`` called ``_logs_stdout`` module or ``wishbone.module.output.syslog`` module called ``_logs_syslog`` and connects this instance to ``_logs.outbox``. - Initializes the following template functions and makes them available to each initialized module:: - strftime() - env() - epoch() - version() Args: filename (str): The filename of the configuration to load. logstyle (str): How logging should be setup. Possible options are: stdout and syslog. loglevel (str): The loglevel for the router to use to use. identification (str): A string which identifies the router instance colorize_stdout (bool): When True, colors each stdout printed logline using proper coloring. """ def __init__( self, filename, logstyle, loglevel=6, identification="wishbone", colorize_stdout=True, ): self.identification = identification self.colorize_stdout = colorize_stdout self.logstyle = logstyle self.loglevel = loglevel self.config = EasyDict( { "template_functions": EasyDict({}), "modules": EasyDict({}), "module_functions": EasyDict({}), "protocols": EasyDict({}), "routingtable": [], } ) self.__addLogFunnel() self.__addLogFilter() self.__addMetricFunnel() self.load(filename)
[docs] def addModule( self, name, module, arguments={}, description="", functions={}, protocol=None, event=False, ): """ Adds a module to the configuration. Args: name (str): The module instance name module (str): The module name arguments (dict): The module variables description (str): A description of the module instance functions (dict): The module functions protocol (str): The protocol to apply to the module event (bool): Whether incoming or outgoing events need to be treated as full events. """ if name.startswith("_"): raise Exception("Module instance names cannot start with _.") self.__addModule( name, module, arguments, description, functions, protocol, event )
[docs] def addTemplateFunction(self, name, function, arguments={}): """Adds a template funtion to the configuration. Args: name (str): The name of the function instance function (str): The entry point name of the function arguments (dict): The arguments to initiate the function class. """ if name not in self.config["template_functions"]: self.config["template_functions"][name] = EasyDict( {"function": function, "arguments": arguments} ) else: raise Exception("Lookup instance name '%s' is already taken." % (name))
[docs] def addModuleFunction(self, name, function, arguments={}): """ Adds a module function to the configuration. Args: name (str): The name of the function instance function (str): The entry point name of the function arguments (dict): The arguments to initiate the function class. """ self.config["module_functions"][name] = EasyDict( {"function": function, "arguments": arguments} )
[docs] def addProtocol(self, name, protocol, arguments={}): """ Adds a protocol to the configuration Args: name (str): The name of the protocol instance protcol (str): The entry point name of the protocol arguments (dict): The arguments to initiate the protocol class. """ self.config["protocols"][name] = EasyDict( {"protocol": protocol, "arguments": arguments} )
[docs] def addConnection( self, source_module, source_queue, destination_module, destination_queue ): """ Adds connections between module queues. Args: source_module (str): The source module instance name source_queue (str): The source module instance queue name destination_module (str): The destination instance name destination_queue (str): The destination instance queue name """ connected = self.__queueConnected(source_module, source_queue) if not connected: self.config["routingtable"].append( EasyDict( { "source_module": source_module, "source_queue": source_queue, "destination_module": destination_module, "destination_queue": destination_queue, } ) ) else: raise Exception( "Cannot connect '%s.%s' to '%s.%s'. Reason: %s." % ( source_module, source_queue, destination_module, destination_queue, connected, ) )
[docs] def dump(self): """ Dumps the configuration as an ``EasyDict`` instance. """ return EasyDict(self.config)
[docs] def load(self, filename): """ Loads a YAML bootstrap file. Args: filename (str): The filename to load. """ config = self.__load(filename) self.__validate(config) self.__validateRoutingTable(config) self.__addDefaultTemplateFunctions() if "template_functions" in config: for function in config["template_functions"]: self.addTemplateFunction( name=function, **config["template_functions"][function] ) if "module_functions" in config: for function in config["module_functions"]: self.addModuleFunction( name=function, **config["module_functions"][function] ) if "protocols" in config: for protocol in config["protocols"]: self.addProtocol( name=protocol, protocol=config["protocols"][protocol].get("protocol", None), arguments=config["protocols"][protocol].get("arguments", {}), ) for module in config["modules"]: self.addModule(name=module, **config["modules"][module]) for route in config["routingtable"]: sm, sq, dm, dq = self.__splitRoute(route) self.addConnection(sm, sq, dm, dq) getattr(self, "_setupLogging%s" % (self.logstyle.upper()))()
def __addDefaultTemplateFunctions(self): """ Adds template functions which should be available for each module. """ self.addTemplateFunction("strftime", "wishbone.function.template.strftime") self.addTemplateFunction("epoch", "wishbone.function.template.epoch") self.addTemplateFunction("env", "wishbone.function.template.environment") self.addTemplateFunction("version", "wishbone.function.template.version") def __addModule( self, name, module, arguments={}, description="", functions={}, protocol=None, event=False, ): if protocol is not None and protocol not in self.config.protocols: raise Exception( "No protocol module defined with name '%s' for module instance '%s'" % (protocol, name) ) for queue, fs in functions.items(): for function in fs: if function not in self.config.module_functions.keys(): raise Exception( "No function defined with name '%s' for module instance '%s'." % (function, name) ) if name not in self.config["modules"]: self.config["modules"][name] = EasyDict( { "description": description, "module": module, "arguments": arguments, "functions": functions, "protocol": protocol, "event": event, } ) self.addConnection(name, "_logs", "_logs", "_%s" % (name)) self.addConnection(name, "_metrics", "_metrics", "_%s" % (name)) else: raise Exception("Module instance name '%s' is already taken." % (name)) def __queueConnected(self, module, queue): for c in self.config["routingtable"]: if (c["source_module"] == module and c["source_queue"] == queue) or ( c["destination_module"] == module and c["destination_queue"] == queue ): return "Queue '%s.%s' is already connected to '%s.%s'" % ( c["source_module"], c["source_queue"], c["destination_module"], c["destination_queue"], ) return False def __splitRoute(self, route): (source, destination) = route.split("->") (source_module, source_queue) = source.rstrip().lstrip().split(".") (destination_module, destination_queue) = ( destination.rstrip().lstrip().split(".") ) return source_module, source_queue, destination_module, destination_queue def __load(self, filename): """Loads and returns the yaml bootstrap file.""" try: with open(filename, "r") as f: config = yaml.load(f) except Exception as err: raise Exception("Failed to load bootstrap file. Reason: %s" % (err)) else: return config def __validate(self, config): try: validate(config, SCHEMA) except Exception as err: raise Exception( "Failed to validate configuration file. Reason: %s" % (err.message) ) def __validateRoutingTable(self, config): for route in config["routingtable"]: (left, right) = route.split("->") assert "." in left.lstrip().rstrip(), ( 'routingtable rule "%s" does not have the right format. Missing a dot.' % (route) ) assert "." in right.lstrip().rstrip(), ( 'routingtable rule "%s" does not have the right format. Missing a dot.' % (route) ) def __addLogFilter(self): self.__addModule( name="_logs_filter", module="wishbone.module.flow.queueselect", arguments={ "templates": [ { "name": "log_filter", "queue": LOG_FILTER_TEMPLATE.format(loglevel=self.loglevel), } ] }, description="Centralizes the logs of all modules.", functions={}, protocol=None, ) def __addLogFunnel(self): self.__addModule( name="_logs", module="wishbone.module.flow.funnel", arguments={}, description="Centralizes the logs of all modules.", functions={}, protocol=None, ) def __addMetricFunnel(self): self.__addModule( name="_metrics", module="wishbone.module.flow.funnel", arguments={}, description="Centralizes the metrics of all modules.", functions={}, protocol=None, ) def _setupLoggingSTDOUT(self): if not self.__queueConnected("_logs", "outbox"): self.__addModule( name="_logs_format", module="wishbone.module.process.template", arguments={"templates": {"human_log": LOG_COLOR_TEMPLATE}}, description="Create a human readable log format.", functions={}, protocol=None, ) self.addConnection("_logs", "outbox", "_logs_filter", "inbox") self.addConnection("_logs_filter", "pass", "_logs_format", "inbox") self.__addModule( name="_logs_stdout", module="wishbone.module.output.stdout", arguments={"colorize": self.colorize_stdout, "selection": "human_log"}, description="Prints all incoming logs to STDOUT.", functions={}, protocol=None, ) self.addConnection("_logs_format", "outbox", "_logs_stdout", "inbox") def _setupLoggingSYSLOG(self): if not self.__queueConnected("_logs", "outbox"): self.__addModule( name="_logs_syslog", module="wishbone.module.output.syslog", arguments={ "ident": self.identification, "payload": "module({{data.module}}): {{data.message}}", }, description="Writes all incoming messages to syslog.", functions={}, protocol=None, ) self.addConnection("_logs", "outbox", "_logs_filter", "inbox") self.addConnection("_logs_filter", "pass", "_logs_syslog", "inbox")