Builtin Modules

Input modules

wishbone.input.cron

class wishbone.module.cron.Cron(actor_config, cron='*/10 * * * *', payload='wishbone', field='@data')[source]

Generates an event at the defined time

Generates an event with the defined payload at the chosen time. Time is in crontab format.

Parameters:
  • cron (-) –
    The cron expression.
  • payload (-) –
    The content of <field>.
  • field (-) –
    The location to write <payload> to.

Queues:

  • outbox
    Outgoing messges

wishbone.input.dictgenerator

class wishbone.module.dictgenerator.DictGenerator(actor_config, keys=[], randomize_keys=True, num_values=False, num_values_min=0, num_values_max=1, min_elements=1, max_elements=1, interval=1)[source]

Generates random dictionaries.

This module allows you to generate an stream of dictionaries.

Parameters:
  • keys (-) –
    If provided, documents are created using the provided
    keys to which random values will be assigned.
  • randomize_keys (-) –
    Randomizes the keys. Otherwise keys are sequential
    numbers.
  • num_values (-) –
    If true values will be numeric and randomized.
  • num_values_max (-) –
    The maximum of a value when they are numeric.
  • min_elements (-) –
    The minimum number of elements per dictionary.
  • max_elements (-) –
    The maximum number of elements per dictionary.
  • interval (-) –
    The time in seconds to sleep between each message.

Queues:

  • outbox
    Outgoing messges

wishbone.input.testevent

class wishbone.module.testevent.TestEvent(actor_config, interval=1, message='test', numbered=False, additional_values={})[source]

Generates a test event at the chosen interval.

The data field of the test event contains the string “test”.

Parameters:
  • interval (-) –
    The interval in seconds between each generated event.
    A value of 0 means as fast as possible.
  • message (-) –
    The content of the test message.
  • numbered (-) –
    When true, appends a sequential number to the end.
  • additional_values (-) –
    A dictionary of key/value to add to the event.

Queues:

  • outbox
    Contains the generated events.

Output modules

wishbone.output.null

class wishbone.module.null.Null(actor_config)[source]

Purges incoming events.

Purges incoming events.

Parameters:

n/a

Queues:

  • inbox
    incoming events

wishbone.output.stdout

class wishbone.module.stdout.STDOUT(actor_config, selection='@data', counter=False, prefix='', pid=False, foreground_color='WHITE', background_color='RESET', color_style='NORMAL')[source]

Prints incoming events to STDOUT.

Prints incoming events to STDOUT. When <complete> is True, the complete event including headers is printed to STDOUT.

You can optionally define the colors used.

Parameters:
  • selection (-) –
    The part of the event to submit externally.
    Use an empty string to refer to the complete event.
  • counter (-) –
    Puts an incremental number for each event in front
    of each event.
  • prefix (-) –
    Puts the prefix in front of each printed event.
  • pid (-) –
    Includes the pid of the process producing the output.
  • foreground_color (-) –
    The foreground color.
    Valid values: BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE
  • background_color (-) –
    The background color.
    Valid values: BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, RESET
  • color_style (-) –
    The coloring style to use
    Valid values: DIM, NORMAL, BRIGHT

Queues:

  • inbox
    Incoming events.

wishbone.output.syslog

class wishbone.module.wbsyslog.Syslog(actor_config, selection='@data', level=5, ident='sphinx-build')[source]

Writes log events to syslog.

Logevents have following format:

(6, 1367682301.430527, ‘Router’, ‘Received SIGINT. Shutting down.’)

The first value corresponds to the syslog severity level.

Parameters:
  • selection (-) –
    The part of the event to submit externally.
    Use an empty string to refer to the complete event.
  • level (-) –
    The loglevel.
  • ident (-) –
    The syslog id string.
    If not provided the script name is used.

Queues:

  • inbox
    incoming events

Flow modules

wishbone.function.acknowledge

class wishbone.module.acknowledge.Acknowledge(actor_config, ack_id=None)[source]

Lets events pass or not based on some event value present or not in a lookup table.

This module stores a value <ack_id> from passing events in a list and only let’s events go through for which the <ack_id> value is not in the list.

<ack_id> can be removed from the list by sending the event into the <acknowledge> queue.

<ack_id> should some unique identifier to make sure that any following <modules are not processing events with the same datastructure.

Typically, downstream modules’s <successful> and/or <failed> queues are sending events to the <acknowledge> queue.

Parameters:ack_id (-) –
A value stored somewhere in the event which then acts as the
ack_id. It possibly makes only sense to define an EventLookup
value here.

Queues:

  • inbox
    Incoming events
  • outbox
    Outgoing events
  • acknowledge
    Acknowledge events
  • dropped
    Where events go to when unacknowledged

