Source code for cmd_paths

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# 
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
# 
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame.  If not, see <http://www.gnu.org/licenses/>

import heapq,sys,traceback,pools
sys.path.insert(0,"/opt/pyrame")

space_pool=pools.pool()
volume_pool=pools.pool()
path_pool=pools.pool()

def check_param(param,valid_list,default_param,map_func,unique=True):
    if param=="undef" or param=="":
        param=default_param
    if len(param)!=len(default_param) or any(c not in valid_list for c in param):
        return 0,"invalid param"
    result=map(map_func,param)
    if unique and len(set(result))!=len(result):
        return 0,"repeated characters"
    return 1,result

def is_in_volume(point,space,volumes):
    # make error half of the resolution
    errors=map(lambda x: x/2,[space["r1"],space["r2"],space["r3"]])
    if volumes!="":
        for vol_id in volumes.split(","):
            try:
                vol=volume_pool.get(vol_id)
                imodule=__import__(vol["module"])
                func=getattr(imodule,vol["function"])
                if func(vol,errors,point):
                    return True
            except Exception as e:
                raise Exception("Error while checking point in forbidden volume: %s"%(str(e)))
    return False

# first neighbors of point p
def neighbors(space,p,order):
    # example for order=1,2,3
    #results=[(x+dx,y,z),(x-dx,y,z),(x,y+dy,z),(x,y-dy,z),(x,y,z+dz),(x,y,z-dz)]
    r=[space["r1"],space["r2"],space["r3"]]
    results=[None]*6
    for i in [0,1,2]:
        e=2*i
        a=order[i]-1
        k=list(p)
        k[a]=round((k[a] + r[a])/r[a])*r[a]
        results[e]=tuple(k)
        k=list(p)
        k[a]=round((k[a] - r[a])/r[a])*r[a]
        results[e+1]=tuple(k)
    results=filter(lambda x: x[0]<=space["max1"] and x[0]>=space["min1"], results)
    results=filter(lambda x: x[1]<=space["max2"] and x[1]>=space["min2"], results)
    results=filter(lambda x: x[2]<=space["max3"] and x[2]>=space["min3"], results)
    # check forbidden volumes
    results=filter(lambda x: not is_in_volume(x,space,space["forbidden"]), results)
    return results

# cost from current to next is the distance in the axis that changes
def cost(c,n,r,order):
    for i,j,k in [[1,2,0],[0,2,1],[0,1,2]]:
        if c[i]==n[i] and c[j]==n[j]:
            return r[k]

# check if to points are the same to a certain resolution *r*
def eq_point(p1,p2,space):
    # make error half of the resolution
    errors=map(lambda x: x/2,[space["r1"],space["r2"],space["r3"]])
    return all((abs(p1[0]-p2[0])<errors[0],abs(p1[1]-p2[1])<errors[1],abs(p1[2]-p2[2])<errors[2]))

def search_path(space,start,goal,order):
    # A* algorithm
    start=tuple(start)
    goal=tuple(goal)
    if start==goal:
        return []
    if space["forbidden"]=="":
        return [start,goal]
    # if goal and start differ only in one coordinate, return direct path
    for i,j,k in [[1,2,0],[0,2,1],[0,1,2]]:
        if start[i]==goal[i] and start[j]==goal[j]:
            direct_path=True
            step=1 if goal[k]>start[k] else -1
            for n in range(step,step+int((goal[k]-start[k])/space["r"][k]),step):
                point=list(start)
                point[k] += n*space["r"%(k+1)]
                direct_path=not is_in_volume(point,space,space["forbidden"])
                if not direct_path:
                    break
            if direct_path:
                return [start,goal]
    frontier=[]
    heapq.heappush(frontier,(0,start))
    came_from={}
    cost_so_far={}
    came_from[start]=None
    cost_so_far[start]=0
    while len(frontier):
        current=heapq.heappop(frontier)[1]
        # current == goal ?
        if eq_point(current,goal,space):
            break
        for next in neighbors(space,current,order):
            new_cost=cost_so_far[current] + cost(current,next,space["r"],order)
            if next not in cost_so_far or new_cost < cost_so_far[next]:
                heuristic=abs(goal[0]-next[0]) + abs(goal[1]-next[1]) + abs(goal[2]-next[2])
                priority=new_cost + heuristic
                heapq.heappush(frontier,(priority,next))
                cost_so_far[next]=new_cost
                came_from[next]=current
    if goal not in came_from:
        raise Exception("No valid path was found")
    path=[goal,came_from[goal]]
    while path[-1]!=start:
        path.append(came_from[path[-1]])
        for i,j in [[1,2],[0,2],[0,1]]:
            if path[-3][i]==path[-2][i]==path[-1][i] and path[-3][j]==path[-2][j]==path[-1][j]:
                del path[-2]
                break
    print path[::-1]
    return path[::-1]

