Source code for musiccast2mqtt.musiccast_interface

'''Interface for MusicCast gateway

=== OLD System Docstring
Representation of the Audio-Video system, including non-MusicCast devices.

The Audio-Video system is represented by a tree structure made of the :class:`System`
as root, having a list of :class:`musiccastDevice` objects as branches.
Devices have then lists of :class:`Input` objects and lists of :class:`Zone` objects.

The initialisation process is separated in 3 steps:

#. Instantiate objects, load the static data from a JSON file into local attributes
   and propagate this initialisation steps to the next level, i.e. devices and then
   inputs and zones.

#. Post initialisation step, where some attributes are created based on the whole
   tree structure being already initialised in step 1.  For example, finding and
   assigning the actual object represented by a string *id* in the JSON file, is done
   here.

#. Attempt to retrieve *live* data from all the MusicCast devices
   and initialise various parameters based on this data.  In case of failure,
   the retrieval of the information is delayed and the functionality of the
   device is not available until it goes back *online*.  Beware that in this
   case some *helper* dictionaries might still point to objects that are not valid,
   so always test if a device is *ready* before proceeding using MusicCast related
   attributes.


The execution of a command is triggered by a lambda function retrieved from the
ACTIONS dictionary.
These lambda functions are methods called from a :class:`Zone` objects that
perform all the steps to execute these actions, including sending the actual
requests to the devices over HTTP (through the `musiccast_comm` module).

Note on replies:
The policy to send back status messages depends on the addressing used
by the incoming MQTT message: if it is addressed specifically to this
interface or to a specific MusicCast device, then a reply will always be sent
back (case called ``explicit``); if it is not, a reply is sent only if a command
is executed, otherwise it stays silent as the message is probably intended for
somebody else.

.. reviewed 13 OCt 2018.
   TODO: implement locations
'''

import json
import threading
import Queue
import logging

import musiccast2mqtt as mcc
from musiccast2mqtt.musiccast_device import musiccastDevice
from musiccast2mqtt.musiccast_discovery import musiccastDiscovery
from musiccast2mqtt.musiccast_listener import musiccastListener

from mqttgateway.app_properties import AppProperties

LOG = logging.getLogger(__name__)

