Source code for wishbone.module.pack

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#  pack.py
#
#  Copyright 2018 Jelle Smet <development@smetj.net>
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
#  MA 02110-1301, USA.
#
#

from wishbone.actor import Actor
from wishbone.event import Event
from wishbone.module import ProcessModule
from wishbone.error import BulkFull
from gevent import sleep


class Bucket(object):

    def __init__(self, key, size, age, logging, queue, looplock):

        self.key = key
        self.size = size
        self.age = age
        self.logging = logging
        self.queue = queue
        self.loop = looplock

        self.bucket = None
        self.createEmptyBucket()
        self.logging.info("Created new bucket with aggregation key '%s'." % (self.key))

    def createEmptyBucket(self):
        self.bucket = Event(
            bulk=True,
            bulk_size=self.size
        )
        self.resetTimer()

    def flushBucketTimer(self):
        '''
        Flushes the buffer when <bucket_age> has expired.
        '''

        while self.loop():
            sleep(1)
            self._timer -= 1
            if self._timer == 0:
                if len(self.bucket.data) > 0:
                    self.logging.debug("Bucket age expired after %s s." % (self.age))
                    self.flush()
                else:
                    self.resetTimer()

    def flush(self):
        '''
        Flushes the buffer.
        '''
        self.logging.debug("Flushed bucket '%s' of size '%s'" % (self.key, len(self.bucket.data)))
        self.queue.put(self.bucket)
        self.createEmptyBucket()

    def resetTimer(self):
        '''
        Resets the buffer expiry countdown to its configured value.
        '''

        self._timer = self.age


[docs]class Pack(ProcessModule): '''**Packs multiple events into a bulk event.** Aggregates multiple events into a bulk event usually prior to submitting to an output module. Flushing the buffer can be done in various ways: - The age of the bucket exceeds <bucket_age>. - The size of the bucket reaches <bucket_size>. - Any event arrives in queue <flush>. Parameters:: - bucket_size(int)(100) | The maximum amount of events per bucket. - bucket_age(int)(10) | The maximum age in seconds before a bucket is closed and | forwarded. This actually corresponds the time since the first | event was added to the bucket. - aggregation_key(str)("default") | Groups events with key <aggregation_key> into the same buckets. Queues:: - inbox | Incoming events - outbox | Outgoing bulk events - flush | Flushes the buffer on incoming events despite the bulk being | full (bucket_size) or expired (bucket_age). ''' def __init__(self, actor_config, bucket_size=100, bucket_age=10, aggregation_key="default"): Actor.__init__(self, actor_config) self.pool.createQueue("inbox") self.pool.createQueue("outbox") self.pool.createQueue("flush") self.registerConsumer(self.consume, "inbox") self.registerConsumer(self.flushIncomingMessage, "flush") self.buckets = {} def consume(self, event): try: self.getBucket(self.kwargs.aggregation_key).bucket.appendBulk(event) except BulkFull: self.logging.debug("Bucket full after %s events." % (self.kwargs.bucket_size)) self.getBucket(self.kwargs.aggregation_key).flush() self.getBucket(self.kwargs.aggregation_key).bucket.appendBulk(event) def getBucket(self, key): if key in self.buckets: return self.buckets[key] else: self.buckets[key] = Bucket( self.kwargs.aggregation_key, self.kwargs.bucket_size, self.kwargs.bucket_age, self.logging, self.pool.queue.outbox, self.loop) self.sendToBackground(self.buckets[key].flushBucketTimer) return self.buckets[key]
[docs] def flushIncomingMessage(self, event): ''' Called on each incoming messages of <flush> queue. Flushes the buffer. ''' self.logging.debug("Recieved message in <flush> queue. Flushing all bulk buffers.") for index, bucket in self.buckets: bucket.flush()