Source code for wishbone.module.acknowledge

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#  acknowledge.py
#
#  Copyright 2018 Jelle Smet <development@smetj.net>
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
#  MA 02110-1301, USA.
#
#

from wishbone.module import FlowModule
from gevent.lock import Semaphore
from random import SystemRandom
import string


class AckList(object):

    def __init__(self):

        self.ack_table = []
        self.lock = Semaphore()

    def ack(self, value):

        with self.lock:
            if value in self.ack_table:
                self.ack_table.remove(value)
                return True
            else:
                return False

    def unack(self, value):

        with self.lock:
            if value in self.ack_table:
                return False
            else:
                self.ack_table.append(value)
                return True


[docs]class Acknowledge(FlowModule): '''**Forwards or drops events by acknowleding values.** This module stores the value of field ``ack_id`` from each incoming event. Subsequent events with the same ``ack_id`` field value will be dropped until it is removed by having it acknowledged. The ``ack_id`` can be acknowledged by sending the event to the ``acknowledge`` queue. The ``ack_id`` field value should be an unique value. Typically, downstream modules's ``successful`` and/or ``failed`` queues are sending events to the ``acknowledge`` queue. Parameters:: - ack_id (data): A unique value identifying the event. Queues:: - inbox | Incoming events - outbox | Outgoing events - acknowledge | Acknowledge events - dropped | Where events go to when unacknowledged Event variables:: - tmp.<name>.ack_id | The location of the acknowledgement ID when coming in through the | inbox queue. ''' def __init__(self, actor_config, ack_id=None): FlowModule.__init__(self, actor_config) self.pool.createQueue("inbox") self.pool.createQueue("outbox") self.pool.createQueue("acknowledge") self.pool.createQueue("dropped") self.registerConsumer(self.consume, "inbox") self.registerConsumer(self.acknowledge, "acknowledge") self.ack_table = AckList() def consume(self, event): if self.kwargs.ack_id is None: ack_id = self.generateID() else: ack_id = event.kwargs.ack_id if event.has("tmp.%s.ack_id" % (self.name)): self.logging.warning("Event arriving to <inbox> with tmp.%s.ack_id already set. Perhaps that should have been the <acknowledge> queue instead." % (self.name)) else: event.set({"ack_id": ack_id}, "tmp.%s" % (self.name)) if self.ack_table.unack(ack_id): self.submit(event, "outbox") else: self.logging.debug("Event with still unacknowledged <ack_id> '%s' send to <dropped> queue." % (ack_id)) self.submit(event, "dropped") def acknowledge(self, event): if event.has("tmp.%s.ack_id" % (self.name)): ack_id = event.get('tmp.%s.ack_id' % (self.name)) if self.ack_table.ack(ack_id): self.logging.debug("Event acknowledged with <ack_id> '%s'." % (ack_id)) event.delete('tmp.%s.ack_id' % (self.name)) else: self.logging.debug("Event with <ack_id> '%s' received but was not previously acknowledged." % (ack_id)) else: self.logging.warning("Received event without 'tmp.%s.ack_id' therefor it is dropped" % (self.name)) def generateID(self): return ''.join(SystemRandom().choice(string.ascii_lowercase) for _ in range(4)) def postHook(self): self.logging.debug("The ack table has %s events unacknowledged." % (len(self.ack_table.ack_table)))