#!/usr/bin/env python2
# -*- coding: utf-8 -*-
#
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
#
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame. If not, see <http://www.gnu.org/licenses/>
import time
import pools,conf_strings,buses
channels=["A1","A2","B1","B2"]
Punits=["mbar","Torr","Pa"]
# PF_TPG300 #########################################################
class pf_tpg300(object):
def __init__(self):
self.pf_tpg300_pool=pools.pool()
class pf_tpg300_Exception(Exception):
pass
def init(self,pf_tpg300_id,conf_string):
try:
conf=conf_strings.parse(conf_string)
except Exception as e:
return 0,str(e)
if conf.name!="pf_tpg300":
return 0,"Invalid module name %s in conf_string instead of pf_tpg300"%(conf.name)
if not conf.has("chan"):
print("warning: missing channel. assuming 1")
conf.params["chan"]="1"
if conf.params["chan"] not in map(str,range(1,5)):
return 0,"invalid chan. must be from 1 to 4"
if not conf.has("bus"):
return 0,"Error: some of the required parameters (bus) in conf_string are not present"
try:
conf_bus=conf_strings.parse(conf.params["bus"])
except Exception as e:
return 0,str(e)
if conf_bus.name=="serial":
if not conf_bus.has("baudrate"):
conf_bus.params["baudrate"]="9600"
conf_bus.params["parity"]="O"
conf_bus.params["bytesize"]="7"
bus_id="bus_%s"%(pf_tpg300_id)
retcode,res=submod_execcmd("init@"+conf_bus.name,bus_id,conf_strings.unparse(conf_bus))
if retcode==0:
return 0,"Error initializing link <- %s"%(res)
self.pf_tpg300_pool.new(pf_tpg300_id,{"chan": int(conf.params["chan"]), "bus": conf_bus.name, "bus_id": bus_id})
return 1,"ok"
def deinit(self,pf_tpg300_id):
try:
pf_tpg300=self.pf_tpg300_pool.get(pf_tpg300_id)
except Exception as e:
return 1,str(e)
retcode,res=submod_execcmd("deinit@"+pf_tpg300["bus"],pf_tpg300["bus_id"])
if retcode==0:
return 0,"Error deinitializing link <- %s"%(res)
self.pf_tpg300_pool.remove(pf_tpg300_id)
return 1,"ok"
def send_query(self,command,pf_tpg300):
print("send_query "+command)
cmd=buses.encode_bin_string(command)
retcode,res=submod_execcmd("wrnrd_bin_until@"+pf_tpg300["bus"],pf_tpg300["bus_id"],cmd+"0d",r"\n")
res=res[:-2]
if retcode==0 or res!="06":
raise self.pf_tpg300_Exception((0,"Error sending query <- %s"%(res)))
if res=="15": #15=NACK
raise self.pf_tpg300_Exception((0,"invalid query"))
if res!="06": #06=ACK
raise self.pf_tpg300_Exception((0,"invalid answer from TPG300"))
def get_answer(self,pf_tpg300):
print("get_answer")
retcode,res=submod_execcmd("wrnrd_bin_until@"+pf_tpg300["bus"],pf_tpg300["bus_id"],"050d",r"\n") # ENQ
if retcode==0:
raise self.pf_tpg300_Exception((0,"Error getting answer <- %s"%(res)))
print("got %s"%(buses.decode_bin_string(res[:-2])))
return buses.decode_bin_string(res[:-2])
def config(self,pf_tpg300_id,units):
try:
pf_tpg300=self.pf_tpg300_pool.get(pf_tpg300_id)
except Exception as e:
return 0,str(e)
if "configured" in pf_tpg300:
return 1,"already configured"
retcode,res=submod_execcmd("config@"+pf_tpg300["bus"],pf_tpg300["bus_id"])
if retcode==0:
return 0,"Error configuring link <- %s"%(res)
try:
if units not in ["mbar","Torr","Pa"]:
raise self.pf_tpg300_Exception((0,"invalid units. must be mbar, Torr or Pa"))
# set units
cmd="UNI ,%d"%(Punits.index(units)+1)
self.send_query(cmd,pf_tpg300)
time.sleep(0.05)
except self.pf_tpg300_Exception as e:
_,_=submod_execcmd("inval@"+pf_tpg300["bus"],pf_tpg300["bus_id"])
return 0,"error configuring channel %d : %s"%(pf_tpg300["chan"],e[0][1])
pf_tpg300["configured"]=True
return 1,"ok"
def inval(self,pf_tpg300_id):
try:
pf_tpg300=self.pf_tpg300_pool.get(pf_tpg300_id)
except Exception as e:
return 0,str(e)
if not "configured" in pf_tpg300:
return 1,"not configured"
# Invalidate bus
retcode,res=submod_execcmd("inval@"+pf_tpg300["bus"],pf_tpg300["bus_id"])
if retcode==0:
return 0,"Error invalidating link <- %s"%(res)
# Remove parameters set during config
del pf_tpg300["configured"]
return 1,"ok"
def measure(self,pf_tpg300_id):
try:
pf_tpg300=self.pf_tpg300_pool.get(pf_tpg300_id)
except Exception as e:
return 0,str(e)
if not "configured" in pf_tpg300:
return 0,"not configured"
cmd="P%s"%(channels[pf_tpg300["chan"]-1])
self.send_query(cmd,pf_tpg300)
res=self.get_answer(pf_tpg300)
res=res.split(",")
#switch over res[0]
return {
"0": (1,res[1]), # measurement data ok
"1": (0,"underrange"),
"2": (0,"overrange"),
"3": (0,"measuring circuit error"),
"4": (0,"measuring circuit is off"),
"5": (0,"no hardware on this channel")
}[res[0]]
# CREATE PF_TPG300 POOL #############################################
me=pf_tpg300()
# COMMANDS #######################################################
[docs]def init_pf_tpg300(pf_tpg300_id,conf_string):
"""Initialize PF_TPG300 pressure gauge.
conf_string must contain the parameters:
- chan: a integer from 1 to 4 for channels A1, A2, B1, B2.
- bus: a conf_string for cmd_serial or cmd_tcp"""
return me.init(pf_tpg300_id,conf_string)
[docs]def deinit_pf_tpg300(pf_tpg300_id):
"Deinitialize *pf_tpg300_id* pressure gauge."
return me.deinit(pf_tpg300_id)
[docs]def config_pf_tpg300(pf_tpg300_id,units):
"Configure *pf_tpg300_id* pressure gauge. *units* can be mbar, Torr or Pa."
return me.config(pf_tpg300_id,units)
[docs]def inval_pf_tpg300(pf_tpg300_id):
"Invalidate *pf_tpg300_id* pressure gauge."
return me.inval(pf_tpg300_id)
[docs]def get_pressure_pf_tpg300(pf_tpg300_id):
"Take a measure from *pf_tpg300_id* pressure gauge"
return me.measure(pf_tpg300_id)