Package brisa :: Package core :: Module network_listeners
[hide private]
[frames] | no frames]

Source Code for Module brisa.core.network_listeners

  1  # Licensed under the MIT license 
  2  # http://opensource.org/licenses/mit-license.php or see LICENSE file. 
  3  # Copyright 2007-2008 Brisa Team <brisa-develop@garage.maemo.org> 
  4   
  5  """ Provides a simple API and observer model for listening over UDP. 
  6  """ 
  7   
  8  import socket 
  9  from struct import pack 
 10   
 11  from brisa import __enable_offline_mode__ 
 12  from brisa.core import log, reactor 
 13  from brisa.core.network import get_active_ifaces 
 14  from brisa.core.ireactor import EVENT_TYPE_READ 
 15  from brisa.core.threaded_call import run_async_function 
 16   
 17   
 18  if not __enable_offline_mode__: 
 19      if not get_active_ifaces(): 
 20          raise RuntimeError('Network is down.') 
 21   
 22   
23 -class CannotListenError(Exception):
24 """ Exception denoting an error when binding interfaces for listening. 25 """ 26
27 - def __init__(self, interface, port, addr='', reason=''):
28 """ Constructor for the CannotListenError class 29 30 @param interface: interface where the error occured 31 @param port: port at the error ocurred when binding 32 @param addr: address at the error ocurred when binding 33 @param reason: reason why the error happened 34 35 @type interface: string 36 @type port: integer 37 @type addr: string 38 @type reason: string 39 """ 40 if addr: 41 self.message = "Couldn't listen on %s:%s: %d. " % (interface, 42 addr, port) 43 else: 44 self.message = "Couldn't listen on %s: %d. " % (interface, 45 port) 46 if reason: 47 self.message += reason
48 49
50 -class SubscriptionError(Exception):
51 """ Exception denoting an observer subscription error. Gets raised when the 52 observer does not implement the INetworkObserver interface. 53 """ 54 message = "Couldn't subscribe listener because it does not implement \ 55 data_received. Implement brisa.core.network_listeners.INetworkObserver."
56 57
58 -class INetworkObserver(object):
59 """ Interface for network observers. Prototypes the data_received method. 60 """ 61
62 - def data_received(self, data, addr=None):
63 """ Receives data when subscribed to some network listener. 64 65 @param data: raw data 66 @param addr: can receive a 2-tuple (host, port) 67 68 @type data: string 69 @type addr: None or tuple 70 """ 71 raise Exception("Classes implementing INetworkObserver must implement \ 72 data_received() method")
73 74
75 -class NetworkListener(object):
76 """ Network listener abstract class. Forwards data to multiple subscribed 77 observers and can have a single callback to get data forwarded to. 78 79 Methods that MUST be implemented by an inheriting class: 80 - run() : main loop that receives data. In order to forward data to 81 observers and the data callback run() method MUST call 82 self.forward_data(data, addr). Note that addr is optional. 83 - close() : closes the connection 84 """ 85
86 - def __init__(self, observers=[], data_callback=None):
87 """ Constructor for the NetworkListener class 88 89 @param observers: initial subscribers for data forwarding 90 @param data_callback: callback that gets data forwarded to 91 92 @type observers: list 93 @type data_callback: callable 94 """ 95 self.listening = False 96 self.data_callback = data_callback 97 self.observers = observers
98
99 - def forward_data(self, data, addr=''):
100 """ Forwards data to the subscribed observers and to the data callback. 101 102 @param data: raw data to be forwarded 103 @param addr: can be a 2-tuple (host, port) 104 105 @type data: string 106 @type addr: None or tuple 107 """ 108 for listener in self.observers: 109 if addr: 110 run_async_function(listener.data_received, (data, addr)) 111 else: 112 run_async_function(listener.data_received, (data, ())) 113 114 if self.data_callback: 115 if addr: 116 run_async_function(self.data_callback, (data, addr)) 117 else: 118 run_async_function(self.data_callback, (data, ()))
119
120 - def subscribe(self, observer):
121 """ Subscribes an observer for data forwarding. 122 123 @param observer: observer instance 124 @type observer: INetworkObserver 125 """ 126 if hasattr(observer, data_received) and observer not in self.observers: 127 self.observers.append(observer) 128 else: 129 raise SubscriptionError()
130
131 - def is_listening(self):
132 """ Returns whether this network listener is listening (already started 133 with start()). 134 """ 135 return self.listening
136
137 - def is_running(self):
138 """ Same as is_listening(). 139 """ 140 return self.is_listening()
141
142 - def start(self):
143 self.listening = True
144
145 - def stop(self):
146 self.listening = False
147
148 - def _cleanup(self):
149 """ Removes references to other classes, in order to make GC easier 150 """ 151 self.data_callback = None 152 self.observers = []
153
154 - def destroy(self):
155 pass
156 157
158 -class UDPListener(NetworkListener):
159 """ Listens UDP in a given address and port (and in the given interface, if 160 provided). 161 """ 162 BUFFER_SIZE = 1500 163
164 - def __init__(self, addr, port, interface='', observers=[], 165 data_callback=None, shared_socket=None):
166 """ Constructor for the UDPListener class. 167 168 @param addr: address to listen on 169 @param port: port to listen on 170 @param interface: interface to listen on 171 @param observers: list of initial subscribers 172 @param data_callback: callback to get data forwarded to 173 @param shared_socket: socket to be reused by this network listener 174 175 @type addr: string 176 @type port: integer 177 @type interface: string 178 @type observers: list of INetworkObserver 179 @type data_callback: callable 180 @type shared_socket: socket.socket 181 """ 182 NetworkListener.__init__(self, observers, data_callback) 183 self.addr = addr 184 self.port = port 185 self.interface = interface 186 self.socket = None 187 self.fd_id = None 188 self._create_socket(shared_socket)
189
190 - def start(self):
191 self.fd_id = reactor.add_fd(self.socket, self._receive_datagram, 192 EVENT_TYPE_READ) 193 NetworkListener.start(self)
194
195 - def stop(self):
196 NetworkListener.stop(self) 197 reactor.rem_fd(self.fd_id)
198
199 - def destroy(self):
200 """ Closes the socket, renders the object unusable. 201 """ 202 self.socket.close() 203 self._cleanup()
204
205 - def _create_socket(self, shared):
206 """ Creates the socket if a shared socket has not been passed to the 207 constructor. 208 209 @param shared: socket to be reused 210 @type shared: socket.socket 211 """ 212 if shared: 213 self.socket = shared 214 else: 215 self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 216 socket.IPPROTO_UDP) 217 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 218 try: 219 self.socket.bind((self.interface, self.port)) 220 self.mreq = pack('4sl', socket.inet_aton(self.addr), 221 socket.INADDR_ANY) 222 self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, 223 self.mreq) 224 except Exception, e: 225 log.critical('Cannot create socket: %s' % str(e)) 226 raise CannotListenError(self.interface, self.port, self.addr, 227 "Couldn't bind address")
228
229 - def _receive_datagram(self, sock, cond):
230 """ Implements the UDPListener listening actions. 231 """ 232 if not self.is_listening(): 233 return 234 235 try: 236 (data, addr) = self.socket.recvfrom(self.BUFFER_SIZE) 237 self.forward_data(data, addr) 238 except Exception, e: 239 log.debug('Error when reading on UDP socket: %s', e) 240 241 return True
242