Package brisa :: Package utils :: Module looping_call
[hide private]
[frames] | no frames]

Source Code for Module brisa.utils.looping_call

 1  # Licensed under the MIT license 
 2  # http://opensource.org/licenses/mit-license.php or see LICENSE file. 
 3  # Copyright 2007-2008 Brisa Team <brisa-develop@garage.maemo.org> 
 4   
 5  """ Performs repeated function calls. 
 6  """ 
 7   
 8  from threading import Timer 
 9  from brisa.core import log, reactor 
10   
11   
12 -class LoopingCall(object):
13 """ Class that performs repeated function calls in a interval. 14 """ 15 16 msg_already_started = 'tried to start() LoopingCall when already started' 17 msg_already_stopped = 'tried to stop() LoopingCall when already stopped' 18
19 - def __init__(self, f, *a, **kw):
20 """ Constructor for the LoopingCall class. 21 """ 22 self._function = f 23 self._args = a 24 self._kwargs = kw 25 self.interval = 0 26 self.running = False 27 self._callback_handler = None
28
29 - def is_running(self):
30 """ Returns True if the looping call is active, otherwise False. 31 """ 32 return self.running
33
34 - def start(self, interval, now=True):
35 """ Starts the function calls in the interval specified. If now is 36 False, it waits the interval before starting doing calls. 37 38 @param interval: interval between calls 39 @param now: whether it will start calling now or not 40 41 @type interval: float 42 @type now: boolean 43 """ 44 if not self.is_running(): 45 self.interval = interval 46 self.running = True 47 assert interval != 0, ('(warning) starting LoopingCall with \ 48 interval %f' % self.interval) 49 if now: 50 self._register() 51 else: 52 Timer(interval, self._register).start() 53 else: 54 log.warning(self.msg_already_started)
55
56 - def stop(self):
57 """ Stops the LoopingCall. 58 """ 59 if self.running: 60 self.running = False 61 self._unregister() 62 else: 63 log.warning(self.msg_already_stopped)
64
65 - def destroy(self):
66 if self.is_running(): 67 self.stop() 68 self._cleanup()
69
70 - def _cleanup(self):
71 self._function = lambda: None 72 self._args = [] 73 self._kwargs = {} 74 self._callback_handler = None
75
76 - def _register(self):
77 self._call() 78 self._callback_handler = reactor.add_timer(self.interval, self._call)
79
80 - def _unregister(self):
81 reactor.rem_timer(self._callback_handler)
82
83 - def _call(self):
84 if not self.running: 85 log.debug('LoopingCall on function %s cancelled' % 86 str(self._function)) 87 return False 88 89 self._function(*self._args, **self._kwargs) 90 return True
91