wishbone.function.deserialize

class wishbone.module.deserialize.Deserialize(actor_config, source='@data', destination='@data')[source]

Deserializes Bulk events or arrays.

When incoming data is a Bulk object the content will be forwarded as single events again.

When the incoming data is a single and <source> is a list/array a new event is created from each element of the array.

Parameters:
  • source (-) –
    The source of the array.
    (Ignored when incoming type is Bulk)
  • destination (-) –
    The destination key to store the array item.
    (Ignored when incoming type is Bulk)

Queues:

  • inbox
    Incoming messages
  • outbox
    Outgoing messges

wishbone.flow.fanout

class wishbone.module.fanout.Fanout(actor_config)[source]

Forward each incoming message to all connected queues.

Forward each incoming message to all connected queues.

Parameters:n/a

Queues:

  • inbox:
    Incoming messages

wishbone.flow.fresh

class wishbone.module.fresh.Fresh(actor_config, timeout_payload='timeout', recovery_payload='recovery', timeout=60, repeat_interval=60)[source]

Generates a new event unless an event came through in the last x time.

This module forwards events without modifying them. If an event has been forwarded it resets the timeout counter back to <timeout>. If the timeout counter reaches zero because no messages have passed through, an event with <timeout_payload> is generated and submitted to the module’s <timeout> queue. When the a timeout_payload has been sent and the event stream recovers, a new event with <recovery_payload> is generated and submitted to the <timeout> queue.

Parameters:
  • timeout_payload (-) –
    The data a timeout event contains.
  • recovery_payload (-) –
    The data a recovery event contains
  • timeout (-) –
    The max time in seconds allowed to not to receive events.
  • repeat_interval (-) –
    The interval time to resend the <payload> event in case
    <timeout> has expired and

Queues:

  • inbox
    Incoming events.
  • inbox
    Incoming events.
  • timeout
    timeout and recovery events.

wishbone.flow.funnel

class wishbone.module.funnel.Funnel(actor_config)[source]

Funnel multiple incoming queues to 1 outgoing queue.

Funnel multiple incoming queues to 1 outgoing queue.

Parameters:n/a

Queues:

  • outbox:
    Outgoing messages

wishbone.flow.roundrobin

class wishbone.module.roundrobin.RoundRobin(actor_config, randomize=False)[source]

Round-robins incoming events to all connected queues.

Create a “1 to n” relationship between queues. Events arriving in inbox are then submitted in a roundrobin (or randomized) fashion to the connected queues. The outbox queue is non existent.

Parameters:randomize (-) –
Randomizes the queue selection instead of going round-robin
over all queues.

Queues:

  • inbox
    Incoming events

wishbone.flow.switch

class wishbone.module.switch.Switch(actor_config, outgoing='outbox')[source]

Switch outgoing queues while forwarding events.

Forwards events to the desired outgoing queue based on the value of <outgoing>.

The value of <outgoing> can be dynamically set in 2 ways:

  • Using a lookup value.
  • By sending an event to the <switch> queue with the value of <outgoing> stored under @data.
Parameters:outgoing (-) –
The name of the queue to submit incoming events to.

Queues:

  • inbox
    incoming events
  • switch
    incoming events to alter outgoing queue.
  • outbox
    outgoing events
  • <connected_queue_1>
    outgoing events
  • <connected_queue_n>
    outgoing events

wishbone.flow.tippingbucket

class wishbone.module.tippingbucket.TippingBucket(actor_config, bucket_size=100, bucket_age=10, aggregation_key='default')[source]

Aggregates multiple events into bulk.

Aggregates multiple incoming events into bulk usually prior to submitting to an output module.

Flushing the buffer can be done in various ways:

  • The age of the bucket exceeds <bucket_age>.
  • The size of the bucket reaches <bucket_size>.
  • Any event arrives in queue <flush>.
Parameters:
  • bucket_size (-) –
    The maximum amount of events per bucket.
  • bucket_age (-) –
    The maximum age in seconds before a bucket is closed and
    forwarded. This actually corresponds the time since the first
    event was added to the bucket.
  • aggregation_key (-) –
    Groups events with key <aggregation_key> into the same buckets.

Queues:

  • inbox
    Incoming events
  • outbox
    Outgoing bulk events
  • flush
    Flushes the buffer on incoming events despite the bulk being
    full (bucket_size) or expired (bucket_age).

wishbone.flow.ttl

class wishbone.module.ttl.TTL(actor_config, ttl=1)[source]

Allows messages to pass a maximum number of times.

When a message has traveled through this module more than <ttl> times it will be submitted to the <ttl_exceeded> queue.

Parameters:

  • ttl(int)(1)
    The maximum number of times an event is allowed
    to travel through.

