Source code for cmd_ps

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# 
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
# 
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame.  If not, see <http://www.gnu.org/licenses/>

import time
import conf_strings,pools,apipools

# CLASS ##########################################################

class ps_class():
    ps_pool=pools.pool()

    def init(self,ps_id,conf_string):
        try:
            conf=conf_strings.parse(conf_string)
        except Exception as e:
            return 0,str(e)
        # Initialize PS
        model=conf.name
        model_id="psmodel_%s"%(ps_id)
        retcode,res=submod_execcmd("init@"+model,model_id,conf_string)
        if retcode==0:
            return 0,"Error initializing %s power supply <- %s"%(model,res)
        # Get API of the module and add it to the api_pool, if it's not already there
        if not api_pool.is_present(model):
            retcode,res=submod_execcmd("getapi@"+model)
            if retcode==0:
                return 0,"Can't get API for %s <- %s"%(model,res)
            api_pool.add_api_from_string(model,res)
        self.ps_pool.new(ps_id,{"model":model,"model_id":model_id})
        return 1,"ok"

    def deinit(self,ps_id):
        try:
            ps=self.ps_pool.get(ps_id)
        except Exception as e:
            return 0,str(e)
        # Call the deinitializer function for the model
        retcode,res=submod_execcmd("deinit@"+ps["model"],ps["model_id"])
        if retcode==0:
            return 0,"Error deinitializing %s power supply <- %s"%(ps["model"],res)
        # Remove ps from the pool
        try:
            self.ps_pool.remove(ps_id)
        except Exception as e:
            return 0,str(e)
        return retcode,res

    def config(self,ps_id):
        try:
            ps=self.ps_pool.get(ps_id)
        except Exception as e:
            return 0,str(e)
        # Call the configuration function for the model
        retcode,res=submod_execcmd("config@"+ps["model"],ps["model_id"])
        if retcode==0:
            return 0,"Error configuring %s power supply <- %s"%(ps["model"],res)
        return retcode,res

    def inval(self,ps_id):
        try:
            ps=self.ps_pool.get(ps_id)
        except Exception as e:
            return 0,str(e)
        # Call the invalidation function for the model
        retcode,res=submod_execcmd("inval@"+ps["model"],ps["model_id"])
        if retcode==0:
            return 0,"Error invalidating %s power supply <- %s"%(ps["model"],res)
        return retcode,res

    def relay(self,ps_id,function,*params):
        try:
            ps=self.ps_pool.get(ps_id)
        except Exception as e:
            return 0,str(e)
        # Call the function for the model
        func="%s@%s"%(function,ps["model"])
        api=api_pool.get_api(ps["model"],func)
        if api==None:
            return 0,"The selected PS model does not implement this function"
        retcode,res=submod_execcmd(func,ps["model_id"],*params)
        if retcode==0:
            return 0,"Error in %s <- %s"%(func,res)
        return 1,res

    def reset(self,ps_id):
        return self.relay(ps_id,"reset","undef")

    def set_voltage(self,ps_id,voltage,slew_rate="undef"):
        try:
            ps=self.ps_pool.get(ps_id)
        except Exception as e:
            return 0,str(e)
        # Check capabilities
        api=api_pool.get_api(ps["model"],"set_voltage_"+ps["model"])
        if api==None:
            return 0,"The selected PS model does not implement this function"
        # Call the set_voltage function for the model
        if "slew_rate" in api:
            retcode,res=submod_execcmd("set_voltage@"+ps["model"],ps["model_id"],voltage,slew_rate)
            if retcode==0:
                return 0,"Error setting voltage <- %s"%(res)
        elif slew_rate!="undef" and slew_rate!="" and voltage!="undef" and voltage!="":
            api=api_pool.get_api(ps["model"],"get_voltage_"+ps["model"])
            if api==None:
                return 0,"The selected PS model does not implement the required get_voltage function for ramps."
            retcode,res=submod_execcmd("get_voltage@"+ps["model"],ps["model_id"])
            if retcode==0:
                return 0,"Error getting voltage <- %s"%(res)
            v_i=float(res)
            voltage=float(voltage)
            slew_rate=float(slew_rate)
            steps=int(round((v_i - voltage)/slew_rate))
            if steps==0: steps=1
            for t in reversed(range(0,steps,steps/abs(steps))):
                time.sleep(1)
                v=str(voltage + t*slew_rate)
                retcode,res=submod_execcmd("set_voltage@"+ps["model"],ps["model_id"],v)
                if retcode==0:
                    return 0,"Error setting voltage <- %s"%(res)
        else:
            retcode,res=submod_execcmd("set_voltage@"+ps["model"],ps["model_id"],voltage)
            if retcode==0:
                return 0,"Error setting voltage <- %s"%(res)
        return retcode,res

    def set_current(self,ps_id,current):
        return self.relay(ps_id,"set_current",current)

    def set_voltage_limit(self,ps_id,voltage_limit):
        return self.relay(ps_id,"set_voltage_limit",voltage_limit)

    def set_current_limit(self,ps_id,current_limit):
        return self.relay(ps_id,"set_current_limit",current_limit)

    def get_voltage(self,ps_id):
        return self.relay(ps_id,"get_voltage")

    def get_current(self,ps_id):
        return self.relay(ps_id,"get_current")

    def power_on(self,ps_id):
        return self.relay(ps_id,"power_on")

    def power_off(self,ps_id):
        return self.relay(ps_id,"power_off")

    def set_digital_on(self,ps_id,active_level="undef",channel="undef"):
        return self.relay(ps_id,"set_digital_on",channel,active_level)

    def set_digital_off(self,ps_id,active_level="undef",channel="undef"):
        return self.relay(ps_id,"set_digital_off",channel,active_level)

    def free_command(self,ps_id,command):
        return self.relay(ps_id,"free_command",command)

    def free_query(self,ps_id,query):
        return self.relay(ps_id,"free_query","undef",query)

    def get_error_queue(self,ps_id):
        return self.relay(ps_id,"get_error_queue")

    def get_power_status(self,ps_id):
        return self.relay(ps_id,"get_power_status")

