Package brisa :: Package upnp :: Package control_point :: Module service
[hide private]
[frames] | no frames]

Source Code for Module brisa.upnp.control_point.service

  1  # Licensed under the MIT license 
  2  # http://opensource.org/licenses/mit-license.php or see LICENSE file. 
  3  # Copyright 2007-2008 Brisa Team <brisa-develop@garage.maemo.org> 
  4   
  5  """ Control Point side service implementation. Classes contained by this module 
  6  contain methods for performing actions on the service and event notification. 
  7   
  8  If you're using the control point high level API with no modifications on the 
  9  global flags (located on module brisa), then you shouldn't need to create this 
 10  class manually. 
 11   
 12  The framework default response to a device arrival is to build it and its 
 13  services automatically and forward it to your control point on the 
 14  "new_device_event" subscribed callback. This callback will receive the device 
 15  already ready for all actions. 
 16   
 17  These device's services are already built and their UPnP actions are already 
 18  methods of the service object itself. For instance, a content directory service 
 19  object would already have a Browse() method for you to call directly, passing 
 20  parameters in the keywords format (key=value) e.g. cds.Browse(ObjectId=5, ...). 
 21  Also note that the keys should be on the UPnP specified format, like the 
 22  example above we used ObjectId, as it appears on the reference. 
 23   
 24  """ 
 25  import brisa 
 26   
 27  from brisa.core import log 
 28  from brisa.core.network import url_fetch, parse_url, http_call 
 29  from brisa.core.threaded_call import run_async_call 
 30  from brisa.core.threaded_call import ThreadedCall 
 31  from brisa.upnp.soap import SOAPProxy 
 32  from brisa.upnp.base_service import BaseService, BaseStateVariable,\ 
 33                                      format_rel_url 
 34  from brisa.upnp.control_point.action import Action, Argument 
 35  from brisa.upnp.base_service_builder import BaseServiceBuilder 
 36   
 37   
