Source code for wishbone.event

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#  event.py
#
#  Copyright 2018 Jelle Smet <development@smetj.net>
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
#  MA 02110-1301, USA.
#
#

import time
from wishbone.error import BulkFull, InvalidData, TTLExpired
from uuid import uuid4
from jinja2 import Template
from copy import deepcopy
from scalpl import Cut
from easydict import EasyDict


EVENT_RESERVED = ["timestamp", "data", "tmp", "errors", "uuid", "uuid_previous", "cloned", "bulk", "ttl", "tags"]


def extractBulkItemValues(event, selection):
    '''Yields a field from all events in the bulk event.

    Args:

        event (wishbone.event.Event): The wishbone event in bulk mode.
        selection (str): The field to extract form each event in the bulk.


    Yields:

        str/int/float/dict/list: The value of the event
    '''

    if event.isBulk():
        for e in event.data["data"]:
            yield Event().slurp(e).get(selection)


def extractBulkItems(event):
    '''Yields all the events from a bulk event.

    Args:

        event (wishbone.event.Event): The wishbone event in bulk mode.

    Yields:

        `wishbone.event.Event`: The value of the event
    '''

    if event.isBulk():
        for e in event.data["data"]:
            yield Event().slurp(e)


[docs]class Event(object): '''The Wishbone event object. A class object containing the event data being passed from one Wishbone module to the other. The keyformat used is the one handled by the ``Scalpl`` module (https://pypi.python.org/pypi/scalpl/). Args: data (dict/list/string/int/float): The data to assign to the ``data`` field. ttl (int): The TTL value for the event. bulk (bool): Initialize the event as a bulk event bulk_size (int): The number of events the bulk can hold. Attributes: data (dict): A dict containing the event data structure. bulk_size (int): The max allowed bulk size. ''' def __init__(self, data=None, ttl=254, bulk=False, bulk_size=100): self.data = Cut({ "cloned": False, "bulk": bulk, "data": None, "errors": { }, "tags": [], "timestamp": time.time(), "tmp": { }, "ttl": ttl, "uuid_previous": [ ], }) if bulk: self.data["data"] = [] else: self.set(data) self.data["uuid"] = str(uuid4()) self.bulk_size = bulk_size
[docs] def appendBulk(self, event): '''Appends an event to this bulk event. Args: event(wishbone.event.Event): The event to add to the bulk instance Raises: InvalidData: Either the event is not of type Bulk or ``event`` is not an wishbone.event.Event instance. ''' if self.data["bulk"]: if isinstance(event, Event): if len(self.data["data"]) == self.bulk_size: raise BulkFull("The bulk event already contains '%s' events." % (self.bulk_size)) self.data["data"].append(event.dump()) else: raise InvalidData("'event' should be of type wishbone.event.Event.") else: raise InvalidData("This instance is not initialized as a bulk event.")
[docs] def clone(self): '''Returns a cloned version of the event. Returns: class: A ````wishbone.event.Event```` instance ''' e = deepcopy(self) if "uuid_previous" in e.data: e.data["uuid_previous"].append( e.data["uuid"] ) else: e.data["uuid_previous"] = [ e.data["uuid"] ] e.data["uuid"] = str(uuid4()) e.data["timestamp"] = time.time() e.data["cloned"] = True return e
[docs] def copy(self, source, destination): '''Copies the source key to the destination key. Args: source (str): The name of the source key. destination (str): The name of the destination key. ''' self.set( deepcopy( self.get( source ) ), destination )
[docs] def decrementTTL(self): '''Decrements the TTL value. Raises: TTLExpired: When TTL has reached 0. ''' self.data["ttl"] -= 1 if self.data["ttl"] <= 0: raise TTLExpired("Event TTL expired in transit.")
[docs] def delete(self, key=None): '''Deletes a key. Args: key (str): The key to delete Raises: Exception: Deleting the root of a reserved keyword such as ``data`` or ``tags``. KeyError: When a non-existing key is referred to. ''' s = key.split('.') if s[0] in EVENT_RESERVED and len(s) == 1: raise Exception("Cannot delete root of reserved keyword '%s'." % (key)) del(self.data[key])
[docs] def dump(self): ''' Dumps the complete event. This complete event is also called a ``native event`` Returns: dict: The content of the event. ''' d = deepcopy(self) d.data["timestamp"] = float(d.data["timestamp"]) return d.data.data
[docs] def get(self, key="data"): '''Returns the value of ``key``. ``key`` must be in ``Scalpl`` format. Args: key (str): The name of the key to read. Returns: str/int/float/dict/list: The value of the key Raises: KeyError: The provided key does not exist. ''' if key in [None, "", "."]: return self.data else: try: return self.data[key] except Exception as err: raise KeyError(key)
getNative = dump
[docs] def has(self, key="data"): '''Returns a bool indicating the event has ``key`` ``key`` must be in ``Scalpl`` format. Args: key (str): The name of the key to check Returns: bool: True if the key is there otherwise false Raises: KeyError: The provided key does not exist ''' try: self.data[key] except KeyError: return False else: return True
[docs] def isBulk(self): '''Tells whether event is ``bulk`` or not. Returns: bool: True if the event is bulk ''' return self.data["bulk"]
[docs] def merge(self, value, key="data"): ''' Merges value into ``key``. Value types should be mergeable otherwise an error is returned. Args: value (dict,list): The value to merge key (str): The key to merge into Raises: InvalidData: Types are not mergeable ''' try: if isinstance(value, list): self.data[key] += value elif isinstance(value, dict): self.data[key].update(value) else: raise InvalidData("Source and destination are incompatible to merge") except Exception: raise InvalidData("Source and destination are incompatible to merge")
[docs] def render(self, template, env_template=None): '''Returns a formatted string using the provided template and key Args: template (str): A string representing the Jinja2 template. env_template (``jinja2.Environment``): Used to render template strings from. If not set, then template rendering happens without environment. Returns: str: The rendered string Raises: InvalidData: An invalid jinja2 template has been provided ''' try: if env_template is None: return Template(template).render(**self.data) else: return env_template.from_string(template).render(**self.data) except Exception as err: raise InvalidData("Failed to render template. Reason: %s" % (err))
[docs] def renderField(self, field_name, env_template=None): ''' Expects ``field_name`` to contain a template, renders it and replaces its content with the result. If the field ``field_name`` contains anything else than a string then it's silently ignored. Args: field_name (str): A string defining the field to handle. Returns: None Raises: InvalidData: An invalid jinja2 template has been provided ''' try: raw_value = self.get(field_name) rendered_value = self.__renderDataStructure(raw_value, env_template) self.set(rendered_value, field_name) except Exception as err: raise InvalidData("Failed to render template. Reason: %s" % (err))
[docs] def renderKwargs(self, template_kwargs): ''' Renders all the templates found in ``template_kwargs`` and sets self.kwarg, a version of the current module's kwargs relate to this events' content Args: template_kwargs (dict): A dict of the modules kwargs optoinally containing Template instances. ''' def recurse(data): if isinstance(data, Template): try: return data.render(**self.dump()) except Exception as err: return "#error: %s#" % (err) elif isinstance(data, dict): result = {} for key, value in data.items(): result[key] = recurse(value) return EasyDict(result) elif isinstance(data, list): result = [] for value in data: result.append(recurse(value)) return result else: return data self.kwargs = EasyDict( recurse( template_kwargs ) )
[docs] def set(self, value, key="data"): '''Sets the value of ``key``. ``key`` must be in ``Scalpl`` format. Args: value (str, int, float, dict, list): The value to assign. key (str): The key to store the value ''' self.data[key] = value
[docs] def slurp(self, data): ''' Create an event object from a ``native event`` dict exported by ``dump()`` The timestamp field will be reset to the time this method has been called. Args: data (dict): The dict object containing the complete event. Returns: wishbone.event.Event: A Wishbone event instance. Raises: InvalidData: ``data`` does not contain valid fields to build an event ''' try: assert isinstance(data, (dict, Cut)), "event.slurp() expects a dict." for item in [ ("timestamp", float), ("data", None), ("tmp", dict), ("errors", dict), ("uuid", str), ("uuid_previous", list), ("cloned", bool), ("bulk", bool), ("ttl", int), ("tags", list) ]: assert item[0] in data, "%s is missing" % (item[0]) if item[1] is not None: assert isinstance(data[item[0]], item[1]), "%s type '%s' is not valid." % (item[0], item[1]) except AssertionError as err: raise InvalidData("The incoming data could not be used to construct an event. Reason: '%s'." % err) else: self.data = Cut(data) self.data["timestamp"] = time.time() return(self)
raw = dump def __renderDataStructure(self, datastructure, env_template): def recurse(data): if isinstance(data, str): if '{{' in data and '}}'in data: try: if env_template is None: return Template(data).render(**self.dump()) else: return env_template.from_string(data).render(**self.dump()) except Exception as err: return "#error: %s#" % (err) else: return data elif isinstance(data, dict): result = {} for key, value in data.items(): result[key] = recurse(value) return EasyDict(result) elif isinstance(data, list): result = [] for value in data: result.append(recurse(value)) return result else: return data return recurse(datastructure)