Source code for cmd_isegshq

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# 
# Copyright 2012-2015 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
# 
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame.  If not, see <http://www.gnu.org/licenses/>

import re,time
import conf_strings,pools

# CLASS ##########################################################
#  ISEG SHQx2x power supplies.
#  Harpo module optins : IWP(10 turn I), 0n1 (100pA on uA range)
##################################################################

class isegshq(object):

    def __init__(self):
        self.isegshq_pool=pools.pool()

    def bitfiled(self,val):
        "represent bite as bit list values"
        byte=int(val)
        return [byte >> i & 1 for i in range(0,6)]
    #    return [byte >> i & 1 for i in range(7,-1,-1)]

    def shqunpack(self,res):
        " convert shq float representation to float"
        fields=res.split("-")
        sign=1
        power=0
        man=0
        if len(fields)==3 and fields.pop(0)=="":
            sign=-1
        if len(fields)==2:
            man=int(fields[0])
            power=int(fields[1])
        else:
            return 0,0
        return sign*man,power

    def shq2float(self,res):
        " convert shq float representation to float"
        man,power=self.shqunpack(res)
        return float(man*pow(10,-1*power))

    # def ignore_crlf(self,string):
    #     # special chars removed in cmd serial ????
    #     return string.replace("\0","@").replace("\r","*").replace("\n","$")
    #     return string

    def command(self,isegshq,cmd,use_chan,val="undef"):
        """
        iseq command one string ended by <cr><lf>
        the hv reply: two strings
        first - ended by <cr><lf> echo of commad
        second - command parameter ended by <cr><lf>
        the paramater may be empty
        """
        if "configured" not in isegshq:
            return 0,"not configured"
        if use_chan:
            cmd+=isegshq["channel"]
        if val!="undef":
            cmd+="=%s"%(val)
        retcode,res=submod_execcmd("write@"+isegshq["bus"],isegshq["bus_id"],cmd+r"\r\n")
        if retcode==0:
            return 0,"Error writing to link <- %s"%(res)
        #read delay 
        time.sleep(0.2)
        retcode,res=submod_execcmd("read_until@"+isegshq["bus"],isegshq["bus_id"],r"\n")
        if retcode==0:
            return 0,"Error reading from link <- %s"%(res)
        echo=res
        retcode,res=submod_execcmd("read_until@"+isegshq["bus"],isegshq["bus_id"],r"\n")
        if retcode==0:
            return 0,"Error reading from link <- %s"%(res)
        hvreply= res
        if echo!=cmd or hvreply.startswith("?"):
            echo+=r"*"
            echo+=hvreply
            return 0,"SHQ Command error %s"%(echo)
        return 1,hvreply

    def get_mod_id(self,isegshq):
        """ Read module identifier """
        command=r"#"
        return self.command(isegshq,command,False)
        
    def get_break_time(self,isegshq):
        """ Read break time (delay beetween charachers) ms """
        command=r"W"
        return self.command(isegshq,command,False)

    def get_actual_voltage(self,isegshq):
        """ Read actual voltage (in V)"""
        command=r"U"
        return self.command(isegshq,command,True)

    def get_actual_current(self,isegshq):
        """ Read actual current (in A)"""
        command=r"I"
        return self.command(isegshq,command,True)

    def get_voltage_limit(self,isegshq):
        """ Read voltage limit (in % Vnom)"""
        command=r"M"
        return self.command(isegshq,command,True)

    def get_current_limit(self,isegshq):
        """ Read current limit (in % if Inom)"""
        command=r"N"
        return self.command(isegshq,command,True)

    def get_voltage_set(self,isegshq):
        """Read set voltage (in V)"""
        command=r"D"
        return self.command(isegshq,command,True)

    def get_ramp_speed(self,isegshq):
        """Read ramp speed (in V/s)"""
        command=r"V"
        return self.command(isegshq,command,True)

    def start_voltage_change(self,isegshq):
        """Start voltage change"""
        command="G"
        return self.command(isegshq,command,True)

    def get_current_trip(self,isegshq):
        """Read current trip
        L1=nnnnn resolution range mA > 0
        L1 * { mantisse / exp. with sign } * (s.a., current trip in A)
        """
        command=r"L"
        return self.command(isegshq,command,True)

    def get_current_trip_b(self,isegshq):
        """Read current trip
        LB1=nnnnn resolution range mA > 0
        """
        command=r"LB"
        return self.command(isegshq,command,True)

    def get_current_trip_s(self,isegshq):
        """Read current trip
        LS1=nnnnn resolution range uA > 0
        """
        command=r"LS"
        return self.command(isegshq,command,True)

    def get_status(self,isegshq):
        """Read status word"""
        command=r"S"
        return self.command(isegshq,command,True)

    def get_mod_status(self,isegshq):
        """Read module status"""
    #                    1     2     3      4     5     6     7
    #shq_st_bits=["","MAN","POS","OFF","KILL","INH","ERR","QUA"]
    #                  "REM","NEG","ON"
        command=r"T"
        return self.command(isegshq,command,True)

    def get_auto_start(self,isegshq):
        """Read auto start"""
        command=r"A"
        return self.command(isegshq,command,True)

   # set commands

    def set_break_time(self,isegshq,val):
        """ Write break time (delay beetween charachers) ms """
        command=r"W"
        return self.command(isegshq,command,False,val)

    # def set_voltage_limit(self,isegshq,val):
    #     """ Write voltage limit (in % Vnom)"""
    #     command=r"M"
    #     retcode,res=self.command(isegshq,command,True,val)
    #     return retcode,res

    # def set_current_limit(self,isegshq,val):
    #     """ Write current limit (in % if Inom)"""
    #     command=r"N"
    #     retcode,res=self.command(isegshq,command,True,val)
    #     return retcode,res

    def set_voltage_set(self,isegshq,val):
        """Write set voltage (in V)"""
        command=r"D"
        return self.command(isegshq,command,True,val)

    def set_ramp_speed(self,isegshq,val):
        """Write ramp speed (in V/s)"""
        command=r"V"
        return self.command(isegshq,command,True,val)

    def set_current_trip(self,isegshq,val):
        """Write current trip
        L1=nnnnn resolution range mA > 0
        L1 * { mantisse / exp. with sign } * (s.a., current trip in A)
        """
        command=r"L"
        return self.command(isegshq,command,True,val)

    def set_current_trip_b(self,isegshq,val):
        """Write current trip
        LB1=nnnnn resolution range mA > 0
        """
        command=r"LB"
        return self.command(isegshq,command,True,val)

    def set_current_trip_s(self,isegshq,val):
        """Write current trip
        LS1=nnnnn resolution range uA > 0
        """
        command=r"LS"
        return self.command(isegshq,command,True,val)

    def set_auto_start(self,isegshq,val):
        """Write auto start"""
        command=r"A"
        return self.command(isegshq,command,True,val)

    def notimpl(self,isegshq_id,function,*params):
        return 1,"The iSEG SHQ PS module does not implement the function %s"%(function)
        
    # end local ############################################################

    def init(self,isegshq_id,conf_string):
        try:
            conf=conf_strings.parse(conf_string)
        except Exception as e:
            return 0,str(e)
        if conf.name!="isegshq":
            return 0,"Invalid module name %s in conf_string instead of isegshq"%(conf.name)
        if not conf.has("bus"):
            return 0,"missing bus parameter in conf_string"
        if not conf.has("chan"):
            return 0,"missing chan parameter in conf_string"
        try:
            conf_bus=conf_strings.parse(conf.params["bus"])
        except Exception as e:
            return 0,str(e)
        if conf_bus.name!="serial":
            return 0,"Error: The bus must be serial"
        conf_bus.params["baudrate"]="9600"
        conf_bus.params["bytesize"]="8"
        conf_bus.params["parity"]="N"
        conf_bus.params["stopbits"]="1"
        conf_bus.params["timeout"]="5"
        # init bus
        bus_id="bus_%s"%(isegshq_id)
        retcode,res=submod_execcmd("init@"+conf_bus.name,bus_id,conf_strings.unparse(conf_bus))
        if retcode==0:
            return 0,"Error initializing link <- %s"%(res)
        self.isegshq_pool.new(isegshq_id,{
            "bus":conf_bus.name,
            "bus_id":bus_id,
            "channel":conf.params["chan"]})
        return 1,"ok"

    def deinit(self,isegshq_id):
        try:
            isegshq=self.isegshq_pool.get(isegshq_id)
        except Exception as e:
            return 0,str(e)
        # Call the deinitializer function for the model
        retcode,res=submod_execcmd("deinit@"+isegshq["bus"],isegshq["bus_id"])
        if retcode==0:
            return 0,"Error deinitializing link <- %s"%(res)
        # Remove isegshq from the pool
        try:
            self.isegshq_pool.remove(isegshq_id)
        except Exception as e:
            return 0,str(e)
        return 1,"ok"

    def config(self,isegshq_id,error_check="fast"):
        try:
            isegshq=self.isegshq_pool.get(isegshq_id)
        except Exception as e:
            return 0,str(e)
        if "configured" in isegshq:
            return 1,"already configured"
        # Configure link
        retcode,res=submod_execcmd("config@"+isegshq["bus"],isegshq["bus_id"])
        if retcode==0:
            return 0,"Error configuring link <- %s"%(res)
        #
        # \todo reset tty ( cleanup in/out queues),
        #  inval link on errors
        #
        
        # Optional configuration commands
        isegshq["error_check"]=error_check.lower()
        isegshq["configured"]=1
        isegshq["polarity"]=0.0
        isegshq["current_range"]=10e-6 # switch from mA to uA range
        isegshq["current_res"]=1e-10  # uA range resolution
        isegshq["Vmax"]=6000.0
        isegshq["Imax"]=1e-3

        #
        # SHQ autoconfiguration, Vmax, Imax, current limits 
        #
        retcode,res=self.get_mod_id(isegshq)
        if retcode==0:
            self.inval(isegshq_id)
            return 0,"Read mod id error invalidating link <- %s"%(res)
        ids=res.split(";")
        if len(ids)!=4:
            self.inval(isegshq_id)
            return 0,"mod id format error invalidating link"
        par=str(ids[2])
        volt=re.match(r"(\d+)V",par)
        if volt is not None:
            v=float(volt.group(1))
            if v!=isegshq["Vmax"]:
                print "SHQ Vmax ",v
                isegshq["Vmax"]=v
        par=str(ids[3])
        curr=re.match(r"(\d+)uA",par)
        if curr is not None:
            i=float(curr.group(1)) * 1e-6
            if i!=isegshq["Imax"]:
                print "SHQ imax ",i
                isegshq["Imax"]=i

        retcode,res=self.get_mod_status(isegshq)
        if retcode==0:
            if res.endswith("?WCN"):
                return 0,"SHQ channel invalid"
            else:
                return 0,res
        else:
            st_bits=self.bitfiled(res)
            if st_bits[2]==1:
                isegshq["polarity"]=1.0
            else:
                isegshq["polarity"]=-1.0
            if res!="000":
                print "Info channel",isegshq["channel"]," status ",res

        retcode,res=self.get_current_trip_s(isegshq)
        if retcode==0:
            return 0,res
        man,power=self.shqunpack(res)
        if int(power)==10:
            print "isegshq:Info current range (0n1 option)"
            #isegshq["current_range"]=10e-6
            #isegshq["current_res"]=1e-10
        else:
            print "isegshq:Info standard current range"
            isegshq["current_range"]=100e-6
            isegshq["current_res"]=1e-9
        #\todo On SHQ power on iniitial current limits <> 0
        # Do we need reset ??
        return 1,"ok"

    def inval(self,isegshq_id):
        try:
            isegshq=self.isegshq_pool.get(isegshq_id)
        except Exception as e:
            return 0,str(e)
        if "configured" not in isegshq:
            return 1,"not configured"
        # Call the invalidation function for the model
        retcode,res=submod_execcmd("inval@"+isegshq["bus"],isegshq["bus_id"])
        if retcode==0:
            return 0,"Error invalidating link <- %s"%(res)
        del isegshq["polarity"]
        del isegshq["current_range"]
        del isegshq["current_res"]
        del isegshq["Vmax"]
        del isegshq["Imax"]
        del isegshq["error_check"]
        del isegshq["configured"]
        return 1,"ok"

    # def reset(self,isegshq_id):
    #     return self.notimpl(isegshq_id,"reset","undef")

    def set_voltage(self,isegshq_id,voltage,slew_rate="undef"):
        try:
            isegshq=self.isegshq_pool.get(isegshq_id)
        except Exception as e:
            return 0,str(e)
        if slew_rate!="undef":
            ramp=int(float(slew_rate))
            if ramp < 2:
                print("Warnning rump must be in [2..255] region")
                ramp=2
            if ramp > 255:
                ramp=255
                print("Warnning rump must be in [2..255] region")
            #check in [2..255]
            retcode,res=self.set_ramp_speed(isegshq,str(ramp))
            if retcode==0:
                return 0,res
            if isegshq["error_check"]=="careful":
                retcode,res=self.get_ramp_speed(isegshq)
                if retcode==0:
                    return 0,res
                if ramp!=int(res):
                    return 0,"isegshq ramp speed %s"%(res)
        if float(voltage)==0.0:
            v0=0.0
        else:
            v0=float(voltage) * float(isegshq["polarity"])
        if v0 < 0.0:
            return 0,"SHQ Bad Voltage polarity %s"%(voltage)

        # to internal
        v1="%.1f"%(v0)
        retcode,res=self.set_voltage_set(isegshq,v1)
        if retcode==0:
            return 0,res
        if isegshq["error_check"]=="careful":
            retcode,res=self.get_voltage_set(isegshq)
            if retcode==0:
                return 0,res
            #from internal
            v2=str(self.shq2float(res))
            if v1!=v2:
                return 0,"voltage set %s"%(res)
        if isegshq["error_check"]!="fast":
            retcode,res=self.get_status(isegshq)
            if retcode==0:
                return 0,res
            ch,st=res.split("=",1)
            if "ON"!=st:
                return 0,"isegshq statys %s not ON"%(res)
            retcode,res=self.get_voltage_set(isegshq)
            if retcode==0:
                return 0,res
        retcode,res=self.start_voltage_change(isegshq)
        return retcode,res

    # def set_current(self,isegshq_id,current):
    #     return self.notimpl(isegshq_id,"set_current",current)

    def set_voltage_limit(self,isegshq_id,voltage_limit):
        return self.notimpl(isegshq_id,"set_voltage_limit",voltage_limit)

    def set_current_limit(self,isegshq_id,current_limit):
        try:
            isegshq=self.isegshq_pool.get(isegshq_id)
        except Exception as e:
            return 0,str(e)
        # remove limits
        retcode,res=self.set_current_trip_b(isegshq,"0")
        if retcode==0:
            return 0,res
        if isegshq["error_check"]=="careful":
            retcode,res=self.get_current_trip_b(isegshq)
            if retcode==0:
                return 0,res
            #from internal
            i1=self.shq2float(res)
            if i1!=0.0:
                return 0,"isegshq current_trip_b set %s <> 0"%(res)
        retcode,res=self.set_current_trip_s(isegshq,"0")
        if retcode==0:
            return 0,res
        if isegshq["error_check"]=="careful":
            retcode,res=self.get_current_trip_s(isegshq)
            if retcode==0:
                return 0,res
            #from internal
            i1=self.shq2float(res)
            if i1!=0.0:
                return 0,"isegshq current_trip_s set %s <> 0"%(res)
                
        # set limit for coreesponding range
        ilim=float(current_limit)

        if (ilim==0.0 or ilim > isegshq["Imax"]):
            print "isegshq:remove current limit"
        elif (ilim >= isegshq["current_range"]):
            # mA region
            i0=int(ilim / 1e-7)  # 100 nA 
            retcode,res=self.set_current_trip_b(isegshq,str(i0))
            if retcode==0:
                return 0,res
            if isegshq["error_check"]=="careful":
                retcode,res=self.get_current_trip_b(isegshq)
                if retcode==0:
                    return 0,res
                #from internal
                i1=self.shq2float(res)
                if i1!=ilim:
                    return 0,"isegshq current_trip_b set %f <> %f"%(i1,ilim)
        else:
            # uA region
            i0=int(ilim / isegshq["current_res"])  # 1 nA or 100 pA 
            retcode,res=self.set_current_trip_s(isegshq,str(i0))
            if retcode==0:
                return 0,res
            if isegshq["error_check"]=="careful":
                retcode,res=self.get_current_trip_s(isegshq)
                if retcode==0:
                    return 0,res
                #from internal
                i1=self.shq2float(res)
                if i1!=ilim:
                    return 0,"isegshq current_trip_s set %f <> %f"%(i1,ilim)

        if isegshq["error_check"]!="fast":
            retcode,res=self.get_status(isegshq)
            if retcode==0:
                return 0,res
            ch,st=res.split("=",1)
            if "ON"!=st:
                return 0,"isegshq statys %s not ON"%(res)

        return retcode,res

    def get_voltage(self,isegshq_id):
        try:
            isegshq=self.isegshq_pool.get(isegshq_id)
        except Exception as e:
            return 0,str(e)
        retcode,res= self.get_actual_voltage(isegshq)
        if retcode==0:
            return 0,res
        val=self.shq2float(res)
        return retcode,str(val)

    def get_current(self,isegshq_id):
        try:
            isegshq=self.isegshq_pool.get(isegshq_id)
        except Exception as e:
            return 0,str(e)
        retcode,res=self.get_actual_current(isegshq)
        if retcode==0:
            return 0,res
        val=self.shq2float(res)
        return retcode,str(val)

    def power_on(self,isegshq_id):
        return self.notimpl(isegshq_id,"power_on")

    def power_off(self,isegshq_id):
        return self.notimpl(isegshq_id,"power_off")

    def free_command(self,isegshq_id,command="undef"):
        try:
            isegshq=self.isegshq_pool.get(isegshq_id)
        except Exception as e:
            return 0,str(e)
        if command!="" and command!="undef":
            retcode,res=self.command(isegshq,command,False)
            if retcode==0:
                return 0,"Error writing to link <- %s"%(res)
            return 1,res
        return 1,"ok"

    def get_error_queue(self,isegshq_id):
        try:
            isegshq=self.isegshq_pool.get(isegshq_id)
        except Exception as e:
            return 0,str(e)
        summary=[]
        retcode,res=self.get_status(isegshq)
        if retcode==0:
            return 0,res
        if not res.lower().endswith("=on"):
            summary.append(res.lower())
        return retcode,",".join(summary)

    def get_status_word(self):
        try:
            isegshq=self.isegshq_pool.get(isegshq_id)
        except Exception as e:
            return 0,str(e)
        return self.get_status(isegshq)

    def get_module_status(self):
        try:
            isegshq=self.isegshq_pool.get(isegshq_id)
        except Exception as e:
            return 0,str(e)
        return self.get_mod_status(isegshq)

    def get_module_id(self="undef"):
        try:
            isegshq=self.isegshq_pool.get(isegshq_id)
        except Exception as e:
            return 0,str(e)
        return self.get_mod_id(isegshq)

    def get_power_status(self,isegshq_id):
        "Check power state on PS"
        try:
            isegshq=self.isegshq_pool.get(isegshq_id)
        except Exception as e:
            return 0,str(e)
        retcode,res=self.get_mod_status(isegshq)
        if retcode==0:
            return 0,str(res)
        st_bits=self.bitfiled(res)
        if st_bits[3]==1:
            return 1,"OFF"
        else:
            return 1,"ON"
        return 0,"ERR"

