Creating a module¶
Contents
The following example module evaluates whether an event containing an integer value is between a minimum and a maximum. Depending on whether the value is higher or lower the event will be routed to the appropriate queue.
#!/usr/bin/env python
from wishbone.module import FlowModule
class HigherLower(FlowModule):
'''
**Checks whether an integer is higher or lower than the defined value.**
Checks whether an event value is higher, lower or equal to the defined baseline.
Depending on the outcome, the event will be submitted to the appropirate queue.
Parameters::
- base(int)(100)
| The value to compare against.
- value(int)(100)
| The value to compare.
Queues::
- inbox
| Incoming messages
- higher
| Events with a higher value than ``value`` are submitted to this
| queue.
- lower
| Events with a lower value than ``value`` are submitted to this
| queue.
- equal
| Events with an equal value to ``value`` are submitted to this
| queue.
'''
def __init__(self, actor_config, base=100, value=100):
FlowModule.__init__(self, actor_config)
self.pool.createQueue("inbox")
self.pool.createQueue("higher")
self.pool.createQueue("lower")
self.pool.createQueue("equal")
self.registerConsumer(self.consume, "inbox")
def consume(self, event):
if not isinstance(event.data, int):
raise TypeError("Event data is not type integer")
if event.kwargs.value > event.kwargs.base:
self.submit(event, self.pool.queue.higher)
elif event.kwargs.value < event.kwargs.base:
self.submit(event, self.pool.queue.lower)
else:
self.submit(event, self.pool.queue.equal)
Document the module¶
The docstring (line 6-29) contains the module’s description. It’s encouraged to document your module in a similar fashion. The content of the docstring can be accessed on CLI using the wishbone show command.
$ wishbone show --docs wishbone_contrib.module.flow.higherlower
Base the correct class¶
A module should base (line 5) one of the four four modules types.
Since this example module is applying logic of some sort to its incoming events to decide which queue to submit the event to without actually modifying its payload we choose type flow module.
The first parameter of a Wishbone module must always be actor_config
which on its turn is used to initialize the base class (line 32).
The actor_config
parameter is a wishbone.actorconfig.ActorConfig
instance which configures the module’s behavior within the Wishbone framework.
Creating queues¶
All the module’s queues are stored in wishbone.pool
which is an
instance of wishbone.queue.QueuePool
.
wishbone.pool
is created by basing the module base class.
Besides for the default _failed
and _success
queues, it’s left up to the
developer to make sure the necessary queues are created.
Creating queues is done by invoking the
wishbone.queue.QueuePool.createQueue
(line 43-46). In the case of
this specific flow module we will create inbox
, higher
,
lower
, equal
.
Registering a function¶
The modules incoming events are to its inbox
queue. We need to register a
function which takes care of processing the events in the inbox
queue.
Once we have registered such a function, Wishbone will take care of draining
the queue and applying the registered function
to all its events.
Registering such a function is done by applying
wishbone.actor.Actor.registerConsumer()
(line 47). The function should have
1 parameter accepting the wishbone.event.Event
instances.
Handling dynamic parameter values¶
This is an important topic. Somehow the module needs to know where in the event it can find the integer value to work with.
There are 2 different approaches to this:
Define the field as a parameter¶
In this case we make the field where to find the integer configurable. The module’s parameters could look like this:
def __init__(self, actor_config, base=100, field="data"):
The consume
function then could look something like this:
if event.get(self.kwargs.field) > event.kwargs.base:
self.submit(event, self.pool.queue.higher)
elif event.get(self.kwargs.field) < event.kwargs.base:
self.submit(event, self.pool.queue.lower)
else:
self.submit(event, self.pool.queue.equal)
Define the value as a template value¶
This is the technique we use in this example
We can also pass a template as a parameter which fetches the desired value
from the event. Each time an event enters a registered version, then Wishbone
stores a rendered version of self.kwargs
(the modules parameters) under
event.kwargs
using the content of the event itself.
So let’s say that incoming events have following wishbone.event.Event
format:
{
"bulk": false,
"cloned": false,
"data": 99,
"errors": {},
"tags": [],
"timestamp": 1515239271.0001013,
"tmp": {},
"ttl": 254,
"uuid": "1bb5301c-36d6-4a6e-b039-c310eb9a4d85",
"uuid_previous": []
}
We can have the bootstrap file initialize the module as such:
evaluate:
module: wishbone_contrib.module.flow.higherlower
arguments:
base: 50
value: '{{data}}'
Wishbone resolves the (Jinja2) template {{data}}
then into the desired value and store it under event.kwargs.value
.
Hence we can do:
if event.kwargs.value > event.kwargs.base:
Submitting an event to a queue¶
After processing the event it must be submitted to the relevant queue so it can be forwarded to the next module.
Submitting an event to a queue should be done by using wishbone.actor.Actor.submit()
(line 55, 57, 59).
Dealing with errors¶
If an exception
occurs inside the registered function then Wishbone will
automatically submit the event to the module’s default _failed
queue.
Therefor it is important to allow errors to raise. On the contrary, when the
event has been handled without exceptions then it is also submitted to the
modules _success
queue.
Taking advantage of this behavior is useful to setup error handling constructions.
Provide an entrypoint¶
Wishbone uses Python’s setuptools entrypoint definitions to load modules. These are defined in the module’s setup.py file.
Example:
entry_points={
'wishbone_contrib.module.flow': [
'higherlower = higherlower:HigherLower'
]
}
This entrypoint definition allows Wishbone to import the module using
wishbone_contrib.module.flow.higherlower
in the boostrap file.