1
2
3
4
5
6 """ Device-side event related classes.
7 """
8 import uuid
9
10 from xml.etree import ElementTree
11 from datetime import datetime
12
13 from brisa.core import log, reactor, webserver
14 log = log.getLogger('device-events')
15
16 from brisa.core.network import http_call
17 from brisa.core.threaded_call import run_async_call, ThreadedCall
18 from brisa.utils.looping_call import LoopingCall
19 from brisa.upnp import soap
20
21 import brisa
22 if not brisa.__enable_events_logging__:
23 log.disabled = 1
24
25
27 """ Listen for event subscribe and unsubscribe at device services.
28 It also manager the list of the control points interested at service
29 eventing.
30 """
31
37
38 - def render(self, uri, request, response):
39 """ Event renderer method.
40
41 @param uri: URI of the request
42 @param request: request object (Cherrypy)
43 @param response: response object (Cherrypy)
44
45 @type uri: string
46
47 @note: see Cherrypy documentation for further info about request and
48 response attributes and methods.
49 """
50 compressed_headers = {}
51 for k, v in request.headers.items():
52 if not v:
53 v = ""
54 compressed_headers[k.lower()] = v.strip()
55
56 if request.method.lower() == "subscribe":
57 if not 'sid' in compressed_headers:
58 return self.render_subscriber(request,
59 response, compressed_headers)
60 else:
61 return self.render_renew(request, response, compressed_headers)
62 elif request.method.lower() == "unsubscribe":
63 return self.render_unsubscribe(request,
64 response, compressed_headers)
65
66 return response.body
67
69 """ Renders the subscribe message for an event.
70
71 @param request: request object (Cherrypy)
72 @param response: response object (Cherrypy)
73
74 @note: see Cherrypy documentation for further info about request and
75 response attributes and methods.
76 """
77 log.debug('Receiving subscribe request')
78 request_status = self._validate_subscribe_request(request,
79 compressed_headers)
80
81 if request_status == 200:
82 timeout = int(compressed_headers['timeout'].split("-")[-1])
83 callback = compressed_headers['callback'][1:-1]
84 log.debug('Subscriber callback: %s' % callback)
85 subscriber = Subscriber(self.service, timeout,
86 callback, request.server_protocol)
87 response_body = self._get_subscribe_response(request,
88 response, subscriber)
89 self.subscribers.append(subscriber)
90
91 eventing_variables = {}
92 for var_name, state_var in self.service.get_variables().items():
93 eventing_variables[var_name] = state_var.get_value()
94 EventMessage(subscriber, eventing_variables, 1, "")
95
96
97 t_call = ThreadedCall(self._auto_remove_subscriber, None, None,
98 None, None, timeout * 1.1,
99 "uuid:" + str(subscriber.subscription_id))
100 reactor.add_after_stop_func(t_call.stop)
101 t_call.start()
102
103 log.debug('Subscribe success')
104 return response_body
105 else:
106 return self._build_error(request_status, request, response)
107
109 log.debug('Validating subscribe request')
110
111 if not 'callback' in compressed_headers:
112 log.error('There is not a callback at the request')
113 return 412
114
115 if (not 'nt' in compressed_headers) or \
116 (compressed_headers['nt'] != "upnp:event"):
117 return 412
118
119
120
121
122 return 200
123
125 log.debug('Building subscribe response')
126 response_obj.status = 200
127
128
129 response_obj.headers["DATE"] = ''
130 response_obj.headers["SERVER"] = 'BRisa UPnP Framework'
131 response_obj.headers["SID"] = 'uuid:' + str(subscriber.subscription_id)
132 response_obj.headers["CONTENT-LENGTH"] = '0'
133 response_obj.headers["TIMEOUT"] = 'Second-' \
134 + str(subscriber.subscription_duration)
135 response_obj.body = ['']
136
137 return response_obj.body
138
139 - def render_renew(self, request, response, compressed_headers):
140 """ Renders the subscribe renew message for an event.
141
142 @param request: request object (Cherrypy)
143 @param response: response object (Cherrypy)
144
145 @note: see Cherrypy documentation for further info about request and
146 response attributes and methods.
147 """
148 log.debug('Receiving renew request')
149 request_status, subs = self._validate_renew_request(request,
150 compressed_headers)
151
152 if request_status == 200:
153 timeout = compressed_headers['timeout']
154 subs.subscription_duration = int(timeout.split("-")[-1])
155 subs.timestamp = datetime.now()
156
157 return self._get_subscribe_response(request, response, subs)
158 else:
159 return self._build_error(request_status, request, response)
160
162 if 'callback' in compressed_headers or 'nt' in compressed_headers:
163 log.error('Missing callback or nt')
164 return 400, None
165
166 subs = self._find_subscriber(compressed_headers['sid'])
167 if not subs:
168 log.error('Subscriber does not exist')
169 return 412, None
170
171
172
173
174 return 200, subs
175
177 subscriber = self._find_subscriber(sid)
178 if not subscriber:
179
180 return
181
182 time_delta = datetime.now() - subscriber.timestamp
183 if time_delta.seconds > subscriber.subscription_duration:
184 log.debug('Subscriber sid:%s timeout'
185 % str(subscriber.subscription_id))
186 self._remove_subscriber(subscriber)
187 else:
188 subscriber.timestamp = datetime.now()
189
190
191 t_call = ThreadedCall(self._auto_remove_subscriber, None, None,
192 None, None, subscriber.subscription_duration * 1.1,
193 sid)
194 reactor.add_after_stop_func(t_call.stop)
195 t_call.start()
196
198 """ Renders the unsubscribe message for an event.
199
200 @param request: request object (Cherrypy)
201 @param response: response object (Cherrypy)
202
203 @note: see Cherrypy documentation for further info about request and
204 response attributes and methods.
205 """
206 log.debug('Receiving unsubscribe request')
207 request_status, subs = self._validate_unsubscribe_request(request,
208 compressed_headers)
209
210 if request_status == 200:
211 self._remove_subscriber(subs)
212
213 response.status = 200
214 response.body = [""]
215 return response.body
216 else:
217 return self._build_error(request_status, request, response)
218
220
221 if not 'sid' in compressed_headers:
222 log.error('Missing sid')
223 return 412, None
224
225 if 'callback' in compressed_headers or 'nt' in compressed_headers:
226 log.error('Missing callback or nt')
227 return 400, None
228
229 subs = self._find_subscriber(compressed_headers['sid'])
230 if not subs:
231 log.error('Subscriber does not exist')
232 return 412, None
233
234
235 return 200, subs
236
238 log.error('Building error response')
239 response = soap.build_soap_error(status)
240
241 response_obj.status = 500
242
243 if self.encoding is not None:
244 mime_type = 'text/xml; charset="%s"' % self.encoding
245 else:
246 mime_type = "text/xml"
247 response_obj.headers["Content-type"] = mime_type
248 response_obj.headers["Content-length"] = str(len(response))
249 response_obj.headers["EXT"] = ''
250 response_obj.body = response
251 return response
252
258
260 subscriber.stop()
261 self.subscribers.remove(subscriber)
262
263
265
266 - def __init__(self, service, subscription_duration, delivery_url, http_version):
286
288 self.event_key += 1
289 if self.event_key > 4294967295:
290 self.event_key = 1
291
293 if name in self.eventing_variables.keys() and \
294 self.eventing_variables[name] != value:
295
296 self._send_variables()
297 self.eventing_variables[name] = value
298 return
299
300 self.eventing_variables[name] = value
301
303 if self.eventing_variables:
304 EventMessage(self, self.eventing_variables, 0, "")
305 self.eventing_variables = {}
306
314
315
317 """ Wrapper for an event message.
318 """
319
320 - def __init__(self, subscriber, variables, event_delay, cargo):
321 """ Constructor for the EventMessage class.
322
323 @param subscriber: subscriber that will receive the message
324 @param variables: variables of the event
325 @param event_delay: delay to wait before sending the event
326 @param cargo: callback parameters
327
328 @type subscriber: Subscriber
329 @type variables: dict
330 @type event_delay: float
331 """
332 log.debug("event message")
333
334 if not variables:
335 log.error("There are no variables to send")
336 return
337
338 self.cargo = cargo
339
340 headers = {}
341 headers["HOST"] = subscriber.delivery_url
342 headers["CONTENT-TYPE"] = 'text/xml'
343 headers["NT"] = 'upnp:event'
344 headers["NTS"] = 'upnp:propchange'
345 headers["SID"] = "uuid:" + str(subscriber.subscription_id)
346 headers["SEQ"] = str(subscriber.event_key)
347 subscriber.event_key_increment()
348
349 event_body = self._build_message_body(variables)
350
351 headers["CONTENT-LENGTH"] = str(len(event_body))
352
353 log.debug("Running http call")
354 run_async_call(http_call, success_callback=self.response,
355 error_callback=self.error, delay=event_delay,
356 method='NOTIFY', url=subscriber.delivery_url,
357 body=event_body, headers=headers)
358
359 - def _build_message_body(self, variables):
360 log.debug("Building message body")
361 property_set = ElementTree.Element("e:propertyset")
362 property_set.attrib.update({'xmlns:e':
363 "urn:schemas-upnp-org:event-1-0"})
364
365 type_map = {str: 'xsd:string',
366 unicode: 'xsd:string',
367 int: 'xsd:int',
368 long: 'xsd:int',
369 float: 'xsd:float',
370 bool: 'xsd:boolean'}
371
372 for var_name, var_value in variables.items():
373 property = ElementTree.SubElement(property_set, "e:property")
374
375 if var_value == None:
376 var_val = ''
377 else:
378 var_type = type_map[type(var_value)]
379 if var_type == 'xsd:string':
380 var_val = var_value
381 if type(var_value) == unicode:
382 var_val = var_val.encode('utf-8')
383 elif var_type == 'xsd:int' or var_type == 'xsd:float':
384 var_val = str(var_value)
385 elif var_type == 'xsd:boolean':
386 var_val = '1' if var_value else '0'
387 else:
388
389 log.error("Unknown state variable type")
390 pass
391
392 e = ElementTree.SubElement(property, var_name)
393 e.text = var_val
394
395 preamble = """<?xml version="1.0" encoding="utf-8"?>"""
396 return '%s%s' % (preamble, ElementTree.tostring(property_set, 'utf-8'))
397
398 - def error(self, cargo, error):
399 """ Callback for receiving an error.
400
401 @param cargo: callback parameters passed at construction
402 @param error: exception raised
403
404 @type error: Exception
405
406 @rtype: boolean
407 """
408 log.debug("error", error)
409 return True
410
411 - def response(self, http_response, cargo):
412 """ Callback for receiving the HTTP response on a successful HTTP call.
413
414 @param http_response: response object
415 @param cargo: callback parameters passed at construction
416
417 @type http_response: HTTPResponse
418
419 @rtype: boolean
420 """
421 log.debug("response")
422 return True
423