Source code for cmd_ly_zi

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# 
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
# 
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame.  If not, see <http://www.gnu.org/licenses/>

from re import sub,search
import base64,pools,vicp

ly_zi_pool=pools.pool()
links={}

# Available channels
valid_channels=["M1","M2","M3","M4","C1","C2","C3","C4","F1","F2","F3","F4","F5","F6","F7","F8"]

# Functions

[docs]def init_ly_zi(ly_zi_id): """Initialize ly_zi DSO. *dso_hostname* is the IP address or resolvable network name""" ly_zi_pool.new(ly_zi_id) return 1,"ok"
[docs]def deinit_ly_zi(ly_zi_id): "Deregister an ly_zi from the pool" try: ly_zi=ly_zi_pool.get(ly_zi_id) except Exception as e: return 1,str(e) # Remove ly_zi from the pool ly_zi_pool.remove(ly_zi_id) return 1,"ok"
def config_ly_zi(ly_zi_id,dso_hostname): try: ly_zi=ly_zi_pool.get(ly_zi_id) except Exception as e: return 0,str(e) ly_zi["host"]=dso_hostname links[ly_zi_id]=connect(dso_hostname) return 1,"ok" def connect(dso_hostname): link=vicp.device() link.deviceAddress=dso_hostname if not link.connect(): raise Exception("Error initializing TCP link <- %s"%(link.LastErrorMsg)) # Turn on error logging if not link.write("CHLP EO\n",True,True): link.disconnect() raise Exception("Error setting up error logging <- VICP error") # Flush error queue retcode,res=get_error_queue(link) if retcode==0: raise Exception(res) # Turn on headers in responses if not link.write("CHDR SHORT\n"): raise Exception("Error turning on headers in responses") # Set datapoints to be recovered if not link.write("WFSU SP,0,NP,0,SN,0,FP,0\n"): raise Exception("Error setting up waveform") return link def inval_ly_zi(ly_zi_id,dso_hostname): try: ly_zi=ly_zi_pool.get(ly_zi_id) except Exception as e: return 0,str(e) if ly_zi_id in links: res=links[ly_zi_id].disconnect() if not res: return 0,"Error deinitializing TCP link <- %s"%(links[ly_zi_id].LastErrorMsg) return 1,"ok"
[docs]def get_data_ly_zi(ly_zi_id,sparsing,channel): """Get waveform of *channel*. *channel* can be 'C1', 'C2', 'C3', 'C4', 'M1', 'M2', 'M3', 'M4', 'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7' or 'F8', or a comma-separated list of channels. e.g.: C1,C2,F3. Data is sent back in base64. The *sparsing* parameter defines the interval between data points. *sparsing* =4 gets every 4th datapoint.""" try: ly_zi=ly_zi_pool.get(ly_zi_id) except Exception as e: return 0,str(e) # Expand channels channels=list(set(channel.split(","))) # Verify consistency of parameters for channel in channels: if channel not in valid_channels: return 0,"invalid channel %d"%(channel) # Send the command if ly_zi_id not in links: links[ly_zi_id]=connect(ly_zi["host"]) if not links[ly_zi_id].write("",True,True): return 0,"Error flushing queue <- VICP Error" if not links[ly_zi_id].write("*CLS; INE 1; *SRE 1\n"): return 0,"Error asking for SRQ <- VICP Error" retcode,res=links[ly_zi_id].serialPoll() if retcode == 1 and res == '1': wavedesc_total="" output="" for channel in channels: # Turn off headers in responses if not links[ly_zi_id].write("CHDR OFF\n"): return 0,"Error turning off headers in responses <- VICP Error" # Set datapoints to be recovered if not links[ly_zi_id].write("WFSU SP,{0},NP,0,SN,0,FP,0\n".format(sparsing)): return 0,"Error setting up waveform <- VICP Error" # Get waveform retcode,res=links[ly_zi_id].wrnrd("{0}:INSP? \"SIMPLE\",FLOAT\n".format(channel)) if retcode == 0: return 0,"Error getting the data <- %s"%(res) # Clean up wave=sub(r'[ \n\r]+','\n',res).strip("\"\n") # Get wave descriptor retcode,res=links[ly_zi_id].wrnrd("{0}:INSP? \"WAVEDESC\"\n".format(channel)) if retcode == 0: return 0,"Error getting the vertical scale <- %s"%(res) wavedesc=res wavedesc_total += wavedesc print(wavedesc) # Extract horizontal step h_step=float(search(r"HORIZ_INTERVAL *: ([0-9\.e+-]+)",wavedesc).expand(r"\1")) # Extract horizontal offset h_offset=float(search(r"HORIZ_OFFSET *: ([0-9\.e+-]+)",wavedesc).expand(r"\1")) # Extract sparsing factor sparsing=float(search(r"SPARSING_FACTOR *: ([0-9\.e+-]+)",wavedesc).expand(r"\1")) # Turn on headers in responses again if not links[ly_zi_id].write("CHDR SHORT\n"): return 0,"Error turning on headers in responses <- VICP error" # Rescale and add abscises x=0 y=0 wave=wave.split("\n") for i in range(len(wave)): x=i*h_step*sparsing + h_offset y=float(wave[i]) output+="%f %f\n"%(x,y) output+="\n\n" wavedesc_total=wavedesc_total.strip("\"").strip().replace("\r\n",";").strip(";").strip() return 1,base64.b64encode(output) return 0,"Error getting ready state from DSO (serialPoll) <- %s"%(links[ly_zi_id].LastErrorMsg)
[docs]def set_v_offset_ly_zi(ly_zi_id,v_offset,channel): "Set *v_offset* vertical offset for *channel*" try: ly_zi=ly_zi_pool.get(ly_zi_id) except Exception as e: return 0,str(e) # Verify consistency of parameters if channel not in valid_channels: return 0,"Error: invalid channel %d"%(channel) if ly_zi_id not in links: links[ly_zi_id]=connect(ly_zi["host"]) if not links[ly_zi_id].write("%s:OFST %f\n"%(channel,float(v_offset))): return 0,"Error setting vertical offset <- %s"%(res) return 1,"ok"
[docs]def set_v_div_ly_zi(ly_zi_id,v_div,channel): "Set *v_div* vertical volts per division for *channel*" try: ly_zi=ly_zi_pool.get(ly_zi_id) except Exception as e: return 0,str(e) # Verify consistency of parameters if channel not in valid_channels: return 0,"invalid channel %d"%(channel) if ly_zi_id not in links: links[ly_zi_id]=connect(ly_zi["host"]) if not links[ly_zi_id].write("%s:VDIV %f\n"%(channel,float(v_div))): return 0,"Error setting vertical volts per division <- %s"%(res) return 1,"ok"
[docs]def clear_sweep_ly_zi(ly_zi_id): "Clear sweep on DSO" try: ly_zi=ly_zi_pool.get(ly_zi_id) except Exception as e: return 0,str(e) if ly_zi_id not in links: links[ly_zi_id]=connect(ly_zi["host"]) if not links[ly_zi_id].write("CLSW\n"): return 0,"Error clearing sweep <- %s"%(res) return 1,"ok"
[docs]def get_error_queue_ly_zi(ly_zi_id): "Read error queue" try: ly_zi=ly_zi_pool.get(ly_zi_id) except Exception as e: return 0,str(e) if ly_zi_id not in links: links[ly_zi_id]=connect(ly_zi["host"]) return get_error_queue(links[ly_zi_id])
def get_error_queue(link): retcode,res=link.wrnrd("CHL? CLR\n") res=res.replace("\r\n",";").strip("\r\n") if retcode == 0: return 0,res elif res != "CHL \"\"": return 1,res else: return 1,""