# CREATE POOL ####################################################

me=isegshq()

# COMMANDS #######################################################

# Functions

[docs]def init_isegshq(isegshq_id,conf_string): """Initialize ISEGSHQ power supply identified by *isegshq_id* *conf_string* must include the parameter: - bus: conf_string of the underlying link module (GPIB, TCP, ...) - channel: channel on which the id will act""" return me.init(isegshq_id,conf_string)
[docs]def deinit_isegshq(isegshq_id): "Deregister a ISEG SHQ PS from the pool" return me.deinit(isegshq_id)
[docs]def config_isegshq(isegshq_id,error_check="fast"): "Configure the ISEG SHQ PS" return me.config(isegshq_id,error_check)
[docs]def inval_isegshq(isegshq_id): "Invalidate configuration of ISEG SHQ PS" return me.inval(isegshq_id)
# def reset_isegshq(isegshq_id="undef"): # "Reset ISEG SHQ PS" # return me.reset(isegshq_id)
[docs]def set_voltage_isegshq(isegshq_id,voltage,slew_rate="undef"): "Set voltage in Volts. Optional slew_rate argument in V/s." return me.set_voltage(isegshq_id,voltage,slew_rate)
[docs]def set_current_limit_isegshq(isegshq_id,current_limit): "Set current limit in Ampers." return me.set_current_limit(isegshq_id,current_limit)
[docs]def get_voltage_isegshq(isegshq_id): "Get voltage in Volts" return me.get_voltage(isegshq_id)
[docs]def get_current_isegshq(isegshq_id): "Get current in Ampers." return me.get_current(isegshq_id)
[docs]def power_on_isegshq(isegshq_id): "Turn on ISEG SHQ PS" return me.power_on(isegshq_id)
[docs]def power_off_isegshq(isegshq_id): "Turn off ISEG SHQ PS" return me.power_off(isegshq_id)
[docs]def free_command_isegshq(isegshq_id,command="undef"): "Send free command to ISEG SHQ PS" return me.free_command(isegshq_id,command)
[docs]def get_error_queue_isegshq(isegshq_id): "Read error queue" return me.get_error_queue(isegshq_id)
[docs]def get_voltage_limit_isegshq(isegshq_id): "Get voltage limit in Volts." return me.get_voltage_limit(isegshq_id)
[docs]def get_current_limit_isegshq(isegshq_id): "Get current limit in Ampers." return me.get_current_limit(isegshq_id)
[docs]def get_module_status_isegshq(isegshq_id): "Get module status" return me.get_module_status(isegshq_id)
[docs]def get_status_isegshq(isegshq_id): "Get status word" return me.get_status_word(isegshq_id)
[docs]def get_module_id_isegshq(isegshq_id): "Get module ID" return me.get_module_id(isegshq_id)
[docs]def get_power_status_isegshq(ps_id): "Check power state on PS" return me.get_power_status(ps_id)