#!/usr/bin/env python2
# -*- coding: utf-8 -*-
#
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
#
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame. If not, see <http://www.gnu.org/licenses/>
import time
import conf_strings,pools,apipools
# CLASS ##########################################################
class ps_class():
ps_pool=pools.pool()
def init(self,ps_id,conf_string):
try:
conf=conf_strings.parse(conf_string)
except Exception as e:
return 0,str(e)
# Initialize PS
model=conf.name
model_id="psmodel_%s"%(ps_id)
retcode,res=submod_execcmd("init@"+model,model_id,conf_string)
if retcode==0:
return 0,"Error initializing %s power supply <- %s"%(model,res)
# Get API of the module and add it to the api_pool, if it's not already there
if not api_pool.is_present(model):
retcode,res=submod_execcmd("getapi@"+model)
if retcode==0:
return 0,"Can't get API for %s <- %s"%(model,res)
api_pool.add_api_from_string(model,res)
self.ps_pool.new(ps_id,{"model":model,"model_id":model_id})
return 1,"ok"
def deinit(self,ps_id):
try:
ps=self.ps_pool.get(ps_id)
except Exception as e:
return 0,str(e)
# Call the deinitializer function for the model
retcode,res=submod_execcmd("deinit@"+ps["model"],ps["model_id"])
if retcode==0:
return 0,"Error deinitializing %s power supply <- %s"%(ps["model"],res)
# Remove ps from the pool
try:
self.ps_pool.remove(ps_id)
except Exception as e:
return 0,str(e)
return retcode,res
def config(self,ps_id):
try:
ps=self.ps_pool.get(ps_id)
except Exception as e:
return 0,str(e)
# Call the configuration function for the model
retcode,res=submod_execcmd("config@"+ps["model"],ps["model_id"])
if retcode==0:
return 0,"Error configuring %s power supply <- %s"%(ps["model"],res)
return retcode,res
def inval(self,ps_id):
try:
ps=self.ps_pool.get(ps_id)
except Exception as e:
return 0,str(e)
# Call the invalidation function for the model
retcode,res=submod_execcmd("inval@"+ps["model"],ps["model_id"])
if retcode==0:
return 0,"Error invalidating %s power supply <- %s"%(ps["model"],res)
return retcode,res
def relay(self,ps_id,function,*params):
try:
ps=self.ps_pool.get(ps_id)
except Exception as e:
return 0,str(e)
# Call the function for the model
func="%s@%s"%(function,ps["model"])
api=api_pool.get_api(ps["model"],func)
if api==None:
return 0,"The selected PS model does not implement this function"
retcode,res=submod_execcmd(func,ps["model_id"],*params)
if retcode==0:
return 0,"Error in %s <- %s"%(func,res)
return 1,res
def reset(self,ps_id):
return self.relay(ps_id,"reset","undef")
def set_voltage(self,ps_id,voltage,slew_rate="undef"):
try:
ps=self.ps_pool.get(ps_id)
except Exception as e:
return 0,str(e)
# Check capabilities
api=api_pool.get_api(ps["model"],"set_voltage_"+ps["model"])
if api==None:
return 0,"The selected PS model does not implement this function"
# Call the set_voltage function for the model
if "slew_rate" in api:
retcode,res=submod_execcmd("set_voltage@"+ps["model"],ps["model_id"],voltage,slew_rate)
if retcode==0:
return 0,"Error setting voltage <- %s"%(res)
elif slew_rate!="undef" and slew_rate!="" and voltage!="undef" and voltage!="":
api=api_pool.get_api(ps["model"],"get_voltage_"+ps["model"])
if api==None:
return 0,"The selected PS model does not implement the required get_voltage function for ramps."
retcode,res=submod_execcmd("get_voltage@"+ps["model"],ps["model_id"])
if retcode==0:
return 0,"Error getting voltage <- %s"%(res)
v_i=float(res)
voltage=float(voltage)
slew_rate=float(slew_rate)
steps=int(round((v_i - voltage)/slew_rate))
if steps==0: steps=1
for t in reversed(range(0,steps,steps/abs(steps))):
time.sleep(1)
v=str(voltage + t*slew_rate)
retcode,res=submod_execcmd("set_voltage@"+ps["model"],ps["model_id"],v)
if retcode==0:
return 0,"Error setting voltage <- %s"%(res)
else:
retcode,res=submod_execcmd("set_voltage@"+ps["model"],ps["model_id"],voltage)
if retcode==0:
return 0,"Error setting voltage <- %s"%(res)
return retcode,res
def set_current(self,ps_id,current):
return self.relay(ps_id,"set_current",current)
def set_voltage_limit(self,ps_id,voltage_limit):
return self.relay(ps_id,"set_voltage_limit",voltage_limit)
def set_current_limit(self,ps_id,current_limit):
return self.relay(ps_id,"set_current_limit",current_limit)
def get_voltage(self,ps_id):
return self.relay(ps_id,"get_voltage")
def get_current(self,ps_id):
return self.relay(ps_id,"get_current")
def power_on(self,ps_id):
return self.relay(ps_id,"power_on")
def power_off(self,ps_id):
return self.relay(ps_id,"power_off")
def set_digital_on(self,ps_id,active_level="undef",channel="undef"):
return self.relay(ps_id,"set_digital_on",channel,active_level)
def set_digital_off(self,ps_id,active_level="undef",channel="undef"):
return self.relay(ps_id,"set_digital_off",channel,active_level)
def free_command(self,ps_id,command):
return self.relay(ps_id,"free_command",command)
def free_query(self,ps_id,query):
return self.relay(ps_id,"free_query","undef",query)
def get_error_queue(self,ps_id):
return self.relay(ps_id,"get_error_queue")
def get_power_status(self,ps_id):
return self.relay(ps_id,"get_power_status")
# CREATE POOL ####################################################
me=ps_class()
api_pool=apipools.api_pool()
# COMMANDS #######################################################
[docs]def init_ps(ps_id,conf_string):
"""Registers in the pool and initializes a new PS. *conf_string* is the configuration string for the module to be initialized."""
return me.init(ps_id,conf_string)
[docs]def deinit_ps(ps_id):
"Deregister a PS from the pool"
return me.deinit(ps_id)
[docs]def config_ps(ps_id):
"Configure the PS"
return me.config(ps_id)
[docs]def inval_ps(ps_id):
"Invalidate configuration of PS"
return me.inval(ps_id)
[docs]def reset_ps(ps_id):
"Reset PS"
return me.reset(ps_id)
[docs]def set_voltage_ps(ps_id,voltage,slew_rate="undef"):
"Set voltage in Volts. Optional slew_rate argument in V/s for PS modules that support it."
return me.set_voltage(ps_id,voltage,slew_rate)
[docs]def set_current_ps(ps_id,current):
"Set current in Ampers."
return me.set_current(ps_id,current)
[docs]def set_voltage_limit_ps(ps_id,voltage_limit):
"Set voltage limit in Volts."
return me.set_voltage_limit(ps_id,voltage_limit)
[docs]def set_current_limit_ps(ps_id,current_limit):
"Set current limit in Ampers."
return me.set_current_limit(ps_id,current_limit)
[docs]def get_voltage_ps(ps_id):
"Get voltage in Volts."
return me.get_voltage(ps_id)
[docs]def get_current_ps(ps_id):
"Get current in Ampers."
return me.get_current(ps_id)
[docs]def set_rise_delay_ps(ps_id,rise_delay):
"Set power-on (rise) delay in seconds."
return me.set_rise_delay(ps_id)
[docs]def power_on_ps(ps_id):
"Turn on PS"
return me.power_on(ps_id)
[docs]def power_off_ps(ps_id):
"Turn off PS"
return me.power_off(ps_id)
[docs]def set_digital_on_ps(ps_id,channel="undef",active_level="undef"):
"Turn on digital output on PS"
return me.set_digital_on(ps_id,channel,active_level)
[docs]def set_digital_off_ps(ps_id,channel="undef",active_level="undef"):
"Turn off digital output on PS"
return me.set_digital_off(ps_id,channel,active_level)
[docs]def free_command_ps(ps_id,command):
"Send free command PS"
return me.free_command(ps_id,command)
[docs]def free_query_ps(ps_id,query):
"Send free query PS"
return me.free_query(ps_id,query)
[docs]def get_error_queue_ps(ps_id):
"Read error queue"
return me.get_error_queue(ps_id)
[docs]def get_power_status_ps(ps_id):
"Check power state on PS"
return me.get_power_status(ps_id)