Flow Modules

class wishbone.module.acknowledge.Acknowledge(actor_config, ack_id=None)[source]

Forwards or drops events by acknowleding values.

This module stores the value of field ack_id from each incoming event. Subsequent events with the same ack_id field value will be dropped until it is removed by having it acknowledged.

The ack_id can be acknowledged by sending the event to the acknowledge queue.

The ack_id field value should be an unique value.

Typically, downstream modules’s successful and/or failed queues are sending events to the acknowledge queue.

Parameters:

- ack_id (data):
    A unique value identifying the event.

Queues:

- inbox
    |  Incoming events

- outbox
    |  Outgoing events

- acknowledge
    |  Acknowledge events

- dropped
    |  Where events go to when unacknowledged

Event variables:

- tmp.<name>.ack_id
    | The location of the acknowledgement ID when coming in through the
    | inbox queue.
class wishbone.module.count.Count(actor_config, conditions={})[source]

Pass or drop events based on the number of times an event value occurs.

Events pass through or get dropped after a certain key/value has appeared for a number of times within an optional time window.

When the time window expires, the occurance counter for that field is reset.

Conditions have following format:

Example 1:

{
"data": {
    "value": "abc",
    "occurrence": 10,
    "window": 60
    "action": "pass"
    }
}

This means if an event of which the <data> field has value “abc” occurs 10 times within a time window of 60 seconds since the first occurance happened, the 10th event will pass through. The first 9 events will get dropped.

Example 2:

{
"tmp.address": {
    "value": "127.0.0.1",
    "occurence": 10,
    "window": 60,
    "action": "drop"
    }
}

This means that events with field <tmp.address> and value <127.0.0.1> can pass through 10 times after which the events get dropped within a time window of 60 seconds

A window of 0 seconds disables the time window expiration for a key.

Events which do not have the requested key can pass through.

Parameters:

- conditions(dict)({})
   |  The conditions which should be met.

- expire(int)(0)
   |  The time window in which all occurances should happen.
   |  A value of 0 disables the expiration.

Queues:

- inbox
   |  Incoming events

- outbox
   |  Outgoing events

- dropped
   |  Events not meeting the counter requirements.
class wishbone.module.fanout.Fanout(actor_config)[source]

Forward each incoming message to all connected queues.

Forward each incoming message to all connected queues.

Parameters:

n/a

Queues:

- inbox:
   |  Incoming messages
class wishbone.module.fresh.Fresh(actor_config, timeout_payload='timeout', recovery_payload='recovery', timeout=60, repeat_interval=60)[source]

Generates a new event unless an event came through in the last x time.

This module forwards events without modifying them. If an event has been forwarded it resets the timeout counter back to <timeout>. If the timeout counter reaches zero because no messages have passed through, an event with <timeout_payload> is generated and submitted to the module’s <timeout> queue. When the a timeout_payload has been sent and the event stream recovers, a new event with <recovery_payload> is generated and submitted to the <timeout> queue.

Parameters:

- timeout_payload(int/float/str/obj/list/...)("timeout")
   |  The data a timeout event contains.

- recovery_payload(int/float/str/obj/list/...)("recovery")
    |  The data a recovery event contains

- timeout(int)(60)
   |  The max time in seconds allowed to not to receive events.

- repeat_interval(int)(60)

   |  The interval time to resend the <payload> event in case
   |  <timeout> has expired and

Queues:

- inbox
   |  Incoming events.

- outbox
   |  Outgoing events.

- timeout
   |  timeout and recovery events.
class wishbone.module.funnel.Funnel(actor_config)[source]

Funnel multiple incoming queues to one outgoing queue.

Funnel multiple incoming queues to one outgoing queue.

Parameters:

n/a

Queues:

- outbox:
   |  Outgoing messages
class wishbone.module.queueselect.QueueSelect(actor_config, templates=[], log_matching=False)[source]

Submits message to the queue defined by a rendered template.

Renders a list of templates against the event. Events are submitted to the queue a rendered template returns.

Typically this module is used to route messages using Jinja2 conditionals and logic.

A rule rule looks like this:

{ "name": "name of the rule",
  "queue": "{{ 'queue_1,queue_one' if data.one == 1 else 'queue_2,queue_two' }}"
  "payload":
    "queue_1": {
        "detail_1": "some value",
        "detail_2": "some other value",
    },
    "queue_2": {
        "detail_1": "some value",
        "detail_2": "some other value",
    }
  }
}

The <file> queue expects events containing the absolute path of a YAML file to read (or delete). Typically this queue receives events from wishbone.module.input.inotify.

Events of type “IN_CREATE”, “IN_CLOSE_WRITE”, “IN_DELETE”, “WISHBONE_INIT” are processed all others are ignored.

Parameters:

- templates(list)([])*
   |  A list consisting out of template dicts as explained above.

- log_matching(bool)(False)
   |  Whether to produce debug log messages for matches.
   | Can be verbose hence it's configurable.

Queues:

- inbox
   |  Incoming events

- outbox
   |  Outgoing events

- nomatch
   |  Events not matching at least 1 rule.

- file
   |  Read rules from YAML file or delete them.
class wishbone.module.roundrobin.RoundRobin(actor_config, randomize=False)[source]

Round-robins incoming events to all connected queues.

Create a “1 to n” relationship between queues. Events arriving in inbox are then submitted in a roundrobin (or randomized) fashion to the connected queues. The outbox queue is non existent.

Parameters:

- randomize(bool)(False)
    |  Randomizes the queue selection instead of going round-robin
    |  over all queues.

Queues:

- inbox
   |  Incoming events
class wishbone.module.switch.Switch(actor_config, outgoing='outbox')[source]

Switch outgoing queues while forwarding events.

Forwards events to the desired outgoing queue based on the value of <outgoing>.

The value of <outgoing> can be dynamically set in 2 ways:

  • Using a template function.
  • By sending an event to the <switch> queue with the value of <outgoing> stored under data.

Parameters:

- outgoing(str)("outbox")*
    |  The name of the queue to submit incoming events to.

Queues:

- inbox
   |  incoming events

- switch
   |  incoming events to alter outgoing queue.

- outbox*
   |  outgoing events

- <connected_queue_1>
   |  outgoing events

- <connected_queue_n>
   |  outgoing events