Package brisa :: Package core :: Package reactors :: Module _select
[hide private]
[frames] | no frames]

Source Code for Module brisa.core.reactors._select

  1  # Licensed under the MIT license 
  2  # http://opensource.org/licenses/mit-license.php or see LICENSE file. 
  3  # Copyright 2007-2008 Brisa Team <brisa-develop@garage.maemo.org> 
  4   
  5  """ Default select-based reactor. 
  6  """ 
  7   
  8  __all__ = ('SelectReactor', ) 
  9   
 10  import os 
 11  import time 
 12  import random 
 13  import select 
 14  import socket 
 15  import signal 
 16   
 17  from errno import EINTR, EBADF 
 18   
 19  from brisa.core import log 
 20  from brisa.core.ireactor import * 
 21   
 22   
23 -class Timer(object):
24 """ Timer class. 25 """ 26
27 - def __init__(self, callback, timeout_rel, timeout_abs, threshold):
28 """ Constructor for the Timer class 29 30 @param callback: function to be called 31 @param timeout_rel: seconds from now to sleep before the call 32 @param timeout_abs: seconds since epoch when the call is scheduled 33 @param threshold: lower acceptable bound for timeout_abs precision 34 """ 35 self.callback = callback 36 self.timeout_rel = timeout_rel 37 self.timeout_abs = timeout_abs 38 self.threshold = threshold
39
40 - def __call__(self):
41 """ Performs the callback. 42 """ 43 self.callback()
44
45 - def update_abs_timeout(self):
46 """ Updates absolute timeout based on the time now and the relative 47 timeout specified. 48 """ 49 self.timeout_abs = self.timeout_rel + time.time()
50
51 - def __str__(self):
52 """ String representation of the class. 53 """ 54 return '<Timer callback=%s, timeout_rel=%s, timeout_abs=%s' \ 55 ', threshold=%s>' % (str(self.callback), str(self.timeout_rel), 56 str(self.timeout_abs), str(self.threshold))
57 58
59 -class SelectReactor(ReactorInterface):
60 61 _timers = {} 62 _read_fds = {} 63 _write_fds = {} 64 _excpt_fds = {} 65 _stop_funcs = [] 66 _start_funcs = [] 67 68 state = REACTOR_STATE_STOPPED 69
70 - def __init__(self, *args, **kwargs):
71 ReactorInterface.__init__(self, *args, **kwargs) 72 p = os.pipe() 73 self._death_pipe_w = os.fdopen(p[1], 'w') 74 self._death_pipe_r = os.fdopen(p[0], 'r') 75 self.add_fd(self._death_pipe_r, lambda a,b: False, 76 EVENT_TYPE_READ) 77 signal.signal(signal.SIGTERM, self._main_sig_quit) 78 signal.signal(signal.SIGINT, self._main_sig_quit)
79
80 - def add_timer(self, interval, callback, threshold=0.01):
81 """ Adds a timer. 82 83 @param interval: interval to sleep between calls 84 @param callback: function to be called 85 @param threshold: lower bound for the time precision 86 87 @type interval: integer 88 @type callback: callable 89 @type threshold: float 90 91 @return: unique ID for the callback 92 @rtype: integer 93 """ 94 id = random.randint(-50000, 50000) 95 96 while id in self._timers: 97 id = random.randint(-50000, 50000) 98 99 self._timers[id] = Timer(callback, interval,interval + time.time(), 100 threshold) 101 return id
102
103 - def rem_timer(self, id):
104 """ Removes a timed callback given its id. 105 106 @param id: unique ID returned by add_timer() 107 @type id: integer 108 """ 109 try: 110 self._timers.pop(id) 111 except KeyError: 112 raise KeyError('No such timeout callback registered with id %d' % 113 id)
114
115 - def add_fd(self, fd, evt_callback, evt_type, data=None):
116 """ Adds a fd for watch. 117 118 @param fd: file descriptor 119 @param evt_callback: callback to be called 120 @param evt_type: event type to be watched on this fd. An OR combination 121 of EVENT_TYPE_* flags. 122 @param data: data to be forwarded to the callback 123 124 @type fd: file 125 @type evt_callback: callable 126 @type evt_type: integer 127 @type data: any 128 """ 129 if evt_type & EVENT_TYPE_READ: 130 log.debug('Added fd %s watch for READ events' % str(fd)) 131 self._read_fds[fd] = evt_callback 132 if evt_type & EVENT_TYPE_WRITE: 133 log.debug('Added fd %s watch for WRITE events' % str(fd)) 134 self._write_fds[fd] = evt_callback 135 if evt_type & EVENT_TYPE_EXCEPTION: 136 log.debug('Added fd %s watch for EXCEPTION events' % str(fd)) 137 self._excpt_fds[fd] = evt_callback 138 139 return fd
140
141 - def rem_fd(self, fd):
142 """ Removes a fd from being watched. 143 144 @param fd: file descriptor to be removed 145 146 @type fd: file 147 """ 148 for d in (self._read_fds, self._write_fds, self._excpt_fds): 149 d.pop(fd, None)
150
151 - def add_after_stop_func(self, func):
152 """ Registers a function to be called before entering the STOPPED 153 state. 154 155 @param func: function 156 @type func: callable 157 """ 158 if func not in self._stop_funcs: 159 self._stop_funcs.append(func)
160
161 - def rem_after_stop_func(self, func):
162 """ Removes a registered function. 163 164 @param func: function 165 @type func: callable 166 """ 167 if func in self._stop_funcs: 168 self._stop_funcs.remove(func)
169
170 - def add_before_start_func(self, func):
171 """ Registers a function to be called before entering the RUNNING 172 state. 173 174 @param func: function 175 @type func: callable 176 """ 177 if func not in self._start_funcs: 178 self._start_funcs.append(func)
179
180 - def rem_before_start_func(self, func):
181 """ Removes a registered function. 182 183 @param func: function 184 @type func: callable 185 """ 186 if func in self._start_funcs: 187 self._start_funcs.remove(func)
188
189 - def main(self):
190 """ Enters the RUNNING state by running the main loop until 191 main_quit() is called. 192 """ 193 if self.state != REACTOR_STATE_STOPPED: 194 raise ReactorAlreadyRunningException('main() called twice or '\ 195 'together with main_loop_iterate()') 196 197 self.state = REACTOR_STATE_RUNNING 198 log.info('Preparing main loop') 199 self._main_call_before_start_funcs() 200 log.info('Entering main loop') 201 while self.state == REACTOR_STATE_RUNNING: 202 try: 203 if not self.main_loop_iterate(): 204 break 205 except: 206 break 207 log.info('Preparing to exit main loop') 208 self._main_call_before_stop_funcs() 209 log.info('Exited main loop')
210
211 - def main_quit(self):
212 """ Terminates the main loop. 213 """ 214 self.state = REACTOR_STATE_STOPPED 215 self._death_pipe_w.close() 216 log.debug('Writing pipe of death')
217
218 - def main_loop_iterate(self):
219 """ Runs a single iteration of the main loop. Reactor enters the 220 RUNNING state while this method executes. 221 """ 222 if not self._main_select(): 223 self._main_trigger_timers() 224 return False 225 if not self._main_trigger_timers(): 226 return False 227 return True
228
229 - def is_running(self):
230 return bool(self.state)
231
232 - def _main_select(self):
233 """ Selects and process events. 234 235 @return: True if no exception was raised. False if an exception was 236 raised, meaning that we recommend another _main_select() 237 @rtype: boolean 238 """ 239 try: 240 revt, wevt, eevt = select.select(self._read_fds.keys(), 241 self._write_fds.keys(), 242 self._excpt_fds.keys(), 243 self._get_min_timeout()) 244 if not self._main_process_events(revt, wevt, eevt): 245 return False 246 # Fix problems with problematic file descriptors 247 except ValueError, v: 248 log.debug('Main loop ValueError: %s' % str(v)) 249 self._main_cleanup_fds() 250 except TypeError, t: 251 log.debug('Main loop TypeError %s' % str(t)) 252 self._main_cleanup_fds() 253 except (select.error, IOError), s: 254 if s.args[0] in (0, 2): 255 if not ((not self._read_fds) and (not self._write_fds)): 256 raise 257 elif s.args[0] == EINTR: 258 pass 259 elif s.args[0] == EBADF: 260 self._main_cleanup_fds() 261 except socket.error, s: 262 if s.args[0] == EBADF: 263 self._main_cleanup_fds() 264 except KeyboardInterrupt: 265 return False 266 return True
267
268 - def _main_process_events(self, revt, wevt, eevt):
269 for read in revt: 270 if read == self._death_pipe_w: 271 log.debug('Pipe of death read') 272 self._read_fds.pop(read) 273 return False 274 if read not in self._read_fds: 275 continue 276 try: 277 log.debug('Read event on %s, calling %s', read, 278 self._read_fds[read]) 279 if not self._read_fds[read](read, EVENT_TYPE_READ): 280 # Returned False, remove it 281 self._read_fds.pop(read) 282 except Exception, e: 283 log.debug('Exception %s raised when handling a READ'\ 284 ' event on file %s', e, read) 285 for write in wevt: 286 if write not in self._write_fds: 287 continue 288 try: 289 log.debug('Write event on %s, calling %s', write, 290 self._write_fds[write]) 291 if not self._write_fds[write](write, EVENT_TYPE_WRITE): 292 self._write_fds.pop(write) 293 except Exception, e: 294 log.debug('Exception %s raised when handling a WRITE'\ 295 ' event on file %s', e, write) 296 for excpt in eevt: 297 if excpt not in self._excpt_fds: 298 continue 299 try: 300 log.debug('Exception event on %s, calling %s', excpt, 301 self._excpt_fds[excpt]) 302 if not self._excpt_fds[excpt](excpt, EVENT_TYPE_EXCEPTION): 303 self._excpt_fds.pop(excpt) 304 except Exception, e: 305 log.debug('Exception %s raised when handling a EXCEPTION'\ 306 ' event on file %s', e, excpt) 307 return True
308
309 - def _main_trigger_timers(self):
310 """ Triggers the timers that are ready. 311 """ 312 for callback in self._timers.values(): 313 if callback.timeout_abs - callback.threshold < time.time(): 314 log.debug('Callback ready: %s' % str(callback)) 315 if self.is_running(): 316 try: 317 callback() 318 except KeyboardInterrupt, k: 319 # Ctrl-C would be ignored 320 return False 321 except: 322 log.error('Error while processing timer %s' % 323 str(callback)) 324 # Update the absolute timeout anyways 325 callback.update_abs_timeout() 326 return True
327
328 - def _main_cleanup_fds(self):
329 """ Cleans up problematic fds. 330 """ 331 log.debug('Problematic fd found. Cleaning up...') 332 333 for d in [self._read_fds, self._write_fds, self._excpt_fds]: 334 for s in d.keys(): 335 try: 336 select.select([s], [s], [s], 0) 337 except Exception, e: 338 log.debug('Removing problematic fd: %s' % str(s)) 339 d.pop(s)
340
341 - def _get_min_timeout(self):
342 """ Returns the minimum timeout among registered timers. 343 """ 344 min = 0 345 for callback in self._timers.values(): 346 if min == 0 or callback.timeout_rel < min: 347 min = callback.timeout_rel 348 return min
349
351 for cb in self._stop_funcs: 352 cb()
353
355 for cb in self._start_funcs: 356 cb()
357
358 - def _main_sig_quit(self, sig, frame):
359 self.main_quit()
360