These are some examples to show how simple it is to develop using BRisa.
This is the implementation of a Binary Light Device, which the specifications you can find here.
The first thing you have to do is to choose a reactor. After that do the imports.
from brisa.core.reactors import install_default_reactor
reactor = install_default_reactor()
import os
from brisa.upnp.device import Device, Service
This is everything you need to implement the device. Now, you should implement your Service class doing the following:
class SwitchPower(Service):
def __init__(self):
Service.__init__(self, 'SwitchPower',
'urn:schemas-upnp-org:service:SwitchPower:1',
'',
os.getcwd() + '/SwitchPower-scpd.xml')
self.target = False
self.status = False
def soap_SetTarget(self, *args, **kwargs):
self.target = kwargs['newTargetValue']
print 'Light switched ', {'1': 'on', '0': 'off'}.get(self.target, None)
# Status here is the target because our device is "perfect" and
# no errors could occur (this is a simulation)
self.status = self.target
return {}
def soap_GetTarget(self, *args, **kwargs):
return {'RetTargetValue': self.target}
def soap_GetStatus(self, *args, **kwargs):
return {'ResultStatus': self.status}
The Device will provide services to the Control Point, so, if the services that the Device will provide are already created, you can create the Device class
class BinaryLight(object):
def __init__(self):
self.server_name = 'Binary Light Device'
self.device = None
def _create_device(self):
project_page = 'https://garage.maemo.org/projects/brisa'
self.device = Device('urn:schemas-upnp-org:device:BinaryLight:1',
self.server_name,
manufacturer='Brisa Team. Embedded Laboratory '\
'and INdT Brazil',
manufacturer_url=project_page,
model_name='Binary Light Device',
model_description='A UPnP Binary Light Device',
model_number='1.0',
model_url=project_page)
This is the method where you’re going to specify the services
def _add_services(self):
switch = SwitchPower()
self.device.add_service(switch)
After doing this, the start method and the main are created. The start method is going to be called in the main method and initialize the Device appending the services and leaving it ready to use.
def start(self):
self._create_device()
self._add_services()
self.device.start()
reactor.add_after_stop_func(self.device.stop)
reactor.main()
if __name__ == '__main__':
device = BinaryLight()
device.start()
Thats all it takes to implement the Binary Light Device.
You could follow these steps to implement any service/device, but remember that for now you’ll have to write the service’s xml yourself.
You can find this source code Here and the service’s xml Here.
This is an example of a simple Control Point, in this example we will implement a Binary Light command line Control Point, but this may help you to create any Control Point. This Control Point has the following features:
Before everything you have to choose a reactor and then do the imports.
from brisa.core.reactors import install_default_reactor
reactor = install_default_reactor()
from brisa.core.network import parse_url
from brisa.core.threaded_call import run_async_function
from brisa.upnp.control_point.control_point import ControlPoint
Done the importing, lets implement the Control Point. To do that you have to choose the service that you’re going to use and create that callback functions that are going to be used.
service = ('u', 'urn:schemas-upnp-org:service:SwitchPower:1')
binary_light_type = 'urn:schemas-upnp-org:device:BinaryLight:1'
def on_new_device(dev):
if not dev:
return
print 'Got new device:', dev.udn
print "Type 'list' to see the whole list"
def on_removed_device(udn):
print 'Device is gone:', udn
def get_switch_service(device):
return device.services[service[1]]
Now you can create the events that will call the callbacks, and create the main method, that will initialize the Control Point.
def create_control_point():
c = ControlPoint()
c.subscribe('new_device_event', on_new_device)
c.subscribe('removed_device_event', on_removed_device)
return c
def main():
c = create_control_point()
c.start()
run_async_function(_handle_cmds, (c, ))
reactor.add_after_stop_func(c.stop)
reactor.main()
Now the functions that are going to perform the actions by the command handler, just like the command handler are implemented
def _exit(c):
"""Stops the _handle_cmds loop
"""
global running_handle_cmds
running_handle_cmds = False
def _help(c):
"""Prints the available commands that are used in '_handle_cmds' method.
"""
print 'Available commands: '
for x in ['exit', 'help', 'search', 'set_light <dev number>',
'get_status', 'get_target', 'turn <on/off>', 'stop',
'list']:
print '\t%s' % x
def _search(c):
""" Start searching for devices of type upnp:rootdevice and repeat
search every 600 seconds (UPnP default)
"""
c.start_search(600, binary_light_type)
def _set_light(c, command):
"""Gets the binary device by the number that is passed as parameter
"""
try:
devices = c.get_devices().values()
c.current_server = devices[int(command)]
except:
print 'BinaryLight number not found. Please run list and '\
'check again'
c.current_server = None
def _get_status(c):
"""Gets the binary light status and print if it's on or off.
"""
try:
service = get_switch_service(c.current_server)
status_response = service.GetStatus()
if status_response['ResultStatus'] == '1':
print 'Binary light status is on'
else:
print 'Binary light status is off'
except Exception, e:
if not hasattr(c, 'current_server') or not c.current_server:
print 'BinaryLight device not set.Please use set_light <n>'
else:
print 'Error in get_status():', e
def _get_target(c):
"""Gets the binary light target and print if it's on or off.
"""
try:
service = get_switch_service(c.current_server)
status_response = service.GetTarget()
if status_response['RetTargetValue'] == '1':
print 'Binary light target is on'
else:
print 'Binary light target is off'
except Exception, e:
if not hasattr(c, 'current_server') or not c.current_server:
print 'BinaryLight device not set.Please use set_light <n>'
else:
print 'Error in get_target():', e
def _turn(c, command):
"""Turns the binary device on or off
"""
try:
cmd = {'on': '1', 'off': '0'}.get(command, '')
print cmd
if not cmd:
print 'Wrong usage. Please try turn on or turn off.'
service = get_switch_service(c.current_server)
service.SetTarget(newTargetValue=cmd)
print 'Turning binary light', command
except Exception, e:
if not hasattr(c, 'current_server') or not c.current_server:
print 'BinaryLight device not set.Please use set_light <n>'
else:
print 'Error in set_status():', e
def _stop(c):
"""Stop searching
"""
c.stop_search()
def _list_devices(c):
"""Lists the devices that are in network.
"""
k = 0
for d in c.get_devices().values():
print 'Device no.:', k
print 'UDN:', d.udn
print 'Name:', d.friendly_name
print 'Device type:', d.device_type
print 'Services:', d.services.keys() # Only print services name
print 'Embedded devices:', [dev.friendly_name for dev in \
d.devices.values()] # Only print embedded devices names
print
k += 1
#Control the loop at _handle_cmds method
running_handle_cmds = True
commands = {'exit': _exit,
'help': _help,
'search': _search,
'stop': _stop,
'list': _list_devices,
'turn': _turn,
'set_light': _set_light,
'get_status': _get_status,
'get_target': _get_target}
def _handle_cmds(c):
while running_handle_cmds:
try:
input = raw_input('>>> ').strip()
if len(input.split(" ")) > 0:
try:
if len(input.split(" ")) > 1:
commands[input.split(" ")[0]](c, input.split(" ")[1])
else:
commands[input.split(" ")[0]](c)
except KeyError, IndexError:
print 'Invalid command, try help'
except KeyboardInterrupt, EOFError:
c.stop()
break
# Stops the main loop
reactor.main_quit()
if __name__ == '__main__':
main()
And, finally to allow you to run it, add to the end
if __name__ == '__main__':
main()
You can find this code here.
This is very similar to create a simple device. You will need to create your Qt Gui, create the device, create the service and use the QtReactor
from brisa.core.reactors.qtreactor import QtReactor
reactor = QtReactor()
from PyQt4 import QtCore, QtGui
from brisa.core import config
from brisa.upnp.device import Device
from brisa.upnp.device.service import Service, StateVariable
The constructor is going to be most part composed of GUI code, but these two lines(40-41) are going to be important on the Device creation.
self.root_device = None
self.upnp_urn = 'urn:schemas-upnp-org:device:MyDevice:1'
In the _add_root_device method you are going to initialize the root_device setting it from None to a Device object.
def _add_root_device(self):
""" Creates the root device object which will represent the device
description.
"""
project_page = 'http://brisa.garage.maemo.org'
serial_no = config.manager.brisa_version.replace('.', '').rjust(7, '0')
self.root_device = Device(self.upnp_urn,
str(self.lineEdit.text()),
manufacturer='BRisa Team. Embedded '\
'Laboratory and INdT Brazil',
manufacturer_url=project_page,
model_description='An Open Source UPnP generic '\
'Device',
model_name='Generic Device Example',
model_number=config.manager.brisa_version,
model_url=project_page,
serial_number=serial_no)
In the _add_services method you’re going to create the Services that the Device will provide.
def _add_services(self):
""" Creates the root device service.
"""
# Creating the example Service
service_name = 'MyService'
service_type = 'urn:schemas-upnp-org:service:MyService:1'
myservice = Service(service_name, service_type, '')
var = StateVariable(self, "A_ARG_TYPE_Variable",
True, False, "string")
myservice.add_state_variable(var)
# Inserting a service into the root device
self.root_device.add_service(myservice)
The _load method is going to initialize the device and add the services.
def _load(self):
self._add_root_device()
self._add_services()
These are the callbacks to the GUI commands, and will start or stop the root device.
def start(self):
""" Starts the root device.
"""
self.stop()
self._load()
self.root_device.start()
reactor.add_after_stop_func(self.root_device.stop)
def stop(self):
""" Stops the root device.
"""
if self.root_device:
self.root_device.stop()
self.root_device = None
Now you can implement the main method and call it.
def main():
qt_dev = QtDevice()
qt_dev.show()
reactor.main()
if __name__ == '__main__':
main()
You can find this source code Here.
This is very similar to create a simple control point. You will need to create your Qt Gui, create the control point and use the QtReactor and do the correct Qt imports.
from brisa.core.reactors.qtreactor import QtReactor
reactor = QtReactor()
from PyQt4 import QtCore, QtGui
from brisa.upnp.control_point import ControlPoint
Here you’re going to create the Control Point and the events (lines 16-21).
self.cp = ControlPoint()
self.cp.subscribe('new_device_event', self.on_new_device)
self.cp.subscribe('removed_device_event', self.on_removed_device)
self.devices = []
These methods are going to be the callback methods of the events that the Control Point will receive from the Device.
def on_new_device(self, dev):
if dev.udn not in [d.udn for d in self.devices]:
self.devices.append(dev)
self.list_devices()
def on_removed_device(self, udn):
for dev in self.devices:
if dev.udn == udn:
self.devices.remove(dev)
self.list_devices()
These methods are the methods that are going to be the callbacks to the GUI events.
def list_devices(self):
""" Lists the devices at list view.
"""
self.listView.clear()
for d in self.devices:
self.listView.addItem(str(d.friendly_name))
def search(self):
""" Starts search for new devices.
"""
self.cp.start_search(600, 'upnp:rootdevice')
def stop_search(self):
""" Stops search for new devices.
"""
self.cp.stop_search()
Now you implement and call the main method.
def main():
qt_cp = QtControlPoint()
qt_cp.show()
reactor.main()
if __name__ == '__main__':
main()
You can find this source code Here.