#!/nsr/bin/env python2
# -*- coding: utf-8 -*-
#
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
#
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame. If not, see <http://www.gnu.org/licenses/>
import pools,conf_strings
# CLASS ##########################################################
class gpib_class():
def __init__(self):
self.gpib_pool=pools.pool()
def init(self,gpib_id,conf_string):
# Extract parameters from conf_string
try:
conf=conf_strings.parse(conf_string)
except Exception as e:
return 0,str(e)
if conf.name!="gpib":
return 0,"Invalid module name in conf_string"
if not conf.has("bus","dst_addr"):
return 0,"bus and dst_addr needed in conf_string"
bus_cs=conf.params["bus"]
try:
bus_conf=conf_strings.parse(bus_cs)
except Exception as e:
return 0,str(e)
if bus_conf.name=="tcp" and not bus_conf.has("port"):
bus_conf.params["port"]=1234
elif bus_conf.name=="serial" and not bus_conf.has("device") and not bus_conf.has("vendor","product"):
bus_conf.params["vendor"]="0403"
bus_conf.params["product"]="6001"
try:
conf.params["bus"]=conf_strings.unparse(bus_conf)
except Exception as e:
return 0,str(e)
dst_addr=conf.params["dst_addr"]
adapter_addr="0"
if conf.has("adapter_addr"):
adapter_addr=conf.params["adapter_addr"]
# Validate parameters
if dst_addr=="" or dst_addr=="undef" or int(dst_addr) not in range(0,30):
return 0,"Invalid dst_addr %s"%(dst_addr)
if adapter_addr=="" or adapter_addr=="undef" or int(adapter_addr) not in range(0,30):
return 0,"Invalid adapter_addr %s"%(adapter_addr)
# Initialize bus
bus_id="bus_%s"%(gpib_id)
retcode,res=submod_execcmd("init@"+bus_conf.name,bus_id,conf.params["bus"])
if retcode==0:
return 0,"Error initializing bus <- %s"%(res)
# Add to the pool
self.gpib_pool.new(gpib_id,{"bus_type": bus_conf.name, "bus_id": bus_id, "dst_addr": dst_addr, "adapter_addr": adapter_addr})
return 1,"ok"
def deinit(self,gpib_id):
try:
gpib=self.gpib_pool.get(gpib_id)
except Exception as e:
return 1,str(e)
retcode,res=submod_execcmd("deinit@"+gpib["bus_type"],gpib["bus_id"])
if retcode==0:
return 0,"Error attempting to deinitialize bus <- %s"%(res)
try:
self.gpib_pool.remove(gpib_id)
except Exception as e:
return 0,str(e)
return 1,"ok"
def config(self,gpib_id):
try:
gpib=self.gpib_pool.get(gpib_id)
except Exception as e:
return 0,str(e)
if "bus_canon" in gpib:
return 1,"already configured"
retcode,res=submod_execcmd("config@"+gpib["bus_type"],gpib["bus_id"])
if retcode==0:
return 0,"Error attempting to configure bus <- %s"%(res)
bus_canon=res
# If a GPIB connection with the same bus is already in the pool,
# check that there's no inconsistent configuration attempt
found=False
for _,b in self.gpib_pool.get_all():
if "bus_canon" in b and (b["bus_canon"]==bus_canon):
found=True
break
if found:
if b["adapter_addr"]!=gpib["adapter_addr"]:
submod_execcmd("inval@"+gpib["bus_type"],gpib["bus_id"])
del gpib["bus_canon"]
return 0,"GPIB adapter already initialized with different adapter address %s"%(b["adapter_addr"])
if not found or (found and "bus_canon" not in b):
# Configure adapter_addr
retcode,res=submod_execcmd("write@"+gpib["bus_type"],gpib["bus_id"],r"++mode 0\n++addr %s\n++mode 1\n"%(gpib["adapter_addr"]))
if retcode==0:
self.inval(gpib_id)
return 0,"Error setting adapter address <- %s"%(res)
# Check connectivity
retcode,res=submod_execcmd("wrnrd_until@"+gpib["bus_type"],gpib["bus_id"],r"++mode\n",r"\n")
if retcode==0 or res!="1":
self.inval(gpib_id)
return 0,"Error checking connectivity <- %s"%(res)
# Configure
retcode,res=submod_execcmd("write@"+gpib["bus_type"],gpib["bus_id"],r"++eot_enable 1\n++eot_char 10\n")
if retcode==0:
self.inval(gpib_id)
return 0,"Error while configuring GPIB link <- %s"%(res)
# Shut up last recipient
retcode,res=submod_execcmd("write@"+gpib["bus_type"],gpib["bus_id"],r"++auto 0\n")
if retcode==0:
self.inval(gpib_id)
return 0,"Error while shuting up last recipient %s <- %s"%(res)
gpib["bus_canon"]=bus_canon
return 1,"ok"
def inval(self,gpib_id):
try:
gpib=self.gpib_pool.get(gpib_id)
except Exception as e:
return 0,str(e)
if "bus_canon" not in gpib:
return 1,"not configured"
retcode,res=submod_execcmd("inval@"+gpib["bus_type"],gpib["bus_id"])
if retcode==0:
return 0,"Error attempting to invalidate bus <- %s"%(res)
del gpib["bus_canon"]
return 1,"ok"
def send(self,gpib,data):
if not "bus_canon" in gpib:
return 0,"not configured"
data=r"++addr %s\n"%(gpib["dst_addr"]) + data
retcode,res=submod_execcmd("write@"+gpib["bus_type"],gpib["bus_id"],data)
if (retcode==0):
return 0,"Error writing data <- %s"%(res)
return 1,"ok"
def write(self,gpib_id,data):
try:
gpib=self.gpib_pool.get(gpib_id)
except Exception as e:
return 0,str(e)
# Write
return self.send(gpib,data)
def wrnrd_until(self,gpib_id,data,eot=r"\n",timeout="undef"):
try:
gpib=self.gpib_pool.get(gpib_id)
except Exception as e:
return 0,str(e)
if eot!=r"\n":
print("\n\nWarning: Calling wrnrd_until_gpib with eot other than %s\n\n"%(r"\n"))
# Write and read
data += r"++auto 1\n" # At the end add command to address device to talk
retcode,res=self.send(gpib,data)
if retcode==0:
return 0,res
# Avoid empty returns because of first character read being an eot
res=""
while res=="":
retcode,res=submod_execcmd("read_until@"+gpib["bus_type"],gpib["bus_id"],eot,timeout)
if retcode==0:
return 0,"Error reading GPIB answer <- %s"%(res)
# Address device to listen
retcode2,res2=self.send(gpib,r"++auto 0\n")
if retcode2==0:
return 0,"Error setting device to listen: %s"%(res2)
return retcode,res
def expect(self,gpib_id,pattern,timeout="undef"):
try:
gpib=self.gpib_pool.get(gpib_id)
except Exception as e:
return 0,str(e)
if not "bus_canon" in gpib:
return 0,"not configured"
retcode,res=submod_execcmd("write@"+gpib["bus_type"],gpib["bus_id"],r"++addr %s\n++auto 1\n"%(gpib["dst_addr"]))
if retcode==0:
return 0,"Error setting destination address to talk <- %s"%(res)
retcode,res=submod_execcmd("expect@"+gpib["bus_type"],gpib["bus_id"],pattern,timeout)
retcode2,res2=submod_execcmd("write@"+gpib["bus_type"],gpib["bus_id"],r"++auto 0\n")
if retcode2==0:
return 0,"%s, and error setting destination address not to talk <- %s"%(res2)
if retcode==0:
return 0,"Pattern not found or GPIB link not responding <- %s"%(res)
return retcode,res
# CREATE POOL ####################################################
bus=gpib_class()
# COMMANDS #######################################################
[docs]def init_gpib(gpib_id,conf_string):
"""Initialize a GPIB link.
*conf_string* must include:
- bus: conf_string of the underlying bus module. For tcp and serial, the default tcp port or serial vendor and product id will be included if not present. For a USB adapter, a typical value could be `serial()`
- dst_addr: GPIB address of the destination device
- adapter_addr (defaults to 0): GPIB address of the adapter"""
return bus.init(gpib_id,conf_string)
[docs]def deinit_gpib(gpib_id):
"Deinitialize and deregister GPIB link *gpib_id*."
return bus.deinit(gpib_id)
[docs]def config_gpib(gpib_id):
"Configure GPIB link *gpib_id*."
return bus.config(gpib_id)
[docs]def inval_gpib(gpib_id):
"Invalidate configuration of GPIB link *gpib_id*."
return bus.inval(gpib_id)
[docs]def write_gpib(gpib_id,data):
"Write *data* to GPIB link *gpib_id*."
return bus.write(gpib_id,data)
[docs]def wrnrd_until_gpib(gpib_id,data,eot=r"\n",timeout="undef"):
"Write and read data to and from GPIB link *gpib_id*."
return bus.wrnrd_until(gpib_id,data,eot,timeout)
[docs]def expect_gpib(gpib_id,pattern,timeout="undef"):
"Read data from GPIB link *gpib_id* until *pattern* is found or timeout."
return bus.expect(gpib_id,pattern,timeout)