#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# default.py
#
# Copyright 2018 Jelle Smet <development@smetj.net>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
#
from wishbone.actorconfig import ActorConfig
from wishbone.error import ModuleInitFailure, NoSuchModule
from wishbone.error import QueueConnected
from wishbone.componentmanager import ComponentManager
from gevent import event, sleep, spawn
from gevent import pywsgi
from .graphcontent import GRAPHCONTENT
from .graphcontent import VisJSData
from types import SimpleNamespace
from wishbone.utils import GetProtocolHandler
class ModulePool():
def __init__(self):
self.module = SimpleNamespace()
def list(self):
'''Returns a generator returning all module instances.'''
for m in list(self.module.__dict__.keys()):
yield self.module.__dict__[m]
def getModule(self, name):
'''Returns a module instance'''
try:
return getattr(self.module, name)
except AttributeError:
raise NoSuchModule("Could not find module %s" % name)
def hasModule(self, name):
'''
Checks whether the module pool has this module.
Args:
name (str): The name of the module instance.
Returns
bool: True if module exists False if not.
'''
return name in self.module.__dict__.keys()
[docs]class Default(object):
'''
The default Wishbone router.
A Wishbone router is responsible for organising the event flow between
modules.
Args:
config (EasyDict): The router setup configuration.
size (int): The size of all queues.
frequency (int)(1): The frequency at which metrics are produced.
identification (wishbone): A string identifying this instance in logging.
'''
def __init__(self, config=None, size=100, frequency=10, identification="wishbone", graph=False, graph_include_sys=False):
self.component_manager = ComponentManager()
self.config = config
self.size = size
self.frequency = frequency
self.identification = identification
self.graph = graph
self.graph_include_sys = graph_include_sys
self.module_pool = ModulePool()
self.__block = event.Event()
self.__block.clear()
self.__connections = {
}
[docs] def block(self):
'''Blocks until stop() is called and the shutdown process ended.'''
self.__block.wait()
[docs] def connectQueue(self, source, destination):
'''Connects one queue to the other.
For convenience, the syntax of the queues is <modulename>.<queuename>
For example:
stdout.inbox
This type of router actually replaces the destination queue with the source
queue.
Args:
source (str): The source queue in <module.queue_name> syntax
destination (str): The destination queue in <module.queue_name> syntax
'''
(source_module, source_queue) = source.split('.')
(destination_module, destination_queue) = destination.split('.')
if not self.module_pool.hasModule(source_module):
raise NoSuchModule("Module instance %s does not exist." % (source_module))
if not self.module_pool.hasModule(destination_module):
raise NoSuchModule("Module instance %s does not exist." % (destination_module))
result = self.__isConnectedTo(source)
if result is not None:
raise QueueConnected("Queue %s is already connected to %s." % (source, result))
result = self.__isConnectedTo(destination)
if result is not None:
raise QueueConnected("Queue %s is already connected to %s." % (destination, result))
self.__connections[source] = destination
source_module_instance = self.module_pool.getModule(source_module)
if not source_module_instance.pool.hasQueue(source_queue):
source_module_instance.pool.createSystemQueue(source_queue)
source_module_instance.logging.debug("Module instance '%s' has no queue '%s' so auto created." % (source_module, source_queue))
destination_module_instance = self.module_pool.getModule(destination_module)
if not destination_module_instance.pool.hasQueue(destination_queue):
destination_module_instance.pool.createSystemQueue(destination_queue)
destination_module_instance.logging.debug("Module instance '%s' has no queue '%s' so auto created." % (destination_module, destination_queue))
setattr(
destination_module_instance.pool.queue,
destination_queue,
source_module_instance.pool.getQueue(
source_queue
)
)
source_module_instance.pool.getQueue(source_queue).disableFallThrough()
source_module_instance.logging.debug("Connected queue %s to %s" % (source, destination))
[docs] def getChildren(self, module):
'''
Returns all the connected child modules
Args:
module (str): The name of the module.
Returns:
list: A list of module names.
'''
children = []
def travel(m):
for connection in self.__connections:
if connection.split('.')[0] == m:
child = self.__connections[connection].split('.')[0]
if child in children:
continue
else:
children.append(child)
travel(child)
travel(module)
return children
[docs] def registerModule(self, module, actor_config, arguments={}):
'''Initializes the wishbone module ``module``.
Args:
module (str): A Wishbone module component name.
actor_config (ActorConfig): The module's actor configuration
arguments (dict): The parameters to initialize the module.
'''
try:
m = self.component_manager.getComponentByName(module)
setattr(self.module_pool.module, actor_config.name, m(actor_config, **arguments))
except Exception as err:
raise ModuleInitFailure("Problem loading module '%s'. Reason: %s" % (actor_config.name, err))
[docs] def stop(self):
'''Stops all running modules.'''
for module in self.module_pool.list():
if module.name not in list(self.getChildren("_logs")) + ["_logs"] and not module.stopped:
module.stop()
while not self.__logsEmpty():
sleep(0.1)
self.__running = False
self.__block.set()
[docs] def start(self):
'''Starts all registered modules.'''
if self.config is not None:
self.__initConfig()
if self.graph:
self.graph = GraphWebserver(self.config, self.module_pool, self.__block, self.graph_include_sys)
self.graph.start()
for module in self.module_pool.list():
module.start()
def __initConfig(self):
'''Setup all modules and routes.'''
protocols = {}
for name, instance in list(self.config.protocols.items()):
protocols[name] = {"class": self.component_manager.getComponentByName(instance.protocol), "arguments": instance.arguments}
template_functions = {}
for name, instance in list(self.config.template_functions.items()):
template_functions[name] = self.component_manager.getComponentByName(instance.function)(**instance.arguments)
module_functions = {}
for name, instance in list(self.config.module_functions.items()):
module_functions[name] = self.component_manager.getComponentByName(instance.function)(**instance.arguments)
for name, instance in list(self.config.modules.items()):
mod_func = {}
for queue, queue_functions in list(instance.functions.items()):
mod_func[queue] = []
for queue_function in queue_functions:
if queue_function in module_functions:
mod_func[queue].append(module_functions[queue_function])
if instance.protocol is None:
protocol = None
elif instance.protocol not in protocols:
raise ModuleInitFailure("Protocol %s referenced but not available." % (instance.protocol))
else:
protocol = GetProtocolHandler(protocols[instance.protocol]["class"], protocols[instance.protocol]["arguments"]).getProtocol
actor_config = ActorConfig(
name=name,
size=self.size,
frequency=self.frequency,
template_functions=template_functions,
description=instance.description,
module_functions=mod_func,
identification=self.identification,
protocol=protocol,
io_event=instance.event
)
self.registerModule(
instance.module,
actor_config,
instance.arguments
)
self.__setupConnections()
def __isConnectedTo(self, queue):
'''
Returns the module.queue ``queue`` is connected to.
Args:
queue (str): The name of the queue in ``module.queue`` format.
Returns
str/None: The name of the queue which is connected.
'''
inverse = {v: k for k, v in self.__connections.items()}
if queue in self.__connections:
return self.__connections[queue]
elif queue in inverse:
return inverse[queue]
else:
return None
def __logsEmpty(self):
'''Checks each module whether any logs have stayed behind.'''
for module in self.module_pool.list():
if not module.pool.queue._logs.size() == 0:
return False
else:
return True
def __setupConnections(self):
'''Setup all connections as defined by configuration_manager'''
for route in self.config.routingtable:
self.connectQueue("%s.%s" % (route.source_module, route.source_queue), "%s.%s" % (route.destination_module, route.destination_queue))
class GraphWebserver():
def __init__(self, config, module_pool, block, include_sys):
self.config = config
self.module_pool = module_pool
self.block = block
self.include_sys = include_sys
self.js_data = VisJSData()
for c in self.config["routingtable"]:
if not self.include_sys and any([
c.source_module.startswith('_'),
c.destination_module.startswith('_'),
c.source_queue.startswith('_'),
c.destination_queue.startswith('_')]):
continue
else:
self.js_data.addModule(instance_name=c.source_module,
module_name=self.config["modules"][c.source_module]["module"],
description=self.module_pool.getModule(c.source_module).description)
self.js_data.addModule(instance_name=c.destination_module,
module_name=self.config["modules"][c.destination_module]["module"],
description=self.module_pool.getModule(c.destination_module).description)
self.js_data.addQueue(c.source_module, c.source_queue)
self.js_data.addQueue(c.destination_module, c.destination_queue)
self.js_data.addEdge("%s.%s" % (c.source_module, c.source_queue), "%s.%s" % (c.destination_module, c.destination_queue))
def start(self):
print("#####################################################")
print("# #")
print("# Caution: Started webserver on port 8088 #")
print("# #")
print("#####################################################")
spawn(self.setupWebserver)
def stop(self):
pass
def loop(self):
return self.__block
def application(self, env, start_response):
if env['PATH_INFO'] == '/':
start_response('200 OK', [('Content-Type', 'text/html')])
return[str.encode(GRAPHCONTENT % (self.js_data.dumpString()[0], self.js_data.dumpString()[1]))]
else:
start_response('404 Not Found', [('Content-Type', 'text/html')])
return [b'<h1>Not Found</h1>']
def setupWebserver(self):
pywsgi.WSGIServer(('', 8088), self.application, log=None, error_log=None).serve_forever()