#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# __init__.py
#
# Copyright 2018 Jelle Smet <development@smetj.net>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
#
from wishbone.actor import Actor
from wishbone.moduletype import ModuleType
from wishbone.componentmanager import ComponentManager
from wishbone.error import ModuleInitFailure
from wishbone.event import extractBulkItemValues
from wishbone.event import Event as Wishbone_Event
from wishbone.protocol.encode.dummy import Dummy as DummyEncoder
from wishbone.error import TTLExpired
from wishbone.utils import GetProtocolHandler
from copy import deepcopy
from sys import exc_info
import traceback
from gevent.pool import Pool
[docs]class OutputModule(Actor):
MODULE_TYPE = ModuleType.OUTPUT
[docs] def setEncoder(self, name, *args, **kwargs):
'''
Sets the encoder with name <name> unless there's already an encoder
defined via ``actorconfig.ActorConfig``.
Args:
name (str): The name of the encoder to initialize
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
Bool: True if the encoder is set, False when an encoder was already
set via ``actorconfig.ActorConfig``
'''
if self.encode is None:
self.encode = ComponentManager().getComponentByName(name)(*args, **kwargs).handler
self.logging.debug("Encoder '%s' has been set.")
return True
else:
self.logging.debug("Encoder '%s' has not been set. The user already defined one." % (name))
return False
encode = DummyEncoder().handler
[docs] def getDataToSubmit(self, event):
'''
Derives the data to submit from ``event`` taking into account
``native_events``, ``payload`` and ``selection`` module parameters.
Args:
event (```wishbone.event.Event```): The event to extract data from.
Returns:
dict/str/...: The data to submit.
'''
if self.kwargs.native_events:
return event.getNative()
elif event.kwargs.payload is None:
if event.isBulk():
return "\n".join([str(item) for item in extractBulkItemValues(event, self.kwargs.selection)])
else:
return event.get(
event.kwargs.selection
)
else:
return event.kwargs.payload
def _consumer(self, function, queue):
'''
Greenthread which applies <function> to each element from <queue>.
However, this version overrides ``Actor._consumer`` as it executes
parallel coroutine versions of ``functions`` spawned on a
``gevent.pool.Pool`` instance. The number of parallel instances is
defined by the ``parallel_streams`` value.
Args:
function (``function``): The function which has been registered to consume ``queue``.
queue (str): The name of the queue from which events have to be
consumed and processed by ``function``.
Returns:
None
'''
self.parallel_pool = Pool(self.kwargs.parallel_streams)
self._run.wait()
self.logging.debug("Function '%s' has been registered to consume queue '%s'" % (function.__name__, queue))
def execFunction(function, event):
try:
function(deepcopy(event))
except Exception as err:
if self.config.disable_exception_handling:
raise
exc_type, exc_value, exc_traceback = exc_info()
info = (traceback.extract_tb(exc_traceback)[-1][1], str(exc_type), str(exc_value))
event.set(info, "errors.%s" % (self.name))
self.logging.error("%s" % (err))
self.submit(event, "_failed")
else:
self.submit(event, "_success")
finally:
# Unset the current event uuid to the logger object
self.logging.setCurrentEventID(None)
while self.loop():
event = self.pool.getQueue(queue).get()
if not event.has("tmp.%s" % (self.name)):
event.set({}, "tmp.%s" % (self.name))
# Render kwargs relative to the event's content and make these accessible under event.kwargs
event.renderKwargs(self.kwargs_template)
# Validate TTL
try:
event.decrementTTL()
except TTLExpired as err:
self.logging.warning("Event with UUID %s dropped. Reason: %s" % (event.get("uuid"), err))
continue
# Set the current event uuid to the logger object
self.logging.setCurrentEventID(event.get("uuid"))
# Apply all the defined queue functions to the event
event = self._applyFunctions(queue, event)
# Apply consumer function
self.parallel_pool.spawn(execFunction, function, event)
def _moduleInitSetup(self):
'''
Does module type specific setup.
'''
if not hasattr(self, "encode") and self.config.protocol is None:
self.logging.debug("This 'Output' module has no encoder method set. Setting dummy decoder.")
self.setEncoder("wishbone.protocol.encode.dummy")
if self.config.protocol is not None:
self.logging.debug("This 'Output' module has '%s' encoder configured." % (self.config.protocol))
self.encode = self.config.protocol()
def _moduleInitValidation(self):
'''
Validates whether we have all the parameters this module type expects
Args:
None
Returns:
None
Raises:
ModuleInitFailure: Raised when one of the components isn't correct.
'''
for param in ["payload", "selection", "native_events", "parallel_streams"]:
if param not in self.kwargs.keys():
raise ModuleInitFailure("An 'output' module should always have a '%s' parameter. This is a programming error." % (param))
class FlowModule(Actor):
MODULE_TYPE = ModuleType.FLOW
def _moduleInitSetup(self):
'''
Does module type specific setup.
'''
pass
def _moduleInitValidation(self):
'''
Validates whether we have all the parameters this module type expects
Args:
None
Returns:
None
Raises:
ModuleInitFailure: Raised when one of the components isn't correct.
'''
pass
[docs]class ProcessModule(Actor):
MODULE_TYPE = ModuleType.PROCESS
def _moduleInitSetup(self):
'''
Does module type specific setup.
'''
pass
def _moduleInitValidation(self):
'''
Validates whether we have all the parameters this module type expects
Args:
None
Returns:
None
Raises:
ModuleInitFailure: Raised when one of the components isn't correct.
'''
pass