Source code for sardana.pool.poolmeasurementgroup

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module is part of the Python Pool library. It defines the base classes
for"""

__all__ = ["PoolMeasurementGroup", "MeasurementConfiguration",
           "ControllerConfiguration", "ChannelConfiguration",
           "SynchronizerConfiguration", "build_measurement_configuration"]

__docformat__ = 'restructuredtext'

import threading
import weakref


from taurus.core.tango.tangovalidator import TangoAttributeNameValidator

from sardana import State, ElementType, TYPE_EXP_CHANNEL_ELEMENTS
from sardana.sardanaevent import EventType
from sardana.pool.pooldefs import AcqMode, SynchParam, AcqSynch, \
    SynchDomain, AcqSynchType

from sardana.pool.poolgroupelement import PoolGroupElement
from sardana.pool.poolacquisition import PoolAcquisition
from sardana.pool.poolsynchronization import SynchronizationDescription
from sardana.pool.poolexternal import PoolExternalObject

from sardana.taurus.core.tango.sardana import PlotType, Normalization


# ----------------------------------------------
# Measurement Group Configuration information
# ----------------------------------------------
# dict <str, obj> with (at least) keys:
#    - 'timer' : the timer channel name / timer channel id
#    - 'monitor' : the monitor channel name / monitor channel id
#    - 'controllers' : dict<Controller, dict> where:
#        - key: ctrl
#        - value: dict<str, dict> with (at least) keys:
#                - 'timer' : the timer channel name / timer channel id
#                - 'monitor' : the monitor channel name / monitor channel id
#                - 'synchronization' : 'Gate'/'Software'
#                - 'channels' where value is a dict<str, obj> with (at least)
#                   keys:
#                    - 'id' : the channel name ( channel id )
#                    optional keys:
#                    - 'enabled' : True/False (default is True)
#                    any hints:
#                    - 'output' : True/False (default is True)
#                    - 'plot_type' : 'No'/'1D'/'2D' (default is 'No')
#                    - 'plot_axes' : list<str> 'where str is channel
#                                    name/'step#/'index#' (default is [])
#                    - 'label' : prefered label (default is channel name)
#                    - 'scale' : <float, float> with min/max (defaults to
#                                channel range if it is defined
#                    - 'plot_color' : int representing RGB
#    optional keys:
#    - 'label' : measurement group label (defaults to measurement group name)
#    - 'description' : measurement group description

# <MeasurementGroupConfiguration>
#  <timer>UxTimer</timer>
#  <monitor>CT1</monitor>
# </MeasurementGroupConfiguration>

# Example: 2 NI cards, where channel 1 of card 1 is wired to channel 1 of
# card 2 at configuration time we should set:

# ni0ctrl.setCtrlPar(0, 'synchronization', AcqSynch.SoftwareTrigger)
# ni0ctrl.setCtrlPar(0, 'timer', 1) # channel 1 is the timer
# ni0ctrl.setCtrlPar(0, 'monitor', 4) # channel 4 is the monitor
# ni1ctrl.setCtrlPar(0, 'synchronization', AcqSynch.HardwareTrigger)
# ni1ctrl.setCtrlPar(0, 'master', 0)

# when we count for 1.5 seconds:
# ni1ctrl.Load(1.5)
# ni0ctrl.Load(1.5)
# ni1ctrl.Start()
# ni0ctrl.Start()

"""