[docs]def init_space_paths(space_id,motion1_id,motion2_id,motion3_id,r1,r2,r3): "Initialize *space_id* with id of motion system *motion_id* and minimal steps for each axis *rn*" r1=float(r1) r2=float(r2) r3=float(r3) space={"r1":r1,"r2":r2,"r3":r3,"motion1_id":motion1_id,"motion2_id":motion2_id,"motion3_id":motion3_id,"forbidden":""} for i in range(1,4): retcode,res=submod_execcmd("get_limits@motion",space["motion%d_id"%(i)]) if retcode==0: return 0,"Error getting motion system limits of axis %d <- %s"%(i,res) space["min%d"%(i)]=float(res.split(",")[0]) space["max%d"%(i)]=float(res.split(",")[1]) space_pool.new(space_id,space) return 1,"ok"
[docs]def deinit_space_paths(space_id): "Deinitialize *space_id*" try: space=space_pool.get(space_id) except Exception as e: return 1,str(e) # remove space_id space_pool.remove(space_id) return 1,"ok"
[docs]def add_limits_space_paths(space_id,volume_id): "Add *volume_id* to the forbidden regions of *space_id*" try: space=space_pool.get(space_id) except Exception as e: return 0,str(e) try: volume=volume_pool.get(volume_id) except Exception as e: return 0,str(e) if space["forbidden"]=="": space["forbidden"]=volume_id else: space["forbidden"]+=","+volume_id return 1,"ok"
def get_pos(space): tres=[] for i in range(1,4): retcode,res=submod_execcmd("get_pos@motion",space["motion%d_id"%(i)]) if retcode==0: return 0,"error getting position of axis %d <- %s"%(i,res) tres.append(float(res)) return 1,tres def move(space_id,d,s,a,strategy="undef"): try: space=space_pool.get(space_id) except Exception as e: return 0,str(e) retcode,order=check_param(strategy,["1","2","3"],"123",int) if retcode==0: return 0,"invalid strategy: %s" % (order) # check if d is multiple of r for i in order: d[i-1]=float(d[i-1]) # determine if d is compatible with discretization of space m=d[i-1]/space["r%d"%(i)] if abs(m-round(m))>1e-12: return 0,"destination must be multiple of the minimum step %f (axis %d)"%(space["r%d"%(i)],i) # check if we are trying to go to a forbidden point if is_in_volume(d,space,space["forbidden"]): return 0,"destination out of limits" # get current position _,c=get_pos(space) # round c to the closest node for i in order: c[i-1]=round(c[i-1]/space["r%d"%(i)])*space["r%d"%(i)] # search safe path surrounding forbidden regions try: safe_path=search_path(space,c,d,order) except Exception as e: return 0,"error searching path: %s"%(str(e)) for step in safe_path: step=map(str,step) for i in order: retcode,res=submod_execcmd("move@motion",space["motion%d_id"%(i)],step[i],s[i],a[i]) if retcode==0: return 0,"error moving axis %d <- %s"%(i,res) return 1,"ok"
[docs]def move_space_paths(space_id,d1,d2,d3,s1,s2,s3,a1,a2,a3,strategy="undef"): "Move on *space_id* to *dn* destination at *sn* speed, with *an* acceleration (for n=1,2,3). strategy is the preferential order of axis movement (e.g. 213 to move first axis 2, then axis 1 finally axis 3)" d=[d1,d2,d3] s=[s1,s2,s3] a=[a1,a2,a3] retcode,res=move(space_id,d,s,a,strategy) if retcode==0: return 0,res return 1,"ok"
[docs]def init_volume_paths(volume_id,space_id,module,function,*params): "Create *volume_id* by specifying the Python module and function that will determine inside/outside of volume" try: space=space_pool.get(space_id) except Exception as e: return 0,str(e) try: imodule=__import__(module) init_func=getattr(imodule,"init_"+function) except Exception as e: return 0,"Error loading module and function: %s"%(str(e)) try: volume=init_func(*params) except Exception as e: traceback.print_exception(*sys.exc_info()) return 0,"Error initializing volume: %s"%(str(e)) volume["space_id"]=space_id volume["module"]=module volume["function"]=function volume_pool.new(volume_id,volume) return 1,"ok"
[docs]def deinit_volume_paths(volume_id): "Deinitialize *volume_id*" try: volume=volume_pool.remove(volume_id) except Exception as e: return 1,str(e) return 1,"ok"
[docs]def init_path_paths(path_id,space_id,volume_id,p1d,p2d,p3d,order,path_type,directions): """Create path to scan *volume_id* with *pnd* steps (for n=1,2,3). The path is associated to *space_id* *order* is a three character string. Its first character is the number of the fastest axis, and the third one is the slowest. *path_type* is a two character string indicating: the type of scan ('r' for raster, 'm' for meander) of the fastest and middle axis (first character), and middle and slowest axis (second character). *directions* is a three character string with either 'p' or 'n' for positive or negative for the fastest, middle and slowest axis.""" try: space=space_pool.get(space_id) except Exception as e: return 0,str(e) path={"space_id": space_id} try: volume=volume_pool.get(volume_id) except Exception as e: return 0,str(e) path["volume_id"]=volume_id pd=[float(p1d),float(p2d),float(p3d)] for i in [1,2,3]: print i if abs(pd[i-1]/space["r%d"%(i)] - round(pd[i-1]/space["r%d"%(i)]))>1e-12: print pd[i-1]/space["r%d"%(i)],round(pd[i-1]/space["r%d"%(i)]) print abs(pd[i-1]/space["r%d"%(i)] - round(pd[i-1]/space["r%d"%(i)])) return 0,"p%dd must be multiple of the minimum step for axis %d (%.2e)"%(i,i,space["r%d"%(i)]) retcode,order=check_param(order,["1","2","3"],"123",lambda x: str(int(x)-1)) if retcode==0: return 0,"invalid order: %s" % (order) retcode,path_type=check_param(path_type,["r","m"],"rr",lambda x: 0 if x=="r" else 1,False) if retcode==0: return 0,"invalid path_type: %s" % (path_type) retcode,directions=check_param(directions,["p","n"],"ppp",lambda x: 1 if x=="p" else -1,False) if retcode==0: return 0,"invalid directions: %s" % (directions) ff=volume[("max" if directions[0]==1 else "min") + "_" + order[0]] fi=volume[("min" if directions[0]==1 else "max") + "_" + order[0]] fd=pd[int(order[0])]*directions[0] mf=volume[("max" if directions[1]==1 else "min") + "_" + order[1]] mi=volume[("min" if directions[1]==1 else "max") + "_" + order[1]] md=pd[int(order[1])]*directions[1] sf=volume[("max" if directions[2]==1 else "min") + "_" + order[2]] si=volume[("min" if directions[2]==1 else "max") + "_" + order[2]] sd=pd[int(order[2])]*directions[2] points=[] mc=0 fc=0 # slowest axis for s in range(abs(int((sf-si)/sd))+1): # middle axis for m in range(abs(int((mf-mi)/md))+1)[::1 - 2 * (mc&1) * path_type[1]]: # fastest axis for f in range(abs(int((ff-fi)/fd))+1)[::1 - 2 * (fc&1) * path_type[0]]: point=[None]*3 point[int(order[0])]=fi+f*fd point[int(order[1])]=mi+m*md point[int(order[2])]=si+s*sd if point[0]>space["max1"] or point[0]<space["min1"] or \ point[1]>space["max2"] or point[1]<space["min2"] or \ point[2]>space["max3"] or point[2]<space["min3"]: continue if is_in_volume(point,space,space["forbidden"]): #print("point ",point," forbidden") continue if is_in_volume(point,space,volume_id): points.append(",".join(map(str,point))) fc += 1 mc += 1 if len(points)==0: return 0,"No points in path" path["current"]=-1 path["points"]=";".join(points) path["nb_points"]=len(points) path_pool.new(path_id,path) return 1,"ok"
[docs]def deinit_path_paths(path_id): "Deinitialize *path_id*" try: path=path_pool.remove(path_id) except Exception as e: return 1,str(e) return 1,"ok"
[docs]def get_path_paths(path_id): "Returns the path (sequence of 3D points) described by *path_id*" try: path=path_pool.get(path_id) except Exception as e: return 0,str(e) return 1,path["points"]
[docs]def dump_path_paths(path_id,filename): "Dump in *filename* the path (sequence of 3D points) described by *path_id*" try: path=path_pool.get(path_id) except Exception as e: return 0,str(e) f=open(filename,"w") f.write(path["points"].replace(";","\n").replace(","," ")) f.close() return 1,"ok"
[docs]def move_next_paths(path_id,s1,s2,s3,a1,a2,a3,strategy="undef"): "Go to next point of *path_id*. The first time after initialization of the path, next goes to the first point. It moves with speed *sn* and acceleration *an* (for axis number n: 1,2,3)" try: path=path_pool.get(path_id) except Exception as e: return 0,str(e) d=path["points"].split(";")[path["current"]+1].split(",") s=[s1,s2,s3] a=[a1,a2,a3] retcode,res=move(path["space_id"],d,s,a,strategy) if retcode==0: return 0,res path["current"] += 1 if path["current"]+1==path["nb_points"]: path["current"]=-1 return 1,"finished" else: return 1,"ok"
[docs]def move_first_paths(path_id,s1,s2,s3,a1,a2,a3,strategy="undef"): "Go the first point of *path_id*. Next call to move_next_paths will go to the next point in the path." try: path=path_pool.get(path_id) except Exception as e: return 0,str(e) path["current"]=-1 return move_next_paths(path_id,s1,s2,s3,a1,a2,a3,strategy)