#!/usr/bin/env python2
# -*- coding: utf-8 -*-
#
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
#
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame. If not, see <http://www.gnu.org/licenses/>
import time
import pools,conf_strings
pk_xli_pool=pools.pool()
# COMMANDS #######################################################
def send_command(pk_xli,command,mode="command"):
if type(command) is not list:
command=[command]
for c in command:
cmd=c.rstrip(r"\r\n").replace("chan",pk_xli["chan"])
retcode,res=submod_execcmd("wrnrd_until@"+pk_xli["bus"],pk_xli["bus_id"],cmd+r"\r\n",r"\r")
if retcode==0:
return 0,"error sending command <- %s"%(res)
# is echo = command sent ?
if res=="*E":
if mode.startswith("strict"):
return 0,"*E"
print("warning: a previous command failed. trying to continue...")
# try again to read command echo
retcode,res=submod_execcmd("read_until@"+pk_xli["bus"],pk_xli["bus_id"],r"\r")
if retcode==0:
return 0,"error trying to read command echo <- %s"%(res)
if cmd!=res:
print("warning: invalid command echo expecting %s, got %s. flushing buffer"%(cmd,res))
submod_execcmd("read_bin@"+pk_xli["bus"],pk_xli["bus_id"],"100000","1")
# check if error is received
retcode,res=submod_execcmd("read_until@"+pk_xli["bus"],pk_xli["bus_id"],r"\r","0.1")
# in command mode we shouldn't have response
# (i.e. we'll have retcode=0 because of timeout),
# so consider error just if real error (i.e.: *E)
if retcode==0 and mode.endswith("query"):
return 0,"unable to get answer <- %s"%(res)
if res=="*E":
return 0,"error during command execution"
# return result of last command
return 1,res
[docs]def init_pk_xli(pk_xli_id,conf_string):
"""Initialize PK_XLI motion controller.
*conf_string* must contain:
- bus: conf_string of the underlying link module
- chan: integer for the channel of the axis"""
try:
conf=conf_strings.parse(conf_string)
except Exception as e:
return 0,"%s" % str(e)
if conf.name!="pk_xli":
return 0,"Invalid module name %s in conf_string instead of pk_xli"%(conf.name)
if not conf.has("bus","chan"):
return 0,"Some of the required parameters (bus and chan) in conf_string are not present"
try:
if int(conf.params["chan"]) not in range(1,10):
raise Exception()
except:
return 0,"channel must be a positive integer from 1 to 9"
try:
conf_bus=conf_strings.parse(conf.params["bus"])
except Exception as e:
return 0,str(e)
if conf_bus.name=="serial" and not conf_bus.has("baudrate"):
conf_bus.params["baudrate"]="9600"
if not conf_bus.has("timeout"):
conf_bus.params["timeout"]="0.2"
bus_id="bus_%s"%(pk_xli_id)
retcode,res=submod_execcmd("init@"+conf_bus.name,bus_id,conf_strings.unparse(conf_bus))
if retcode==0:
return 0,"error initializing axis <- %s"%(res)
pk_xli_pool.new(pk_xli_id,{"bus":conf_bus.name,"bus_id":bus_id,"chan":conf.params["chan"],"conf_bus":conf.params["bus"]})
return 1,"ok"
[docs]def deinit_pk_xli(pk_xli_id):
"Deinitialize and deregister pk_xli motion controller from the pool."
try:
pk_xli=pk_xli_pool.get(pk_xli_id)
except Exception as e:
return 1,str(e)
if "configured" in pk_xli:
retcode,res=inval(pk_xli_id)
if retcode==0:
return 0,res
retcode,res=submod_execcmd("deinit@"+pk_xli["bus"],pk_xli["bus_id"])
if retcode==0:
return 0,"Error deinitializing link <- %s"%(res)
pk_xli_pool.remove(pk_xli_id)
return 1,"ok"
[docs]def config_pk_xli(pk_xli_id,pos_max,pos_min):
"Configure *pk_xli_id* axis. *pos_max* and *pos_min* define the limits of excursion."
try:
pk_xli=pk_xli_pool.get(pk_xli_id)
except Exception as e:
return 0,str(e)
pos_min=float(pos_min)
pos_max=float(pos_max)
if pos_max<=pos_min:
return 0,"max must be higher than min"
retcode,res=submod_execcmd("config@"+pk_xli["bus"],pk_xli["bus_id"])
if retcode==0:
return 0,"error configuring link <- %s"%(res)
command="chanON"
retcode,res=send_command(pk_xli,command)
if retcode==0:
return 0,"error enabling motor <- %s"%(res)
pk_xli["max"]=pos_max
pk_xli["min"]=pos_min
pk_xli["configured"]=True
return 1,"ok"
def set_output(pk_xli,chan,status):
if chan not in map(str,range(1,9)) or status not in ["1","0"]:
return 0,"invalid chan or state. must be 1 or 0"
chan=int(chan)
all_status="X"*(chan-1)+status+"X"*(8-chan)
command="chanO(%s)"%(all_status)
retcode,res=send_command(pk_xli,command)
if retcode==0:
return 0,"error setting output %d to %s <- %s"%(chan,status,res)
return 1,"ok"
[docs]def set_output_pk_xli(pk_xli_id,chan,status):
"Set output *chan* (1 to 8) to *status* (1 or 0) on pk_xli_id"
try:
pk_xli=pk_xli_pool.get(pk_xli_id)
except Exception as e:
return 0,str(e)
return set_output(pk_xli,chan,status)
def inval(pk_xli_id):
try:
pk_xli=pk_xli_pool.get(pk_xli_id)
except Exception as e:
return 0,str(e)
retcode,res=submod_execcmd("inval@"+pk_xli["bus"],pk_xli["bus_id"])
if retcode==0:
return 0,"Error invalidating link <- %s"%(res)
del pk_xli["max"]
del pk_xli["min"]
del pk_xli["configured"]
return 1,"ok"
[docs]def inval_pk_xli(pk_xli_id):
"Invalidate *pk_xli_id* configuration"
return inval(pk_xli_id)
[docs]def set_origin_pk_xli(pk_xli_id):
"Set origin to current position"
try:
pk_xli=pk_xli_pool.get(pk_xli_id)
except Exception as e:
return 0,str(e)
if "configured" not in pk_xli:
return 0,"not configured"
command="chanW(PA,0)"
retcode,res=send_command(pk_xli,command)
if retcode==0:
return 0,"error setting origin: %s"%(res)
return 1,"ok"
def get_pos(pk_xli,mode="normal"):
command="chanR(PA)"
retcode,res=send_command(pk_xli,command,mode+"-query")
if retcode==0:
return 0,"error getting position <- %s"%(res)
return 1,res.strip("*")
[docs]def get_pos_pk_xli(pk_xli_id):
"Get position of the specified axis"
try:
pk_xli=pk_xli_pool.get(pk_xli_id)
except Exception as e:
return 0,str(e)
if "configured" not in pk_xli:
return 0,"not configured"
# Send command to pk_xli
return get_pos(pk_xli)
def move(pk_xli_id,mode,d,velocity,acceleration):
try:
pk_xli=pk_xli_pool.get(pk_xli_id)
except Exception as e:
return 0,str(e)
# depending on mode: d is destination (abs) or displacement (rel)
if "configured" not in pk_xli:
return 0,"not configured"
d=int(float(d))
velocity=abs(float(velocity))
acceleration=abs(float(acceleration))
retcode,res=get_pos(pk_xli)
if retcode==0:
return 0,res
current=int(res)
if mode=="abs":
dest=d
command=["chanMA"]
if mode=="rel":
dest=current+d
command=["chanMI"]
if dest>pk_xli["max"] or dest<pk_xli["min"]:
return 0,"refusing to move: final position %f would be out of the axis limits"%(dest)
command+=["chanA%.1f"%(acceleration)
,"chanV%.2f"%(velocity)
,"chanD%d"%(d)
,"chanG"]
retcode,res=send_command(pk_xli,command)
if retcode==0:
return 0,"error moving: %s"%(res)
while current!=dest:
time.sleep(0.5)
#print("waiting until current position is %d. now %d"%(dest,current))
retcode,res=get_pos(pk_xli,"strict")
if res=="*E":
res="hard limit attained"
if retcode==0:
return 0,"error moving: %s"%(res)
current=int(res)
return 1,"ok"
[docs]def move_pk_xli(pk_xli_id,displacement,velocity,acceleration):
"Move by displacement units with the specified maximum velocity"
return move(pk_xli_id,"rel",displacement,velocity,acceleration)
[docs]def go_min_pk_xli(pk_xli_id,velocity,acceleration):
"Go to the minimum position as defined during the initalization of the axis"
try:
pk_xli=pk_xli_pool.get(pk_xli_id)
except Exception as e:
return 0,str(e)
return move(pk_xli_id,"abs",pk_xli["min"],velocity,acceleration)
[docs]def go_max_pk_xli(pk_xli_id,velocity,acceleration):
"Go to the maximum position as defined during the initalization of the axis"
try:
pk_xli=pk_xli_pool.get(pk_xli_id)
except Exception as e:
return 0,str(e)
return move(pk_xli_id,"abs",pk_xli["max"],velocity,acceleration)