38 -class StateVariable(BaseStateVariable):
39
40 - def __init__(self, service, name, send_events, multicast, data_type, values):
41 BaseStateVariable.__init__(self, service, name, send_events, 42 multicast, data_type, values)
43
44 - def subscribe(self, callback):
46 47
48 -class ServiceBuilder(BaseServiceBuilder):
49
50 - def build(self):
51 try: 52 return BaseServiceBuilder.build(self) 53 except: 54 return False
55
56 - def _create_argument(self, arg_name, arg_direction, arg_state_var):
57 return Argument(arg_name, arg_direction, arg_state_var.name)
58
59 - def _create_action(self, name, args):
60 action = Action(self.service, name, args) 61 setattr(self.service, name, action) 62 return action
63
64 - def _create_state_var(self, name, send_events, multicast, 65 data_type, values):
66 return StateVariable(self.service, name, send_events, 67 multicast, data_type, values)
68 69
70 -def is_relative(url, base):
71 if base in url: 72 return False 73 return True
74 75
76 -class Service(BaseService):
77
78 - def __init__(self, id, serv_type, url_base, scpd_url, 79 control_url='', event_url='', presentation_url='', 80 build_async=False, async_cb=None, build=False):
81 """ 82 @param async_cb: callback in the form async_cb(success). Receives only 83 one parameter (success) that tells if the the service 84 has been successfully built or not. 85 """ 86 BaseService.__init__(self, id, serv_type, url_base) 87 self.event_sid = '' 88 self.event_timeout = 0 89 self.scpd_url = scpd_url 90 self.control_url = control_url 91 self.event_sub_url = event_url 92 self.presentation_url = presentation_url 93 self._auto_renew_subs = None 94 self._soap_service = None 95 96 if not brisa.__skip_soap_service__: 97 self.generate_soap_service() 98 99 if brisa.__skip_service_xml__: 100 return 101 102 if build_async: 103 assert async_cb != None, 'you won\'t be notified when the '\ 104 'service build finished.' 105 self._build_async(async_cb) 106 elif build: 107 self._build_sync()
108
109 - def _build_sync(self):
110 """ Builds the Service synchronously. 111 """ 112 if is_relative(self.scpd_url, self.url_base): 113 url = '%s%s' % (self.url_base, self.scpd_url) 114 else: 115 url = self.scpd_url 116 fd = url_fetch(url) 117 if not fd: 118 log.debug('Could not fetch SCPD URL %s' % self.scpd_url) 119 raise RuntimeError('Could not build Service %s', self) 120 ServiceBuilder(self, fd).build()
121
122 - def _build_async(self, cb):
123 """ Builds the service asynchronously. Forwards True to the specified 124 callback 'cb' if the service was successfully built (otherwise, 125 forwards False). 126 """ 127 if is_relative(self.scpd_url, self.url_base): 128 path = '%s%s' % (self.url_base, self.scpd_url) 129 else: 130 path = self.scpd_url 131 run_async_call(url_fetch, 132 success_callback=self._fetch_scpd_async_done, 133 error_callback=self._fetch_scpd_async_error, 134 delay=0, url=path, success_callback_cargo=cb, 135 error_callback_cargo=cb)
136
137 - def _fetch_scpd_async_done(self, fd=None, cb=None):
138 """ Called when the SCPD XML was sucessfully fetched. If so, build the 139 service by parsing the description. 140 """ 141 if fd: 142 parsed_ok = ServiceBuilder(self, fd).build() 143 if cb: 144 cb(parsed_ok)
145
146 - def _fetch_scpd_async_error(self, cb=None, error=None):
147 """ Called when the SCPD XML wasn't successfully fetched. 148 """ 149 log.debug('Failed to fetch SCPD for service %s' % self.id) 150 if cb: 151 cb(False)
152
153 - def generate_soap_service(self):
154 namespace = ('u', self.service_type) 155 ctrl_url = '' 156 157 if self.url_base in self.control_url: 158 ctrl_url = self.control_url 159 else: 160 ctrl_url = '%s%s' % (self.url_base, 161 format_rel_url(self.control_url)) 162 163 self._soap_service = SOAPProxy(ctrl_url, namespace)
164
165 - def subscribe_for_variable(self, var_name, callback):
166 """ Subscribes for events on a specific variable (unicast eventing) 167 with a notifier callback. 168 169 @param var_name: variable name to subscribe on 170 @param callback: callback to receive notifications 171 172 @type var_name: string 173 @type callback: callable 174 """ 175 if var_name in self._state_variables: 176 self._state_variables[var_name].subscribe(callback)
177
178 - def get_state_variable(self, var_name):
179 """ Returns a state variable of the service, if exists. 180 181 @param var_name: name of the state variable 182 @type var_name: string 183 184 @return: matching state variable or None 185 @rtype: StateVariable 186 """ 187 return self._state_variables.get(var_name, None)
188
189 - def event_subscribe(self, event_host, callback, cargo, auto_renew=True, 190 renew_callback=None):
191 """ Subscribes for events. 192 193 @param event_host: 2-tuple (host, port) with the event listener server. 194 @param callback: callback 195 @param cargo: callback parameters 196 @param auto_renew: if True, the framework will automatically renew the 197 subscription before it expires. If False, the program need to call 198 event_renew method before the subscription timeout. 199 @param renew_callback: renew callback. It will be used when auto_renew 200 is True 201 202 @type event_host: tuple 203 @type callback: callable 204 @type auto_renew: boolean 205 @type renew_callback: callable 206 """ 207 if auto_renew: 208 self._auto_renew_subs = AutoRenew(self, event_host, 209 renew_callback, cargo) 210 SubscribeRequest(self, event_host, callback, cargo)
211
212 - def event_unsubscribe(self, event_host, callback, cargo):
213 """ Unsubscribes for events. 214 215 @param event_host: 2-tuple (host, port) with the event listener server. 216 @param callback: callback 217 @param cargo: callback parameters 218 219 @type event_host: tuple 220 @type callback: callable 221 """ 222 if not self.event_sid: 223 # not registered 224 return 225 UnsubscribeRequest(self, event_host, callback, cargo)
226
227 - def event_renew(self, event_host, callback, cargo):
228 """ Renew subscription for events. 229 230 @param event_host: 2-tuple (host, port) with the event listener server. 231 @param callback: callback 232 @param cargo: callback parameters 233 234 @type event_host: tuple 235 @type callback: callable 236 """ 237 if not self._auto_renew_subs: 238 RenewSubscribeRequest(self, event_host, callback, cargo)
239
240 - def _on_event(self, changed_vars):
241 log.debug('Receiving state variables notify') 242 for name, value in changed_vars.items(): 243 self._state_variables[name].update(value)
244 245
246 -class SubscribeRequest(object):
247 """ Wrapper for an event subscription. 248 """ 249
250 - def __init__(self, service, event_host, callback, cargo):
251 """ Constructor for the SubscribeRequest class. 252 253 @param service: service that is subscribing 254 @param event_host: 2-tuple (host, port) of the event listener server 255 @param callback: callback 256 @param cargo: callback parameters 257 258 @type service: Service 259 @type event_host: tuple 260 @type callback: callable 261 """ 262 log.debug("subscribe") 263 self.callback = callback 264 self.cargo = cargo 265 self.service = service 266 267 addr = "%s%s" % (service.url_base, service.event_sub_url) 268 Paddr = parse_url(addr) 269 270 headers = {} 271 headers["Host"] = Paddr.hostname 272 headers["User-agent"] = 'BRisa UPnP Framework' 273 headers["TIMEOUT"] = 'Second-300' 274 headers["NT"] = 'upnp:event' 275 headers["CALLBACK"] = "<http://%s:%d/eventSub>" % event_host 276 headers["HOST"] = '%s:%d' % (Paddr.hostname, Paddr.port) 277 278 run_async_call(http_call, success_callback=self.response, 279 error_callback=self.error, delay=0, 280 method='SUBSCRIBE', url=addr, 281 headers=headers)
282
283 - def error(self, cargo, error):
284 """ Callback for receiving an error. 285 286 @param cargo: callback parameters passed at construction 287 @param error: exception raised 288 289 @type error: Exception 290 291 @rtype: boolean 292 """ 293 log.debug("error %s", error) 294 self.service.event_sid = "" 295 self.service.event_timeout = 0 296 if self.callback: 297 self.callback(self.cargo, "", 0) 298 return True
299
300 - def response(self, http_response, cargo):
301 """ Callback for receiving the HTTP response on a successful HTTP call. 302 303 @param http_response: response object 304 @param cargo: callback parameters passed at construction 305 306 @type http_response: HTTPResponse 307 308 @rtype: boolean 309 """ 310 log.debug("response") 311 compressed_headers = {} 312 sid = None 313 for k, v in dict(http_response.getheaders()).items(): 314 if not v: 315 v = "" 316 compressed_headers[k.lower()] = v.lower().strip() 317 if 'sid' in compressed_headers: 318 sid = compressed_headers['sid'] 319 timeout = 300 320 if 'timeout' in compressed_headers: 321 stimeout = compressed_headers['timeout'] 322 if stimeout[0:7] == "second-": 323 try: 324 timeout = int(stimeout[7:]) 325 except ValueError: 326 pass 327 self.service.event_sid = sid 328 self.service.event_timeout = timeout 329 if self.service._auto_renew_subs and sid: 330 self.service._auto_renew_subs.start_auto_renew() 331 if self.callback and sid: 332 self.callback(self.cargo, sid, timeout) 333 334 return True
335 336
337 -class UnsubscribeRequest(object):
338 """ Wrapper for an event unsubscription. 339 """ 340
341 - def __init__(self, service, event_host, callback, cargo):
342 """ Constructor for the UnsubscribeRequest class. 343 344 @param service: service that is unsubscribing 345 @param event_host: 2-tuple (host, port) of the event listener server 346 @param callback: callback 347 @param cargo: callback parameters 348 349 @type service: Service 350 @type event_host: tuple 351 @type callback: callable 352 """ 353 self.old_sid = service.event_sid 354 service.event_sid = "" 355 service.event_timeout = 0 356 357 self.callback = callback 358 self.cargo = cargo 359 self.service = service 360 361 addr = "%s%s" % (service.url_base, service.event_sub_url) 362 Paddr = parse_url(addr) 363 364 headers = {} 365 headers["Host"] = Paddr.hostname 366 headers["User-agent"] = 'BRISA-CP' 367 headers["HOST"] = '%s:%d' % (Paddr.hostname, Paddr.port) 368 headers["SID"] = self.old_sid 369 370 run_async_call(http_call, success_callback=self.response, 371 error_callback=self.error, delay=0, 372 method='UNSUBSCRIBE', url=addr, headers=headers)
373
374 - def error(self, cargo, error):
375 """ Callback for receiving an error. 376 377 @param cargo: callback parameters passed at construction 378 @param error: exception raised 379 380 @type error: Exception 381 382 @rtype: boolean 383 """ 384 if self.callback: 385 self.callback(self.cargo, "") 386 return True
387
388 - def response(self, data, cargo):
389 """ Callback for receiving the HTTP response on a successful HTTP call. 390 391 @param data: response object 392 @param cargo: callback parameters passed at construction 393 394 @type data: HTTPResponse 395 396 @rtype: boolean 397 """ 398 if self.callback: 399 self.callback(self.cargo, self.old_sid) 400 return True
401 402
403 -class RenewSubscribeRequest(object):
404 """ Wrapper for renew an event subscription. 405 """ 406
407 - def __init__(self, service, event_host, callback, cargo):
408 """ Constructor for the RenewSubscribeRequest class. 409 410 @param service: service that is renewing the subscribe 411 @param event_host: 2-tuple (host, port) of the event listener server 412 @param callback: callback 413 @param cargo: callback parameters 414 415 @type service: Service 416 @type event_host: tuple 417 @type callback: callable 418 """ 419 log.debug("renew subscribe") 420 421 if not service.event_sid or service.event_sid == "": 422 return 423 424 self.callback = callback 425 self.cargo = cargo 426 self.service = service 427 428 addr = "%s%s" % (service.url_base, service.event_sub_url) 429 Paddr = parse_url(addr) 430 431 headers = {} 432 headers["HOST"] = '%s:%d' % (Paddr.hostname, Paddr.port) 433 headers["SID"] = self.service.event_sid 434 headers["TIMEOUT"] = 'Second-300' 435 436 run_async_call(http_call, success_callback=self.response, 437 error_callback=self.error, delay=0, 438 method='SUBSCRIBE', url=addr, 439 headers=headers)
440
441 - def error(self, cargo, error):
442 """ Callback for receiving an error. 443 444 @param cargo: callback parameters passed at construction 445 @param error: exception raised 446 447 @type error: Exception 448 449 @rtype: boolean 450 """ 451 log.debug("error", error) 452 self.service.event_sid = "" 453 self.service.event_timeout = 0 454 if self.callback: 455 self.callback(self.cargo, "", 0) 456 return True
457
458 - def response(self, http_response, cargo):
459 """ Callback for receiving the HTTP response on a successful HTTP call. 460 461 @param http_response: response object 462 @param cargo: callback parameters passed at construction 463 464 @type http_response: HTTPResponse 465 466 @rtype: boolean 467 """ 468 log.debug("response") 469 compressed_headers = {} 470 for k, v in dict(http_response.getheaders()).items(): 471 if not v: 472 v = "" 473 compressed_headers[k.lower()] = v.lower().strip() 474 if 'sid' in compressed_headers: 475 sid = compressed_headers['sid'] 476 timeout = 300 477 if 'timeout' in compressed_headers: 478 stimeout = compressed_headers['timeout'] 479 if stimeout[0:7] == "second-": 480 try: 481 timeout = int(stimeout[7:]) 482 except ValueError: 483 pass 484 self.service.event_sid = sid 485 self.service.event_timeout = timeout 486 if self.callback and sid: 487 self.callback(self.cargo, sid, timeout) 488 489 return True
490 491
492 -class AutoRenew(object):
493
494 - def __init__(self, service, event_host, callback, cargo):
495 self.event_host = event_host 496 self.callback = callback 497 self.cargo = cargo 498 self.service = service
499
500 - def start_auto_renew(self):
501 self._auto_renew()
502
503 - def _auto_renew(self):
504 renew_delay = int(self.service.event_timeout) - 10 505 if renew_delay <= 0: 506 renew_delay = int(self.service.event_timeout) - 0.5 507 t_call = ThreadedCall(self._renew, delay=renew_delay) 508 t_call.start()
509
510 - def _renew(self):
511 RenewSubscribeRequest(self.service, self.event_host, 512 self._renew_callback, self.cargo)
513
514 - def _renew_callback(self, cargo, sid, timeout):
515 if timeout != 0: 516 self._auto_renew() 517 if self.callback: 518 self.callback(cargo, sid, timeout)
519