Source code for cmd_pk_xli

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# 
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
# 
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame.  If not, see <http://www.gnu.org/licenses/>

import time
import pools,conf_strings

pk_xli_pool=pools.pool()

# COMMANDS #######################################################

def send_command(pk_xli,command,mode="command"):
    if type(command) is not list:
        command=[command]
    for c in command:
        cmd=c.rstrip(r"\r\n").replace("chan",pk_xli["chan"])
        retcode,res=submod_execcmd("wrnrd_until@"+pk_xli["bus"],pk_xli["bus_id"],cmd+r"\r\n",r"\r")
        if retcode==0:
            return 0,"error sending command <- %s"%(res)
        # is echo = command sent ?
        if res=="*E":
            if mode.startswith("strict"):
                return 0,"*E"
            print("warning: a previous command failed. trying to continue...")
            # try again to read command echo
            retcode,res=submod_execcmd("read_until@"+pk_xli["bus"],pk_xli["bus_id"],r"\r")
            if retcode==0:
                return 0,"error trying to read command echo <- %s"%(res)
        if cmd!=res:
            print("warning: invalid command echo expecting %s, got %s. flushing buffer"%(cmd,res))
            submod_execcmd("read_bin@"+pk_xli["bus"],pk_xli["bus_id"],"100000","1")
        # check if error is received
        retcode,res=submod_execcmd("read_until@"+pk_xli["bus"],pk_xli["bus_id"],r"\r","0.1")
        # in command mode we shouldn't have response
        # (i.e. we'll have retcode=0 because of timeout),
        # so consider error just if real error (i.e.: *E)
        if retcode==0 and mode.endswith("query"):
            return 0,"unable to get answer <- %s"%(res)
        if res=="*E":
            return 0,"error during command execution"
    # return result of last command
    return 1,res

