#!/usr/bin/env python2
# -*- coding: utf-8 -*-
#
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
#
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame. If not, see <http://www.gnu.org/licenses/>
import time
import pools,conf_strings
nw_esp301_pool=pools.pool()
# Axes
axes=["1","2","3"]
# Ports
ports=["A","B"]
# COMMANDS #######################################################
[docs]def init_nw_esp301(nw_esp301_id,conf_string):
"""Initialize NW_ESP301 motion controller.
*conf_string* must contain:
- bus: conf_string of the underlying link module
- chan: 1, 2 or 3 for the channel of the axis"""
try:
conf=conf_strings.parse(conf_string)
except Exception as e:
return 0,"%s" % str(e)
if conf.name!="nw_esp301":
return 0,"Invalid module name %s in conf_string instead of nw_esp301"%(conf.name)
if not conf.has("bus","chan"):
return 0,"Some of the required parameters (bus and chan) in conf_string are not present"
if conf.params["chan"] not in axes:
return 0,"invalid channel"
try:
conf_bus=conf_strings.parse(conf.params["bus"])
except Exception as e:
return 0,str(e)
if conf_bus.name=="serial" and not conf_bus.has("baudrate"):
conf_bus.params["baudrate"]="115200"
bus_id="bus_%s"%(nw_esp301_id)
retcode,res=submod_execcmd("init@"+conf_bus.name,bus_id,conf_strings.unparse(conf_bus))
if retcode==0:
return 0,"error initializing axis <- %s"%(res)
nw_esp301_pool.new(nw_esp301_id,{"bus":conf_bus.name,"bus_id":bus_id,"chan":conf.params["chan"]})
return 1,"ok"
[docs]def deinit_nw_esp301(nw_esp301_id):
"Deinitialize and deregister nw_esp301 motion controller from the pool."
try:
nw_esp301=nw_esp301_pool.get(nw_esp301_id)
except Exception as e:
return 1,str(e)
retcode,res=submod_execcmd("deinit@"+nw_esp301["bus"],nw_esp301["bus_id"])
if retcode==0:
return 0,"Error deinitializing link <- %s" % (res)
nw_esp301_pool.remove(nw_esp301_id)
return 1,"ok"
[docs]def config_nw_esp301(nw_esp301_id,pos_max,pos_min):
"Configure *nw_esp301_id* axis. *pos_max* and *pos_min* define the limits of excursion."
try:
nw_esp301=nw_esp301_pool.get(nw_esp301_id)
except Exception as e:
return 0,str(e)
pos_min=float(pos_min)
pos_max=float(pos_max)
if pos_max<=pos_min:
return 0,"max must be higher than min"
retcode,res=submod_execcmd("config@"+nw_esp301["bus"],nw_esp301["bus_id"])
if retcode==0:
return 0,"Error configuring link <- %s" % (res)
command=r"{0}MO\r".format(nw_esp301["chan"])
retcode,res=submod_execcmd("write@"+nw_esp301["bus"],nw_esp301["bus_id"],command)
if retcode==0:
return 0,"Error writing to link <- %s" % (res)
nw_esp301["max"]=pos_max
nw_esp301["min"]=pos_min
return 1,"ok"
[docs]def inval_nw_esp301(nw_esp301_id):
"Invalidate *nw_esp301_id* configuration"
try:
nw_esp301=nw_esp301_pool.get(nw_esp301_id)
except Exception as e:
return 0,str(e)
command=r"{0}MF\r".format(nw_esp301["chan"])
retcode,res=submod_execcmd("write@"+nw_esp301["bus"],nw_esp301["bus_id"],command)
if retcode==0:
return 0,"Error writing to link <- %s" % (res)
retcode,res=submod_execcmd("inval@"+nw_esp301["bus"],nw_esp301["bus_id"])
if retcode==0:
return 0,"Error invalidating link <- %s" % (res)
del nw_esp301["max"]
del nw_esp301["min"]
return 1,"ok"
def command(nw_esp301,command):
retcode,res=submod_execcmd("wrnrd_until@"+nw_esp301["bus"],nw_esp301["bus_id"],command,r"\r")
if retcode==0:
return 0,"Error writing and reading to/from link <- %s" % (res)
elif res!="1":
return 0,"Error monitoring move completion status <- %s" % (res)
retcode,errors=get_error_queue(nw_esp301)
if errors!="":
return 0,"Error(s) returned from the MC: %s" % (errors)
return 1,"ok"
[docs]def home_nw_esp301(nw_esp301_id,direction,velocity):
"Reset axis *nw_esp301_id* by homing on *direction* at *velocity*."
try:
nw_esp301=nw_esp301_pool.get(nw_esp301_id)
except Exception as e:
return 0,str(e)
if "max" not in nw_esp301:
return 0,"not configured"
# Send command to nw_esp301
cmd=r"{0}OH{1};{0}OL0.1;{0}OR;{0}WS;" \
r"{0}PR0.2;{0}WS;{0}DH;" \
r"{0}MD\r".format(nw_esp301["chan"],velocity)
retcode,res=command(nw_esp301,cmd)
if retcode==0:
return 0,"Error homing axis: %s"%(res)
return 1,"ok"
[docs]def reset_nw_esp301(nw_esp301_id):
"Reset the controller of *nw_esp301_id*, not just the axis"
try:
nw_esp301=nw_esp301_pool.get(nw_esp301_id)
except Exception as e:
return 0,str(e)
if "max" not in nw_esp301:
return 0,"not configured"
command=r"RS\r"
retcode,res=submod_execcmd("write@"+nw_esp301["bus"],nw_esp301["bus_id"],command)
if retcode==0:
return 0,"Error resetting nw_esp301 <- %s" % (res)
time.sleep(12)
retcode,errors=get_error_queue(nw_esp301)
if errors!="":
return 0,"Error(s) returned from the MC <- %s" % (errors)
return 1,"ok"
[docs]def go_min_nw_esp301(nw_esp301_id,velocity,acceleration):
"Go to the minimum position as defined during the initalization of the axis"
try:
nw_esp301=nw_esp301_pool.get(nw_esp301_id)
except Exception as e:
return 0,str(e)
if "max" not in nw_esp301:
return 0,"not configured"
# Send command to nw_esp301
velocity=float(velocity)
acceleration=float(acceleration)
cmd=r"{0}MO;" \
r"{0}AC{1:f};" \
r"{0}VA{2:f};" \
r"{0}PA{3:f};{0}WS;" \
r"{0}MD\r".format(nw_esp301["chan"],acceleration,velocity,nw_esp301["min"])
retcode,res=command(nw_esp301,cmd)
if retcode==0:
return 0,"Error moving to min: %s"%(res)
return 1,"ok"
[docs]def go_max_nw_esp301(nw_esp301_id,velocity,acceleration):
"Go to the maximum position as defined during the initalization of the axis"
try:
nw_esp301=nw_esp301_pool.get(nw_esp301_id)
except Exception as e:
return 0,str(e)
if "max" not in nw_esp301:
return 0,"not configured"
# Send command to nw_esp301
velocity=float(velocity)
acceleration=float(acceleration)
cmd=r"{0}MO;" \
r"{0}AC{1:f};" \
r"{0}VA{2:f};" \
r"{0}PA{3:f};{0}WS;" \
r"{0}MD\r".format(nw_esp301["chan"],acceleration,velocity,nw_esp301["max"])
retcode,res=command(nw_esp301,cmd)
if retcode==0:
return 0,"Error moving to max: %s"%(res)
return 1,"ok"
[docs]def move_nw_esp301(nw_esp301_id,displacement,velocity,acceleration):
"Move by displacement units with the specified maximum velocity"
try:
nw_esp301=nw_esp301_pool.get(nw_esp301_id)
except Exception as e:
return 0,str(e)
if "max" not in nw_esp301:
return 0,"not configured"
# Send command to nw_esp301
displacement=float(displacement)
velocity=float(velocity)
acceleration=float(acceleration)
retcode,res=get_pos(nw_esp301)
if retcode==0:
return 0,res
if float(res)+displacement > nw_esp301["max"] or float(res)+displacement < nw_esp301["min"]:
return 0,"Refusing to move: final position {0:f} would be out of the axis limits".format(float(res)+displacement)
cmd=r"{0}MO;" \
r"{0}AC{1:f};" \
r"{0}VA{2:f};" \
r"{0}PR{3:f};{0}WS;" \
r"{0}MD\r".format(nw_esp301["chan"],acceleration,velocity,displacement)
retcode,res=command(nw_esp301,cmd)
if retcode==0:
return 0,"Error moving: %s"%(res)
return 1,"ok"
[docs]def get_pos_nw_esp301(nw_esp301_id):
"Get position of the specified axis"
try:
nw_esp301=nw_esp301_pool.get(nw_esp301_id)
except Exception as e:
return 0,str(e)
if "max" not in nw_esp301:
return 0,"not configured"
# Send command to nw_esp301
retcode,res=get_pos(nw_esp301)
if retcode==0:
return retcode,res
return 1,res
[docs]def get_pos(nw_esp301):
"Get position of the specified axis"
command=r"{0}TP\r".format(nw_esp301["chan"])
retcode,res=submod_execcmd("wrnrd_until@"+nw_esp301["bus"],nw_esp301["bus_id"],command,r"\r")
if retcode==0:
return 0,"Error writing and reading to/from link <- %s" % (res)
retcode,errors=get_error_queue(nw_esp301)
if errors!="":
return 0,"Error(s) returned from the MC: %s" % (errors)
return 1,str(float(res))
[docs]def get_error_queue_nw_esp301(nw_esp301_id):
"Read error queue until the end (code 0)"
try:
nw_esp301=nw_esp301_pool.get(nw_esp301_id)
except Exception as e:
return 0,str(e)
if "max" not in nw_esp301:
return 0,"not configured"
return get_error_queue(nw_esp301)
[docs]def get_error_queue(nw_esp301):
"Read error queue until the end (code 0)"
command=r"TB\r"
errors=""
while True:
retcode,res=submod_execcmd("wrnrd_until@"+nw_esp301["bus"],nw_esp301["bus_id"],command,r"\r")
if retcode==0:
return 0,res
elif int(res.split(",",1)[0])!=0:
errors += res if errors=="" else "; "+res
else:
break
return 1,errors