Package brisa :: Package upnp :: Package device :: Module event
[hide private]
[frames] | no frames]

Source Code for Module brisa.upnp.device.event

  1  # Licensed under the MIT license 
  2  # http://opensource.org/licenses/mit-license.php or see LICENSE file. 
  3  # 
  4  # Copyright 2007-2008, Brisa Team <brisa-develop@garage.maemo.org> 
  5   
  6  """ Device-side event related classes. 
  7  """ 
  8  import uuid 
  9   
 10  from xml.etree import ElementTree 
 11  from datetime import datetime 
 12   
 13  from brisa.core import log, reactor, webserver 
 14  log = log.getLogger('device-events') 
 15   
 16  from brisa.core.network import http_call 
 17  from brisa.core.threaded_call import run_async_call, ThreadedCall 
 18  from brisa.utils.looping_call import LoopingCall 
 19  from brisa.upnp import soap 
 20   
 21  import brisa 
 22  if not brisa.__enable_events_logging__: 
 23      log.disabled = 1 
 24   
 25   
26 -class EventController(webserver.CustomResource):
27 """ Listen for event subscribe and unsubscribe at device services. 28 It also manager the list of the control points interested at service 29 eventing. 30 """ 31
32 - def __init__(self, service):
33 webserver.CustomResource.__init__(self, 'eventSub') 34 #FIXME - This list is not thread safe 35 self.subscribers = [] 36 self.service = service
37
38 - def render(self, uri, request, response):
39 """ Event renderer method. 40 41 @param uri: URI of the request 42 @param request: request object (Cherrypy) 43 @param response: response object (Cherrypy) 44 45 @type uri: string 46 47 @note: see Cherrypy documentation for further info about request and 48 response attributes and methods. 49 """ 50 compressed_headers = {} 51 for k, v in request.headers.items(): 52 if not v: 53 v = "" 54 compressed_headers[k.lower()] = v.strip() 55 56 if request.method.lower() == "subscribe": 57 if not 'sid' in compressed_headers: 58 return self.render_subscriber(request, 59 response, compressed_headers) 60 else: 61 return self.render_renew(request, response, compressed_headers) 62 elif request.method.lower() == "unsubscribe": 63 return self.render_unsubscribe(request, 64 response, compressed_headers) 65 66 return response.body
67
68 - def render_subscriber(self, request, response, compressed_headers):
69 """ Renders the subscribe message for an event. 70 71 @param request: request object (Cherrypy) 72 @param response: response object (Cherrypy) 73 74 @note: see Cherrypy documentation for further info about request and 75 response attributes and methods. 76 """ 77 log.debug('Receiving subscribe request') 78 request_status = self._validate_subscribe_request(request, 79 compressed_headers) 80 81 if request_status == 200: 82 timeout = int(compressed_headers['timeout'].split("-")[-1]) 83 callback = compressed_headers['callback'][1:-1] 84 log.debug('Subscriber callback: %s' % callback) 85 subscriber = Subscriber(self.service, timeout, 86 callback, request.server_protocol) 87 response_body = self._get_subscribe_response(request, 88 response, subscriber) 89 self.subscribers.append(subscriber) 90 91 eventing_variables = {} 92 for var_name, state_var in self.service.get_variables().items(): 93 eventing_variables[var_name] = state_var.get_value() 94 EventMessage(subscriber, eventing_variables, 1, "") 95 96 # Try to unsubscribe after the timeout 97 t_call = ThreadedCall(self._auto_remove_subscriber, None, None, 98 None, None, timeout * 1.1, 99 "uuid:" + str(subscriber.subscription_id)) 100 reactor.add_after_stop_func(t_call.stop) 101 t_call.start() 102 103 log.debug('Subscribe success') 104 return response_body 105 else: 106 return self._build_error(request_status, request, response)
107
108 - def _validate_subscribe_request(self, request, compressed_headers):
109 log.debug('Validating subscribe request') 110 #TODO: verify if the callback url is a valid one 111 if not 'callback' in compressed_headers: 112 log.error('There is not a callback at the request') 113 return 412 114 115 if (not 'nt' in compressed_headers) or \ 116 (compressed_headers['nt'] != "upnp:event"): 117 return 412 118 119 #TODO: Put the verification of error 5xx 120 121 # No errors 122 return 200
123
124 - def _get_subscribe_response(self, request, response_obj, subscriber):
125 log.debug('Building subscribe response') 126 response_obj.status = 200 127 128 #TODO: date 129 response_obj.headers["DATE"] = '' 130 response_obj.headers["SERVER"] = 'BRisa UPnP Framework' 131 response_obj.headers["SID"] = 'uuid:' + str(subscriber.subscription_id) 132 response_obj.headers["CONTENT-LENGTH"] = '0' 133 response_obj.headers["TIMEOUT"] = 'Second-' \ 134 + str(subscriber.subscription_duration) 135 response_obj.body = [''] 136 137 return response_obj.body
138
139 - def render_renew(self, request, response, compressed_headers):
140 """ Renders the subscribe renew message for an event. 141 142 @param request: request object (Cherrypy) 143 @param response: response object (Cherrypy) 144 145 @note: see Cherrypy documentation for further info about request and 146 response attributes and methods. 147 """ 148 log.debug('Receiving renew request') 149 request_status, subs = self._validate_renew_request(request, 150 compressed_headers) 151 152 if request_status == 200: 153 timeout = compressed_headers['timeout'] 154 subs.subscription_duration = int(timeout.split("-")[-1]) 155 subs.timestamp = datetime.now() 156 157 return self._get_subscribe_response(request, response, subs) 158 else: 159 return self._build_error(request_status, request, response)
160
161 - def _validate_renew_request(self, request, compressed_headers):
162 if 'callback' in compressed_headers or 'nt' in compressed_headers: 163 log.error('Missing callback or nt') 164 return 400, None 165 166 subs = self._find_subscriber(compressed_headers['sid']) 167 if not subs: 168 log.error('Subscriber does not exist') 169 return 412, None 170 171 #TODO: Put the verification of error 5xx 172 173 # No errors 174 return 200, subs
175
176 - def _auto_remove_subscriber(self, sid):
177 subscriber = self._find_subscriber(sid) 178 if not subscriber: 179 #Already unsubscribe 180 return 181 182 time_delta = datetime.now() - subscriber.timestamp 183 if time_delta.seconds > subscriber.subscription_duration: 184 log.debug('Subscriber sid:%s timeout' 185 % str(subscriber.subscription_id)) 186 self._remove_subscriber(subscriber) 187 else: 188 subscriber.timestamp = datetime.now() 189 190 # Try to unsubscribe after the timeout 191 t_call = ThreadedCall(self._auto_remove_subscriber, None, None, 192 None, None, subscriber.subscription_duration * 1.1, 193 sid) 194 reactor.add_after_stop_func(t_call.stop) 195 t_call.start()
196
197 - def render_unsubscribe(self, request, response, compressed_headers):
198 """ Renders the unsubscribe message for an event. 199 200 @param request: request object (Cherrypy) 201 @param response: response object (Cherrypy) 202 203 @note: see Cherrypy documentation for further info about request and 204 response attributes and methods. 205 """ 206 log.debug('Receiving unsubscribe request') 207 request_status, subs = self._validate_unsubscribe_request(request, 208 compressed_headers) 209 210 if request_status == 200: 211 self._remove_subscriber(subs) 212 213 response.status = 200 214 response.body = [""] 215 return response.body 216 else: 217 return self._build_error(request_status, request, response)
218
219 - def _validate_unsubscribe_request(self, request, compressed_headers):
220 221 if not 'sid' in compressed_headers: 222 log.error('Missing sid') 223 return 412, None 224 225 if 'callback' in compressed_headers or 'nt' in compressed_headers: 226 log.error('Missing callback or nt') 227 return 400, None 228 229 subs = self._find_subscriber(compressed_headers['sid']) 230 if not subs: 231 log.error('Subscriber does not exist') 232 return 412, None 233 234 # No errors 235 return 200, subs
236
237 - def _build_error(self, request, response_obj, status):
238 log.error('Building error response') 239 response = soap.build_soap_error(status) 240 241 response_obj.status = 500 242 243 if self.encoding is not None: 244 mime_type = 'text/xml; charset="%s"' % self.encoding 245 else: 246 mime_type = "text/xml" 247 response_obj.headers["Content-type"] = mime_type 248 response_obj.headers["Content-length"] = str(len(response)) 249 response_obj.headers["EXT"] = '' 250 response_obj.body = response 251 return response
252
253 - def _find_subscriber(self, sid):
254 for subscribe in self.subscribers: 255 if str(subscribe.subscription_id) == sid[5:]: 256 return subscribe 257 return None
258
259 - def _remove_subscriber(self, subscriber):
260 subscriber.stop() 261 self.subscribers.remove(subscriber)
262 263
264 -class Subscriber:
265
266 - def __init__(self, service, subscription_duration, delivery_url, http_version):
267 268 self.service = service 269 self.subscription_id = uuid.uuid4() 270 self.delivery_url = delivery_url 271 self.event_key = 0 272 self.subscription_duration = subscription_duration 273 self.http_version = http_version 274 self.timestamp = datetime.now() 275 276 self.eventing_variables = {} 277 for name, state_var in self.service.get_variables().items(): 278 state_var.subscribe_for_update(self._update_variable) 279 280 self.looping_call = LoopingCall(self._send_variables) 281 reactor.add_after_stop_func(self.looping_call.stop) 282 self.looping_call.start(10, False) 283 284 sid = str(self.subscription_id) 285 log.debug('Creating subscriber with subscription id: %s' % sid)
286
287 - def event_key_increment(self):
288 self.event_key += 1 289 if self.event_key > 4294967295: 290 self.event_key = 1
291
292 - def _update_variable(self, name, value):
293 if name in self.eventing_variables.keys() and \ 294 self.eventing_variables[name] != value: 295 296 self._send_variables() 297 self.eventing_variables[name] = value 298 return 299 300 self.eventing_variables[name] = value
301
302 - def _send_variables(self):
303 if self.eventing_variables: 304 EventMessage(self, self.eventing_variables, 0, "") 305 self.eventing_variables = {}
306
307 - def stop(self):
308 for name, state_var in self.service.get_variables().items(): 309 state_var.unsubscribe_for_update(self._update_variable) 310 311 # When called stop() manually, remove the before stop callback 312 reactor.rem_after_stop_func(self.looping_call.stop) 313 self.looping_call.stop()
314 315
316 -class EventMessage:
317 """ Wrapper for an event message. 318 """ 319
320 - def __init__(self, subscriber, variables, event_delay, cargo):
321 """ Constructor for the EventMessage class. 322 323 @param subscriber: subscriber that will receive the message 324 @param variables: variables of the event 325 @param event_delay: delay to wait before sending the event 326 @param cargo: callback parameters 327 328 @type subscriber: Subscriber 329 @type variables: dict 330 @type event_delay: float 331 """ 332 log.debug("event message") 333 334 if not variables: 335 log.error("There are no variables to send") 336 return 337 338 self.cargo = cargo 339 340 headers = {} 341 headers["HOST"] = subscriber.delivery_url 342 headers["CONTENT-TYPE"] = 'text/xml' 343 headers["NT"] = 'upnp:event' 344 headers["NTS"] = 'upnp:propchange' 345 headers["SID"] = "uuid:" + str(subscriber.subscription_id) 346 headers["SEQ"] = str(subscriber.event_key) 347 subscriber.event_key_increment() 348 349 event_body = self._build_message_body(variables) 350 351 headers["CONTENT-LENGTH"] = str(len(event_body)) 352 353 log.debug("Running http call") 354 run_async_call(http_call, success_callback=self.response, 355 error_callback=self.error, delay=event_delay, 356 method='NOTIFY', url=subscriber.delivery_url, 357 body=event_body, headers=headers)
358
359 - def _build_message_body(self, variables):
360 log.debug("Building message body") 361 property_set = ElementTree.Element("e:propertyset") 362 property_set.attrib.update({'xmlns:e': 363 "urn:schemas-upnp-org:event-1-0"}) 364 365 type_map = {str: 'xsd:string', 366 unicode: 'xsd:string', 367 int: 'xsd:int', 368 long: 'xsd:int', 369 float: 'xsd:float', 370 bool: 'xsd:boolean'} 371 372 for var_name, var_value in variables.items(): 373 property = ElementTree.SubElement(property_set, "e:property") 374 375 if var_value == None: 376 var_val = '' 377 else: 378 var_type = type_map[type(var_value)] 379 if var_type == 'xsd:string': 380 var_val = var_value 381 if type(var_value) == unicode: 382 var_val = var_val.encode('utf-8') 383 elif var_type == 'xsd:int' or var_type == 'xsd:float': 384 var_val = str(var_value) 385 elif var_type == 'xsd:boolean': 386 var_val = '1' if var_value else '0' 387 else: 388 #TODO - raise an error 389 log.error("Unknown state variable type") 390 pass 391 392 e = ElementTree.SubElement(property, var_name) 393 e.text = var_val 394 395 preamble = """<?xml version="1.0" encoding="utf-8"?>""" 396 return '%s%s' % (preamble, ElementTree.tostring(property_set, 'utf-8'))
397
398 - def error(self, cargo, error):
399 """ Callback for receiving an error. 400 401 @param cargo: callback parameters passed at construction 402 @param error: exception raised 403 404 @type error: Exception 405 406 @rtype: boolean 407 """ 408 log.debug("error", error) 409 return True
410
411 - def response(self, http_response, cargo):
412 """ Callback for receiving the HTTP response on a successful HTTP call. 413 414 @param http_response: response object 415 @param cargo: callback parameters passed at construction 416 417 @type http_response: HTTPResponse 418 419 @rtype: boolean 420 """ 421 log.debug("response") 422 return True
423