[docs]def init_pk_xli(pk_xli_id,conf_string): """Initialize PK_XLI motion controller. *conf_string* must contain: - bus: conf_string of the underlying link module - chan: integer for the channel of the axis""" try: conf=conf_strings.parse(conf_string) except Exception as e: return 0,"%s" % str(e) if conf.name!="pk_xli": return 0,"Invalid module name %s in conf_string instead of pk_xli"%(conf.name) if not conf.has("bus","chan"): return 0,"Some of the required parameters (bus and chan) in conf_string are not present" try: if int(conf.params["chan"]) not in range(1,10): raise Exception() except: return 0,"channel must be a positive integer from 1 to 9" try: conf_bus=conf_strings.parse(conf.params["bus"]) except Exception as e: return 0,str(e) if conf_bus.name=="serial" and not conf_bus.has("baudrate"): conf_bus.params["baudrate"]="9600" if not conf_bus.has("timeout"): conf_bus.params["timeout"]="0.2" bus_id="bus_%s"%(pk_xli_id) retcode,res=submod_execcmd("init@"+conf_bus.name,bus_id,conf_strings.unparse(conf_bus)) if retcode==0: return 0,"error initializing axis <- %s"%(res) pk_xli_pool.new(pk_xli_id,{"bus":conf_bus.name,"bus_id":bus_id,"chan":conf.params["chan"],"conf_bus":conf.params["bus"]}) return 1,"ok"
[docs]def deinit_pk_xli(pk_xli_id): "Deinitialize and deregister pk_xli motion controller from the pool." try: pk_xli=pk_xli_pool.get(pk_xli_id) except Exception as e: return 1,str(e) if "configured" in pk_xli: retcode,res=inval(pk_xli_id) if retcode==0: return 0,res retcode,res=submod_execcmd("deinit@"+pk_xli["bus"],pk_xli["bus_id"]) if retcode==0: return 0,"Error deinitializing link <- %s"%(res) pk_xli_pool.remove(pk_xli_id) return 1,"ok"
[docs]def config_pk_xli(pk_xli_id,pos_max,pos_min): "Configure *pk_xli_id* axis. *pos_max* and *pos_min* define the limits of excursion." try: pk_xli=pk_xli_pool.get(pk_xli_id) except Exception as e: return 0,str(e) pos_min=float(pos_min) pos_max=float(pos_max) if pos_max<=pos_min: return 0,"max must be higher than min" retcode,res=submod_execcmd("config@"+pk_xli["bus"],pk_xli["bus_id"]) if retcode==0: return 0,"error configuring link <- %s"%(res) command="chanON" retcode,res=send_command(pk_xli,command) if retcode==0: return 0,"error enabling motor <- %s"%(res) pk_xli["max"]=pos_max pk_xli["min"]=pos_min pk_xli["configured"]=True return 1,"ok"
def set_output(pk_xli,chan,status): if chan not in map(str,range(1,9)) or status not in ["1","0"]: return 0,"invalid chan or state. must be 1 or 0" chan=int(chan) all_status="X"*(chan-1)+status+"X"*(8-chan) command="chanO(%s)"%(all_status) retcode,res=send_command(pk_xli,command) if retcode==0: return 0,"error setting output %d to %s <- %s"%(chan,status,res) return 1,"ok"
[docs]def set_output_pk_xli(pk_xli_id,chan,status): "Set output *chan* (1 to 8) to *status* (1 or 0) on pk_xli_id" try: pk_xli=pk_xli_pool.get(pk_xli_id) except Exception as e: return 0,str(e) return set_output(pk_xli,chan,status)
def inval(pk_xli_id): try: pk_xli=pk_xli_pool.get(pk_xli_id) except Exception as e: return 0,str(e) retcode,res=submod_execcmd("inval@"+pk_xli["bus"],pk_xli["bus_id"]) if retcode==0: return 0,"Error invalidating link <- %s"%(res) del pk_xli["max"] del pk_xli["min"] del pk_xli["configured"] return 1,"ok"
[docs]def inval_pk_xli(pk_xli_id): "Invalidate *pk_xli_id* configuration" return inval(pk_xli_id)
[docs]def set_origin_pk_xli(pk_xli_id): "Set origin to current position" try: pk_xli=pk_xli_pool.get(pk_xli_id) except Exception as e: return 0,str(e) if "configured" not in pk_xli: return 0,"not configured" command="chanW(PA,0)" retcode,res=send_command(pk_xli,command) if retcode==0: return 0,"error setting origin: %s"%(res) return 1,"ok"
def get_pos(pk_xli,mode="normal"): command="chanR(PA)" retcode,res=send_command(pk_xli,command,mode+"-query") if retcode==0: return 0,"error getting position <- %s"%(res) return 1,res.strip("*")
[docs]def get_pos_pk_xli(pk_xli_id): "Get position of the specified axis" try: pk_xli=pk_xli_pool.get(pk_xli_id) except Exception as e: return 0,str(e) if "configured" not in pk_xli: return 0,"not configured" # Send command to pk_xli return get_pos(pk_xli)
def move(pk_xli_id,mode,d,velocity,acceleration): try: pk_xli=pk_xli_pool.get(pk_xli_id) except Exception as e: return 0,str(e) # depending on mode: d is destination (abs) or displacement (rel) if "configured" not in pk_xli: return 0,"not configured" d=int(float(d)) velocity=abs(float(velocity)) acceleration=abs(float(acceleration)) retcode,res=get_pos(pk_xli) if retcode==0: return 0,res current=int(res) if mode=="abs": dest=d command=["chanMA"] if mode=="rel": dest=current+d command=["chanMI"] if dest>pk_xli["max"] or dest<pk_xli["min"]: return 0,"refusing to move: final position %f would be out of the axis limits"%(dest) command+=["chanA%.1f"%(acceleration) ,"chanV%.2f"%(velocity) ,"chanD%d"%(d) ,"chanG"] retcode,res=send_command(pk_xli,command) if retcode==0: return 0,"error moving: %s"%(res) while current!=dest: time.sleep(0.5) #print("waiting until current position is %d. now %d"%(dest,current)) retcode,res=get_pos(pk_xli,"strict") if res=="*E": res="hard limit attained" if retcode==0: return 0,"error moving: %s"%(res) current=int(res) return 1,"ok"
[docs]def move_pk_xli(pk_xli_id,displacement,velocity,acceleration): "Move by displacement units with the specified maximum velocity" return move(pk_xli_id,"rel",displacement,velocity,acceleration)
[docs]def go_min_pk_xli(pk_xli_id,velocity,acceleration): "Go to the minimum position as defined during the initalization of the axis" try: pk_xli=pk_xli_pool.get(pk_xli_id) except Exception as e: return 0,str(e) return move(pk_xli_id,"abs",pk_xli["min"],velocity,acceleration)
[docs]def go_max_pk_xli(pk_xli_id,velocity,acceleration): "Go to the maximum position as defined during the initalization of the axis" try: pk_xli=pk_xli_pool.get(pk_xli_id) except Exception as e: return 0,str(e) return move(pk_xli_id,"abs",pk_xli["max"],velocity,acceleration)