# CREATE POOL ####################################################

me=ps_class()
api_pool=apipools.api_pool()

# COMMANDS #######################################################

[docs]def init_ps(ps_id,conf_string): """Registers in the pool and initializes a new PS. *conf_string* is the configuration string for the module to be initialized.""" return me.init(ps_id,conf_string)
[docs]def deinit_ps(ps_id): "Deregister a PS from the pool" return me.deinit(ps_id)
[docs]def config_ps(ps_id): "Configure the PS" return me.config(ps_id)
[docs]def inval_ps(ps_id): "Invalidate configuration of PS" return me.inval(ps_id)
[docs]def reset_ps(ps_id): "Reset PS" return me.reset(ps_id)
[docs]def set_voltage_ps(ps_id,voltage,slew_rate="undef"): "Set voltage in Volts. Optional slew_rate argument in V/s for PS modules that support it." return me.set_voltage(ps_id,voltage,slew_rate)
[docs]def set_current_ps(ps_id,current): "Set current in Ampers." return me.set_current(ps_id,current)
[docs]def set_voltage_limit_ps(ps_id,voltage_limit): "Set voltage limit in Volts." return me.set_voltage_limit(ps_id,voltage_limit)
[docs]def set_current_limit_ps(ps_id,current_limit): "Set current limit in Ampers." return me.set_current_limit(ps_id,current_limit)
[docs]def get_voltage_ps(ps_id): "Get voltage in Volts." return me.get_voltage(ps_id)
[docs]def get_current_ps(ps_id): "Get current in Ampers." return me.get_current(ps_id)
[docs]def set_rise_delay_ps(ps_id,rise_delay): "Set power-on (rise) delay in seconds." return me.set_rise_delay(ps_id)
[docs]def power_on_ps(ps_id): "Turn on PS" return me.power_on(ps_id)
[docs]def power_off_ps(ps_id): "Turn off PS" return me.power_off(ps_id)
[docs]def set_digital_on_ps(ps_id,channel="undef",active_level="undef"): "Turn on digital output on PS" return me.set_digital_on(ps_id,channel,active_level)
[docs]def set_digital_off_ps(ps_id,channel="undef",active_level="undef"): "Turn off digital output on PS" return me.set_digital_off(ps_id,channel,active_level)
[docs]def free_command_ps(ps_id,command): "Send free command PS" return me.free_command(ps_id,command)
[docs]def free_query_ps(ps_id,query): "Send free query PS" return me.free_query(ps_id,query)
[docs]def get_error_queue_ps(ps_id): "Read error queue" return me.get_error_queue(ps_id)
[docs]def get_power_status_ps(ps_id): "Check power state on PS" return me.get_power_status(ps_id)