#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# configfile.py
#
# Copyright 2018 Jelle Smet <development@smetj.net>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
#
import yaml
from easydict import EasyDict
from jsonschema import validate
from .schema import SCHEMA
LOG_FILTER_TEMPLATE = """
{{%- if data.level <= {loglevel} -%}}
pass
{{%- else -%}}
nomatch
{{%- endif -%}}"""
LOG_COLOR_TEMPLATE = """
{%- if data.level == 0 -%}
\x1B[0;35m
{%- elif data.level == 1 -%}
\x1B[1;35m
{%- elif data.level == 2 -%}
\x1B[0;31m
{%- elif data.level == 3 -%}
\x1B[1;31m
{%- elif data.level == 4 -%}
\x1B[1;33m
{%- elif data.level == 5 -%}
\x1B[1;30m
{%- elif data.level == 6 -%}
\x1B[1;37m
{%- else -%}
\x1B[1;37m
{%- endif -%}
{{strftime(epoch(), "YYYY-MM-DDTHH:mm:ss.SSSSZZ")}} {{data.identification}}[{{data.pid}}] {{data.txt_level}} {{data.module}}: {{data.message}}\x1B[0m"""
[docs]class ConfigFile(object):
"""
Generates a wishbone.router configuration object used to initialize a
wishbone router object.
This config generator has some tailered functionality which makes it
suitable when bootstrapping from CLI.
It does following automatic configurations:
- Initializes a ``wishbone.module.flow.funnel`` module called ``_logs``
which is connected to the ``_logs`` queue of all modules except if
this module has already been connected in the bootstrap file.
- Initializes a ``wishbone.module.flow.funnel`` module called ``_metrics``
which is connected to the ``_metrics`` queue of all modules except if
this module has already been connected in the bootstrap file. The
``_metrics`` modules is by default not connected to any other
modules. The effect of this is that all metrics are dropped unless
the user connects a module for furhter processing the metrics.
- Adds a ``wishbone.module.flow.queueselect`` module called
``_logs_filter`` responsible for dropping logs which log level do
not correspond to the define ``--log-level``
- Adds either a ``wishbone.module.output.stdout`` called
``_logs_stdout`` module or ``wishbone.module.output.syslog`` module
called ``_logs_syslog`` and connects this instance to
``_logs.outbox``.
- Initializes the following template functions and makes them
available to each initialized module::
- strftime()
- env()
- epoch()
- version()
Args:
filename (str): The filename of the configuration to load.
logstyle (str): How logging should be setup. Possible options are: stdout and syslog.
loglevel (str): The loglevel for the router to use to use.
identification (str): A string which identifies the router instance
colorize_stdout (bool): When True, colors each stdout printed logline using proper coloring.
"""
def __init__(
self,
filename,
logstyle,
loglevel=6,
identification="wishbone",
colorize_stdout=True,
):
self.identification = identification
self.colorize_stdout = colorize_stdout
self.logstyle = logstyle
self.loglevel = loglevel
self.config = EasyDict(
{
"template_functions": EasyDict({}),
"modules": EasyDict({}),
"module_functions": EasyDict({}),
"protocols": EasyDict({}),
"routingtable": [],
}
)
self.__addLogFunnel()
self.__addLogFilter()
self.__addMetricFunnel()
self.load(filename)
[docs] def addModule(
self,
name,
module,
arguments={},
description="",
functions={},
protocol=None,
event=False,
):
"""
Adds a module to the configuration.
Args:
name (str): The module instance name
module (str): The module name
arguments (dict): The module variables
description (str): A description of the module instance
functions (dict): The module functions
protocol (str): The protocol to apply to the module
event (bool): Whether incoming or outgoing events need to be treated as full events.
"""
if name.startswith("_"):
raise Exception("Module instance names cannot start with _.")
self.__addModule(
name, module, arguments, description, functions, protocol, event
)
[docs] def addTemplateFunction(self, name, function, arguments={}):
"""Adds a template funtion to the configuration.
Args:
name (str): The name of the function instance
function (str): The entry point name of the function
arguments (dict): The arguments to initiate the function class.
"""
if name not in self.config["template_functions"]:
self.config["template_functions"][name] = EasyDict(
{"function": function, "arguments": arguments}
)
else:
raise Exception("Lookup instance name '%s' is already taken." % (name))
[docs] def addModuleFunction(self, name, function, arguments={}):
"""
Adds a module function to the configuration.
Args:
name (str): The name of the function instance
function (str): The entry point name of the function
arguments (dict): The arguments to initiate the function class.
"""
self.config["module_functions"][name] = EasyDict(
{"function": function, "arguments": arguments}
)
[docs] def addProtocol(self, name, protocol, arguments={}):
"""
Adds a protocol to the configuration
Args:
name (str): The name of the protocol instance
protcol (str): The entry point name of the protocol
arguments (dict): The arguments to initiate the protocol class.
"""
self.config["protocols"][name] = EasyDict(
{"protocol": protocol, "arguments": arguments}
)
[docs] def addConnection(
self, source_module, source_queue, destination_module, destination_queue
):
"""
Adds connections between module queues.
Args:
source_module (str): The source module instance name
source_queue (str): The source module instance queue name
destination_module (str): The destination instance name
destination_queue (str): The destination instance queue name
"""
connected = self.__queueConnected(source_module, source_queue)
if not connected:
self.config["routingtable"].append(
EasyDict(
{
"source_module": source_module,
"source_queue": source_queue,
"destination_module": destination_module,
"destination_queue": destination_queue,
}
)
)
else:
raise Exception(
"Cannot connect '%s.%s' to '%s.%s'. Reason: %s."
% (
source_module,
source_queue,
destination_module,
destination_queue,
connected,
)
)
[docs] def dump(self):
"""
Dumps the configuration as an ``EasyDict`` instance.
"""
return EasyDict(self.config)
[docs] def load(self, filename):
"""
Loads a YAML bootstrap file.
Args:
filename (str): The filename to load.
"""
config = self.__load(filename)
self.__validate(config)
self.__validateRoutingTable(config)
self.__addDefaultTemplateFunctions()
if "template_functions" in config:
for function in config["template_functions"]:
self.addTemplateFunction(
name=function, **config["template_functions"][function]
)
if "module_functions" in config:
for function in config["module_functions"]:
self.addModuleFunction(
name=function, **config["module_functions"][function]
)
if "protocols" in config:
for protocol in config["protocols"]:
self.addProtocol(
name=protocol,
protocol=config["protocols"][protocol].get("protocol", None),
arguments=config["protocols"][protocol].get("arguments", {}),
)
for module in config["modules"]:
self.addModule(name=module, **config["modules"][module])
for route in config["routingtable"]:
sm, sq, dm, dq = self.__splitRoute(route)
self.addConnection(sm, sq, dm, dq)
getattr(self, "_setupLogging%s" % (self.logstyle.upper()))()
def __addDefaultTemplateFunctions(self):
"""
Adds template functions which should be available for each module.
"""
self.addTemplateFunction("strftime", "wishbone.function.template.strftime")
self.addTemplateFunction("epoch", "wishbone.function.template.epoch")
self.addTemplateFunction("env", "wishbone.function.template.environment")
self.addTemplateFunction("version", "wishbone.function.template.version")
def __addModule(
self,
name,
module,
arguments={},
description="",
functions={},
protocol=None,
event=False,
):
if protocol is not None and protocol not in self.config.protocols:
raise Exception(
"No protocol module defined with name '%s' for module instance '%s'"
% (protocol, name)
)
for queue, fs in functions.items():
for function in fs:
if function not in self.config.module_functions.keys():
raise Exception(
"No function defined with name '%s' for module instance '%s'."
% (function, name)
)
if name not in self.config["modules"]:
self.config["modules"][name] = EasyDict(
{
"description": description,
"module": module,
"arguments": arguments,
"functions": functions,
"protocol": protocol,
"event": event,
}
)
self.addConnection(name, "_logs", "_logs", "_%s" % (name))
self.addConnection(name, "_metrics", "_metrics", "_%s" % (name))
else:
raise Exception("Module instance name '%s' is already taken." % (name))
def __queueConnected(self, module, queue):
for c in self.config["routingtable"]:
if (c["source_module"] == module and c["source_queue"] == queue) or (
c["destination_module"] == module and c["destination_queue"] == queue
):
return "Queue '%s.%s' is already connected to '%s.%s'" % (
c["source_module"],
c["source_queue"],
c["destination_module"],
c["destination_queue"],
)
return False
def __splitRoute(self, route):
(source, destination) = route.split("->")
(source_module, source_queue) = source.rstrip().lstrip().split(".")
(destination_module, destination_queue) = (
destination.rstrip().lstrip().split(".")
)
return source_module, source_queue, destination_module, destination_queue
def __load(self, filename):
"""Loads and returns the yaml bootstrap file."""
try:
with open(filename, "r") as f:
config = yaml.load(f)
except Exception as err:
raise Exception("Failed to load bootstrap file. Reason: %s" % (err))
else:
return config
def __validate(self, config):
try:
validate(config, SCHEMA)
except Exception as err:
raise Exception(
"Failed to validate configuration file. Reason: %s" % (err.message)
)
def __validateRoutingTable(self, config):
for route in config["routingtable"]:
(left, right) = route.split("->")
assert "." in left.lstrip().rstrip(), (
'routingtable rule "%s" does not have the right format. Missing a dot.'
% (route)
)
assert "." in right.lstrip().rstrip(), (
'routingtable rule "%s" does not have the right format. Missing a dot.'
% (route)
)
def __addLogFilter(self):
self.__addModule(
name="_logs_filter",
module="wishbone.module.flow.queueselect",
arguments={
"templates": [
{
"name": "log_filter",
"queue": LOG_FILTER_TEMPLATE.format(loglevel=self.loglevel),
}
]
},
description="Centralizes the logs of all modules.",
functions={},
protocol=None,
)
def __addLogFunnel(self):
self.__addModule(
name="_logs",
module="wishbone.module.flow.funnel",
arguments={},
description="Centralizes the logs of all modules.",
functions={},
protocol=None,
)
def __addMetricFunnel(self):
self.__addModule(
name="_metrics",
module="wishbone.module.flow.funnel",
arguments={},
description="Centralizes the metrics of all modules.",
functions={},
protocol=None,
)
def _setupLoggingSTDOUT(self):
if not self.__queueConnected("_logs", "outbox"):
self.__addModule(
name="_logs_format",
module="wishbone.module.process.template",
arguments={"templates": {"human_log": LOG_COLOR_TEMPLATE}},
description="Create a human readable log format.",
functions={},
protocol=None,
)
self.addConnection("_logs", "outbox", "_logs_filter", "inbox")
self.addConnection("_logs_filter", "pass", "_logs_format", "inbox")
self.__addModule(
name="_logs_stdout",
module="wishbone.module.output.stdout",
arguments={"colorize": self.colorize_stdout, "selection": "human_log"},
description="Prints all incoming logs to STDOUT.",
functions={},
protocol=None,
)
self.addConnection("_logs_format", "outbox", "_logs_stdout", "inbox")
def _setupLoggingSYSLOG(self):
if not self.__queueConnected("_logs", "outbox"):
self.__addModule(
name="_logs_syslog",
module="wishbone.module.output.syslog",
arguments={
"ident": self.identification,
"payload": "module({{data.module}}): {{data.message}}",
},
description="Writes all incoming messages to syslog.",
functions={},
protocol=None,
)
self.addConnection("_logs", "outbox", "_logs_filter", "inbox")
self.addConnection("_logs_filter", "pass", "_logs_syslog", "inbox")