Process Modules

class wishbone.module.modify.Modify(actor_config, expressions=[])[source]

Modify and manipulate datastructures.

This module modifies the data of an event using a sequential list of expressions.

Expressions are dictionaries containing 1 item. The key is a string and the value is a list of parameters accepted by the expression.

For example:

{"set": ["hi", "data.one"]}

Sets the value “hi” to key “data.one”.

In the the YAML formatted bootstrap file that would look like:

module: wishbone.function.modify
arguments:
  expressions:
    - set: [ "hi", "data.one" ]

Valid expressions are:

  • add_item:

    add_item: [<item>, <key>]
    

    Adds <item> to the list stored under <key>.

  • copy:

    copy: [<source_key>, <destination_key>, <default_value>]
    

    Copies <source_key> to <destination_key> and overwrites <destination_key> when it exists. If <source_key> does not exist, <default_value> is taken instead.

  • del_item:

    del_item: [<key>, <item>]
    

    Deletes first occurance of <item> from the list stored under <source_key>.

  • delete:

    delete: [<key>]
    

    Deletes <key> from the event.

  • extract:

    extract: [<destination>, <regex>, <source>]
    

    Makes use of Python re module to extract named groups from <source> using <regex> and add the resulting matches to <destination>.

    The following example would extract the words “one” and “two” from “data.test” and add the to data.extract:

    expression:

    extract: ["data.extract", '(?P<first>.*?);(?P<second>.*)', "data.test"]
    

    result:

    {"data":{"test:"one;two", extract:{"first": "one", "second": "two"}}}
    
  • join:

    join: [<array>, <join>, <destination>]
    

    Joins an array into a string using the <join> value.

  • lowercase:

    lowercase: [<key>]
    

    Turns the string stored under <key> to lowercase.

  • merge:

    merge: [<object_one>, <object_two>, <destination>]
    

    Merges 2 arrays into <destination>

  • replace:

    replace: [<regex>, <value>, <key>]
    

    replaces every occurance of <regex> of the value stored in <key> with <value>

  • set:

    set: [<value>, <key>]
    

    Sets <value> to the event <key>.

  • uppercase:

    uppercase: [<key>]
    

    Turns the string stored under <key> to uppercase.

  • template:

    template: [<destination_key>, <template>, <source_key>]
    

    Uses the dictionary stored in <source_key> to complete <template> and stores the results into key <destination_key>. The templating language used is Python’s builtin string format one.

  • time:

    time: [<destination_key>, <format>]
    

    Modifies the <timestamp> value according the the <format> specification and stores it into <destination_key>. See http://crsmithdev.com/arrow/#format for the format.

Parameters:expressions (-) –
A list of expressions to apply.

Queues:

  • inbox:
    Incoming messages
  • outbox:
    Outgoing modified messages
class wishbone.module.pack.Pack(actor_config, bucket_size=100, bucket_age=10, aggregation_key='default')[source]

Packs multiple events into a bulk event.

Aggregates multiple events into a bulk event usually prior to submitting to an output module.

Flushing the buffer can be done in various ways:

  • The age of the bucket exceeds <bucket_age>.
  • The size of the bucket reaches <bucket_size>.
  • Any event arrives in queue <flush>.

Parameters:

- bucket_size(int)(100)
   |  The maximum amount of events per bucket.

- bucket_age(int)(10)
   |  The maximum age in seconds before a bucket is closed and
   |  forwarded.  This actually corresponds the time since the first
   |  event was added to the bucket.

- aggregation_key(str)("default")
   |  Groups events with key <aggregation_key> into the same buckets.

Queues:

- inbox
   |  Incoming events

- outbox
   |  Outgoing bulk events

- flush
   |  Flushes the buffer on incoming events despite the bulk being
   |  full (bucket_size) or expired (bucket_age).
flushIncomingMessage(event)[source]

Called on each incoming messages of <flush> queue. Flushes the buffer.

class wishbone.module.template.Template(actor_config, filename=None, destination='data', templates={})[source]

Renders Jinja2 templates.

Renders Jinja2 templates using the event content as context. Templates can be read from a file or directly from the bootstrap file.

Parameters:

- filename(str)(None)*
   |  The absolute template filename to load.
   |  Can be a dynamic lookup. Assing None
   |  (default) to disable.

- destination(str)(data)*
   |  The key to store the rendered file template.


- templates(dict)({})*
   |  A dict of templates to render. The dict keys are the fields to
   |  assign the rendered string to.

Queues:

- inbox
   |  Incoming events

- outbox
   |  Outgoing events
class wishbone.module.unpack.Unpack(actor_config)[source]

Unpacks bulk events into single events.

Creates single events from all the events stored in a bulk event.

Parameters:

None

Queues:

- inbox
   |  Incoming messages

- outbox
   |  Outgoing messages

- dropped
   |  Dropped messages