Flow Modules¶
-
class
wishbone.module.acknowledge.
Acknowledge
(actor_config, ack_id=None)[source]¶ Forwards or drops events by acknowleding values.
This module stores the value of field
ack_id
from each incoming event. Subsequent events with the sameack_id
field value will be dropped until it is removed by having it acknowledged.The
ack_id
can be acknowledged by sending the event to theacknowledge
queue.The
ack_id
field value should be an unique value.Typically, downstream modules’s
successful
and/orfailed
queues are sending events to theacknowledge
queue.Parameters:
- ack_id (data): A unique value identifying the event.
Queues:
- inbox | Incoming events - outbox | Outgoing events - acknowledge | Acknowledge events - dropped | Where events go to when unacknowledged
Event variables:
- tmp.<name>.ack_id | The location of the acknowledgement ID when coming in through the | inbox queue.
-
class
wishbone.module.count.
Count
(actor_config, conditions={})[source]¶ Pass or drop events based on the number of times an event value occurs.
Events pass through or get dropped after a certain key/value has appeared for a number of times within an optional time window.
When the time window expires, the occurance counter for that field is reset.
Conditions have following format:
Example 1:
{ "data": { "value": "abc", "occurrence": 10, "window": 60 "action": "pass" } }
This means if an event of which the <data> field has value “abc” occurs 10 times within a time window of 60 seconds since the first occurance happened, the 10th event will pass through. The first 9 events will get dropped.
Example 2:
{ "tmp.address": { "value": "127.0.0.1", "occurence": 10, "window": 60, "action": "drop" } }
This means that events with field <tmp.address> and value <127.0.0.1> can pass through 10 times after which the events get dropped within a time window of 60 seconds
A window of 0 seconds disables the time window expiration for a key.
Events which do not have the requested key can pass through.
Parameters:
- conditions(dict)({}) | The conditions which should be met. - expire(int)(0) | The time window in which all occurances should happen. | A value of 0 disables the expiration.
Queues:
- inbox | Incoming events - outbox | Outgoing events - dropped | Events not meeting the counter requirements.
-
class
wishbone.module.fanout.
Fanout
(actor_config)[source]¶ Forward each incoming message to all connected queues.
Forward each incoming message to all connected queues.
Parameters:
n/a
Queues:
- inbox: | Incoming messages
-
class
wishbone.module.fresh.
Fresh
(actor_config, timeout_payload='timeout', recovery_payload='recovery', timeout=60, repeat_interval=60)[source]¶ Generates a new event unless an event came through in the last x time.
This module forwards events without modifying them. If an event has been forwarded it resets the timeout counter back to <timeout>. If the timeout counter reaches zero because no messages have passed through, an event with <timeout_payload> is generated and submitted to the module’s <timeout> queue. When the a timeout_payload has been sent and the event stream recovers, a new event with <recovery_payload> is generated and submitted to the <timeout> queue.
Parameters:
- timeout_payload(int/float/str/obj/list/...)("timeout") | The data a timeout event contains. - recovery_payload(int/float/str/obj/list/...)("recovery") | The data a recovery event contains - timeout(int)(60) | The max time in seconds allowed to not to receive events. - repeat_interval(int)(60) | The interval time to resend the <payload> event in case | <timeout> has expired and
Queues:
- inbox | Incoming events. - outbox | Outgoing events. - timeout | timeout and recovery events.
-
class
wishbone.module.funnel.
Funnel
(actor_config)[source]¶ Funnel multiple incoming queues to one outgoing queue.
Funnel multiple incoming queues to one outgoing queue.
Parameters:
n/a
Queues:
- outbox: | Outgoing messages
-
class
wishbone.module.queueselect.
QueueSelect
(actor_config, templates=[], log_matching=False)[source]¶ Submits message to the queue defined by a rendered template.
Renders a list of templates against the event. Events are submitted to the queue a rendered template returns.
Typically this module is used to route messages using Jinja2 conditionals and logic.
A rule rule looks like this:
{ "name": "name of the rule", "queue": "{{ 'queue_1,queue_one' if data.one == 1 else 'queue_2,queue_two' }}" "payload": "queue_1": { "detail_1": "some value", "detail_2": "some other value", }, "queue_2": { "detail_1": "some value", "detail_2": "some other value", } } }
The <file> queue expects events containing the absolute path of a YAML file to read (or delete). Typically this queue receives events from wishbone.module.input.inotify.
Events of type “IN_CREATE”, “IN_CLOSE_WRITE”, “IN_DELETE”, “WISHBONE_INIT” are processed all others are ignored.
Parameters:
- templates(list)([])* | A list consisting out of template dicts as explained above. - log_matching(bool)(False) | Whether to produce debug log messages for matches. | Can be verbose hence it's configurable.
Queues:
- inbox | Incoming events - outbox | Outgoing events - nomatch | Events not matching at least 1 rule. - file | Read rules from YAML file or delete them.
-
class
wishbone.module.roundrobin.
RoundRobin
(actor_config, randomize=False)[source]¶ Round-robins incoming events to all connected queues.
Create a “1 to n” relationship between queues. Events arriving in inbox are then submitted in a roundrobin (or randomized) fashion to the connected queues. The outbox queue is non existent.
Parameters:
- randomize(bool)(False) | Randomizes the queue selection instead of going round-robin | over all queues.
Queues:
- inbox | Incoming events
-
class
wishbone.module.switch.
Switch
(actor_config, outgoing='outbox')[source]¶ Switch outgoing queues while forwarding events.
Forwards events to the desired outgoing queue based on the value of <outgoing>.
The value of <outgoing> can be dynamically set in 2 ways:
- Using a template function.
- By sending an event to the <switch> queue with the value of <outgoing> stored under data.
Parameters:
- outgoing(str)("outbox")* | The name of the queue to submit incoming events to.
Queues:
- inbox | incoming events - switch | incoming events to alter outgoing queue. - outbox* | outgoing events - <connected_queue_1> | outgoing events - <connected_queue_n> | outgoing events