#!/usr/bin/env python2
# -*- coding: utf-8 -*-
#
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
#
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame. If not, see <http://www.gnu.org/licenses/>
import heapq,sys,traceback,pools
sys.path.insert(0,"/opt/pyrame")
space_pool=pools.pool()
volume_pool=pools.pool()
path_pool=pools.pool()
def check_param(param,valid_list,default_param,map_func,unique=True):
if param=="undef" or param=="":
param=default_param
if len(param)!=len(default_param) or any(c not in valid_list for c in param):
return 0,"invalid param"
result=map(map_func,param)
if unique and len(set(result))!=len(result):
return 0,"repeated characters"
return 1,result
def is_in_volume(point,space,volumes):
# make error half of the resolution
errors=map(lambda x: x/2,[space["r1"],space["r2"],space["r3"]])
if volumes!="":
for vol_id in volumes.split(","):
try:
vol=volume_pool.get(vol_id)
imodule=__import__(vol["module"])
func=getattr(imodule,vol["function"])
if func(vol,errors,point):
return True
except Exception as e:
raise Exception("Error while checking point in forbidden volume: %s"%(str(e)))
return False
# first neighbors of point p
def neighbors(space,p,order):
# example for order=1,2,3
#results=[(x+dx,y,z),(x-dx,y,z),(x,y+dy,z),(x,y-dy,z),(x,y,z+dz),(x,y,z-dz)]
r=[space["r1"],space["r2"],space["r3"]]
results=[None]*6
for i in [0,1,2]:
e=2*i
a=order[i]-1
k=list(p)
k[a]=round((k[a] + r[a])/r[a])*r[a]
results[e]=tuple(k)
k=list(p)
k[a]=round((k[a] - r[a])/r[a])*r[a]
results[e+1]=tuple(k)
results=filter(lambda x: x[0]<=space["max1"] and x[0]>=space["min1"], results)
results=filter(lambda x: x[1]<=space["max2"] and x[1]>=space["min2"], results)
results=filter(lambda x: x[2]<=space["max3"] and x[2]>=space["min3"], results)
# check forbidden volumes
results=filter(lambda x: not is_in_volume(x,space,space["forbidden"]), results)
return results
# cost from current to next is the distance in the axis that changes
def cost(c,n,r,order):
for i,j,k in [[1,2,0],[0,2,1],[0,1,2]]:
if c[i]==n[i] and c[j]==n[j]:
return r[k]
# check if to points are the same to a certain resolution *r*
def eq_point(p1,p2,space):
# make error half of the resolution
errors=map(lambda x: x/2,[space["r1"],space["r2"],space["r3"]])
return all((abs(p1[0]-p2[0])<errors[0],abs(p1[1]-p2[1])<errors[1],abs(p1[2]-p2[2])<errors[2]))
def search_path(space,start,goal,order):
# A* algorithm
start=tuple(start)
goal=tuple(goal)
if start==goal:
return []
if space["forbidden"]=="":
return [start,goal]
# if goal and start differ only in one coordinate, return direct path
for i,j,k in [[1,2,0],[0,2,1],[0,1,2]]:
if start[i]==goal[i] and start[j]==goal[j]:
direct_path=True
step=1 if goal[k]>start[k] else -1
for n in range(step,step+int((goal[k]-start[k])/space["r"][k]),step):
point=list(start)
point[k] += n*space["r"%(k+1)]
direct_path=not is_in_volume(point,space,space["forbidden"])
if not direct_path:
break
if direct_path:
return [start,goal]
frontier=[]
heapq.heappush(frontier,(0,start))
came_from={}
cost_so_far={}
came_from[start]=None
cost_so_far[start]=0
while len(frontier):
current=heapq.heappop(frontier)[1]
# current == goal ?
if eq_point(current,goal,space):
break
for next in neighbors(space,current,order):
new_cost=cost_so_far[current] + cost(current,next,space["r"],order)
if next not in cost_so_far or new_cost < cost_so_far[next]:
heuristic=abs(goal[0]-next[0]) + abs(goal[1]-next[1]) + abs(goal[2]-next[2])
priority=new_cost + heuristic
heapq.heappush(frontier,(priority,next))
cost_so_far[next]=new_cost
came_from[next]=current
if goal not in came_from:
raise Exception("No valid path was found")
path=[goal,came_from[goal]]
while path[-1]!=start:
path.append(came_from[path[-1]])
for i,j in [[1,2],[0,2],[0,1]]:
if path[-3][i]==path[-2][i]==path[-1][i] and path[-3][j]==path[-2][j]==path[-1][j]:
del path[-2]
break
print path[::-1]
return path[::-1]
[docs]def init_space_paths(space_id,motion1_id,motion2_id,motion3_id,r1,r2,r3):
"Initialize *space_id* with id of motion system *motion_id* and minimal steps for each axis *rn*"
r1=float(r1)
r2=float(r2)
r3=float(r3)
space={"r1":r1,"r2":r2,"r3":r3,"motion1_id":motion1_id,"motion2_id":motion2_id,"motion3_id":motion3_id,"forbidden":""}
for i in range(1,4):
retcode,res=submod_execcmd("get_limits@motion",space["motion%d_id"%(i)])
if retcode==0:
return 0,"Error getting motion system limits of axis %d <- %s"%(i,res)
space["min%d"%(i)]=float(res.split(",")[0])
space["max%d"%(i)]=float(res.split(",")[1])
space_pool.new(space_id,space)
return 1,"ok"
[docs]def deinit_space_paths(space_id):
"Deinitialize *space_id*"
try:
space=space_pool.get(space_id)
except Exception as e:
return 1,str(e)
# remove space_id
space_pool.remove(space_id)
return 1,"ok"
[docs]def add_limits_space_paths(space_id,volume_id):
"Add *volume_id* to the forbidden regions of *space_id*"
try:
space=space_pool.get(space_id)
except Exception as e:
return 0,str(e)
try:
volume=volume_pool.get(volume_id)
except Exception as e:
return 0,str(e)
if space["forbidden"]=="":
space["forbidden"]=volume_id
else:
space["forbidden"]+=","+volume_id
return 1,"ok"
def get_pos(space):
tres=[]
for i in range(1,4):
retcode,res=submod_execcmd("get_pos@motion",space["motion%d_id"%(i)])
if retcode==0:
return 0,"error getting position of axis %d <- %s"%(i,res)
tres.append(float(res))
return 1,tres
def move(space_id,d,s,a,strategy="undef"):
try:
space=space_pool.get(space_id)
except Exception as e:
return 0,str(e)
retcode,order=check_param(strategy,["1","2","3"],"123",int)
if retcode==0:
return 0,"invalid strategy: %s" % (order)
# check if d is multiple of r
for i in order:
d[i-1]=float(d[i-1])
# determine if d is compatible with discretization of space
m=d[i-1]/space["r%d"%(i)]
if abs(m-round(m))>1e-12:
return 0,"destination must be multiple of the minimum step %f (axis %d)"%(space["r%d"%(i)],i)
# check if we are trying to go to a forbidden point
if is_in_volume(d,space,space["forbidden"]):
return 0,"destination out of limits"
# get current position
_,c=get_pos(space)
# round c to the closest node
for i in order:
c[i-1]=round(c[i-1]/space["r%d"%(i)])*space["r%d"%(i)]
# search safe path surrounding forbidden regions
try:
safe_path=search_path(space,c,d,order)
except Exception as e:
return 0,"error searching path: %s"%(str(e))
for step in safe_path:
step=map(str,step)
for i in order:
retcode,res=submod_execcmd("move@motion",space["motion%d_id"%(i)],step[i],s[i],a[i])
if retcode==0:
return 0,"error moving axis %d <- %s"%(i,res)
return 1,"ok"
[docs]def move_space_paths(space_id,d1,d2,d3,s1,s2,s3,a1,a2,a3,strategy="undef"):
"Move on *space_id* to *dn* destination at *sn* speed, with *an* acceleration (for n=1,2,3). strategy is the preferential order of axis movement (e.g. 213 to move first axis 2, then axis 1 finally axis 3)"
d=[d1,d2,d3]
s=[s1,s2,s3]
a=[a1,a2,a3]
retcode,res=move(space_id,d,s,a,strategy)
if retcode==0:
return 0,res
return 1,"ok"
[docs]def init_volume_paths(volume_id,space_id,module,function,*params):
"Create *volume_id* by specifying the Python module and function that will determine inside/outside of volume"
try:
space=space_pool.get(space_id)
except Exception as e:
return 0,str(e)
try:
imodule=__import__(module)
init_func=getattr(imodule,"init_"+function)
except Exception as e:
return 0,"Error loading module and function: %s"%(str(e))
try:
volume=init_func(*params)
except Exception as e:
traceback.print_exception(*sys.exc_info())
return 0,"Error initializing volume: %s"%(str(e))
volume["space_id"]=space_id
volume["module"]=module
volume["function"]=function
volume_pool.new(volume_id,volume)
return 1,"ok"
[docs]def deinit_volume_paths(volume_id):
"Deinitialize *volume_id*"
try:
volume=volume_pool.remove(volume_id)
except Exception as e:
return 1,str(e)
return 1,"ok"
[docs]def init_path_paths(path_id,space_id,volume_id,p1d,p2d,p3d,order,path_type,directions):
"""Create path to scan *volume_id* with *pnd* steps (for n=1,2,3). The path is associated to *space_id*
*order* is a three character string. Its first character is the number of the fastest axis, and the third one is the slowest.
*path_type* is a two character string indicating: the type of scan ('r' for raster, 'm' for meander) of the fastest and middle axis (first character), and middle and slowest axis (second character).
*directions* is a three character string with either 'p' or 'n' for positive or negative for the fastest, middle and slowest axis."""
try:
space=space_pool.get(space_id)
except Exception as e:
return 0,str(e)
path={"space_id": space_id}
try:
volume=volume_pool.get(volume_id)
except Exception as e:
return 0,str(e)
path["volume_id"]=volume_id
pd=[float(p1d),float(p2d),float(p3d)]
for i in [1,2,3]:
print i
if abs(pd[i-1]/space["r%d"%(i)] - round(pd[i-1]/space["r%d"%(i)]))>1e-12:
print pd[i-1]/space["r%d"%(i)],round(pd[i-1]/space["r%d"%(i)])
print abs(pd[i-1]/space["r%d"%(i)] - round(pd[i-1]/space["r%d"%(i)]))
return 0,"p%dd must be multiple of the minimum step for axis %d (%.2e)"%(i,i,space["r%d"%(i)])
retcode,order=check_param(order,["1","2","3"],"123",lambda x: str(int(x)-1))
if retcode==0:
return 0,"invalid order: %s" % (order)
retcode,path_type=check_param(path_type,["r","m"],"rr",lambda x: 0 if x=="r" else 1,False)
if retcode==0:
return 0,"invalid path_type: %s" % (path_type)
retcode,directions=check_param(directions,["p","n"],"ppp",lambda x: 1 if x=="p" else -1,False)
if retcode==0:
return 0,"invalid directions: %s" % (directions)
ff=volume[("max" if directions[0]==1 else "min") + "_" + order[0]]
fi=volume[("min" if directions[0]==1 else "max") + "_" + order[0]]
fd=pd[int(order[0])]*directions[0]
mf=volume[("max" if directions[1]==1 else "min") + "_" + order[1]]
mi=volume[("min" if directions[1]==1 else "max") + "_" + order[1]]
md=pd[int(order[1])]*directions[1]
sf=volume[("max" if directions[2]==1 else "min") + "_" + order[2]]
si=volume[("min" if directions[2]==1 else "max") + "_" + order[2]]
sd=pd[int(order[2])]*directions[2]
points=[]
mc=0
fc=0
# slowest axis
for s in range(abs(int((sf-si)/sd))+1):
# middle axis
for m in range(abs(int((mf-mi)/md))+1)[::1 - 2 * (mc&1) * path_type[1]]:
# fastest axis
for f in range(abs(int((ff-fi)/fd))+1)[::1 - 2 * (fc&1) * path_type[0]]:
point=[None]*3
point[int(order[0])]=fi+f*fd
point[int(order[1])]=mi+m*md
point[int(order[2])]=si+s*sd
if point[0]>space["max1"] or point[0]<space["min1"] or \
point[1]>space["max2"] or point[1]<space["min2"] or \
point[2]>space["max3"] or point[2]<space["min3"]:
continue
if is_in_volume(point,space,space["forbidden"]):
#print("point ",point," forbidden")
continue
if is_in_volume(point,space,volume_id):
points.append(",".join(map(str,point)))
fc += 1
mc += 1
if len(points)==0:
return 0,"No points in path"
path["current"]=-1
path["points"]=";".join(points)
path["nb_points"]=len(points)
path_pool.new(path_id,path)
return 1,"ok"
[docs]def deinit_path_paths(path_id):
"Deinitialize *path_id*"
try:
path=path_pool.remove(path_id)
except Exception as e:
return 1,str(e)
return 1,"ok"
[docs]def get_path_paths(path_id):
"Returns the path (sequence of 3D points) described by *path_id*"
try:
path=path_pool.get(path_id)
except Exception as e:
return 0,str(e)
return 1,path["points"]
[docs]def dump_path_paths(path_id,filename):
"Dump in *filename* the path (sequence of 3D points) described by *path_id*"
try:
path=path_pool.get(path_id)
except Exception as e:
return 0,str(e)
f=open(filename,"w")
f.write(path["points"].replace(";","\n").replace(","," "))
f.close()
return 1,"ok"
[docs]def move_next_paths(path_id,s1,s2,s3,a1,a2,a3,strategy="undef"):
"Go to next point of *path_id*. The first time after initialization of the path, next goes to the first point. It moves with speed *sn* and acceleration *an* (for axis number n: 1,2,3)"
try:
path=path_pool.get(path_id)
except Exception as e:
return 0,str(e)
d=path["points"].split(";")[path["current"]+1].split(",")
s=[s1,s2,s3]
a=[a1,a2,a3]
retcode,res=move(path["space_id"],d,s,a,strategy)
if retcode==0:
return 0,res
path["current"] += 1
if path["current"]+1==path["nb_points"]:
path["current"]=-1
return 1,"finished"
else:
return 1,"ok"
[docs]def move_first_paths(path_id,s1,s2,s3,a1,a2,a3,strategy="undef"):
"Go the first point of *path_id*. Next call to move_next_paths will go to the next point in the path."
try:
path=path_pool.get(path_id)
except Exception as e:
return 0,str(e)
path["current"]=-1
return move_next_paths(path_id,s1,s2,s3,a1,a2,a3,strategy)