Source code for cmd_signal

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# 
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
# 
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame.  If not, see <http://www.gnu.org/licenses/>

import pools,conf_strings,apipools

signal_pool=pools.pool()
api_pool=apipools.api_pool()

[docs]def init_signal(signal_id,conf_string,function,hl,ll,freq,dc,pw,re,fe,sym,phase,delay,sync): """Create a signal associated to the *conf_string* of a PG""" # init PG pg_id="pgmodel_%s"%(signal_id) try: conf=conf_strings.parse(conf_string) except Exception as e: return 0,str(e) retcode,res=submod_execcmd("init@"+conf.name,pg_id,conf_string) if retcode==0: return 0,"error initializing %s %s <- %s"%(conf.name,pg_id,res) # Get API of the module and add it to the api_pool, if it's not already there retcode,res=api_pool.add_api_from_module(conf.name) if retcode==0: return 0,res signal_pool.new(signal_id,{ # PG "pg_id": pg_id, "pg_model": conf.name, # Signal function ('sine', 'square', 'ramp', 'pulse') "function": function, # High level in Volts "high_level": hl, # Low level in Volts "low_level": ll, # Frequency in Hertz "frequency": freq, # Duty cycle from 0 to 100 "duty_cycle": dc, # Pulse width in seconds "pulse_width": pw, # Rising edge duration in seconds "rising_edge": re, # Falling edge duration in seconds "falling_edge": fe, # Symmetry from 0 to 100 "symmetry": sym, # Phase in degrees "phase": phase, # Delay in seconds "delay": delay, # Output sync flag (boolean: 1/0) "sync": sync}) return 1,"ok"
[docs]def deinit_signal(signal_id): "Deinitialize signal *signal_id*" try: signal=signal_pool.get(signal_id) except Exception as e: return 1,str(e) # deinit PG retcode,res=submod_execcmd("deinit@"+signal["pg_model"],signal["pg_id"]) if retcode==0: return 0,"error deinitializing %s %s <- %s"%(signal["pg_model"],signal["pg_id"],res) # Remove signal from the pool try: signal_pool.remove(signal_id) except Exception as e: return 0,str(e) return 1,"ok"
[docs]def config_signal(signal_id): "Configure signal *signal_id*" try: signal=signal_pool.get(signal_id) except Exception as e: return 0,str(e) retcode,res=submod_execcmd("config@"+signal["pg_model"],signal["pg_id"]) if retcode==0: return 0,"error configuring %s %s <- %s"%(signal["pg_model"],signal["pg_id"],res) return configure(signal,signal)
[docs]def inval_signal(signal_id): "Invalidate signal *signal_id*" try: signal=signal_pool.get(signal_id) except Exception as e: return 0,str(e) retcode,res=submod_execcmd("inval@"+signal["pg_model"],signal["pg_id"]) if retcode==0: return 0,"error invalidating %s %s <- %s"%(signal["pg_model"],signal["pg_id"],res) return 1,"ok"
[docs]def set_function_signal(signal_id,function): "Set function of *signal_id*. Valid values are: 'sine', 'square', 'ramp', 'pulse'" return safe_configure(signal_id,{"function":function})
[docs]def set_high_level_signal(signal_id,high_level): "Set high level value of *signal_id* in Volts." return safe_configure(signal_id,{"high_level":high_level})
[docs]def set_low_level_signal(signal_id,low_level): "Set low level value of *signal_id* in Volts." return safe_configure(signal_id,{"low_level":low_level})
[docs]def set_frequency_signal(signal_id,frequency): "Set frequency of *signal_id* in Hertz." return safe_configure(signal_id,{"frequency":frequency})
[docs]def set_duty_cycle_signal(signal_id,duty_cycle): "Set duty cycle of *signal_id*. Ranges from 0 to 100." return safe_configure(signal_id,{"duty_cycle":duty_cycle})
[docs]def set_pulse_width_signal(signal_id,pulse_width): "Set width of pulse in seconds for *signal_id*." return safe_configure(signal_id,{"pulse_width":pulse_width})
[docs]def set_edges_signal(signal_id,rising_edge,falling_edge): "Set rising and falling edge duration in seconds for *signal_id* with pulse function. The parameters can also be 'MIN' and/or 'MAX'." return safe_configure(signal_id,{"rising_edge":rising_edge,"falling_edge":falling_edge})
[docs]def set_symmetry_signal(signal_id,symmetry): "Set symmetry of *signal_id* with ramp function. Ranges from 0 to 100." return safe_configure(signal_id,{"symmetry":symmetry})
[docs]def set_phase_signal(signal_id,phase): "Set phase of *signal_id*. Ranges from -360 to +360 degrees." return safe_configure(signal_id,{"phase":phase})
[docs]def set_delay_signal(signal_id,delay): "Set delay of *signal_id* in seconds." return safe_configure(signal_id,{"delay":delay})
[docs]def set_sync_signal(signal_id,sync): "Activate or deactivate output of sync associated to *signal_id*. *sync* can be 1 or 0." if sync not in ["0","1"]: return 0,"sync must be either 0 or 1" return safe_configure(signal_id,{"sync":sync})
def safe_configure(signal_id,values): try: signal=signal_pool.get(signal_id) except Exception as e: return 0,str(e) old_values={} for v in values: old_values[v]=signal[v] signal[v]=values[v] retcode,res=configure(signal,values) if retcode==0: for v in values: signal[v]=old_values[v] return 0,res return retcode,res def configure(signal,values): # take function from values if possible, else from signal if "function" in values and values["function"]!="undef": function=values["function"] elif "function" in signal and signal["function"]!="undef": function=signal["function"] else: return 0,"function is required to configure" # security if signal["duty_cycle"]!="undef" and function!="square": return 0,"duty_cycle only compatible with square functions" if signal["pulse_width"]!="undef" and function!="pulse": return 0,"pulse_width only compatible with pulse functions" if signal["delay"]!="undef" and function!="pulse": return 0,"delay only compatible with pulse functions" if signal["phase"]!="undef" and function=="pulse": return 0,"pulse signal incompatible with phase. use delay" # Call the configure function for the model func="configure_%s@%s"%(function,signal["pg_model"]) api=api_pool.get_api(signal["pg_model"],func) if api==None: return 0,"%s does not implement the function %s"%(signal["pg_model"],function) parameters=[] parameters.append(values.get("frequency","undef")) parameters.append(values.get("sync","undef")) parameters.append(values.get("high_level","undef")) parameters.append(values.get("low_level","undef")) if function=="sine": parameters.append(values.get("phase","undef")) pass if function=="square": parameters.append(values.get("phase","undef")) parameters.append(values.get("duty_cycle","undef")) if function=="ramp": parameters.append(values.get("phase","undef")) parameters.append(values.get("symmetry","undef")) if function=="pulse": parameters.append(values.get("delay","undef")) parameters.append(values.get("pulse_width","undef")) parameters.append(values.get("rising_edge","undef")) parameters.append(values.get("falling_edge","undef")) retcode,res=submod_execcmd(func,signal["pg_id"],*parameters) if retcode==0: return 0,"error configuring %s on %s %s <- %s"%(signal["function"],signal["pg_model"],signal["pg_id"],res) return 1,"ok" def power(signal_id,op): try: signal=signal_pool.get(signal_id) except Exception as e: return 0,str(e) # Call the power_on function for the model if op.lower() not in ["on","off"]: return 0,"invalid power operation" func="power_%s@%s"%(op,signal["pg_model"]) retcode,res=submod_execcmd(func,signal["pg_id"]) if retcode==0: return 0,"error turning %s <- %s"%(op,res) return 1,"ok"
[docs]def power_on_signal(signal_id): "Turn on signal *signal_id*" return power(signal_id,"on")
[docs]def power_off_signal(signal_id): "Turn off signal *signal_id*" return power(signal_id,"off")
[docs]def get_error_queue_signal(signal_id): "Read error queue of PG" try: signal=signal_pool.get(signal_id) except Exception as e: return 0,str(e) # Call the get_error_queue function for the model func="get_error_queue@"+signal["pg_model"] api=api_pool.get_api(signal["pg_model"],func) if api==None: return 0,"%s does not implement the function get_error_queue"%(signal["pg_model"]) retcode,res=submod_execcmd(func,signal["pg_id"]) if retcode==0: return 0,"error getting error queue <- %s"%(res) return 1,res
def reset_pg_signal(signal_id): try: signal=signal_pool.get(signal_id) except Exception as e: return 0,str(e) func="reset@"+signal["pg_model"] api=api_pool.get_api(signal["pg_model"],func) if api==None: return 0,"%s does not implement the function reset"%(signal["pg_model"]) retcode,res=submod_execcmd(func,signal["pg_id"]) if retcode==0: return 0,"error resetting <- %s"%(res) return 1,res