Queues:

  • inbox
    Incoming events.
  • outbox
    Outgoing events.
  • ttl_exceeded
    Events which passed the module more than <ttl> times.

Function modules

wishbone.function.modify

class wishbone.module.modify.Modify(actor_config, expressions=[])[source]

Modify and manipulate datastructures.

This module modifies the data of an event using a sequential list of expressions.

Expressions are dictionaries containing 1 item. The key is a string and the value is a list of parameters accepted by the expression.

For example:

{"set": ["hi", "@data.one"]}

Sets the value “hi” to key “@data.one”.

In the the YAML formatted bootstrap file that would look like:

module: wishbone.function.modify
arguments:
  expressions:
    - set: [ "hi", "@data.one" ]

Valid expressions are:

  • add_item:

    add_item: [<item>, <key>]
    

    Adds <item> to the list stored under <key>.

  • copy:

    copy: [<source_key>, <destination_key>, <default_value>]
    

    Copies <source_key> to <destination_key> and overwrites <destination_key> when it exists. If <source_key> does not exist, <default_value> is taken instead.

  • del_item:

    del_item: [<key>, <item>]
    

    Deletes first occurance of <item> from the list stored under <source_key>.

  • delete:

    delete: [<key>]
    

    Deletes <key> from the event.

  • extract:

    extract: [<destination>, <regex>, <source>]
    

    Makes use of Python re module to extract named groups from <source> using <regex> and add the resulting matches to <destination>.

    The following example would extract the words “one” and “two” from “@data.test” and add the to @data.extract:

    expression:

    extract: ["@data.extract", '(?P<first>.*?);(?P<second>.*)', "@data.test"]
    

    result:

    {"@data":{"test:"one;two", extract:{"first": "one", "second": "two"}}}
    
  • join:

    join: [<array>, <join>, <destination>]
    

    Joins an array into a string using the <join> value.

  • lowercase:

    lowercase: [<key>]
    

    Turns the string stored under <key> to lowercase.

  • merge:

    merge: [<object_one>, <object_two>, <destination>]
    

    Merges 2 arrays into <destination>

  • replace:

    replace: [<regex>, <value>, <key>]
    

    replaces every occurance of <regex> of the value stored in <key> with <value>

  • set:

    set: [<value>, <key>]
    

    Sets <value> to the event <key>.

  • uppercase:

    uppercase: [<key>]
    

    Turns the string stored under <key> to uppercase.

  • template:

    template: [<destination_key>, <template>, <source_key>]
    

    Uses the dictionary stored in <source_key> to complete <template> and stores the results into key <destination_key>. The templating language used is Python’s builtin string format one.

  • time:

    time: [<destination_key>, <format>]
    

    Modifies the <@timestamp> value according the the <format> specification and stores it into <destination_key>. See http://crsmithdev.com/arrow/#format for the format.

Parameters:expressions (-) –
A list of expressions to apply.

Queues:

  • inbox:
    Incoming messages
  • outbox:
    Outgoing modified messages

Encode modules

wishbone.encode.humanlogformat

class wishbone.module.humanlogformat.HumanLogFormat(actor_config, colorize=True, ident=None)[source]

Converts the internal log format into human readable form.

Logs are formated from the internal wishbone format into a more pleasing human readable format suited for STDOUT or a logfile.

Internal Wishbone format:

(6, 1367682301.430527, 3342, ‘Router’, ‘Received SIGINT. Shutting down.’)

Sample output format:

2013-08-04T19:54:43 pid-3342 informational dictgenerator: Initiated 2013-08-04T19:54:43 pid-3342 informational metrics_null: Started

Parameters:n/a

Queues:

  • inbox
    Incoming messages
  • outbox
    Outgoing messges

wishbone.encode.json

class wishbone.module.jsonencode.JSONEncode(actor_config, source='@data', destination='@data')[source]

Converts Python dict data structures to JSON strings.

Encodes Python data structures to JSON.

Parameters:
  • source (-) –
    The data to convert.
  • destination (-) –
    The location to write the JSON string to.

Queues:

  • inbox
    Incoming messages
  • outbox
    Outgoing messges

Decode modules

wishbone.decode.json

class wishbone.module.jsondecode.JSONDecode(actor_config, source='@data', destination='@data', str=True)[source]

Decodes JSON data to Python data objects.

Decodes the payload or complete events from JSON format.

Parameters:
  • source (-) –
    The source of the event to decode.
    Use an empty string to refer to the complete event.
  • destination (-) –
    The destination key to store the Python <dict>.
    Use an empty string to refer to the complete event.
  • unicode (-) –
    When True, converts strings to unicode otherwise regular string.

Queues:

  • inbox
    Incoming messages
  • outbox
    Outgoing messges