"""


def _to_fqdn(name, logger=None):
    full_name = name
    # try to use Taurus 4 to retrieve FQDN
    try:
        from taurus.core.tango.tangovalidator import TangoDeviceNameValidator
        full_name, _, _ = TangoDeviceNameValidator().getNames(name)
    # if Taurus3 in use just continue
    except ImportError:
        pass
    if full_name is None:
        full_name = name
    if full_name != name and logger:
        msg = ("PQDN full name is deprecated in favor of FQDN full name."
               " Re-apply configuration in order to upgrade.")
        logger.warning(msg)
    return full_name


[docs]class ConfigurationItem(object): """Container of configuration attributes related to a given element. Wrap an element to pretend its API. Manage the element's configuration. Hold an information whether the element is enabled. By default it is enabled. .. note:: The ConfigurationItem class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. """ def __init__(self, element, attrs=None): """Construct a wrapper around the element :param element: element to wrap :type element: obj :param: attrs: configuration attributes and their values :type attrs: dict """ self._element = weakref.ref(element) self.enabled = True if attrs is not None: self.__dict__.update(attrs) def __getattr__(self, item): return getattr(self.element, item)
[docs] def get_element(self): """Returns the element associated with this item""" return self._element()
[docs] def set_element(self, element): """Sets the element for this item""" self._element = weakref.ref(element)
element = property(get_element)
[docs]class ControllerConfiguration(ConfigurationItem): """Container of configuration attributes related to a given controller. Inherit behavior from :class:`~sardana.pool.poolmeasurementgroup.ConfigurationItem` and additionally hold information about its enabled/disabled channels. By default it is disabled. .. note:: The ControllerConfiguration class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. """ def __init__(self, element, attrs=None): ConfigurationItem.__init__(self, element, attrs) self.enabled = False self._channels = [] self._channels_enabled = [] self._channels_disabled = []
[docs] def add_channel(self, channel_item): """Aggregate a channel configuration item.""" self._channels.append(channel_item) if channel_item.enabled: self.enabled = True if self._channels_enabled is None: self._channels_enabled = [] self._channels_enabled.append(channel_item) else: if self._channels_disabled is None: self._channels_disabled = [] self._channels_disabled.append(channel_item)
[docs] def remove_channel(self, channel_item): """Remove a channel configuration item.""" self._channels.remove(channel_item) if channel_item.enabled: self._channels_enabled.remove(channel_item) if len(self._channels_enabled) == 0: self.enabled = False else: self._channels_disabled.remove(channel_item)
[docs] def update_state(self): """Update internal state based on the aggregated channels.""" self.enabled = False self._channels_enabled = [] self._channels_disabled = [] for channel_item in self._channels: if channel_item.enabled: self.enabled = True self._channels_enabled.append(channel_item) else: self._channels_disabled.append(channel_item)
[docs] def get_channels(self, enabled=None): """Return aggregated channels. :param enabled: which channels to return - True - only enabled - False - only disabled - None - all :type enabled: bool or None """ if enabled is None: return list(self._channels) elif enabled: return list(self._channels_enabled) else: return list(self._channels_disabled)
[docs] def validate(self): pass
[docs]class TimerableControllerConfiguration(ControllerConfiguration): """Container of configuration attributes related to a given timerable controller. Inherit behavior from :class:`~sardana.pool.poolmeasurementgroup.ControllerConfiguration` and additionally validate *timer* and *monitor* configuration. .. note:: The TimerableControllerConfiguration class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. """
[docs] def update_timer(self): self._update_master("timer")
[docs] def update_monitor(self): self._update_master("monitor")
def _update_master(self, role): master = getattr(self, role, None) if master is None: idx = float("+inf") for channel in self._channels_enabled: if channel.index > idx: continue master = channel idx = channel.index else: found = False for channel in self._channels: if channel.full_name == master: master = channel found = True break if not found: master = None setattr(self, role, master)
[docs] def validate(self): # validate if the timer and monitor are disabled if the # controller is enabled if self.enabled \ and not self.timer.enabled \ and not self.monitor.enabled: err_msg = 'channel {0} used as timer and channel ' \ '{1} used as monitor are disabled. One of them ' \ 'must be enabled.'.format(self.timer.name, self.monitor.name) raise ValueError(err_msg)
[docs]class ExternalControllerConfiguration(ControllerConfiguration): """Container of configuration attributes related to a given external controller. Inherit behavior from :class:`~sardana.pool.poolmeasurementgroup.ControllerConfiguration`. .. note:: The ExternalControllerConfiguration class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. """ def __init__(self, element, attrs=None): ControllerConfiguration.__init__(self, self, attrs) self.full_name = element
[docs]class ChannelConfiguration(ConfigurationItem): """Container of configuration attributes related to a given experimental channel. Inherit behavior from :class:`~sardana.pool.poolmeasurementgroup.ConfigurationItem`. .. note:: The ChannelConfiguration class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. """
[docs]class SynchronizerConfiguration(ConfigurationItem): """Container of configuration attributes related to a given synchronizer element. Inherit behavior from :class:`~sardana.pool.poolmeasurementgroup.ConfigurationItem`. By default it is disabled. .. note:: The ChannelConfiguration class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. """ def __init__(self, element, attrs=None): ConfigurationItem.__init__(self, element, attrs) self.enabled = False
[docs]def build_measurement_configuration(user_elements): """Create a minimal measurement configuration data structure from the user_elements list. .. highlight:: none Minimal configuration data structure:: dict <str, dict> with keys: - 'controllers' : where value is a dict<str, dict> where: - key: controller's full name - value: dict<str, dict> with keys: - 'channels' where value is a dict<str, obj> where: - key: channel's full name - value: dict<str, obj> with keys: - 'index' : where value is the channel's index <int> .. highlight:: default .. note:: The build_measurement_configuration function has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the function) may occur if deemed necessary by the core developers. """ user_config = {} external_user_elements = [] user_config["controllers"] = controllers = {} for index, element in enumerate(user_elements): elem_type = element.get_type() if elem_type == ElementType.External: external_user_elements.append((index, element)) continue ctrl = element.controller ctrl_data = controllers.get(ctrl.full_name) if ctrl_data is None: controllers[ctrl.full_name] = ctrl_data = {} ctrl_data['channels'] = channels = {} else: channels = ctrl_data['channels'] channels[element.full_name] = channel_data = {} channel_data['index'] = index if len(external_user_elements) > 0: controllers['__tango__'] = ctrl_data = {} ctrl_data['channels'] = channels = {} for index, element in external_user_elements: channels[element.full_name] = channel_data = {} channel_data['index'] = index return user_config
[docs]class MeasurementConfiguration(object): """Configuration of a measurement. Accepts import and export from/to a serializable data structure (based on dictionaries/lists and strings). Provides getter methods that facilitate extracting of information e.g. controllers of different types, master timers/monitors, etc. .. note:: The build_measurement_configuration function has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the function) may occur if deemed necessary by the core developers. """ DFT_DESC = 'General purpose measurement configuration' def __init__(self, parent=None): """Initialize measurement configuration object :param parent: (optional) object that this measurement configuration refers to (usually :class:`~sardana.pool.poolmeasurementgroup.PoolMeasurementGroup)` """ self._parent = None if parent is not None: self._parent = weakref.proxy(parent) self._config = None self._use_fqdn = True # Structure to store the controllers and their channels self._timerable_ctrls = {} self._zerod_ctrls = [] self._synch_ctrls = {} self._other_ctrls = [] self._master_timer_sw = None self._master_monitor_sw = None self._master_timer_sw_start = None self._master_monitor_sw_start = None self._label = None self._description = None self._user_config = {} self._channel_acq_synch = {} self._ctrl_acq_synch = {} self.changed = False
[docs] def get_acq_synch_by_channel(self, channel): """Return acquisition synchronization configured for this element. :param channel: channel to look for its acquisition synchronization :type channel: :class:`~sardana.pool.poolbasechannel.PoolBaseChannel` or :class:`~sardana.pool.poolmeasurementgroup.ChannelConfiguration` :return: acquisition synchronization :rtype: :obj:`~sardana.pool.pooldefs.AcqSynch` """ if isinstance(channel, ChannelConfiguration): channel = channel.element return self._channel_acq_synch[channel]
[docs] def get_acq_synch_by_controller(self, controller): """Return acquisition synchronization configured for this controller. :param controller: controller to look for its acquisition synchronization :type controller: :class:`~sardana.pool.poolcontroller.PoolController` or :class:`~sardana.pool.poolmeasurementgroup.ControllerConfiguration` :return: acquisition synchronization :rtype: :obj:`~sardana.pool.pooldefs.AcqSynch` """ if isinstance(controller, ConfigurationItem): controller = controller.element return self._ctrl_acq_synch[controller]
def _filter_ctrls(self, ctrls, enabled): if enabled is None: return ctrls filtered_ctrls = [] for ctrl in ctrls: if ctrl.enabled == enabled: filtered_ctrls.append(ctrl) return filtered_ctrls
[docs] def get_timerable_ctrls(self, acq_synch=None, enabled=None): """Return timerable controllers. Allow to filter controllers based on acquisition synchronization or whether these are enabled/disabled. :param acq_synch: (optional) filter controller based on acquisition synchronization :type acq_synch: :class:`~sardana.pool.pooldefs.AcqSynch` :param enabled: (optional) filter controllers whether these are enabled/disabled: - :obj:`True` - enabled only - :obj:`False` - disabled only - :obj:`None` - all :type enabled: :obj:`bool` or :obj:`None` :return: timerable controllers that fulfils the filtering criteria :rtype: list<:class:`~sardana.pool.poolmeasurementgroup.ControllerConfiguration`> # noqa """ timerable_ctrls = [] if acq_synch is None: for ctrls in list(self._timerable_ctrls.values()): timerable_ctrls += ctrls elif isinstance(acq_synch, list): acq_synch_list = acq_synch for acq_synch in acq_synch_list: timerable_ctrls += self._timerable_ctrls[acq_synch] else: timerable_ctrls = list(self._timerable_ctrls[acq_synch]) return self._filter_ctrls(timerable_ctrls, enabled)
[docs] def get_zerod_ctrls(self, enabled=None): """Return 0D controllers. Allow to filter controllers whether these are enabled/disabled. :param enabled: (optional) filter controllers whether these are enabled/disabled: - :obj:`True` - enabled only - :obj:`False` - disabled only - :obj:`None` - all :type enabled: :obj:`bool` or :obj:`None` :return: 0D controllers that fulfils the filtering criteria :rtype: list<:class:`~sardana.pool.poolmeasurementgroup.ControllerConfiguration`> # noqa """ return self._filter_ctrls(self._zerod_ctrls, enabled)
[docs] def get_synch_ctrls(self, enabled=None): """Return synchronizer (currently only trigger/gate) controllers. Allow to filter controllers whether these are enabled/disabled. :param enabled: (optional) filter controllers whether these are enabled/disabled: - :obj:`True` - enabled only - :obj:`False` - disabled only - :obj:`None` - all :type enabled: :obj:`bool` or :obj:`None` :return: synchronizer controllers that fulfils the filtering criteria :rtype: list<:class:`~sardana.pool.poolmeasurementgroup.ControllerConfiguration`> # noqa """ return self._filter_ctrls(self._synch_ctrls, enabled)
[docs] def get_master_timer_software(self): """Return master timer in software acquisition. :return: master timer in software acquisition :rtype: :class:`~sardana.pool.poolmeasurementgroup.ChannelConfiguration` # noqa """ return self._master_timer_sw
[docs] def get_master_monitor_software(self): """Return master monitor in software acquisition. :return: master monitor in software acquisition :rtype: :class:`~sardana.pool.poolmeasurementgroup.ChannelConfiguration` # noqa """ return self._master_monitor_sw
[docs] def get_master_timer_software_start(self): """Return master timer in software start acquisition. :return: master timer in software start acquisition :rtype: :class:`~sardana.pool.poolmeasurementgroup.ChannelConfiguration` # noqa """ return self._master_monitor_sw_start
[docs] def get_master_monitor_software_start(self): """Return master monitor in software start acquisition. :return: master monitor in software start acquisition :rtype: :class:`~sardana.pool.poolmeasurementgroup.ChannelConfiguration` # noqa """ return self._master_timer_sw_start
[docs] def get_configuration_for_user(self): """Return measurement configuration serializable data structure.""" return self._user_config
[docs] def set_configuration_from_user(self, cfg, to_fqdn=True): """Load measurement configuration from serializable data structure.""" pool = self._parent.pool label = cfg.get('label', self._parent.name) description = cfg.get('description', self.DFT_DESC) timerable_ctrls = {AcqSynch.HardwareGate: [], AcqSynch.HardwareStart: [], AcqSynch.HardwareTrigger: [], AcqSynch.SoftwareStart: [], AcqSynch.SoftwareTrigger: [], AcqSynch.SoftwareGate: []} zerod_ctrls = [] synch_ctrls = [] other_ctrls = [] master_timer_sw = None master_monitor_sw = None master_timer_sw_start = None master_monitor_sw_start = None master_timer_idx_sw = float("+inf") master_monitor_idx_sw = float("+inf") master_timer_idx_sw_start = float("+inf") master_monitor_idx_sw_start = float("+inf") user_elem_ids = {} channel_acq_synch = {} ctrl_acq_synch = {} user_config = {} user_config['controllers'] = {} user_config['label'] = label user_config['description'] = description for ctrl_name, ctrl_data in list(cfg['controllers'].items()): # backwards compatibility for measurement groups created before # implementing feature-372: # https://sourceforge.net/p/sardana/tickets/372/ # WARNING: this is one direction backwards compatibility - it just # reads channels from the units, but does not write channels to the # units back if 'units' in ctrl_data: ctrl_data = ctrl_data['units']['0'] # discard controllers which don't have items (garbage) ch_count = len(ctrl_data['channels']) if ch_count == 0: continue external = ctrl_name.startswith('__') if external: ctrl = ctrl_name else: if to_fqdn: ctrl_name = _to_fqdn(ctrl_name, logger=self._parent) ctrl = pool.get_element_by_full_name(ctrl_name) assert ctrl.get_type() == ElementType.Controller user_config['controllers'][ctrl_name] = user_config_ctrl = {} ctrl_conf = {} synchronizer = ctrl_data.get('synchronizer', 'software') conf_synch = None if synchronizer is None or synchronizer == 'software': ctrl_conf['synchronizer'] = 'software' user_config_ctrl['synchronizer'] = 'software' else: if to_fqdn: synchronizer = _to_fqdn(synchronizer, logger=self._parent) user_config_ctrl['synchronizer'] = synchronizer pool_synch = pool.get_element_by_full_name(synchronizer) pool_synch_ctrl = pool_synch.controller conf_synch_ctrl = None conf_synch = None for conf_ctrl_created in synch_ctrls: if pool_synch_ctrl == conf_ctrl_created.element: conf_synch_ctrl = conf_ctrl_created for conf_synch_created in \ conf_ctrl_created.get_channels(): if pool_synch == conf_synch_created.element: conf_synch = conf_synch_created break break if conf_synch_ctrl is None: conf_synch_ctrl = ControllerConfiguration(pool_synch_ctrl) synch_ctrls.append(conf_synch_ctrl) if conf_synch is None: conf_synch = SynchronizerConfiguration(pool_synch) conf_synch_ctrl.add_channel(conf_synch) ctrl_conf['synchronizer'] = conf_synch try: synchronization = ctrl_data['synchronization'] except KeyError: # backwards compatibility for configurations before SEP6 try: synchronization = ctrl_data['trigger_type'] msg = ("trigger_type configuration parameter is deprecated" " in favor of synchronization. Re-apply " "configuration in order to upgrade.") self._parent.warning(msg) except KeyError: synchronization = AcqSynchType.Trigger ctrl_conf['synchronization'] = synchronization user_config_ctrl['synchronization'] = synchronization acq_synch = None if external: ctrl_item = ExternalControllerConfiguration(ctrl) elif ctrl.is_timerable(): is_software = synchronizer == 'software' acq_synch = AcqSynch.from_synch_type(is_software, synchronization) ctrl_acq_synch[ctrl] = acq_synch ctrl_conf["timer"] = ctrl_data.get("timer") ctrl_conf["monitor"] = ctrl_data.get("monitor") ctrl_item = TimerableControllerConfiguration(ctrl, ctrl_conf) else: ctrl_item = ControllerConfiguration(ctrl, ctrl_conf) ctrl_enabled = False if 'channels' in ctrl_data: user_config_ctrl['channels'] = user_config_channel = {} for ch_name, ch_data in list(ctrl_data['channels'].items()): if external: validator = TangoAttributeNameValidator() full_name = ch_data.get('full_name', ch_name) params = validator.getUriGroups(full_name) params['pool'] = pool channel = PoolExternalObject(**params) else: if to_fqdn: ch_name = _to_fqdn(ch_name, logger=self._parent) ch_data['full_name'] = ch_name channel = pool.get_element_by_full_name(ch_name) ch_data = self._fill_channel_data(channel, ch_data) user_config_channel[ch_name] = ch_data ch_item = ChannelConfiguration(channel, ch_data) ch_item.controller = ctrl_item ctrl_item.add_channel(ch_item) if ch_item.enabled: if external: id_ = channel.full_name else: id_ = channel.id user_elem_ids[ch_item.index] = id_ if ch_item.enabled: ctrl_enabled = True if acq_synch is not None: channel_acq_synch[channel] = acq_synch if not external and ctrl.is_timerable(): ctrl_item.update_timer() ctrl_item.update_monitor() msg_error = '' if ctrl_item.timer is None: timer_name = ctrl_data['timer'] ch_timer = pool.get_element_by_full_name(timer_name) msg_error += 'Channel {0} is not present but used as ' \ 'timer. '.format(ch_timer.name) if ctrl_item.monitor is None: monitor_name = ctrl_data['monitor'] ch_monitor = pool.get_element_by_full_name(monitor_name) msg_error += 'Channel {0} is not present but used as ' \ 'monitor.'.format(ch_monitor.name) if len(msg_error) > 0: raise ValueError(msg_error) if ctrl_item.enabled: user_config_ctrl['timer'] = ctrl_item.timer.full_name user_config_ctrl['monitor'] = ctrl_item.monitor.full_name else: # TODO: Remove timer and monitor configuration # timer and monitor configuration are deprecated. # This is because these may be different for # SoftwareTrigger, SoftwareGate or SoftwareStart # synchronization type. To be decided how we will # specify the new configuration e.g. chosen on the # server side based on the channel's order or # specified explicitly on the client side (expconf). # For backwards compatibility (some clients may rely on # them) we set them. user_config_ctrl['timer'] = ctrl_data['timer'] user_config_ctrl['monitor'] = ctrl_data['monitor'] # Update synchronizer state if ctrl_enabled and conf_synch is not None: conf_synch.enabled = True ctrl_item.validate() if external: other_ctrls.append(ctrl_item) elif ctrl.is_timerable(): timerable_ctrls[acq_synch].append(ctrl_item) # Find master timer/monitor the system take the channel with # less index if not ctrl_item.enabled: # Skip controllers disabled pass elif acq_synch in (AcqSynch.SoftwareTrigger, AcqSynch.SoftwareGate): if ctrl_item.timer.index < master_timer_idx_sw: master_timer_sw = ctrl_item.timer master_timer_idx_sw = ctrl_item.timer.index if ctrl_item.monitor.index < master_monitor_idx_sw: master_monitor_sw = ctrl_item.monitor master_monitor_idx_sw = ctrl_item.monitor.index elif acq_synch == AcqSynch.SoftwareStart: if ctrl_item.timer.index < master_timer_idx_sw_start: master_timer_sw_start = ctrl_item.timer master_timer_idx_sw_start = ctrl_item.timer.index if ctrl_item.monitor.index < master_monitor_idx_sw_start: master_monitor_sw_start = ctrl_item.monitor master_monitor_idx_sw_start = ctrl_item.monitor.index elif ctrl.get_ctrl_types()[0] == ElementType.ZeroDExpChannel: zerod_ctrls.append(ctrl_item) # Update synchronizer controller states for conf_synch_ctrl in synch_ctrls: conf_synch_ctrl.update_state() # Fill user configuration with measurement group's timer & monitor # This is a backwards compatibility cause the measurement group's # timer & monitor are not used if master_timer_sw is not None: user_config['timer'] = master_timer_sw.full_name elif master_timer_sw_start is not None: user_config['timer'] = master_timer_sw_start.full_name else: # Measurement Group with all channel synchronized by hardware if 'timer' in cfg: user_config['timer'] = cfg['timer'] else: # for backwards compatibility use a random monitor user_config['timer'] = user_config_ctrl['timer'] if master_monitor_sw is not None: user_config['monitor'] = master_monitor_sw.full_name elif master_monitor_sw_start is not None: user_config['monitor'] = master_monitor_sw_start.full_name else: # Measurement Group with all channel synchronized by hardware if 'monitor' in cfg: user_config['monitor'] = cfg['monitor'] else: # for backwards compatibility use a random monitor user_config['monitor'] = user_config_ctrl['monitor'] # Update internals values self._label = label self._description = description self._timerable_ctrls = timerable_ctrls self._zerod_ctrls = zerod_ctrls self._synch_ctrls = synch_ctrls self._other_ctrls = other_ctrls self._master_timer_sw = master_timer_sw self._master_monitor_sw = master_monitor_sw self._master_timer_sw_start = master_timer_sw_start self._master_monitor_sw_start = master_monitor_sw_start self._user_config = user_config self._channel_acq_synch = channel_acq_synch self._ctrl_acq_synch = ctrl_acq_synch # sorted ids may not be consecutive (if a channel is disabled) indexes = sorted(user_elem_ids.keys()) user_elem_ids_list = [user_elem_ids[idx] for idx in indexes] for conf_synch_ctrl in synch_ctrls: for conf_synch in conf_synch_ctrl.get_channels(enabled=True): user_elem_ids_list.append(conf_synch.id) self._parent.set_user_element_ids(user_elem_ids_list) self.changed = True
def _fill_channel_data(self, channel, channel_data): """Fill channel default values for the given channel dictionary""" name = channel.name ctype = channel.get_type() full_name = channel.full_name # choose ndim ndim = None if ctype == ElementType.CTExpChannel: ndim = 0 elif ctype == ElementType.PseudoCounter: ndim = 0 elif ctype == ElementType.ZeroDExpChannel: ndim = 0 elif ctype == ElementType.OneDExpChannel: ndim = 1 elif ctype == ElementType.TwoDExpChannel: ndim = 2 elif ctype == ElementType.External: config = channel.get_config() if config is not None: ndim = int(config.data_format) elif ctype == ElementType.IORegister: ndim = 0 if ctype != ElementType.External and channel.is_referable(): value_ref_enabled = channel_data.get('value_ref_enabled', False) channel_data['value_ref_enabled'] = value_ref_enabled # Definitively should be initialized by measurement group # index MUST be here already (asserting this in the following line) channel_data['index'] = channel_data['index'] channel_data['name'] = channel_data.get('name', name) channel_data['full_name'] = channel_data.get('full_name', full_name) channel_data['source'] = channel.get_source() channel_data['enabled'] = channel_data.get('enabled', True) channel_data['label'] = channel_data.get('label', channel_data['name']) channel_data['ndim'] = ndim # Probably should be initialized by measurement group channel_data['output'] = channel_data.get('output', True) # Perhaps should NOT be initialized by measurement group channel_data['plot_type'] = channel_data.get('plot_type', PlotType.No) channel_data['plot_axes'] = channel_data.get('plot_axes', []) channel_data['conditioning'] = channel_data.get('conditioning', '') channel_data['normalization'] = channel_data.get('normalization', Normalization.No) # TODO: think of filling other keys: data_type, data_units, nexus_path # shape here instead of feeling them on the Taurus extension level if ctype != ElementType.External: ctrl_name = channel.controller.full_name channel_data['_controller_name'] = channel_data.get( '_controller_name', ctrl_name) return channel_data
[docs]class PoolMeasurementGroup(PoolGroupElement): def __init__(self, **kwargs): self._state_lock = threading.Lock() self._monitor_count = None self._nb_starts = 1 self._pending_starts = 0 self._acquisition_mode = AcqMode.Timer self._config = MeasurementConfiguration(self) self._config_dirty = True self._moveable = None self._moveable_obj = None # by default software synchronizer initial domain is set to Position self._sw_synch_initial_domain = SynchDomain.Position self._synchronization = SynchronizationDescription() kwargs['elem_type'] = ElementType.MeasurementGroup PoolGroupElement.__init__(self, **kwargs) configuration = kwargs.get("configuration") if configuration is None: user_elements = self.get_user_elements() configuration = build_measurement_configuration(user_elements) self.set_configuration_from_user(configuration) def _create_action_cache(self): acq_name = "%s.Acquisition" % self._name return PoolAcquisition(self, acq_name) def _calculate_states(self, state_info=None): state, status = PoolGroupElement._calculate_states(self, state_info) # check if software synchronizer is occupied synch_soft = self.acquisition._synch._synch_soft acq_sw = self.acquisition._sw_acq acq_0d = self.acquisition._0d_acq if state in (State.On, State.Unknown) \ and (synch_soft.is_started() or acq_sw._is_started() or acq_0d._is_started()): state = State.Moving status += "/nSoftware synchronization is in progress" return state, status
[docs] def on_element_changed(self, evt_src, evt_type, evt_value): name = evt_type.name if name == 'state': with self._state_lock: state, status = self._calculate_states() self.set_state(state, propagate=2) self.set_status("\n".join(status))
[docs] def get_pool_controllers(self): return self.get_acquisition().get_pool_controllers()
[docs] def get_pool_controller_by_name(self, name): name = name.lower() for ctrl in self.get_pool_controllers(): if ctrl.name.lower() == name or ctrl.full_name.lower() == name: return ctrl
[docs] def add_user_element(self, element, index=None): '''Override the base behavior, so the TriggerGate elements are silently skipped if used multiple times in the group''' user_elements = self._user_elements if element in user_elements: # skipping TriggerGate element if already present if element.get_type() is ElementType.TriggerGate: return return PoolGroupElement.add_user_element(self, element, index)
# ------------------------------------------------------------------------- # configuration # ------------------------------------------------------------------------- def _is_managed_element(self, element): element_type = element.get_type() return (element_type in TYPE_EXP_CHANNEL_ELEMENTS or element_type is ElementType.TriggerGate) @property def configuration(self): return self._config # TODO: Check if it needed
[docs] def set_configuration(self, config=None, propagate=1, to_fqdn=True): self._config._use_fqdn = to_fqdn self._config.configuration = config self._config_dirty = True if not propagate: return self.fire_event(EventType("configuration", priority=propagate), config)
[docs] def set_configuration_from_user(self, cfg, propagate=1, to_fqdn=True): self._config.set_configuration_from_user(cfg, to_fqdn) self._config_dirty = True if not propagate: return self.fire_event(EventType("configuration", priority=propagate), self._config.get_configuration_for_user())
[docs] def get_user_configuration(self): return self._config.get_configuration_for_user()
[docs] def get_timer(self): # TODO: Adapt to the new future MeasurementConfiguration API return self._config._master_timer
timer = property(get_timer) # ------------------------------------------------------------------------- # integration time # -------------------------------------------------------------------------
[docs] def get_integration_time(self): integration_time = self._synchronization.active_time if isinstance(integration_time, float): return integration_time elif len(integration_time) == 0: raise Exception("The synchronization group has not been" " initialized") elif len(integration_time) > 1: raise Exception("There are more than one synchronization groups")
[docs] def set_integration_time(self, integration_time, propagate=1): total_time = integration_time + self.latency_time synch = [{SynchParam.Delay: {SynchDomain.Time: 0}, SynchParam.Active: {SynchDomain.Time: integration_time}, SynchParam.Total: {SynchDomain.Time: total_time}, SynchParam.Repeats: 1}] self.set_synchronization(synch) if not propagate: return self.fire_event(EventType("integration_time", priority=propagate), integration_time)
integration_time = property(get_integration_time, set_integration_time, doc="the current integration time") # ------------------------------------------------------------------------- # monitor count # -------------------------------------------------------------------------
[docs] def get_monitor_count(self): return self._monitor_count
[docs] def set_monitor_count(self, monitor_count, propagate=1): self._monitor_count = monitor_count if not propagate: return self.fire_event(EventType("monitor_count", priority=propagate), monitor_count)
monitor_count = property(get_monitor_count, set_monitor_count, doc="the current monitor count") # ------------------------------------------------------------------------- # acquisition mode # -------------------------------------------------------------------------
[docs] def get_acquisition_mode(self): return self._acquisition_mode
[docs] def set_acquisition_mode(self, acquisition_mode, propagate=1): self._acquisition_mode = acquisition_mode self._config_dirty = True # acquisition mode goes to configuration if not propagate: return self.fire_event(EventType("acquisition_mode", priority=propagate), acquisition_mode)
acquisition_mode = property(get_acquisition_mode, set_acquisition_mode, doc="the current acquisition mode") # ------------------------------------------------------------------------- # synchronization # -------------------------------------------------------------------------
[docs] def get_synchronization(self): return self._synchronization
[docs] def set_synchronization(self, synchronization, propagate=1): self._synchronization = SynchronizationDescription(synchronization) self._config_dirty = True # acquisition mode goes to configuration if not propagate: return self.fire_event(EventType("synchronization", priority=propagate), synchronization)
synchronization = property(get_synchronization, set_synchronization, doc="the current acquisition mode") # ------------------------------------------------------------------------- # moveable # -------------------------------------------------------------------------
[docs] def get_moveable(self): return self._moveable
[docs] def set_moveable(self, moveable, propagate=1, to_fqdn=True): self._moveable = moveable if self._moveable is not None: if to_fqdn: moveable = _to_fqdn(moveable, logger=self) self._moveable_obj = self.pool.get_element_by_full_name(moveable) self.fire_event(EventType("moveable", priority=propagate), moveable)
moveable = property(get_moveable, set_moveable, doc="moveable source used in synchronization") # ------------------------------------------------------------------------- # latency time # -------------------------------------------------------------------------
[docs] def get_latency_time(self): latency_time = 0 pool_ctrls = self.get_pool_controllers() for pool_ctrl in pool_ctrls: if not pool_ctrl.is_timerable(): continue candidate = pool_ctrl.get_ctrl_par("latency_time") if candidate > latency_time: latency_time = candidate return latency_time
latency_time = property(get_latency_time, doc="latency time between two consecutive " "acquisitions") # ------------------------------------------------------------------------- # software synchronizer initial domain # -------------------------------------------------------------------------
[docs] def get_sw_synch_initial_domain(self): return self._sw_synch_initial_domain
[docs] def set_sw_synch_initial_domain(self, domain): self._sw_synch_initial_domain = domain
sw_synch_initial_domain = property( get_sw_synch_initial_domain, set_sw_synch_initial_domain, doc="software synchronizer initial domain (SynchDomain.Time " "or SynchDomain.Position)" ) # ------------------------------------------------------------------------- # number of starts # -------------------------------------------------------------------------
[docs] def get_nb_starts(self): return self._nb_starts
[docs] def set_nb_starts(self, nb_starts, propagate=1): self._nb_starts = nb_starts if not propagate: return self.fire_event(EventType("nb_starts", priority=propagate), nb_starts)
nb_starts = property(get_nb_starts, set_nb_starts, doc="current number of starts") # ------------------------------------------------------------------------- # acquisition # -------------------------------------------------------------------------
[docs] def prepare(self, multiple=1): """Prepare for measurement. Delegate measurement preparation to the acquisition action. ..todo:: remove multiple argument """ if len(self.get_user_elements()) == 0: # All channels were disabled raise RuntimeError('all channels in measurement group ' 'are disabled') if self._acquisition_mode == AcqMode.Timer: role = 'timer' elif self._acquisition_mode == AcqMode.Monitor: role = 'monitor' else: raise RuntimeError('acquisition mode must be either Timer ' 'or Monitor') for ctrl in self._config.get_timerable_ctrls(enabled=True): master = getattr(ctrl, role, None) if master is None: msg = ('controller {0} does not have {1} ' 'configured.').format(ctrl.name, role) raise RuntimeError(msg) if not master.enabled: msg = 'channel {0} used as {1} must be enabled'.format( master.name, role) raise RuntimeError(msg) value = self._get_value() self._pending_starts = self.nb_starts kwargs = {'head': self, 'multiple': multiple} self.acquisition.prepare(self.configuration, self.acquisition_mode, value, self._synchronization, self._moveable_obj, self.sw_synch_initial_domain, self.nb_starts, **kwargs)
[docs] def start_acquisition(self, value=None, multiple=1): """Start measurement. Delegate start measurement to the acquisition action. Provide backwards compatibility for starts without previous prepare. ..todo:: remove value and multiple arguments. """ if self._pending_starts == 0: msg = "starting acquisition without prior preparing is " \ "deprecated since version Jan18." self.warning(msg) self.debug("Preparing with number_of_starts equal to 1") nb_starts = self.nb_starts self.set_nb_starts(1, propagate=0) try: self.prepare(multiple) finally: self.set_nb_starts(nb_starts, propagate=0) self._aborted = False self._pending_starts -= 1 if not self._simulation_mode: self.acquisition.run()
def _get_value(self): if self._acquisition_mode is AcqMode.Timer: value = self.get_integration_time() elif self.acquisition_mode is AcqMode.Monitor: value = self._monitor_count return value
[docs] def set_acquisition(self, acq_cache): self.set_action_cache(acq_cache)
[docs] def get_acquisition(self): return self.get_action_cache()
acquisition = property(get_acquisition, doc="acquisition object")
[docs] def stop(self): self._pending_starts = 0 self.acquisition._synch._synch_soft.stop() PoolGroupElement.stop(self)
[docs] def abort(self): self._pending_starts = 0 PoolGroupElement.abort(self)