Source code for cmd_nw_esp301

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# 
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
# 
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame.  If not, see <http://www.gnu.org/licenses/>

import time
import pools,conf_strings

nw_esp301_pool=pools.pool()

# Axes
axes=["1","2","3"]

# Ports
ports=["A","B"]

# COMMANDS #######################################################

[docs]def init_nw_esp301(nw_esp301_id,conf_string): """Initialize NW_ESP301 motion controller. *conf_string* must contain: - bus: conf_string of the underlying link module - chan: 1, 2 or 3 for the channel of the axis""" try: conf=conf_strings.parse(conf_string) except Exception as e: return 0,"%s" % str(e) if conf.name!="nw_esp301": return 0,"Invalid module name %s in conf_string instead of nw_esp301"%(conf.name) if not conf.has("bus","chan"): return 0,"Some of the required parameters (bus and chan) in conf_string are not present" if conf.params["chan"] not in axes: return 0,"invalid channel" try: conf_bus=conf_strings.parse(conf.params["bus"]) except Exception as e: return 0,str(e) if conf_bus.name=="serial" and not conf_bus.has("baudrate"): conf_bus.params["baudrate"]="115200" bus_id="bus_%s"%(nw_esp301_id) retcode,res=submod_execcmd("init@"+conf_bus.name,bus_id,conf_strings.unparse(conf_bus)) if retcode==0: return 0,"error initializing axis <- %s"%(res) nw_esp301_pool.new(nw_esp301_id,{"bus":conf_bus.name,"bus_id":bus_id,"chan":conf.params["chan"]}) return 1,"ok"
[docs]def deinit_nw_esp301(nw_esp301_id): "Deinitialize and deregister nw_esp301 motion controller from the pool." try: nw_esp301=nw_esp301_pool.get(nw_esp301_id) except Exception as e: return 1,str(e) retcode,res=submod_execcmd("deinit@"+nw_esp301["bus"],nw_esp301["bus_id"]) if retcode==0: return 0,"Error deinitializing link <- %s" % (res) nw_esp301_pool.remove(nw_esp301_id) return 1,"ok"
[docs]def config_nw_esp301(nw_esp301_id,pos_max,pos_min): "Configure *nw_esp301_id* axis. *pos_max* and *pos_min* define the limits of excursion." try: nw_esp301=nw_esp301_pool.get(nw_esp301_id) except Exception as e: return 0,str(e) pos_min=float(pos_min) pos_max=float(pos_max) if pos_max<=pos_min: return 0,"max must be higher than min" retcode,res=submod_execcmd("config@"+nw_esp301["bus"],nw_esp301["bus_id"]) if retcode==0: return 0,"Error configuring link <- %s" % (res) command=r"{0}MO\r".format(nw_esp301["chan"]) retcode,res=submod_execcmd("write@"+nw_esp301["bus"],nw_esp301["bus_id"],command) if retcode==0: return 0,"Error writing to link <- %s" % (res) nw_esp301["max"]=pos_max nw_esp301["min"]=pos_min return 1,"ok"
[docs]def inval_nw_esp301(nw_esp301_id): "Invalidate *nw_esp301_id* configuration" try: nw_esp301=nw_esp301_pool.get(nw_esp301_id) except Exception as e: return 0,str(e) command=r"{0}MF\r".format(nw_esp301["chan"]) retcode,res=submod_execcmd("write@"+nw_esp301["bus"],nw_esp301["bus_id"],command) if retcode==0: return 0,"Error writing to link <- %s" % (res) retcode,res=submod_execcmd("inval@"+nw_esp301["bus"],nw_esp301["bus_id"]) if retcode==0: return 0,"Error invalidating link <- %s" % (res) del nw_esp301["max"] del nw_esp301["min"] return 1,"ok"
def command(nw_esp301,command): retcode,res=submod_execcmd("wrnrd_until@"+nw_esp301["bus"],nw_esp301["bus_id"],command,r"\r") if retcode==0: return 0,"Error writing and reading to/from link <- %s" % (res) elif res!="1": return 0,"Error monitoring move completion status <- %s" % (res) retcode,errors=get_error_queue(nw_esp301) if errors!="": return 0,"Error(s) returned from the MC: %s" % (errors) return 1,"ok"
[docs]def home_nw_esp301(nw_esp301_id,direction,velocity): "Reset axis *nw_esp301_id* by homing on *direction* at *velocity*." try: nw_esp301=nw_esp301_pool.get(nw_esp301_id) except Exception as e: return 0,str(e) if "max" not in nw_esp301: return 0,"not configured" # Send command to nw_esp301 cmd=r"{0}OH{1};{0}OL0.1;{0}OR;{0}WS;" \ r"{0}PR0.2;{0}WS;{0}DH;" \ r"{0}MD\r".format(nw_esp301["chan"],velocity) retcode,res=command(nw_esp301,cmd) if retcode==0: return 0,"Error homing axis: %s"%(res) return 1,"ok"
[docs]def reset_nw_esp301(nw_esp301_id): "Reset the controller of *nw_esp301_id*, not just the axis" try: nw_esp301=nw_esp301_pool.get(nw_esp301_id) except Exception as e: return 0,str(e) if "max" not in nw_esp301: return 0,"not configured" command=r"RS\r" retcode,res=submod_execcmd("write@"+nw_esp301["bus"],nw_esp301["bus_id"],command) if retcode==0: return 0,"Error resetting nw_esp301 <- %s" % (res) time.sleep(12) retcode,errors=get_error_queue(nw_esp301) if errors!="": return 0,"Error(s) returned from the MC <- %s" % (errors) return 1,"ok"
[docs]def go_min_nw_esp301(nw_esp301_id,velocity,acceleration): "Go to the minimum position as defined during the initalization of the axis" try: nw_esp301=nw_esp301_pool.get(nw_esp301_id) except Exception as e: return 0,str(e) if "max" not in nw_esp301: return 0,"not configured" # Send command to nw_esp301 velocity=float(velocity) acceleration=float(acceleration) cmd=r"{0}MO;" \ r"{0}AC{1:f};" \ r"{0}VA{2:f};" \ r"{0}PA{3:f};{0}WS;" \ r"{0}MD\r".format(nw_esp301["chan"],acceleration,velocity,nw_esp301["min"]) retcode,res=command(nw_esp301,cmd) if retcode==0: return 0,"Error moving to min: %s"%(res) return 1,"ok"
[docs]def go_max_nw_esp301(nw_esp301_id,velocity,acceleration): "Go to the maximum position as defined during the initalization of the axis" try: nw_esp301=nw_esp301_pool.get(nw_esp301_id) except Exception as e: return 0,str(e) if "max" not in nw_esp301: return 0,"not configured" # Send command to nw_esp301 velocity=float(velocity) acceleration=float(acceleration) cmd=r"{0}MO;" \ r"{0}AC{1:f};" \ r"{0}VA{2:f};" \ r"{0}PA{3:f};{0}WS;" \ r"{0}MD\r".format(nw_esp301["chan"],acceleration,velocity,nw_esp301["max"]) retcode,res=command(nw_esp301,cmd) if retcode==0: return 0,"Error moving to max: %s"%(res) return 1,"ok"
[docs]def move_nw_esp301(nw_esp301_id,displacement,velocity,acceleration): "Move by displacement units with the specified maximum velocity" try: nw_esp301=nw_esp301_pool.get(nw_esp301_id) except Exception as e: return 0,str(e) if "max" not in nw_esp301: return 0,"not configured" # Send command to nw_esp301 displacement=float(displacement) velocity=float(velocity) acceleration=float(acceleration) retcode,res=get_pos(nw_esp301) if retcode==0: return 0,res if float(res)+displacement > nw_esp301["max"] or float(res)+displacement < nw_esp301["min"]: return 0,"Refusing to move: final position {0:f} would be out of the axis limits".format(float(res)+displacement) cmd=r"{0}MO;" \ r"{0}AC{1:f};" \ r"{0}VA{2:f};" \ r"{0}PR{3:f};{0}WS;" \ r"{0}MD\r".format(nw_esp301["chan"],acceleration,velocity,displacement) retcode,res=command(nw_esp301,cmd) if retcode==0: return 0,"Error moving: %s"%(res) return 1,"ok"
[docs]def get_pos_nw_esp301(nw_esp301_id): "Get position of the specified axis" try: nw_esp301=nw_esp301_pool.get(nw_esp301_id) except Exception as e: return 0,str(e) if "max" not in nw_esp301: return 0,"not configured" # Send command to nw_esp301 retcode,res=get_pos(nw_esp301) if retcode==0: return retcode,res return 1,res
[docs]def get_pos(nw_esp301): "Get position of the specified axis" command=r"{0}TP\r".format(nw_esp301["chan"]) retcode,res=submod_execcmd("wrnrd_until@"+nw_esp301["bus"],nw_esp301["bus_id"],command,r"\r") if retcode==0: return 0,"Error writing and reading to/from link <- %s" % (res) retcode,errors=get_error_queue(nw_esp301) if errors!="": return 0,"Error(s) returned from the MC: %s" % (errors) return 1,str(float(res))
[docs]def get_error_queue_nw_esp301(nw_esp301_id): "Read error queue until the end (code 0)" try: nw_esp301=nw_esp301_pool.get(nw_esp301_id) except Exception as e: return 0,str(e) if "max" not in nw_esp301: return 0,"not configured" return get_error_queue(nw_esp301)
[docs]def get_error_queue(nw_esp301): "Read error queue until the end (code 0)" command=r"TB\r" errors="" while True: retcode,res=submod_execcmd("wrnrd_until@"+nw_esp301["bus"],nw_esp301["bus_id"],command,r"\r") if retcode==0: return 0,res elif int(res.split(",",1)[0])!=0: errors += res if errors=="" else "; "+res else: break return 1,errors