[docs]class musiccastInterface(object): '''The Interface. Resolves the system definition file path and calls the System class in musiccast_system. Creates the locations and devices dictionaries. Args: params (dictionary): includes all options from the dedicated section of the configuration file. This class only requires the **sysdefpath** option to be defined. It is the location of the JSON file describing the system. If that option is not found then the local directory is used instead. msglist_in (list of :class:`internalMsg`): the list of incoming messages. msglist_out (list of :class:`internalMsg`): the list of outgoing messages. ''' def __init__(self, params, msglist_in, msglist_out): # Load the message lists self._msgl_in = msglist_in self._msgl_out = msglist_out self._explicit = False # if the 'address' of the message is 'explicit' # Check the system definition file, if any. First get the path where to find it. try: jsonpath = params['sysdefpath'] except KeyError: LOG.info('The "sysdefpath" option is not defined in the configuration file.'\ 'Using ".".') jsonpath = '.' jsonfilepath = AppProperties().get_path(jsonpath, extension='.json') # load the system definition data if any; any errors here and we discard everything. try: with open(jsonfilepath, 'r') as json_file: json_data = json.load(json_file) except (IOError, OSError): LOG.debug(''.join(('Can''t open ', jsonfilepath, '.'))) json_data = None except ValueError: # py3 has a different exception name LOG.debug(''.join(('Can''t JSON-parse ', jsonfilepath, '.'))) json_data = None # TODO: Check validity of json_data, transfer in more friendly structure, keep in object. if json_data is None: pass # dummy statement for further development else: pass # TODO: process json_data # Create the device dictionary {'device_id' : musiccastDevice object} self._devices_lock = threading.RLock() self._devices_shared = {} # Prepare the device factory self.device_factory_queue = Queue.Queue(maxsize=mcc.MAX_QUEUE_SIZE) self._device_factory_thread = threading.Thread(target=self._device_factory, name='Device Factory') # Prepare the discovery loop self._discovery_trigger_event = threading.Event() self._discovery = musiccastDiscovery(refresh_event=self._discovery_trigger_event, device_factory_queue=self.device_factory_queue) # Load the port to listen to MusicCast events try: self.listenport = int(params['listenport']) except KeyError: self.listenport = mcc.DEFAULT_LISTEN_PORT LOG.info(''.join(('The <listenport> option is not defined in the configuration.', ' Using <', mcc.DEFAULT_LISTEN_PORT, '>.'))) except TypeError: self.listenport = mcc.DEFAULT_LISTEN_PORT LOG.info(''.join(('The <listenport> option: <', params['listenport'], '> is not an int:. Using <', mcc.DEFAULT_LISTEN_PORT, '>.'))) # Prepare the event processor self._event_processor_thread = threading.Thread(target=self._event_processor, name='Event Processor') # Prepare the event listener self._listener = musiccastListener(self.listenport) self._musiccast_events_queue = self._listener.get_musiccast_events_queue() # Prepare the message processor self._message_processor_thread = threading.Thread(target=self._message_processor, name='Message Processor')
[docs] def _get_device_from_id(self, device_id, raises=False): ''' Returns the device object if found, None otherwise.''' with self._devices_lock: try: device = self._devices_shared[device_id] except KeyError: if raises: raise mcc.ConfigError(''.join(('Device <', device_id, '> not found.'))) else: device = None return device
[docs] def _device_factory(self): ''' Waits on the queue for tasks relating to the devices list. This is a loop in a thread listening to a queue of devices represented by their device_id and the IP address where they can be reached. If the device_id is definitely new then the musiccastDevice class is called to create a new device. The item in the queue has to be a dictionary containing the following keys: - 'device_id': a 12 digit ASCII string, - 'ip_address': a valid address string e.g. '127.0.0.1', - 'task': one of CREATE or DELETE objects (defined in the package's __init__) ''' while True: item = self.device_factory_queue.get(block=True, timeout=None) self.device_factory_queue.task_done() with self._devices_lock: if item.task == mcc.DeviceHandle.CREATE: if item.device_id not in self._devices_shared: device = musiccastDevice(device_id=item.device_id, host=item.host, api_port=item.api_port, listenport=self.listenport, msgl_out=self._msgl_out, device_factory_queue=self.device_factory_queue) self._devices_shared[item.device_id] = device else: LOG.debug(''.join(('CREATE: Device <', item.device_id, '> already online.'))) elif item.task == mcc.DeviceHandle.DELETE: if item.device_id in self._devices_shared: device = self._devices_shared.pop(item.device_id) device.task_queue.put(mcc.DeviceTask(mcc.DeviceTask.DISABLE_DEVICE)) else: LOG.debug(''.join(('DELETE: Device <', item.device_id, '> not found.'))) else: # task unrecognised continue return
[docs] def _event_processor(self): ''' Wait for a MusicCast event and dispatches it to the right device. OLD DOCSTRING: Checks if a MusicCast event has arrived and parses it. This method uses the dictionary EVENTS based on all possible fields that a MusicCast can have (see Yamaha doc for more details). This dictionary has only 2 levels and every *node* is either a **dict** or a **callable**. Any *event* object received from a MusicCast device should have a structure which is a subset of the EVENTS one. The algorithm goes through the received *event* structure in parallel of going through the EVENTS one. If there is a key mismatch, the specific key in *event* that cannot find a match in EVENTS is ignored. If there is a key match, the lambda function found as value of that key in EVENTS is called with the value of that same key found in *event* (the *argument*). TODO: check if more than one event could be received in a single call. ''' LOG.debug('Event processor started.') while True: # the queue delivers JSON-style dictionaries representing MusicCast events event = self._musiccast_events_queue.get(block=True, timeout=None) self._musiccast_events_queue.task_done() # Find device within the event dictionary device_id = event.pop('device_id', None) # read and remove key if device_id is None: # log error and move on LOG.debug('Event has no device_id. Ignore.') continue device = self._get_device_from_id(device_id, raises=False) # multithread comment: if device is found, from now on it might still become an 'orphan' # at any time; if so, it does not matter, the task gets queued but probably never # picked up. if device is None: LOG.debug('Event has unrecognised device_id. Ignore.') continue # queue the item device.task_queue.put(mcc.DeviceTask(mcc.DeviceTask.PROCESS_EVENT, event=event)) LOG.debug('Event processor ended.') return
[docs] @staticmethod def _filter_topics(msg): ''' Returns True is topics are valid, False otherwise. ''' if not msg.iscmd: # ignore status messages for now? return False if msg.sender == mcc.APP_NAME: # ignore echoes return False # the following filters could be dealt by subscriptions if not msg.function and not msg.gateway: raise mcc.LogicError('No function or gateway in message.') if msg.gateway and msg.gateway != mcc.APP_NAME: return False if msg.function and msg.function != mcc.APP_FUNCTION: return False return True
[docs] def _resolve_zone(self, msg): ''' Finds the zone to operate by resolving the "address" from the topic items. The resolution uses both location and device fields from the topic. The current algorithm is a *strict* one, meaning that if a field is provided, it needs to exist otherwise an exception is thrown. One could imagine a more *tolerant* algorithm if necessary (e.g. if both location and device are provided and the location produces a valid result while the device does not, then the location resolution *wins*). The location defines a zone directly. The device defines only the device (...) so the zone has to be in the arguments otherwise a default is taken (the first zone in the list). This implies that there should always be at least 1 zone in a device and that the first one should be the *main* one if possible. This method should be thread-safe. It uses a re-entrant lock for the devices disctionary. Args: msg (:class:internalMsg): the incoming message to parse. Returns: :class:Zone: a valid Zone object Raises: LogicError, ConfigError. ''' msg.location = None # TODO: implement location processing; ignore location for now self._explicit = msg.gateway or msg.device # defines if to send a reply or not if not msg.location and not msg.device: raise mcc.LogicError('No location or device in message.') # to properly find the right zone and be consistent, we have to lock the dictionary with self._devices_lock: # sets zone_from_location based on the location, None if not found. if msg.location: zone_from_location = None # TODO: implement location processing if msg.device: device = self._get_device_from_id(msg.device, raises=True) # find the zone in the device from the arguments, None if not found zone_id = msg.arguments.get('zone', None) if zone_id is not None: # assume it is the zone mcid. # TODO: implement rename and friendly name zone_from_device = device.get_zone(zone_mcid=zone_id, raises=False) else: zone_from_device = None if msg.location and msg.device: # check consistency. (1) devices found have to be the same if device != zone_from_location.device: raise mcc.LogicError('Location and device point to different devices.') # (2) if zone_from_device is defined, the zones need to be the same if zone_from_device is not None: if zone_from_device != zone_from_location: raise mcc.LogicError('Location and device point to different zones.') zone_returned = zone_from_location elif msg.device: if zone_from_device is None: zone_from_device = device.zones[0] # take the first one by default zone_returned = zone_from_device else: # msg.location is not None zone_returned = zone_from_location return zone_returned
[docs] def _message_processor(self): ''' Waits for a message, parses and executes it.''' LOG.debug('Message processor started.') while True: msg = self._msgl_in.get(block=True, timeout=None) self._msgl_in.task_done() LOG.debug(''.join(('Processing message: ', msg.str()))) if not self._filter_topics(msg): LOG.debug('Topics do not match.') continue try: zone = self._resolve_zone(msg) except (mcc.ConfigError, mcc.LogicError): LOG.debug('Zone can not be resolved.') continue zone.device.task_queue.put(mcc.DeviceTask(mcc.DeviceTask.PROCESS_MESSAGE, msg=msg, zone=zone)) LOG.debug('Message processor ended.') return
[docs] def loop_start(self): ''' Starts all the threads and loops needed.''' self._message_processor_thread.start() self._device_factory_thread.start() # Start the device factory thread self._discovery.loop_start() # Start the discovery loop self._event_processor_thread.start() # Start the event processor thread self._listener.loop_start() # Start the event listener return
[docs] def loop_stop(self): ''' Stops all loops and threads started in `loop_start`.''' # TODO: Implement loop_stop return
if __name__ == '__main__': pass