#!/usr/bin/env python2
# -*- coding: utf-8 -*-
#
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
#
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame. If not, see <http://www.gnu.org/licenses/>
from re import sub,search
import base64,pools,vicp
ly_zi_pool=pools.pool()
links={}
# Available channels
valid_channels=["M1","M2","M3","M4","C1","C2","C3","C4","F1","F2","F3","F4","F5","F6","F7","F8"]
# Functions
[docs]def init_ly_zi(ly_zi_id):
"""Initialize ly_zi DSO. *dso_hostname* is the IP address or resolvable network name"""
ly_zi_pool.new(ly_zi_id)
return 1,"ok"
[docs]def deinit_ly_zi(ly_zi_id):
"Deregister an ly_zi from the pool"
try:
ly_zi=ly_zi_pool.get(ly_zi_id)
except Exception as e:
return 1,str(e)
# Remove ly_zi from the pool
ly_zi_pool.remove(ly_zi_id)
return 1,"ok"
def config_ly_zi(ly_zi_id,dso_hostname):
try:
ly_zi=ly_zi_pool.get(ly_zi_id)
except Exception as e:
return 0,str(e)
ly_zi["host"]=dso_hostname
links[ly_zi_id]=connect(dso_hostname)
return 1,"ok"
def connect(dso_hostname):
link=vicp.device()
link.deviceAddress=dso_hostname
if not link.connect():
raise Exception("Error initializing TCP link <- %s"%(link.LastErrorMsg))
# Turn on error logging
if not link.write("CHLP EO\n",True,True):
link.disconnect()
raise Exception("Error setting up error logging <- VICP error")
# Flush error queue
retcode,res=get_error_queue(link)
if retcode==0:
raise Exception(res)
# Turn on headers in responses
if not link.write("CHDR SHORT\n"):
raise Exception("Error turning on headers in responses")
# Set datapoints to be recovered
if not link.write("WFSU SP,0,NP,0,SN,0,FP,0\n"):
raise Exception("Error setting up waveform")
return link
def inval_ly_zi(ly_zi_id,dso_hostname):
try:
ly_zi=ly_zi_pool.get(ly_zi_id)
except Exception as e:
return 0,str(e)
if ly_zi_id in links:
res=links[ly_zi_id].disconnect()
if not res:
return 0,"Error deinitializing TCP link <- %s"%(links[ly_zi_id].LastErrorMsg)
return 1,"ok"
[docs]def get_data_ly_zi(ly_zi_id,sparsing,channel):
"""Get waveform of *channel*. *channel* can be 'C1', 'C2', 'C3', 'C4', 'M1', 'M2', 'M3', 'M4', 'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7' or 'F8', or a comma-separated list of channels. e.g.: C1,C2,F3. Data is sent back in base64. The *sparsing* parameter defines the interval between data points. *sparsing* =4 gets every 4th datapoint."""
try:
ly_zi=ly_zi_pool.get(ly_zi_id)
except Exception as e:
return 0,str(e)
# Expand channels
channels=list(set(channel.split(",")))
# Verify consistency of parameters
for channel in channels:
if channel not in valid_channels:
return 0,"invalid channel %d"%(channel)
# Send the command
if ly_zi_id not in links:
links[ly_zi_id]=connect(ly_zi["host"])
if not links[ly_zi_id].write("",True,True):
return 0,"Error flushing queue <- VICP Error"
if not links[ly_zi_id].write("*CLS; INE 1; *SRE 1\n"):
return 0,"Error asking for SRQ <- VICP Error"
retcode,res=links[ly_zi_id].serialPoll()
if retcode == 1 and res == '1':
wavedesc_total=""
output=""
for channel in channels:
# Turn off headers in responses
if not links[ly_zi_id].write("CHDR OFF\n"):
return 0,"Error turning off headers in responses <- VICP Error"
# Set datapoints to be recovered
if not links[ly_zi_id].write("WFSU SP,{0},NP,0,SN,0,FP,0\n".format(sparsing)):
return 0,"Error setting up waveform <- VICP Error"
# Get waveform
retcode,res=links[ly_zi_id].wrnrd("{0}:INSP? \"SIMPLE\",FLOAT\n".format(channel))
if retcode == 0:
return 0,"Error getting the data <- %s"%(res)
# Clean up
wave=sub(r'[ \n\r]+','\n',res).strip("\"\n")
# Get wave descriptor
retcode,res=links[ly_zi_id].wrnrd("{0}:INSP? \"WAVEDESC\"\n".format(channel))
if retcode == 0:
return 0,"Error getting the vertical scale <- %s"%(res)
wavedesc=res
wavedesc_total += wavedesc
print(wavedesc)
# Extract horizontal step
h_step=float(search(r"HORIZ_INTERVAL *: ([0-9\.e+-]+)",wavedesc).expand(r"\1"))
# Extract horizontal offset
h_offset=float(search(r"HORIZ_OFFSET *: ([0-9\.e+-]+)",wavedesc).expand(r"\1"))
# Extract sparsing factor
sparsing=float(search(r"SPARSING_FACTOR *: ([0-9\.e+-]+)",wavedesc).expand(r"\1"))
# Turn on headers in responses again
if not links[ly_zi_id].write("CHDR SHORT\n"):
return 0,"Error turning on headers in responses <- VICP error"
# Rescale and add abscises
x=0
y=0
wave=wave.split("\n")
for i in range(len(wave)):
x=i*h_step*sparsing + h_offset
y=float(wave[i])
output+="%f %f\n"%(x,y)
output+="\n\n"
wavedesc_total=wavedesc_total.strip("\"").strip().replace("\r\n",";").strip(";").strip()
return 1,base64.b64encode(output)
return 0,"Error getting ready state from DSO (serialPoll) <- %s"%(links[ly_zi_id].LastErrorMsg)
[docs]def set_v_offset_ly_zi(ly_zi_id,v_offset,channel):
"Set *v_offset* vertical offset for *channel*"
try:
ly_zi=ly_zi_pool.get(ly_zi_id)
except Exception as e:
return 0,str(e)
# Verify consistency of parameters
if channel not in valid_channels:
return 0,"Error: invalid channel %d"%(channel)
if ly_zi_id not in links:
links[ly_zi_id]=connect(ly_zi["host"])
if not links[ly_zi_id].write("%s:OFST %f\n"%(channel,float(v_offset))):
return 0,"Error setting vertical offset <- %s"%(res)
return 1,"ok"
[docs]def set_v_div_ly_zi(ly_zi_id,v_div,channel):
"Set *v_div* vertical volts per division for *channel*"
try:
ly_zi=ly_zi_pool.get(ly_zi_id)
except Exception as e:
return 0,str(e)
# Verify consistency of parameters
if channel not in valid_channels:
return 0,"invalid channel %d"%(channel)
if ly_zi_id not in links:
links[ly_zi_id]=connect(ly_zi["host"])
if not links[ly_zi_id].write("%s:VDIV %f\n"%(channel,float(v_div))):
return 0,"Error setting vertical volts per division <- %s"%(res)
return 1,"ok"
[docs]def clear_sweep_ly_zi(ly_zi_id):
"Clear sweep on DSO"
try:
ly_zi=ly_zi_pool.get(ly_zi_id)
except Exception as e:
return 0,str(e)
if ly_zi_id not in links:
links[ly_zi_id]=connect(ly_zi["host"])
if not links[ly_zi_id].write("CLSW\n"):
return 0,"Error clearing sweep <- %s"%(res)
return 1,"ok"
[docs]def get_error_queue_ly_zi(ly_zi_id):
"Read error queue"
try:
ly_zi=ly_zi_pool.get(ly_zi_id)
except Exception as e:
return 0,str(e)
if ly_zi_id not in links:
links[ly_zi_id]=connect(ly_zi["host"])
return get_error_queue(links[ly_zi_id])
def get_error_queue(link):
retcode,res=link.wrnrd("CHL? CLR\n")
res=res.replace("\r\n",";").strip("\r\n")
if retcode == 0:
return 0,res
elif res != "CHL \"\"":
return 1,res
else: return 1,""