1
2
3
4
5 """ Control Point side service implementation. Classes contained by this module
6 contain methods for performing actions on the service and event notification.
7
8 If you're using the control point high level API with no modifications on the
9 global flags (located on module brisa), then you shouldn't need to create this
10 class manually.
11
12 The framework default response to a device arrival is to build it and its
13 services automatically and forward it to your control point on the
14 "new_device_event" subscribed callback. This callback will receive the device
15 already ready for all actions.
16
17 These device's services are already built and their UPnP actions are already
18 methods of the service object itself. For instance, a content directory service
19 object would already have a Browse() method for you to call directly, passing
20 parameters in the keywords format (key=value) e.g. cds.Browse(ObjectId=5, ...).
21 Also note that the keys should be on the UPnP specified format, like the
22 example above we used ObjectId, as it appears on the reference.
23
24 """
25 import brisa
26
27 from brisa.core import log
28 from brisa.core.network import url_fetch, parse_url, http_call
29 from brisa.core.threaded_call import run_async_call
30 from brisa.core.threaded_call import ThreadedCall
31 from brisa.upnp.soap import SOAPProxy
32 from brisa.upnp.base_service import BaseService, BaseStateVariable,\
33 format_rel_url
34 from brisa.upnp.control_point.action import Action, Argument
35 from brisa.upnp.base_service_builder import BaseServiceBuilder
36
37
39
40 - def __init__(self, service, name, send_events, multicast, data_type, values):
43
46
47
49
55
57 return Argument(arg_name, arg_direction, arg_state_var.name)
58
63
68
69
71 if base in url:
72 return False
73 return True
74
75
77
78 - def __init__(self, id, serv_type, url_base, scpd_url,
79 control_url='', event_url='', presentation_url='',
80 build_async=False, async_cb=None, build=False):
81 """
82 @param async_cb: callback in the form async_cb(success). Receives only
83 one parameter (success) that tells if the the service
84 has been successfully built or not.
85 """
86 BaseService.__init__(self, id, serv_type, url_base)
87 self.event_sid = ''
88 self.event_timeout = 0
89 self.scpd_url = scpd_url
90 self.control_url = control_url
91 self.event_sub_url = event_url
92 self.presentation_url = presentation_url
93 self._auto_renew_subs = None
94 self._soap_service = None
95
96 if not brisa.__skip_soap_service__:
97 self.generate_soap_service()
98
99 if brisa.__skip_service_xml__:
100 return
101
102 if build_async:
103 assert async_cb != None, 'you won\'t be notified when the '\
104 'service build finished.'
105 self._build_async(async_cb)
106 elif build:
107 self._build_sync()
108
110 """ Builds the Service synchronously.
111 """
112 if is_relative(self.scpd_url, self.url_base):
113 url = '%s%s' % (self.url_base, self.scpd_url)
114 else:
115 url = self.scpd_url
116 fd = url_fetch(url)
117 if not fd:
118 log.debug('Could not fetch SCPD URL %s' % self.scpd_url)
119 raise RuntimeError('Could not build Service %s', self)
120 ServiceBuilder(self, fd).build()
121
123 """ Builds the service asynchronously. Forwards True to the specified
124 callback 'cb' if the service was successfully built (otherwise,
125 forwards False).
126 """
127 if is_relative(self.scpd_url, self.url_base):
128 path = '%s%s' % (self.url_base, self.scpd_url)
129 else:
130 path = self.scpd_url
131 run_async_call(url_fetch,
132 success_callback=self._fetch_scpd_async_done,
133 error_callback=self._fetch_scpd_async_error,
134 delay=0, url=path, success_callback_cargo=cb,
135 error_callback_cargo=cb)
136
138 """ Called when the SCPD XML was sucessfully fetched. If so, build the
139 service by parsing the description.
140 """
141 if fd:
142 parsed_ok = ServiceBuilder(self, fd).build()
143 if cb:
144 cb(parsed_ok)
145
147 """ Called when the SCPD XML wasn't successfully fetched.
148 """
149 log.debug('Failed to fetch SCPD for service %s' % self.id)
150 if cb:
151 cb(False)
152
154 namespace = ('u', self.service_type)
155 ctrl_url = ''
156
157 if self.url_base in self.control_url:
158 ctrl_url = self.control_url
159 else:
160 ctrl_url = '%s%s' % (self.url_base,
161 format_rel_url(self.control_url))
162
163 self._soap_service = SOAPProxy(ctrl_url, namespace)
164
166 """ Subscribes for events on a specific variable (unicast eventing)
167 with a notifier callback.
168
169 @param var_name: variable name to subscribe on
170 @param callback: callback to receive notifications
171
172 @type var_name: string
173 @type callback: callable
174 """
175 if var_name in self._state_variables:
176 self._state_variables[var_name].subscribe(callback)
177
179 """ Returns a state variable of the service, if exists.
180
181 @param var_name: name of the state variable
182 @type var_name: string
183
184 @return: matching state variable or None
185 @rtype: StateVariable
186 """
187 return self._state_variables.get(var_name, None)
188
189 - def event_subscribe(self, event_host, callback, cargo, auto_renew=True,
190 renew_callback=None):
191 """ Subscribes for events.
192
193 @param event_host: 2-tuple (host, port) with the event listener server.
194 @param callback: callback
195 @param cargo: callback parameters
196 @param auto_renew: if True, the framework will automatically renew the
197 subscription before it expires. If False, the program need to call
198 event_renew method before the subscription timeout.
199 @param renew_callback: renew callback. It will be used when auto_renew
200 is True
201
202 @type event_host: tuple
203 @type callback: callable
204 @type auto_renew: boolean
205 @type renew_callback: callable
206 """
207 if auto_renew:
208 self._auto_renew_subs = AutoRenew(self, event_host,
209 renew_callback, cargo)
210 SubscribeRequest(self, event_host, callback, cargo)
211
213 """ Unsubscribes for events.
214
215 @param event_host: 2-tuple (host, port) with the event listener server.
216 @param callback: callback
217 @param cargo: callback parameters
218
219 @type event_host: tuple
220 @type callback: callable
221 """
222 if not self.event_sid:
223
224 return
225 UnsubscribeRequest(self, event_host, callback, cargo)
226
228 """ Renew subscription for events.
229
230 @param event_host: 2-tuple (host, port) with the event listener server.
231 @param callback: callback
232 @param cargo: callback parameters
233
234 @type event_host: tuple
235 @type callback: callable
236 """
237 if not self._auto_renew_subs:
238 RenewSubscribeRequest(self, event_host, callback, cargo)
239
241 log.debug('Receiving state variables notify')
242 for name, value in changed_vars.items():
243 self._state_variables[name].update(value)
244
245
247 """ Wrapper for an event subscription.
248 """
249
250 - def __init__(self, service, event_host, callback, cargo):
251 """ Constructor for the SubscribeRequest class.
252
253 @param service: service that is subscribing
254 @param event_host: 2-tuple (host, port) of the event listener server
255 @param callback: callback
256 @param cargo: callback parameters
257
258 @type service: Service
259 @type event_host: tuple
260 @type callback: callable
261 """
262 log.debug("subscribe")
263 self.callback = callback
264 self.cargo = cargo
265 self.service = service
266
267 addr = "%s%s" % (service.url_base, service.event_sub_url)
268 Paddr = parse_url(addr)
269
270 headers = {}
271 headers["Host"] = Paddr.hostname
272 headers["User-agent"] = 'BRisa UPnP Framework'
273 headers["TIMEOUT"] = 'Second-300'
274 headers["NT"] = 'upnp:event'
275 headers["CALLBACK"] = "<http://%s:%d/eventSub>" % event_host
276 headers["HOST"] = '%s:%d' % (Paddr.hostname, Paddr.port)
277
278 run_async_call(http_call, success_callback=self.response,
279 error_callback=self.error, delay=0,
280 method='SUBSCRIBE', url=addr,
281 headers=headers)
282
283 - def error(self, cargo, error):
284 """ Callback for receiving an error.
285
286 @param cargo: callback parameters passed at construction
287 @param error: exception raised
288
289 @type error: Exception
290
291 @rtype: boolean
292 """
293 log.debug("error %s", error)
294 self.service.event_sid = ""
295 self.service.event_timeout = 0
296 if self.callback:
297 self.callback(self.cargo, "", 0)
298 return True
299
300 - def response(self, http_response, cargo):
301 """ Callback for receiving the HTTP response on a successful HTTP call.
302
303 @param http_response: response object
304 @param cargo: callback parameters passed at construction
305
306 @type http_response: HTTPResponse
307
308 @rtype: boolean
309 """
310 log.debug("response")
311 compressed_headers = {}
312 sid = None
313 for k, v in dict(http_response.getheaders()).items():
314 if not v:
315 v = ""
316 compressed_headers[k.lower()] = v.lower().strip()
317 if 'sid' in compressed_headers:
318 sid = compressed_headers['sid']
319 timeout = 300
320 if 'timeout' in compressed_headers:
321 stimeout = compressed_headers['timeout']
322 if stimeout[0:7] == "second-":
323 try:
324 timeout = int(stimeout[7:])
325 except ValueError:
326 pass
327 self.service.event_sid = sid
328 self.service.event_timeout = timeout
329 if self.service._auto_renew_subs and sid:
330 self.service._auto_renew_subs.start_auto_renew()
331 if self.callback and sid:
332 self.callback(self.cargo, sid, timeout)
333
334 return True
335
336
338 """ Wrapper for an event unsubscription.
339 """
340
341 - def __init__(self, service, event_host, callback, cargo):
342 """ Constructor for the UnsubscribeRequest class.
343
344 @param service: service that is unsubscribing
345 @param event_host: 2-tuple (host, port) of the event listener server
346 @param callback: callback
347 @param cargo: callback parameters
348
349 @type service: Service
350 @type event_host: tuple
351 @type callback: callable
352 """
353 self.old_sid = service.event_sid
354 service.event_sid = ""
355 service.event_timeout = 0
356
357 self.callback = callback
358 self.cargo = cargo
359 self.service = service
360
361 addr = "%s%s" % (service.url_base, service.event_sub_url)
362 Paddr = parse_url(addr)
363
364 headers = {}
365 headers["Host"] = Paddr.hostname
366 headers["User-agent"] = 'BRISA-CP'
367 headers["HOST"] = '%s:%d' % (Paddr.hostname, Paddr.port)
368 headers["SID"] = self.old_sid
369
370 run_async_call(http_call, success_callback=self.response,
371 error_callback=self.error, delay=0,
372 method='UNSUBSCRIBE', url=addr, headers=headers)
373
374 - def error(self, cargo, error):
375 """ Callback for receiving an error.
376
377 @param cargo: callback parameters passed at construction
378 @param error: exception raised
379
380 @type error: Exception
381
382 @rtype: boolean
383 """
384 if self.callback:
385 self.callback(self.cargo, "")
386 return True
387
389 """ Callback for receiving the HTTP response on a successful HTTP call.
390
391 @param data: response object
392 @param cargo: callback parameters passed at construction
393
394 @type data: HTTPResponse
395
396 @rtype: boolean
397 """
398 if self.callback:
399 self.callback(self.cargo, self.old_sid)
400 return True
401
402
404 """ Wrapper for renew an event subscription.
405 """
406
407 - def __init__(self, service, event_host, callback, cargo):
408 """ Constructor for the RenewSubscribeRequest class.
409
410 @param service: service that is renewing the subscribe
411 @param event_host: 2-tuple (host, port) of the event listener server
412 @param callback: callback
413 @param cargo: callback parameters
414
415 @type service: Service
416 @type event_host: tuple
417 @type callback: callable
418 """
419 log.debug("renew subscribe")
420
421 if not service.event_sid or service.event_sid == "":
422 return
423
424 self.callback = callback
425 self.cargo = cargo
426 self.service = service
427
428 addr = "%s%s" % (service.url_base, service.event_sub_url)
429 Paddr = parse_url(addr)
430
431 headers = {}
432 headers["HOST"] = '%s:%d' % (Paddr.hostname, Paddr.port)
433 headers["SID"] = self.service.event_sid
434 headers["TIMEOUT"] = 'Second-300'
435
436 run_async_call(http_call, success_callback=self.response,
437 error_callback=self.error, delay=0,
438 method='SUBSCRIBE', url=addr,
439 headers=headers)
440
441 - def error(self, cargo, error):
442 """ Callback for receiving an error.
443
444 @param cargo: callback parameters passed at construction
445 @param error: exception raised
446
447 @type error: Exception
448
449 @rtype: boolean
450 """
451 log.debug("error", error)
452 self.service.event_sid = ""
453 self.service.event_timeout = 0
454 if self.callback:
455 self.callback(self.cargo, "", 0)
456 return True
457
458 - def response(self, http_response, cargo):
459 """ Callback for receiving the HTTP response on a successful HTTP call.
460
461 @param http_response: response object
462 @param cargo: callback parameters passed at construction
463
464 @type http_response: HTTPResponse
465
466 @rtype: boolean
467 """
468 log.debug("response")
469 compressed_headers = {}
470 for k, v in dict(http_response.getheaders()).items():
471 if not v:
472 v = ""
473 compressed_headers[k.lower()] = v.lower().strip()
474 if 'sid' in compressed_headers:
475 sid = compressed_headers['sid']
476 timeout = 300
477 if 'timeout' in compressed_headers:
478 stimeout = compressed_headers['timeout']
479 if stimeout[0:7] == "second-":
480 try:
481 timeout = int(stimeout[7:])
482 except ValueError:
483 pass
484 self.service.event_sid = sid
485 self.service.event_timeout = timeout
486 if self.callback and sid:
487 self.callback(self.cargo, sid, timeout)
488
489 return True
490
491
493
494 - def __init__(self, service, event_host, callback, cargo):
495 self.event_host = event_host
496 self.callback = callback
497 self.cargo = cargo
498 self.service = service
499
502
504 renew_delay = int(self.service.event_timeout) - 10
505 if renew_delay <= 0:
506 renew_delay = int(self.service.event_timeout) - 0.5
507 t_call = ThreadedCall(self._renew, delay=renew_delay)
508 t_call.start()
509
513
515 if timeout != 0:
516 self._auto_renew()
517 if self.callback:
518 self.callback(cargo, sid, timeout)
519