Source code for cmd_pf_tpg300

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# 
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
# 
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame.  If not, see <http://www.gnu.org/licenses/>

import time
import pools,conf_strings,buses

channels=["A1","A2","B1","B2"]
Punits=["mbar","Torr","Pa"]

# PF_TPG300 #########################################################

class pf_tpg300(object):
    def __init__(self):
        self.pf_tpg300_pool=pools.pool()

    class pf_tpg300_Exception(Exception):
        pass

    def init(self,pf_tpg300_id,conf_string):
        try:
            conf=conf_strings.parse(conf_string)
        except Exception as e:
            return 0,str(e)
        if conf.name!="pf_tpg300":
            return 0,"Invalid module name %s in conf_string instead of pf_tpg300"%(conf.name)
        if not conf.has("chan"):
            print("warning: missing channel. assuming 1")
            conf.params["chan"]="1"
        if conf.params["chan"] not in map(str,range(1,5)):
            return 0,"invalid chan. must be from 1 to 4"
        if not conf.has("bus"):
            return 0,"Error: some of the required parameters (bus) in conf_string are not present"
        try:
            conf_bus=conf_strings.parse(conf.params["bus"])
        except Exception as e:
            return 0,str(e)
        if conf_bus.name=="serial":
            if not conf_bus.has("baudrate"):
                conf_bus.params["baudrate"]="9600"
            conf_bus.params["parity"]="O"
            conf_bus.params["bytesize"]="7"
        bus_id="bus_%s"%(pf_tpg300_id)
        retcode,res=submod_execcmd("init@"+conf_bus.name,bus_id,conf_strings.unparse(conf_bus))
        if retcode==0:
            return 0,"Error initializing link <- %s"%(res)
        self.pf_tpg300_pool.new(pf_tpg300_id,{"chan": int(conf.params["chan"]), "bus": conf_bus.name, "bus_id": bus_id})
        return 1,"ok"

    def deinit(self,pf_tpg300_id):
        try:
            pf_tpg300=self.pf_tpg300_pool.get(pf_tpg300_id)
        except Exception as e:
            return 1,str(e)
        retcode,res=submod_execcmd("deinit@"+pf_tpg300["bus"],pf_tpg300["bus_id"])
        if retcode==0:
            return 0,"Error deinitializing link <- %s"%(res)
        self.pf_tpg300_pool.remove(pf_tpg300_id)
        return 1,"ok"

    def send_query(self,command,pf_tpg300):
        print("send_query "+command)
        cmd=buses.encode_bin_string(command)
        retcode,res=submod_execcmd("wrnrd_bin_until@"+pf_tpg300["bus"],pf_tpg300["bus_id"],cmd+"0d",r"\n")
        res=res[:-2]
        if retcode==0 or res!="06":
            raise self.pf_tpg300_Exception((0,"Error sending query <- %s"%(res)))
        if res=="15": #15=NACK
            raise self.pf_tpg300_Exception((0,"invalid query"))
        if res!="06": #06=ACK
            raise self.pf_tpg300_Exception((0,"invalid answer from TPG300"))

    def get_answer(self,pf_tpg300):
        print("get_answer")
        retcode,res=submod_execcmd("wrnrd_bin_until@"+pf_tpg300["bus"],pf_tpg300["bus_id"],"050d",r"\n") # ENQ
        if retcode==0:
            raise self.pf_tpg300_Exception((0,"Error getting answer <- %s"%(res)))
        print("got %s"%(buses.decode_bin_string(res[:-2])))
        return buses.decode_bin_string(res[:-2])

    def config(self,pf_tpg300_id,units):
        try:
            pf_tpg300=self.pf_tpg300_pool.get(pf_tpg300_id)
        except Exception as e:
            return 0,str(e)
        if "configured" in pf_tpg300:
            return 1,"already configured"
        retcode,res=submod_execcmd("config@"+pf_tpg300["bus"],pf_tpg300["bus_id"])
        if retcode==0:
            return 0,"Error configuring link <- %s"%(res)
        try:
            if units not in ["mbar","Torr","Pa"]:
                raise self.pf_tpg300_Exception((0,"invalid units. must be mbar, Torr or Pa"))
            # set units
            cmd="UNI ,%d"%(Punits.index(units)+1)
            self.send_query(cmd,pf_tpg300)
            time.sleep(0.05)
        except self.pf_tpg300_Exception as e:
            _,_=submod_execcmd("inval@"+pf_tpg300["bus"],pf_tpg300["bus_id"])
            return 0,"error configuring channel %d : %s"%(pf_tpg300["chan"],e[0][1])
        pf_tpg300["configured"]=True
        return 1,"ok"

    def inval(self,pf_tpg300_id):
        try:
            pf_tpg300=self.pf_tpg300_pool.get(pf_tpg300_id)
        except Exception as e:
            return 0,str(e)
        if not "configured" in pf_tpg300:
            return 1,"not configured"
        # Invalidate bus
        retcode,res=submod_execcmd("inval@"+pf_tpg300["bus"],pf_tpg300["bus_id"])
        if retcode==0:
            return 0,"Error invalidating link <- %s"%(res)
        # Remove parameters set during config
        del pf_tpg300["configured"]
        return 1,"ok"

    def measure(self,pf_tpg300_id):
        try:
            pf_tpg300=self.pf_tpg300_pool.get(pf_tpg300_id)
        except Exception as e:
            return 0,str(e)
        if not "configured" in pf_tpg300:
            return 0,"not configured"
        cmd="P%s"%(channels[pf_tpg300["chan"]-1])
        self.send_query(cmd,pf_tpg300)
        res=self.get_answer(pf_tpg300)
        res=res.split(",")
        #switch over res[0]
        return {
                "0": (1,res[1]), # measurement data ok
                "1": (0,"underrange"),
                "2": (0,"overrange"),
                "3": (0,"measuring circuit error"),
                "4": (0,"measuring circuit is off"),
                "5": (0,"no hardware on this channel")
                }[res[0]]

# CREATE PF_TPG300 POOL #############################################

me=pf_tpg300()

# COMMANDS #######################################################

[docs]def init_pf_tpg300(pf_tpg300_id,conf_string): """Initialize PF_TPG300 pressure gauge. conf_string must contain the parameters: - chan: a integer from 1 to 4 for channels A1, A2, B1, B2. - bus: a conf_string for cmd_serial or cmd_tcp""" return me.init(pf_tpg300_id,conf_string)
[docs]def deinit_pf_tpg300(pf_tpg300_id): "Deinitialize *pf_tpg300_id* pressure gauge." return me.deinit(pf_tpg300_id)
[docs]def config_pf_tpg300(pf_tpg300_id,units): "Configure *pf_tpg300_id* pressure gauge. *units* can be mbar, Torr or Pa." return me.config(pf_tpg300_id,units)
[docs]def inval_pf_tpg300(pf_tpg300_id): "Invalidate *pf_tpg300_id* pressure gauge." return me.inval(pf_tpg300_id)
[docs]def get_pressure_pf_tpg300(pf_tpg300_id): "Take a measure from *pf_tpg300_id* pressure gauge" return me.measure(pf_tpg300_id)