#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# configfile.py
#
# Copyright 2018 Jelle Smet <development@smetj.net>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
#
import yaml
from easydict import EasyDict
from jsonschema import validate
from .schema import SCHEMA
LOG_FILTER_TEMPLATE = '''
{{%- if data.level <= {loglevel} -%}}
pass
{{%- else -%}}
nomatch
{{%- endif -%}}'''
LOG_COLOR_TEMPLATE = '''
{%- if data.level == 0 -%}
\x1B[0;35m
{%- elif data.level == 1 -%}
\x1B[1;35m
{%- elif data.level == 2 -%}
\x1B[0;31m
{%- elif data.level == 3 -%}
\x1B[1;31m
{%- elif data.level == 4 -%}
\x1B[1;33m
{%- elif data.level == 5 -%}
\x1B[1;30m
{%- elif data.level == 6 -%}
\x1B[1;37m
{%- else -%}
\x1B[1;37m
{%- endif -%}
{{strftime(epoch(), "YYYY-MM-DDTHH:mm:ss.SSSSZZ")}} {{data.identification}}[{{data.pid}}] {{data.txt_level}} {{data.module}}: {{data.message}}\x1B[0m'''
[docs]class ConfigFile(object):
'''
Generates a wishbone.router configuration object used to initialize a
wishbone router object.
This config generator has some tailered functionality which makes it
suitable when bootstrapping from CLI.
It does following automatic configurations:
- Initializes a ``wishbone.module.flow.funnel`` module called ``_logs``
which is connected to the ``_logs`` queue of all modules except if
this module has already been connected in the bootstrap file.
- Initializes a ``wishbone.module.flow.funnel`` module called ``_metrics``
which is connected to the ``_metrics`` queue of all modules except if
this module has already been connected in the bootstrap file. The
``_metrics`` modules is by default not connected to any other
modules. The effect of this is that all metrics are dropped unless
the user connects a module for furhter processing the metrics.
- Adds a ``wishbone.module.flow.queueselect`` module called
``_logs_filter`` responsible for dropping logs which log level do
not correspond to the define ``--log-level``
- Adds either a ``wishbone.module.output.stdout`` called
``_logs_stdout`` module or ``wishbone.module.output.syslog`` module
called ``_logs_syslog`` and connects this instance to
``_logs.outbox``.
- Initializes the following template functions and makes them
available to each initialized module::
- strftime()
- env()
- epoch()
- version()
Args:
filename (str): The filename of the configuration to load.
logstyle (str): How logging should be setup. Possible options are: stdout and syslog.
loglevel (str): The loglevel for the router to use to use.
identification (str): A string which identifies the router instance
colorize_stdout (bool): When True, colors each stdout printed logline using proper coloring.
'''
def __init__(self, filename, logstyle, loglevel=6, identification="wishbone", colorize_stdout=True):
self.identification = identification
self.colorize_stdout = colorize_stdout
self.logstyle = logstyle
self.loglevel = loglevel
self.config = EasyDict({
"template_functions": EasyDict({}),
"modules": EasyDict({}),
"module_functions": EasyDict({}),
"protocols": EasyDict({}),
"routingtable": []
})
self.__addLogFunnel()
self.__addLogFilter()
self.__addMetricFunnel()
self.load(filename)
[docs] def addModule(self, name, module, arguments={}, description="", functions={}, protocol=None, event=False):
'''
Adds a module to the configuration.
Args:
name (str): The module instance name
module (str): The module name
arguments (dict): The module variables
description (str): A description of the module instance
functions (dict): The module functions
protocol (str): The protocol to apply to the module
event (bool): Whether incoming or outgoing events need to be treated as full events.
'''
if name.startswith('_'):
raise Exception("Module instance names cannot start with _.")
self.__addModule(name, module, arguments, description, functions, protocol, event)
[docs] def addTemplateFunction(self, name, function, arguments={}):
'''Adds a template funtion to the configuration.
Args:
name (str): The name of the function instance
function (str): The entry point name of the function
arguments (dict): The arguments to initiate the function class.
'''
if name not in self.config["template_functions"]:
self.config["template_functions"][name] = EasyDict({
"function": function,
"arguments": arguments
})
else:
raise Exception("Lookup instance name '%s' is already taken." % (name))
[docs] def addModuleFunction(self, name, function, arguments={}):
'''
Adds a module function to the configuration.
Args:
name (str): The name of the function instance
function (str): The entry point name of the function
arguments (dict): The arguments to initiate the function class.
'''
self.config["module_functions"][name] = EasyDict({"function": function, "arguments": arguments})
[docs] def addProtocol(self, name, protocol, arguments={}):
'''
Adds a protocol to the configuration
Args:
name (str): The name of the protocol instance
protcol (str): The entry point name of the protocol
arguments (dict): The arguments to initiate the protocol class.
'''
self.config["protocols"][name] = EasyDict({"protocol": protocol, "arguments": arguments})
[docs] def addConnection(self, source_module, source_queue, destination_module, destination_queue):
'''
Adds connections between module queues.
Args:
source_module (str): The source module instance name
source_queue (str): The source module instance queue name
destination_module (str): The destination instance name
destination_queue (str): The destination instance queue name
'''
connected = self.__queueConnected(source_module, source_queue)
if not connected:
self.config["routingtable"].append(
EasyDict({
"source_module": source_module,
"source_queue": source_queue,
"destination_module": destination_module,
"destination_queue": destination_queue
})
)
else:
raise Exception("Cannot connect '%s.%s' to '%s.%s'. Reason: %s." % (source_module, source_queue, destination_module, destination_queue, connected))
[docs] def dump(self):
'''
Dumps the configuration as an ``EasyDict`` instance.
'''
return EasyDict(self.config)
[docs] def load(self, filename):
'''
Loads a YAML bootstrap file.
Args:
filename (str): The filename to load.
'''
config = self.__load(filename)
self.__validate(config)
self.__validateRoutingTable(config)
self.__addDefaultTemplateFunctions()
if "template_functions" in config:
for function in config["template_functions"]:
self.addTemplateFunction(name=function, **config["template_functions"][function])
if "module_functions" in config:
for function in config["module_functions"]:
self.addModuleFunction(name=function, **config["module_functions"][function])
if "protocols" in config:
for protocol in config["protocols"]:
self.addProtocol(
name=protocol,
protocol=config["protocols"][protocol].get("protocol", None),
arguments=config["protocols"][protocol].get("arguments", {}),
)
for module in config["modules"]:
self.addModule(name=module, **config["modules"][module])
for route in config["routingtable"]:
sm, sq, dm, dq = self.__splitRoute(route)
self.addConnection(sm, sq, dm, dq)
getattr(self, "_setupLogging%s" % (self.logstyle.upper()))()
def __addDefaultTemplateFunctions(self):
'''
Adds template functions which should be available for each module.
'''
self.addTemplateFunction("strftime", "wishbone.function.template.strftime")
self.addTemplateFunction("epoch", "wishbone.function.template.epoch")
self.addTemplateFunction("env", "wishbone.function.template.environment")
self.addTemplateFunction("version", "wishbone.function.template.version")
def __addModule(self, name, module, arguments={}, description="", functions={}, protocol=None, event=False):
if protocol is not None and protocol not in self.config.protocols:
raise Exception("No protocol module defined with name '%s' for module instance '%s'" % (protocol, name))
for queue, fs in functions.items():
for function in fs:
if function not in self.config.module_functions.keys():
raise Exception("No function defined with name '%s' for module instance '%s'." % (function, name))
if name not in self.config["modules"]:
self.config["modules"][name] = EasyDict({
'description': description,
'module': module,
'arguments': arguments,
'functions': functions,
'protocol': protocol,
'event': event})
self.addConnection(name, "_logs", "_logs", "_%s" % (name))
self.addConnection(name, "_metrics", "_metrics", "_%s" % (name))
else:
raise Exception("Module instance name '%s' is already taken." % (name))
def __queueConnected(self, module, queue):
for c in self.config["routingtable"]:
if (c["source_module"] == module and c["source_queue"] == queue) or (c["destination_module"] == module and c["destination_queue"] == queue):
return "Queue '%s.%s' is already connected to '%s.%s'" % (c["source_module"], c["source_queue"], c["destination_module"], c["destination_queue"])
return False
def __splitRoute(self, route):
(source, destination) = route.split('->')
(source_module, source_queue) = source.rstrip().lstrip().split('.')
(destination_module, destination_queue) = destination.rstrip().lstrip().split('.')
return source_module, source_queue, destination_module, destination_queue
def __load(self, filename):
'''Loads and returns the yaml bootstrap file.'''
try:
with open(filename, 'r') as f:
config = yaml.load(f)
except Exception as err:
raise Exception("Failed to load bootstrap file. Reason: %s" % (err))
else:
return config
def __validate(self, config):
try:
validate(config, SCHEMA)
except Exception as err:
raise Exception("Failed to validate configuration file. Reason: %s" % (err.message))
def __validateRoutingTable(self, config):
for route in config["routingtable"]:
(left, right) = route.split("->")
assert "." in left.lstrip().rstrip(), "routingtable rule \"%s\" does not have the right format. Missing a dot." % (route)
assert "." in right.lstrip().rstrip(), "routingtable rule \"%s\" does not have the right format. Missing a dot." % (route)
def __addLogFilter(self):
self.__addModule(
name="_logs_filter",
module="wishbone.module.flow.queueselect",
arguments={
"templates": [
{"name": "log_filter",
"queue": LOG_FILTER_TEMPLATE.format(loglevel=self.loglevel)
}
]
},
description="Centralizes the logs of all modules.",
functions={
},
protocol=None
)
def __addLogFunnel(self):
self.__addModule(
name="_logs",
module="wishbone.module.flow.funnel",
arguments={
},
description="Centralizes the logs of all modules.",
functions={
},
protocol=None
)
def __addMetricFunnel(self):
self.__addModule(
name="_metrics",
module="wishbone.module.flow.funnel",
arguments={
},
description="Centralizes the metrics of all modules.",
functions={
},
protocol=None
)
def _setupLoggingSTDOUT(self):
if not self.__queueConnected("_logs", "outbox"):
self.__addModule(
name="_logs_format",
module="wishbone.module.process.template",
arguments={
"templates": {
"human_log": LOG_COLOR_TEMPLATE
}
},
description="Create a human readable log format.",
functions={
},
protocol=None
)
self.addConnection("_logs", "outbox", "_logs_filter", "inbox")
self.addConnection("_logs_filter", "pass", "_logs_format", "inbox")
self.__addModule(
name="_logs_stdout",
module="wishbone.module.output.stdout",
arguments={
"colorize": self.colorize_stdout,
"selection": "human_log"
},
description="Prints all incoming logs to STDOUT.",
functions={
},
protocol=None
)
self.addConnection("_logs_format", "outbox", "_logs_stdout", "inbox")
def _setupLoggingSYSLOG(self):
if not self.__queueConnected("_logs", "outbox"):
self.__addModule(
name="_logs_syslog",
module="wishbone.module.output.syslog",
arguments={
"ident": self.identification,
"payload": "module({{data.module}}): {{data.message}}"
},
description="Writes all incoming messages to syslog.",
functions={
},
protocol=None
)
self.addConnection("_logs", "outbox", "_logs_filter", "inbox")
self.addConnection("_logs_filter", "pass", "_logs_syslog", "inbox")