#!/usr/bin/env python2
# -*- coding: utf-8 -*-
#
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
#
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame. If not, see <http://www.gnu.org/licenses/>
import pools,conf_strings,apipools
signal_pool=pools.pool()
api_pool=apipools.api_pool()
[docs]def init_signal(signal_id,conf_string,function,hl,ll,freq,dc,pw,re,fe,sym,phase,delay,sync):
"""Create a signal associated to the *conf_string* of a PG"""
# init PG
pg_id="pgmodel_%s"%(signal_id)
try:
conf=conf_strings.parse(conf_string)
except Exception as e:
return 0,str(e)
retcode,res=submod_execcmd("init@"+conf.name,pg_id,conf_string)
if retcode==0:
return 0,"error initializing %s %s <- %s"%(conf.name,pg_id,res)
# Get API of the module and add it to the api_pool, if it's not already there
retcode,res=api_pool.add_api_from_module(conf.name)
if retcode==0:
return 0,res
signal_pool.new(signal_id,{
# PG
"pg_id": pg_id,
"pg_model": conf.name,
# Signal function ('sine', 'square', 'ramp', 'pulse')
"function": function,
# High level in Volts
"high_level": hl,
# Low level in Volts
"low_level": ll,
# Frequency in Hertz
"frequency": freq,
# Duty cycle from 0 to 100
"duty_cycle": dc,
# Pulse width in seconds
"pulse_width": pw,
# Rising edge duration in seconds
"rising_edge": re,
# Falling edge duration in seconds
"falling_edge": fe,
# Symmetry from 0 to 100
"symmetry": sym,
# Phase in degrees
"phase": phase,
# Delay in seconds
"delay": delay,
# Output sync flag (boolean: 1/0)
"sync": sync})
return 1,"ok"
[docs]def deinit_signal(signal_id):
"Deinitialize signal *signal_id*"
try:
signal=signal_pool.get(signal_id)
except Exception as e:
return 1,str(e)
# deinit PG
retcode,res=submod_execcmd("deinit@"+signal["pg_model"],signal["pg_id"])
if retcode==0:
return 0,"error deinitializing %s %s <- %s"%(signal["pg_model"],signal["pg_id"],res)
# Remove signal from the pool
try:
signal_pool.remove(signal_id)
except Exception as e:
return 0,str(e)
return 1,"ok"
[docs]def config_signal(signal_id):
"Configure signal *signal_id*"
try:
signal=signal_pool.get(signal_id)
except Exception as e:
return 0,str(e)
retcode,res=submod_execcmd("config@"+signal["pg_model"],signal["pg_id"])
if retcode==0:
return 0,"error configuring %s %s <- %s"%(signal["pg_model"],signal["pg_id"],res)
return configure(signal,signal)
[docs]def inval_signal(signal_id):
"Invalidate signal *signal_id*"
try:
signal=signal_pool.get(signal_id)
except Exception as e:
return 0,str(e)
retcode,res=submod_execcmd("inval@"+signal["pg_model"],signal["pg_id"])
if retcode==0:
return 0,"error invalidating %s %s <- %s"%(signal["pg_model"],signal["pg_id"],res)
return 1,"ok"
[docs]def set_function_signal(signal_id,function):
"Set function of *signal_id*. Valid values are: 'sine', 'square', 'ramp', 'pulse'"
return safe_configure(signal_id,{"function":function})
[docs]def set_high_level_signal(signal_id,high_level):
"Set high level value of *signal_id* in Volts."
return safe_configure(signal_id,{"high_level":high_level})
[docs]def set_low_level_signal(signal_id,low_level):
"Set low level value of *signal_id* in Volts."
return safe_configure(signal_id,{"low_level":low_level})
[docs]def set_frequency_signal(signal_id,frequency):
"Set frequency of *signal_id* in Hertz."
return safe_configure(signal_id,{"frequency":frequency})
[docs]def set_duty_cycle_signal(signal_id,duty_cycle):
"Set duty cycle of *signal_id*. Ranges from 0 to 100."
return safe_configure(signal_id,{"duty_cycle":duty_cycle})
[docs]def set_pulse_width_signal(signal_id,pulse_width):
"Set width of pulse in seconds for *signal_id*."
return safe_configure(signal_id,{"pulse_width":pulse_width})
[docs]def set_edges_signal(signal_id,rising_edge,falling_edge):
"Set rising and falling edge duration in seconds for *signal_id* with pulse function. The parameters can also be 'MIN' and/or 'MAX'."
return safe_configure(signal_id,{"rising_edge":rising_edge,"falling_edge":falling_edge})
[docs]def set_symmetry_signal(signal_id,symmetry):
"Set symmetry of *signal_id* with ramp function. Ranges from 0 to 100."
return safe_configure(signal_id,{"symmetry":symmetry})
[docs]def set_phase_signal(signal_id,phase):
"Set phase of *signal_id*. Ranges from -360 to +360 degrees."
return safe_configure(signal_id,{"phase":phase})
[docs]def set_delay_signal(signal_id,delay):
"Set delay of *signal_id* in seconds."
return safe_configure(signal_id,{"delay":delay})
[docs]def set_sync_signal(signal_id,sync):
"Activate or deactivate output of sync associated to *signal_id*. *sync* can be 1 or 0."
if sync not in ["0","1"]:
return 0,"sync must be either 0 or 1"
return safe_configure(signal_id,{"sync":sync})
def safe_configure(signal_id,values):
try:
signal=signal_pool.get(signal_id)
except Exception as e:
return 0,str(e)
old_values={}
for v in values:
old_values[v]=signal[v]
signal[v]=values[v]
retcode,res=configure(signal,values)
if retcode==0:
for v in values:
signal[v]=old_values[v]
return 0,res
return retcode,res
def configure(signal,values):
# take function from values if possible, else from signal
if "function" in values and values["function"]!="undef":
function=values["function"]
elif "function" in signal and signal["function"]!="undef":
function=signal["function"]
else:
return 0,"function is required to configure"
# security
if signal["duty_cycle"]!="undef" and function!="square":
return 0,"duty_cycle only compatible with square functions"
if signal["pulse_width"]!="undef" and function!="pulse":
return 0,"pulse_width only compatible with pulse functions"
if signal["delay"]!="undef" and function!="pulse":
return 0,"delay only compatible with pulse functions"
if signal["phase"]!="undef" and function=="pulse":
return 0,"pulse signal incompatible with phase. use delay"
# Call the configure function for the model
func="configure_%s@%s"%(function,signal["pg_model"])
api=api_pool.get_api(signal["pg_model"],func)
if api==None:
return 0,"%s does not implement the function %s"%(signal["pg_model"],function)
parameters=[]
parameters.append(values.get("frequency","undef"))
parameters.append(values.get("sync","undef"))
parameters.append(values.get("high_level","undef"))
parameters.append(values.get("low_level","undef"))
if function=="sine":
parameters.append(values.get("phase","undef"))
pass
if function=="square":
parameters.append(values.get("phase","undef"))
parameters.append(values.get("duty_cycle","undef"))
if function=="ramp":
parameters.append(values.get("phase","undef"))
parameters.append(values.get("symmetry","undef"))
if function=="pulse":
parameters.append(values.get("delay","undef"))
parameters.append(values.get("pulse_width","undef"))
parameters.append(values.get("rising_edge","undef"))
parameters.append(values.get("falling_edge","undef"))
retcode,res=submod_execcmd(func,signal["pg_id"],*parameters)
if retcode==0:
return 0,"error configuring %s on %s %s <- %s"%(signal["function"],signal["pg_model"],signal["pg_id"],res)
return 1,"ok"
def power(signal_id,op):
try:
signal=signal_pool.get(signal_id)
except Exception as e:
return 0,str(e)
# Call the power_on function for the model
if op.lower() not in ["on","off"]:
return 0,"invalid power operation"
func="power_%s@%s"%(op,signal["pg_model"])
retcode,res=submod_execcmd(func,signal["pg_id"])
if retcode==0:
return 0,"error turning %s <- %s"%(op,res)
return 1,"ok"
[docs]def power_on_signal(signal_id):
"Turn on signal *signal_id*"
return power(signal_id,"on")
[docs]def power_off_signal(signal_id):
"Turn off signal *signal_id*"
return power(signal_id,"off")
[docs]def get_error_queue_signal(signal_id):
"Read error queue of PG"
try:
signal=signal_pool.get(signal_id)
except Exception as e:
return 0,str(e)
# Call the get_error_queue function for the model
func="get_error_queue@"+signal["pg_model"]
api=api_pool.get_api(signal["pg_model"],func)
if api==None:
return 0,"%s does not implement the function get_error_queue"%(signal["pg_model"])
retcode,res=submod_execcmd(func,signal["pg_id"])
if retcode==0:
return 0,"error getting error queue <- %s"%(res)
return 1,res
def reset_pg_signal(signal_id):
try:
signal=signal_pool.get(signal_id)
except Exception as e:
return 0,str(e)
func="reset@"+signal["pg_model"]
api=api_pool.get_api(signal["pg_model"],func)
if api==None:
return 0,"%s does not implement the function reset"%(signal["pg_model"])
retcode,res=submod_execcmd(func,signal["pg_id"])
if retcode==0:
return 0,"error resetting <- %s"%(res)
return 1,res