Source code for musiccast2mqtt.musiccast_discovery

''' Discovery module for MusicCast devices.

Based on the UPnP protocol v2.0:
    http://www.upnp.org/specs/arch/UPnP-arch-DeviceArchitecture-v2.0.pdf

This module searches for MusicCast devices in the network by using the UPnP protocol.
A complete search process is made of 2 steps, each launched in a separate thread:

  1) a discovery step where a broadcast is sent over the network and simple responses are expected
     from discovered devices; those responses are put in a queue to be picked up by the
     description thread;

  2) a description step where each discovered device retrieved from the queue is polled to get more
     information about it; each discovered new MusicCast device is written to a dictionary to be
     retrieved by any other thread that might be interested.

The methods ``loop_start`` and ``loop_stop`` are used to start or stop the search process.
The method ``retrieve_new_devices`` can be called at any time to retrieve a dictionary with all the
devices that have been discovered since the last call.
Use the method ``get_new_device_event`` to get an event which will be set each time a new device is
discovered.  Use it or not.
The method ``update_online_devices`` allows to tell this module what are the devices already online
and avoids going through the description process for these devices. It is a nice to have but it
does not need to be used.  If unused, the new devices retrieved with the ``retrieve_new_devices``
call might not be new, but then the calling thread must know that it needs to check that before
doing anything else with those *not really new* devices.

New devices do not go in any queue but in a dictionary, so even if the ``retrieve_new_devices``
method is not called for a while, it does not matter.

.. Reviewed 9 November 2018
'''

import socket
import httplib
from urlparse import urlparse
import xml.etree.ElementTree as ET
import logging

import threading
import Queue

import musiccast2mqtt as mcc

LOG = logging.getLogger(__name__)

_CATCH_ERRORS = (AttributeError, IndexError, KeyError, TypeError, ValueError)

_UPNP_BROADCAST = ('239.255.255.250', 1900) # UPnP protocol broadcast address
_MSEARCH_MSG = '\r\n'.join(['M-SEARCH * HTTP/1.1',
                            'HOST:{ip}:{port}',
                            'MAN:"ssdp:discover"',
                            'ST:{st}',
                            'MX:{mx}',
                            '', ''])
_MEDIARENDERER_TARGET = 'urn:schemas-upnp-org:device:MediaRenderer:1'
_YAMAHA_IDS = ('<manufacturer>Yamaha Corporation</manufacturer>',
               '<yamaha:X_device>',
               '<yamaha:X_yxcControlURL>/YamahaExtendedControl/v1/</yamaha:X_yxcControlURL>'
              )

# XML Helpers: The dictionaries _XML_TAGS, _XML_NAMESPACES and the function _xml_tag go 'together'.
_XML_TAGS = {'friendlyName': ('upnp:device', 'upnp:friendlyName'),
             'serialNumber': ('upnp:device', 'upnp:serialNumber'),
             'UDN': ('upnp:device', 'upnp:UDN'),
             'modelDescription': ('upnp:device', 'upnp:modelDescription'), # optional
             'modelName': ('upnp:device', 'upnp:modelName'), # optional
             'modelURL': ('upnp:device', 'upnp:modelURL'), # optional
             'yamaha_URLBase': ('yamaha:X_device', 'yamaha:X_URLBase')
            }
''' Dictionary of relevant tags inside the XML dataframe.
The keys of the dictionary are arbitrary names for the field being referred to; usually it is
the same as the tag name of the XML element being sought, but it does not have to be.
The values are lists of tag names that represent the path down the XML tree to reach the
element requested.'''
_XML_NAMESPACES = {'upnp': 'urn:schemas-upnp-org:device-1-0',
                   'yamaha': 'urn:schemas-yamaha-com:device-1-0',
                   'dlna': 'urn:schemas-dlna-org:device-1-0'
                  }
''' This dictionary provides the full name-spaces needed to properly parse the tree with the
_XML_TAGS dictionary.  It simply avoids to write down the long name-spaces for every tag in the
_XML_TAGS dictionary.'''

