Source code for bonobo.nodes.throttle
import threading
import time
from bonobo.config import Configurable, ContextProcessor, Method, Option
class RateLimitBucket(threading.Thread):
daemon = True
@property
def stopped(self):
return self._stop_event.is_set()
def __init__(self, initial=1, period=1, amount=1):
super(RateLimitBucket, self).__init__()
self.semaphore = threading.BoundedSemaphore(initial)
self.amount = amount
self.period = period
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def run(self):
while not self.stopped:
time.sleep(self.period)
for _ in range(self.amount):
self.semaphore.release()
def wait(self):
return self.semaphore.acquire()
[docs]class RateLimited(Configurable):
handler = Method()
initial = Option(int, positional=True, default=1)
period = Option(int, positional=True, default=1)
amount = Option(int, positional=True, default=1)
@ContextProcessor
def bucket(self, context):
bucket = RateLimitBucket(self.initial, self.amount, self.period)
bucket.start()
yield bucket
bucket.stop()
bucket.join()
def __call__(self, bucket, *args, **kwargs):
bucket.wait()
return self.handler(*args, **kwargs)