#!/usr/bin/env python
#
# -*- coding: utf-8 -*-
#
# queueselect.py
#
# Copyright 2018 Jelle Smet <development@smetj.net>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
#
from wishbone.module import ProcessModule
from wishbone.utils.structured_data_file import StructuredDataFile
from wishbone.error import InvalidData
[docs]class QueueSelect(ProcessModule):
'''**Submits message to the queue defined by a rendered template.**
Renders a list of templates against the event. Events are submitted to
the queue a rendered template returns.
Typically this module is used to route messages using Jinja2 conditionals
and logic.
A rule rule looks like this:
::
{ "name": "name of the rule",
"queue": "{{ 'queue_1,queue_one' if data.one == 1 else 'queue_2,queue_two' }}"
"payload":
"queue_1": {
"detail_1": "some value",
"detail_2": "some other value",
},
"queue_2": {
"detail_1": "some value",
"detail_2": "some other value",
}
}
}
The <file> queue expects events containing the absolute path of a YAML
file to read (or delete). Typically this queue receives events from
wishbone.module.input.inotify.
Events of type "IN_CREATE", "IN_CLOSE_WRITE", "IN_DELETE", "WISHBONE_INIT"
are processed all others are ignored.
Parameters::
- templates(list)([])*
| A list consisting out of template dicts as explained above.
- log_matching(bool)(False)
| Whether to produce debug log messages for matches.
| Can be verbose hence it's configurable.
Queues::
- inbox
| Incoming events
- outbox
| Outgoing events
- nomatch
| Events not matching at least 1 rule.
- file
| Read rules from YAML file or delete them.
'''
INOTIFY_TYPES = [
"IN_CREATE",
"IN_CLOSE_WRITE",
"IN_DELETE",
"WISHBONE_INIT"
]
def __init__(self, actor_config, templates=[], log_matching=False):
ProcessModule.__init__(self, actor_config)
self.pool.createQueue("inbox")
self.pool.createQueue("outbox")
self.pool.createQueue("file")
self.pool.createQueue("nomatch")
self.registerConsumer(self.consume, "inbox")
self.registerConsumer(self.handleFileTemplate, "file")
self.template_loader = StructuredDataFile(
expect_json=False,
expect_yaml=True
)
def consume(self, event):
for template in event.kwargs.templates:
self.handleQueueSelect(
template_name=template.name,
queue_list=template.queue,
payload=template.get("payload", {}),
event=event)
for file_name, file_content in self.template_loader.dump().items():
try:
queue_name = event.render(file_content["queue"])
except InvalidData as err:
self.logging.error("Failed to render template '%s'. Reason: %s" % (err))
else:
self.handleQueueSelect(
template_name=file_name,
queue_list=queue_name,
payload=file_content["payload"].get(queue_name, {}),
event=event)
def handleQueueSelect(self, template_name, queue_list, payload, event):
'''Handles submitting <event> into queue <queue_name>.'''
for queue_name in [queue.strip() for queue in queue_list.split(',')]:
if self.pool.hasQueue(queue_name):
if self.kwargs.log_matching:
self.logging.debug("Template '{template_name}' selected queue '{queue_name}' to route event '{event_id}' to.".format(
template_name=template_name,
queue_name=queue_name,
event_id=event.get('uuid')
))
# Construct and set the payload
queue_payload = {
"original_event_id": event.get('uuid'),
"queue": queue_name,
"payload": payload
}
e = event.clone()
e.set(queue_payload, "tmp.%s" % (self.name))
# Submit a clone of the event to the required queue
self.submit(e, queue_name)
else:
if self.kwargs.log_matching:
self.logging.debug("Template '{template_name}' selected non-existing queue '{queue_name}' to route event '{event_id}' to.".format(
template_name=template_name,
queue_name=queue_name,
event_id=event.get('uuid')
))
self.submit(event, "nomatch")
def handleFileTemplate(self, event):
'''Loads or deletes the template file defined in data.path.'''
inotify_type = event.get("data.inotify_type")
path = event.get("data.path")
if inotify_type in self.INOTIFY_TYPES:
if inotify_type == "IN_DELETE":
self.template_loader.delete(path)
self.logging.debug("Removed template file '{path}' from cache.".format(path=path))
else:
self.template_loader.load(path)
self.logging.debug("Loaded template file '{path}'".format(path=path))
else:
self.logging.warning("No support for inotify type '{inotify_type}'. Dropped.".format(
inotify_type=inotify_type
))