multithreading - Leaky bucket in python -
hi trying develop leakybucket unlimited bucket capacity in python. want thread-safe , cpu efficient, minimum number of threads. works now. there tiny errors.
i throttle bandwidth 500 kbps. third line seems break this. also, can tell me if right way implement leakybucket? thanks.
rate: 500.00 rate: 500.00 rate: 550.00 rate: 500.00 rate: 500.00 rate: 500.00 rate: 500.00 rate: 500.00
code here:
from collections import deque import threading, time class leakybucket: '''the leaky bucket throttling bit rate''' def __init__(self, node, bitspersec, measintv, lbtype): self.node = node self.bitspersec = bitspersec #the rate limit self.measintv = measintv #the measure interval, tokens become full @ beginning of each interval self.lbtype = lbtype #the type of bucket self.lasttime = 0 #the start time of last measure interval self.bitsdone = 0 #the bits have been transmitted self.bdlock = threading.lock() #the lock bits sent self.packdq = deque() #the packet q self.maxtoken = bitspersec*float(measintv) #the max token (bits) self.token = self.maxtoken #the current token self.condition = threading.condition() #sync lock def packin(self, msg): '''insert packet''' self.condition.acquire() self.packdq.append(msg) self.condition.notify() self.condition.release() def keeppoping(self): '''keep poping new pack''' self.lasttime = time.time() #record start time while true: timenow = time.time() if timenow - self.lasttime > self.measintv: #new intv, need reset token self.token = self.maxtoken self.lasttime = timenow self.condition.acquire() if self.packdq: # queue not empty pack = list(self.packdq)[0] packlen = len(pack[2])*8 if packlen > self.token: #no enough token? #self.packdq.popleft() self.condition.release() time.sleep(max(self.lasttime+self.measintv-time.time(),0)) #wait enough token else: #enough token, can send out packet self.packdq.popleft() self.condition.release() self.changebitsdone(packlen) self.token = self.token - packlen #consume token else: self.condition.wait() self.condition.release() def begin(self): '''begin leakybucket''' athread = threading.thread(target = self.keeppoping, args = []) athread.start() def getbitsdone(self): '''get , reset bitsdone, testing''' self.bdlock.acquire() rev = self.bitsdone self.bitsdone = 0 self.bdlock.release() return rev def changebitsdone(self,length): '''change bitsdone, testing''' self.bdlock.acquire() self.bitsdone += length self.bdlock.release() def measure(self, intv): '''measure throughput of leaky bucket''' while true: bitsdone = self.getbitsdone() rate = bitsdone / float(intv*1024) print 'rate: %.2f' % rate time.sleep(intv) def startmeasure(self, intv): '''start measure rate''' #print 'here' athread = threading.thread(target = self.measure, args = [intv]) athread.start() #=============================== def main(): pack = 1000*'a' msg = ('192.168.1.1', 16000, pack) print 'here' lb = leakybucket(none, 500*1024, 1, 'reg') lb.begin() lb.startmeasure(10) nummsg = 0 while nummsg < 10000: lb.packin(msg) #print 'pack in' nummsg += 1 if __name__ == '__main__': main()
Comments
Post a Comment