Source code for wishbone.module

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#  __init__.py
#
#  Copyright 2018 Jelle Smet <development@smetj.net>
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
#  MA 02110-1301, USA.
#
#


from wishbone.actor import Actor
from wishbone.moduletype import ModuleType
from wishbone.componentmanager import ComponentManager
from wishbone.error import ModuleInitFailure
from wishbone.event import extractBulkItemValues
from wishbone.event import Event as Wishbone_Event
from wishbone.protocol.encode.dummy import Dummy as DummyEncoder
from wishbone.error import TTLExpired
from wishbone.utils import GetProtocolHandler
from copy import deepcopy
from sys import exc_info
import traceback
from gevent.pool import Pool


[docs]class InputModule(Actor): MODULE_TYPE = ModuleType.INPUT actorconfig_defined_decoder = False
[docs] def getDecoder(self): ''' Returns a new instance of the ``handler()`` method of the decoder set by ``self.setDecoder()``. Each concurrent incoming data stream should use its own instance of the decoder otherwise they end up overwriting each other's content. ''' raise Exception("This function should be overwritten by setDecoder().")
[docs] def setDecoder(self, name, *args, **kwargs): ''' Sets the decoder with name <name> unless there's already a decoder defined via ``actorconfig.ActorConfig``. Args: name (str): The entrypoint name of the decoder to initialize *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. ''' if not self.actorconfig_defined_decoder: class_ = ComponentManager().getComponentByName(name) self.getDecoder = GetProtocolHandler(class_, kwargs).getProtocol self.decode = self.getDecoder()
def _generateNativeEvent(self, data={}, destination=None): ''' Gets mapps to self.generateEvent for `input` type modules and if ``Actor.config.protocol_event`` is ``True``. Args: data (dict): The dict representation of ``wishbone.event.Event``. destination (str): The destination str to store ``data``. In this particular implementation it's ignored. Returns: wishbone.event.Event: ``Event`` instance of ``data`` ''' e = Wishbone_Event() e.slurp(data) return e def _moduleInitSetup(self): ''' Does module type specific setup. ''' self.setDecoder("wishbone.protocol.decode.dummy") if self.config.protocol is not None: self.actorconfig_defined_decoder = True self.logging.debug("This 'Input' module has '%s' decoder configured." % (self.config.protocol)) self.getDecoder = self.config.protocol self.decode = self.config.protocol() if self.kwargs.native_events: self.generateEvent = self._generateNativeEvent def _moduleInitValidation(self): ''' Validates whether we have all the parameters this module type expects Args: None Returns: None Raises: ModuleInitFailure: Raised when one of the components isn't correct. ''' for param in ["native_events", "destination"]: if param not in self.kwargs.keys(): raise ModuleInitFailure("An 'Input' module should always have a '%s' parameter. This is a programming error." % (param))
[docs]class OutputModule(Actor): MODULE_TYPE = ModuleType.OUTPUT
[docs] def setEncoder(self, name, *args, **kwargs): ''' Sets the encoder with name <name> unless there's already an encoder defined via ``actorconfig.ActorConfig``. Args: name (str): The name of the encoder to initialize *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. Returns: Bool: True if the encoder is set, False when an encoder was already set via ``actorconfig.ActorConfig`` ''' if self.encode is None: self.encode = ComponentManager().getComponentByName(name)(*args, **kwargs).handler self.logging.debug("Encoder '%s' has been set.") return True else: self.logging.debug("Encoder '%s' has not been set. The user already defined one." % (name)) return False
encode = DummyEncoder().handler
[docs] def getDataToSubmit(self, event): ''' Derives the data to submit from ``event`` taking into account ``native_events``, ``payload`` and ``selection`` module parameters. Args: event (```wishbone.event.Event```): The event to extract data from. Returns: dict/str/...: The data to submit. ''' if self.kwargs.native_events: return event.getNative() elif event.kwargs.payload is None: if event.isBulk(): return "\n".join([str(item) for item in extractBulkItemValues(event, self.kwargs.selection)]) else: return event.get( event.kwargs.selection ) else: return event.kwargs.payload
def _consumer(self, function, queue): ''' Greenthread which applies <function> to each element from <queue>. However, this version overrides ``Actor._consumer`` as it executes parallel coroutine versions of ``functions`` spawned on a ``gevent.pool.Pool`` instance. The number of parallel instances is defined by the ``parallel_streams`` value. Args: function (``function``): The function which has been registered to consume ``queue``. queue (str): The name of the queue from which events have to be consumed and processed by ``function``. Returns: None ''' self.parallel_pool = Pool(self.kwargs.parallel_streams) self._run.wait() self.logging.debug("Function '%s' has been registered to consume queue '%s'" % (function.__name__, queue)) def execFunction(function, event): try: function(deepcopy(event)) except Exception as err: if self.config.disable_exception_handling: raise exc_type, exc_value, exc_traceback = exc_info() info = (traceback.extract_tb(exc_traceback)[-1][1], str(exc_type), str(exc_value)) event.set(info, "errors.%s" % (self.name)) self.logging.error("%s" % (err)) self.submit(event, "_failed") else: self.submit(event, "_success") finally: # Unset the current event uuid to the logger object self.logging.setCurrentEventID(None) while self.loop(): event = self.pool.getQueue(queue).get() if not event.has("tmp.%s" % (self.name)): event.set({}, "tmp.%s" % (self.name)) # Render kwargs relative to the event's content and make these accessible under event.kwargs event.renderKwargs(self.kwargs_template) # Validate TTL try: event.decrementTTL() except TTLExpired as err: self.logging.warning("Event with UUID %s dropped. Reason: %s" % (event.get("uuid"), err)) continue # Set the current event uuid to the logger object self.logging.setCurrentEventID(event.get("uuid")) # Apply all the defined queue functions to the event event = self._applyFunctions(queue, event) # Apply consumer function self.parallel_pool.spawn(execFunction, function, event) def _moduleInitSetup(self): ''' Does module type specific setup. ''' if not hasattr(self, "encode") and self.config.protocol is None: self.logging.debug("This 'Output' module has no encoder method set. Setting dummy decoder.") self.setEncoder("wishbone.protocol.encode.dummy") if self.config.protocol is not None: self.logging.debug("This 'Output' module has '%s' encoder configured." % (self.config.protocol)) self.encode = self.config.protocol() def _moduleInitValidation(self): ''' Validates whether we have all the parameters this module type expects Args: None Returns: None Raises: ModuleInitFailure: Raised when one of the components isn't correct. ''' for param in ["payload", "selection", "native_events", "parallel_streams"]: if param not in self.kwargs.keys(): raise ModuleInitFailure("An 'output' module should always have a '%s' parameter. This is a programming error." % (param))
class FlowModule(Actor): MODULE_TYPE = ModuleType.FLOW def _moduleInitSetup(self): ''' Does module type specific setup. ''' pass def _moduleInitValidation(self): ''' Validates whether we have all the parameters this module type expects Args: None Returns: None Raises: ModuleInitFailure: Raised when one of the components isn't correct. ''' pass
[docs]class ProcessModule(Actor): MODULE_TYPE = ModuleType.PROCESS def _moduleInitSetup(self): ''' Does module type specific setup. ''' pass def _moduleInitValidation(self): ''' Validates whether we have all the parameters this module type expects Args: None Returns: None Raises: ModuleInitFailure: Raised when one of the components isn't correct. ''' pass