Source code for wishbone.actor

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#  actor.py
#
#  Copyright 2018 Jelle Smet <development@smetj.net>
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
#  MA 02110-1301, USA.
#
#

from gevent.monkey import patch_all; patch_all()
from wishbone.queue import QueuePool
from wishbone.logging import Logging
from wishbone.event import Event as Wishbone_Event
from wishbone.error import ModuleInitFailure, InvalidModule, TTLExpired
from wishbone.actorconfig import ActorConfig
from wishbone.function.template import TemplateFunction
from wishbone.function.module import ModuleFunction

from collections import namedtuple
from gevent import spawn, kill
from gevent import sleep, socket
from gevent.event import Event
from wishbone.error import QueueFull
from time import time
from sys import exc_info
import traceback
import inspect
import jinja2
from copy import deepcopy

from easydict import EasyDict
from pkg_resources import get_distribution


Greenlets = namedtuple('Greenlets', "consumer generic log metric")


[docs]class Actor(object): """A base class providing core Actor functionality. The Actor base class is responsible for providing the base functionality, setup and helper functions of a Wishbone module. Args: config (wishbone.actorconfig.ActorConfig): The ActorConfig object instance. Attributes: config (wishbone.actorconfig.ActorConfig): The ActorConfig object instance. name (str): The name of the instance, derived from `config`. description (str): The description of the actor based instance, derived from `config`. pool (wishbone.pool.QueuePool): The Actor's queue pool. Methods: logging (wishbone.logging.Logging) """ def __init__(self, config): self.config = config self.name = config.name self.description = self.__getDescription(config) self.pool = QueuePool(config.size) self.logging = Logging( name=config.name, q=self.pool.queue._logs, identification=self.config.identification ) self.__loop = True self.greenlets = Greenlets([], [], [], []) self.greenlets.metric.append(spawn(self.__metricProducer)) self._run = Event() self._run.clear() self.stopped = True # Setup the Jinja2 environment to render kwargs templates. ########################################################## self.env_template = jinja2.Environment( undefined=jinja2.StrictUndefined, trim_blocks=True, loader=jinja2.FileSystemLoader('/') ) # Add the template functions to the template globals #################################################### for key, value in self.config.template_functions.items(): self.env_template.globals.update({key: value.get}) # Store a copy of the raw/unmodified kwargs ########################################## self.kwargs_raw = self.__getRawKwargs() # Store a copy of kwargs with all templates replaced by a template instance ############################################################################ self.kwargs_template = self.__getTemplateKwargs(self.env_template, self.kwargs_raw) # Store a copy of the rendered kwargs as an EasyDict instance ############################################################# self.kwargs = self.__renderTemplateKwargs(self.kwargs_template) # Do some sanity checks ####################### self.__sanityChecks() # Validate and setup module # Methods are come from based class ################################### self._moduleInitValidation() self._moduleInitSetup() self.version = self.__getVersion()
[docs] def generateEvent(self, data={}, destination=None): ''' Generates a new event. This function can get overridden by ``wishbone.module.InputModule._generateNativeEvent``. The provided ``data`` will be traversed in search of valid templates which then will be rendered. Args: data (``data``): The payload to add to the event. destination (None): The destination key to write the data to Returns: wishbone.event.Event: An event containing ``data`` as a payload. ''' if destination in [None, "data"]: event = Wishbone_Event(data) event.renderField(destination, self.env_template) else: event = Wishbone_Event() event.set(data, destination) event.renderField(destination, self.env_template) return event
[docs] def loop(self): '''The global lock for this module. Returns: bool: True when module is in running mode. False if not. ''' return self.__loop
[docs] def postHook(self): ''' Is executed when module exits. ''' self.logging.debug("Module has no postHook() method set.")
[docs] def preHook(self): ''' Is executed when module starts. Can be overriden by the user. ''' self.logging.debug("Module has no preHook() method set.")
[docs] def registerConsumer(self, function, queue): ''' Registers <function> to process all events in <queue> Don't not trap errors here. When <function> fails then the event will be submitted to the "failed" queue, If <function> succeeds to the success queue. Registering ``function`` to consume ``queue`` will also apply all the registered module functions against the events consumed from it. Args: function (``function``): The function which processes events queue (str): The name of the queue from which ``function`` will process the events. Returns: None ''' self.greenlets.consumer.append(spawn(self._consumer, function, queue))
[docs] def renderEventKwargs(self, event, queue=None): ''' Renders kwargs using the content of ``event`` and stores the result under ``event.kwargs``. Args: event (``wishbone.event.Event``): An Event instance queue (str): The queue name so ``RenderKwargs`` can store the results in the correct queue context. Returns: ``wishbone.event.Event``: The provided event instance. ''' event.kwargs = self.__renderKwargs.render( queue_context=queue, event_content=event.dump() ) return event
[docs] def renderKwargs(self): ''' Renders kwargs without making use of event content. This is typically used when initiliazing a module and render the defined kwargs which do not need a event data for rendering. Returns: None ''' self.kwargs = self.__renderKwargs.render()
[docs] def start(self): ''' Starts the module. Returns: None ''' self.__postHook() if hasattr(self, "preHook"): self.logging.debug("preHook() found, executing") self.preHook() self.__validateAppliedFunctions() self._run.set() self.logging.debug( "Started with max queue size of %s events and metrics interval of %s seconds." % ( self.config.size, self.config.frequency ) ) self.stopped = False if not self.name.startswith("_"): self.logging.debug("Started version %s" % (self.version))
[docs] def sendToBackground(self, function, *args, **kwargs): ''' Executes a function and sends it to the background. Such a function should never exit until ``self.loop`` returns ``False``. This `method` wraps ``function`` again in a loop as long ``self.loop`` returns ``False`` so that ``function`` is restarted and an error is logged. Args: function (``function``): The function which has to be executed. *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. ''' def wrapIntoLoop(): while self.loop(): try: function(*args, **kwargs) # We want to break out of the loop if we get here because # it's the intention of function() to exit without errors. # Normally, background tasks run indefinately but in this # case the user opted not to for some reason so we should # obey that. break except Exception as err: if self.config.disable_exception_handling: raise self.logging.error("Backgrounded function '%s' of module instance '%s' caused an error. This needs attention. Restarting it in 2 seconds. Reason: %s" % ( function.__name__, self.name, err) ) sleep(2) self.greenlets.generic.append(spawn(wrapIntoLoop))
[docs] def stop(self): ''' Makes ``self.loop`` return ``False`` and handles shutdown of of the registered background jobs. ''' self.logging.info("Received stop. Initiating shutdown.") self.__loop = False for background_job in self.greenlets.metric: kill(background_job) for background_job in self.greenlets.generic: kill(background_job) for background_job in self.greenlets.consumer: kill(background_job) if hasattr(self, "postHook"): self.logging.debug("postHook() found, executing") self.postHook() self.logging.debug("Exit.") self.stopped = True
[docs] def submit(self, event, queue): ''' Submits <event> to the queue with name <queue>. Args: event (wishbone.event.Event): An event instance. queue (str): The name of the queue Returns: None ''' while self.loop(): try: getattr(self.pool.queue, queue).put(event) break except AttributeError: self.logging.error("No such queue %s. Event with uuid %s dropped." % (queue, event.get('uuid'))) break except QueueFull: self.logging.warning("Queue '%s' is full and stalls the event pipeline. You should probably look into this." % (queue)) sleep(0.1)
def _applyFunctions(self, queue, event): ''' Executes and applies all registered module functions against the event. Args: queue (str): The name of the queue to which the function was registered. event (wishbone.event.Event): The Wishbone Returns: wisbone.event.Event: The modified version of ``event`` ''' if queue in self.config.module_functions: for f in self.config.module_functions[queue]: try: event = f.do(event) except Exception as err: if self.config.disable_exception_handling: raise self.logging.error("Function '%s' is skipped as it is causing an error. Reason: '%s'" % (f, err)) return event def _consumer(self, function, queue): ''' Greenthread which applies <function> to each element from <queue> Args: function (``function``): The function which has been registered to consume ``queue``. queue (str): The name of the queue from which events have to be consumed and processed by ``function``. Returns: None ''' self._run.wait() self.logging.debug("Function '%s' has been registered to consume queue '%s'" % (function.__name__, queue)) while self.loop(): event = self.pool.getQueue(queue).get() if not event.has("tmp.%s" % (self.name)): event.set({}, "tmp.%s" % (self.name)) # Render kwargs relative to the event's content and make these accessible under event.kwargs event.renderKwargs(self.kwargs_template) # Validate TTL try: event.decrementTTL() except TTLExpired as err: self.logging.warning("Event with UUID %s dropped. Reason: %s" % (event.get("uuid"), err)) continue # Set the current event uuid to the logger object self.logging.setCurrentEventID(event.get("uuid")) # Apply all the defined queue functions to the event event = self._applyFunctions(queue, event) # Apply consumer function try: function(event) except Exception as err: if self.config.disable_exception_handling: raise exc_type, exc_value, exc_traceback = exc_info() info = (traceback.extract_tb(exc_traceback)[-1][1], str(exc_type), str(exc_value)) event.set(info, "errors.%s" % (self.name)) self.logging.error("%s" % (err)) self.submit(event, "_failed") else: self.submit(event, "_success") finally: # Unset the current event uuid to the logger object self.logging.setCurrentEventID(None) def __getDescription(self, config): ''' Gets the module description. Args: config (``wishbone.actorconfig.ActorConfig``): An ActorConfig instance Returns: str: The description of this actor instance. ''' if config.description is None: return self.__doc__.strip().split('\n')[0].strip('*') else: return config.description def __getRawKwargs(self): ''' Get the class paramaters of the class basing this class. Returns (dict): A dict of the the raw kwargs ''' kwargs = {} for key, value in list(inspect.getouterframes(inspect.currentframe())[2][0].f_locals.items()): if key == "self" or isinstance(value, ActorConfig): next else: kwargs[key] = value return kwargs def __getTemplateKwargs(self, template_env, kwargs): ''' Recurses through ``kwargs`` and returns a version of it in which all strings are replaced by jinja2 template instances. Args: template_env (Jinja2.Environment instance): The Jinja2 environment instance to derive templates from. kwargs (dict): The dict of keyword/arguments. ''' def recurse(data): if isinstance(data, str): try: if len(list(template_env.parse(data).find_all(jinja2.nodes.Name))) > 0: t = template_env.from_string(data) return t else: return data except Exception as err: return data elif isinstance(data, dict): for key, value in data.items(): data[key] = recurse(value) return data elif isinstance(data, list): for index, value in enumerate(data): data[index] = recurse(value) return data else: return data return recurse(deepcopy(kwargs)) def __getVersion(self): try: return get_distribution(self.__module__.split('.')[0]).version except Exception as err: return "unknown" def __metricProducer(self): ''' A greenthread collecting the queue metrics at the defined interval. ''' self._run.wait() hostname = socket.gethostname() while self.loop(): for queue in self.pool.listQueues(names=True): for metric, value in list(self.pool.getQueue(queue).stats().items()): event = Wishbone_Event({ "time": time(), "type": "wishbone", "source": hostname, "name": "module.%s.queue.%s.%s" % (self.name, queue, metric), "value": value, "unit": "", "tags": () }) self.submit(event, "_metrics") sleep(self.config.frequency) def __postHook(self): ''' Is always executed when the module starts. ''' self.logging.debug("Following template functions are available: %s" % ", ".join( self.config.template_functions.keys() ) ) def __renderTemplateKwargs(self, kwargs): def recurse(data): if isinstance(data, jinja2.environment.Template): try: return data.render() except Exception as err: return "#error: %s#" % (err) # return self.template_strings[data] elif isinstance(data, dict): result = {} for key, value in data.items(): result[key] = recurse(value) return EasyDict(result) elif isinstance(data, list): result = [] for value in data: result.append(recurse(value)) return result else: return data rendered_kwargs = EasyDict( recurse( kwargs ) ) return rendered_kwargs def __validateAppliedFunctions(self): ''' A validation routine which checks whether functions have been applied to queues without a registered consumer. The effect of that would be that the functions are never applied which is not what the user wanted. ''' queues_w_registered_consumers = [t.args[1] for t in self.greenlets.consumer] for queue in self.config.module_functions.keys(): if queue not in queues_w_registered_consumers: raise ModuleInitFailure("Failed to initialize module '%s'. You have functions defined on queue '%s' which doesn't have a registered consumer." % (self.name, queue)) def __sanityChecks(self): ''' Does following validations: - Validate if all template functions base ``TemplateFunction`` - Validate if all module functions base ``ModuleFunction`` - Validate if the module has attribute "MODULE_TYPE" indicating it's not an pre 3.0 module. Args: None Returns: None Raises: ModuleInitFailure: Raised when one of the components isn't correct. ''' # Validate template functions for n, f in self.config.template_functions.items(): if not isinstance(f, TemplateFunction): raise ModuleInitFailure("Template function '%s' does not base TemplateFunction." % (n)) # Validate module functions for name, functions in self.config.module_functions.items(): for function in functions: if not isinstance(function, ModuleFunction): raise ModuleInitFailure("Module function '%s' does not base ModuleFunction." % (name)) if not hasattr(self, "MODULE_TYPE"): raise InvalidModule("Module instance '%s' seems to be of an incompatible old type." % (self.name))