[docs]def _xml_tag(root, key): ''' Retrieves the content of the tag based on the key in the dictionary _XML_TAGS.''' elem = root for child_tag in _XML_TAGS[key]: elem = elem.find(child_tag, _XML_NAMESPACES) return elem
#==OBSOLETE========================================================================================= # XML_REPLACEMENT = '''<root xmlns="urn:schemas-upnp-org:device-1-0"\ # xmlns:yamaha="urn:schemas-yamaha-com:device-1-0"\ # xmlns:dlna="urn:schemas-dlna-org:device-1-0"''' #=================================================================================================== _RESPONSE_HEADERS = {'location': 'LOCATION', # URL to the UPnP description of the root device 'cache': 'CACHE-CONTROL', 'target': 'ST', 'usn': 'USN' # Unique Service Name. Format "uuid:<device-UUID>[::<other stuff>] } ''' Dictionary that maps the headers of the search response with the attributes of the class.'''
[docs]class searchResponse(object): ''' Represents a response to the SSDP search. The attributes of this class are listed as values in the _RESPONSE_HEADERS dictionary, except for sender which is retrieved from the second field in the response parameter. Args: response: the data as returned by the call to socket.recvfrom ''' def __init__(self, response): # create empty attributes for k in _RESPONSE_HEADERS: setattr(self, k, '') try: self.sender = response[1] # address of the socket sending the data lines = response[0].split('\r\n') self.status = lines.pop(0) # remove the status on the first line for line in lines: tokens = line.split(':', 1) for k, values in _RESPONSE_HEADERS.iteritems(): if tokens[0].upper() == values: setattr(self, k, tokens[1].strip()) # retrieve the device_id; it is the last 12 digits of the first part of the USN. self.device_id = self.usn.strip().split('::', 1)[0][-12:].upper() except _CATCH_ERRORS: raise TypeError
[docs]class musiccastDiscovery(object): ''' Manages the periodic discovery process for new devices online. The discovery process for MusicCast devices (based on Yamaha documentation): - launches a SSDP search for Media Renderers; - Read the responses as they arrive; - Put all valid devices on the outgoing queue. Args: device_queue (:class:Queue): to put the new devices cycle (int): maximum time between 2 searches, in seconds. Loops only once if it is <= 0. refresh_event (Event): triggers a new search before waiting for the end of cycle. ''' def __init__(self, device_factory_queue, refresh_event=None, cycle=mcc.DISCOVERY_CYCLE): self._device_factory_queue = device_factory_queue self._cycle = cycle self._ssdp_responses = Queue.Queue(maxsize=10) # 10 is arbitrary and should be ok self._online_devices_shared = {} # dictionary {device_id: ip_address} of online devices self._online_devices_lock = threading.Lock() # use refresh_event to trigger a new search, or create it for the delay, if undefined if refresh_event is None: self._refresh_event = threading.Event() else: self._refresh_event = refresh_event self._loop_stop_event = threading.Event() return
[docs] def loop_start(self): ''' Starts the loop in a separate thread.''' loop_thread = threading.Thread(target=self._loop, name='UPnP Thread') self._refresh_event.clear() self._loop_stop_event.clear() loop_thread.start() LOG.debug('UPnP search started.') return
[docs] def loop_stop(self): ''' Stops the loop via the appropriate events.''' self._loop_stop_event.set() self._refresh_event.set() # trigger a refresh in order not to wait for the end of the delay return
[docs] def _loop(self): ''' The actual loop. It does a full search then waits either for a 'refresh' event or for the full cycle, then tests the 'loop_stop' event if to exit or loop again.''' while True: self._search() if self._cycle <= 0: break # run only once if cycle value is <= 0. TODO: optimise self._refresh_event.wait(self._cycle) self._refresh_event.clear() # in any case, either time-out or event set. if self._loop_stop_event.is_set(): break LOG.debug('UPnP search stopped.') return
[docs] def _discovery(self, target, mx=3): ''' Launches the discovery process. Sends a search request (SSDP search) then listens to the socket for responses. Terminates after the socket timeout. ''' LOG.debug('_discovery stage started.') if mx < 1 or mx > 5: mx = 3 timeout = mx * 2 # arbitrary; probably could be "= mx", or "= mx+1" # create socket. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Re-use socket sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 5) # Time To Live, was at 2 sock.settimeout(timeout) # send search message msg = _MSEARCH_MSG.format(ip=_UPNP_BROADCAST[0], port=_UPNP_BROADCAST[1], st=target, mx=mx) sock.sendto(msg, _UPNP_BROADCAST) # listen to responses until timeout is reached try: while True: data = sock.recvfrom(mcc.SOCKET_BUFFER_SIZE) self._ssdp_responses.put(data) except socket.timeout: pass sock.shutdown(socket.SHUT_RDWR) sock.close() # send final event self._ssdp_responses.put(mcc.END_QUEUE) LOG.debug('_discovery stage ended.') return
[docs] def _description(self): ''' Retrieves the description of any device found from the search. Reads the queue for new found devices, retrieves the address to call for the description, makes the 'GET' request accordingly, filters to keep only the right devices, parses the XML data and puts a new device object on the new device queue. ''' LOG.debug('_description stage started.') while True: # wait for SSDP responses to show up in the queue data = self._ssdp_responses.get(block=True, timeout=None) # blocking call self._ssdp_responses.task_done() if data == mcc.END_QUEUE: break # the discovery thread is done, so are we. # parse the data in a searchResponse object try: resp = searchResponse(data) except TypeError: continue # check the device_id from the USN against the already online devices. REMOVED FOR NOW. #======================================================================================= # device_exists = False # with self._online_devices_lock: # if resp.device_id in self._online_devices_shared: # TODO: check also the IP address? # device_exists = True # if device_exists: # continue #======================================================================================= # use the location address to request the description of the device try: url = urlparse(resp.location) except _CATCH_ERRORS: continue try: conn = httplib.HTTPConnection(url.netloc, timeout=2) conn.request(method='GET', url=url.path) httpresponse = conn.getresponse() data = httpresponse.read() except httplib.HTTPException: continue # look for specific strings to identify Yamaha devices; discard if not present if any(((data.find(str_id) < 0) for str_id in _YAMAHA_IDS)): continue #== the following is only needed if the XLM data is missing the name-spaces definitions #======================================================================================= # if data.find('xmlns') >= 0: # there are name-spaces definition in the XML file # newdata = data # hopefully this is a valid XML # else: # we need to add the name-spaces to the data # newdata = data.replace('<root', XML_REPLACEMENT, 1) # data = newdata #======================================================================================= # parse the XML data to retrieve the right attributes try: root = ET.fromstring(data) except ET.ParseError: continue # retrieve relevant attributes from parsed XML data # retrieve the device_id. It is the last 12 digits of the UDN device_id = _xml_tag(root, 'UDN').text.strip()[-12:].upper() #serial_num = _xml_tag(root, 'serialNumber').text.strip().upper() url = urlparse(_xml_tag(root, 'yamaha_URLBase').text.strip()) LOG.debug(''.join(('\tDevice found: id - ', device_id, '; host - ', url.hostname))) #print '\tDevice: id - ', device_id, '; ip - ', ip_address # update the dictionary of new devices self._device_factory_queue.put(mcc.DeviceHandle(mcc.DeviceHandle.CREATE, device_id=device_id, host=url.hostname, api_port=url.port)) LOG.debug('_description stage ended.') return
if __name__ == '__main__': device_q = Queue.Queue() mcDiscovery = musiccastDiscovery(device_factory_queue=device_q, cycle=30) mcDiscovery.loop_start() # non-blocking call while True: device = device_q.get(block=True, timeout=None) device_q.task_done() print device