#!/usr/bin/env python2
# -*- coding: utf-8 -*-
#
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
#
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame. If not, see <http://www.gnu.org/licenses/>
import sys,os,socket,json,base64,random,subprocess,time
import xml_param_parser
online=-2
if __name__=='__main__':
# 1. so that CouchDB doesn't become a dependance for Pyrame installation
# 2. imports cannot be done on the init function.
try:
import couchdb
online=-1
except:
print("warning: couchdb python module not installed")
cdbs=None
acq_db=None
run_db=None
run_g_db=None
xml_parser=None
[docs]def init(*uri):
"Initialize server connection based on list *uri*. This function is called with the arguments from the command line call to cmdmod. If not provided, the default value of the database engine is used. Databases are created if non-existant."
global online,cdbs,acq_db,run_db,run_g_db,xml_parser
if online!=-1:
return
cdbs=couchdb.Server(*uri)
# check connectivity
try:
cdbs.version()
except Exception as e:
# try to start it
if os.path.exists("/bin/systemctl"):
result=subprocess.Popen(["systemctl","start","couchdb"])
elif os.path.exists("/etc/init.d/couchdb"):
result=subprocess.Popen(["/etc/init.d/couchdb","start"])
else:
print("warning: Couchdb not running and unable to start it")
return
time.sleep(1)
for delay in range(5):
try:
cdbs.version()
break
except:
pass
print("waiting %ds for couchdb to start..."%(delay))
time.sleep(delay)
# DB acqs
try:
acq_db=cdbs["acqs"]
acq_db.compact()
except couchdb.http.ResourceNotFound:
acq_db=cdbs.create("acqs")
acq_db["_design/basic"]={
"views":{
"acq_name":{
"map":"function(doc){emit(doc.name,doc.run_id);}"
},
"acq_run":{
"map":"function(doc){emit(doc.run_id,doc.name);}"
}
}
}
# DB runs
try:
run_db=cdbs["runs"]
run_db.compact()
except couchdb.http.ResourceNotFound:
run_db=cdbs.create("runs")
run_db["_design/basic"]={
"views":{
"run_name":{
"map":"function(doc){emit(doc.name,doc.run_g_id);}"
},
"run_run_g":{
"map":"function(doc){emit(doc.run_g_id,doc.name);}"
}
}
}
# DB run groups
try:
run_g_db=cdbs["run_groups"]
run_g_db.compact()
except couchdb.http.ResourceNotFound:
run_g_db=cdbs.create("run_groups")
run_g_db["root"]={"name":"root","run_g_id":None}
run_g_db["_design/basic"]={
"views":{
"run_g_name":{
"map":"function(doc){emit(doc.name,doc.run_g_id);}"
},
"run_g_run_g":{
"map":"function(doc){emit(doc.run_g_id,doc.name);}"
}
}
}
# Clean up and compact
acq_db.compact()
run_db.compact()
run_g_db.compact()
# Check for orphans
# res=get_orphans()
# if res!=";;":
# res=res.split(";")
# print("warning: The RunDB contains the following orphan items. Some data may be inaccessible")
# print("acquisitions: %s"%(res[0]))
# print("runs: %s"%(res[1]))
# print("run groups: %s"%(res[2]))
# XML parser
xml_parser=xml_param_parser.xml_param_parser()
online=0
print("RunDB is ready")
#####################
class pyrame_exc(Exception):
pass
def call(msg,function,*params):
if online<0:
return 0,"RunDB not properly initialized"
try:
res=function(*params)
except pyrame_exc as e:
if msg!="":
msg+=": "
return 0,msg+str(e)
return 1,res
#####################
# JSON DUMP
#####################
def update_json_dump(doc_type,doc):
name=os.path.basename(doc["name"])
# write json dump of doc at the path
retcode,res=submod_execcmd("mount@mountd",doc["mp"])
if retcode==0:
raise pyrame_exc("unable to mount %s <- %s"%(doc["mp"],res))
mp_local=res.rstrip("/")
path="%s/%s"%(mp_local,doc["path"])
try:
os.makedirs(path)
except OSError as e:
# just ignore if path already exists (errno 17)
if e.errno==17:
pass
else:
raise(e)
if "/" in name:
filename=name.rsplit("/",1)[1]
else:
filename=name
c_filename="%s/%s_%s_rundb_dump.json"%(path,name,doc_type)
#print("dumping json backup on %s"%(c_filename))
with open(c_filename,"w") as f:
f.write(json.dumps(doc))
#####################
# GENERIC NEW
#####################
def new_element(e_name,name,mp,path,group_id,mode):
# function to create acqs and runs
group_id=group_id.rstrip("/")
if e_name=="acq":
db=acq_db
eg_name="run"
del_element=del_acq
check_run_id(group_id)
check_collisions=[acq_db]
elif e_name=="run":
db=run_db
eg_name="run_g"
del_element=del_run
if name=="root":
raise pyrame_exc("invalid run name")
if "/" in name or "\\" in name:
raise pyrame_exc("invalid characters in run name")
check_run_g_id(group_id)
check_collisions=[run_db,run_g_db]
elif e_name=="run_g":
db=run_g_db
eg_name="run_g"
del_element=del_run_group
if name=="root":
raise pyrame_exc("invalid run group name")
if "/" in name or "\\" in name:
raise pyrame_exc("invalid characters in run group name")
check_run_g_id(group_id)
check_collisions=[run_db,run_g_db]
else:
raise pyrame_exc("invalid element type")
gid_name="%s_id"%(eg_name)
# get and check path so we can dump json data after
# we do it before deleting things
e_id="%s/%s"%(group_id,name)
# check if element already exists or collides
for c_db in check_collisions:
#print("checking existance of %s in "%(e_id),c_db)
if e_id in c_db:
if mode=="remove":
if e_name=="acq":
del_element(e_id)
else:
del_element(e_id,"true")
else:
if e_name=="acq":
raise pyrame_exc("%s already exists in %s"%(name,group_id))
if e_name=="run" and c_db==run_g_db:
raise pyrame_exc("unable to create run. a run group with the same name already exists at %s"%(group_id))
if e_name=="run_g" and c_db==run_db:
raise pyrame_exc("unable to create run group. a run with the same name already exists at %s"%(group_id))
# if run or run_g exist, then update:
# first delete element, then recreate it
db.purge([db[e_id]])
# create and store element
doc={gid_name:group_id,"name":name,"path":path,"mp":mp}
db[e_id]=doc
# dump json data
update_json_dump(e_name,db[e_id])
return e_id
#####################
# NEW
#####################
def new_acq(name,mp,path,run_id,mode="append"):
return new_element("acq",name,mp,path,run_id,mode)
[docs]def new_acq_rundb(name,mp,path,run_id,mode="append"):
"Add an acquisition in *run_id*."
return call("Error adding acquisition",new_acq,name,mp,path,run_id,mode)
#####################
def new_run(name,mp,path,run_g_id,mode="append"):
return new_element("run",name,mp,path,run_g_id,mode)
[docs]def new_run_rundb(name,mp,path,run_g_id,mode="append"):
"Add new run with *name*, in run group *run_g_id*."
return call("Error adding run",new_run,name,mp,path,run_g_id,mode)
#####################
def new_run_group(name,mp,path,run_g_id,mode="append"):
return new_element("run_g",name,mp,path,run_g_id,mode)
[docs]def new_run_group_rundb(name,mp,path,run_g_id,mode="append"):
"Add new run group with *name*, in run group *run_g_id*."
return call("Error adding run group",new_run_group,name,mp,path,run_g_id,mode)
#####################
# DEL
#####################
def del_acq(acq_id):
check_acq_id(acq_id)
acq_db.purge([acq_db[acq_id]])
return "ok"
[docs]def del_acq_rundb(acq_id):
"Remove acquisition *acq_id*."
return call("Error deleting acquisition",del_acq,acq_id)
#####################
def del_run(run_id,recursive):
run_id=run_id.rstrip("/")
check_run_id(run_id)
# check if run contains acqs
res=get_acqs_in_run(run_id,"true")
if res!="":
if sbool(recursive):
for a in res.split(","):
del_acq(a)
else:
raise pyrame_exc("run not empty. it contains acquisitions")
# del run
run_db.purge([run_db[run_id]])
return "ok"
[docs]def del_run_rundb(run_id,recursive):
"Remove run *run_id*."
return call("Error deleting run",del_run,run_id,recursive)
#####################
def del_run_group(run_g_id,recursive):
run_g_id=run_g_id.rstrip("/")
check_run_g_id(run_g_id)
# check if run group contains other run groups
res=get_run_groups_in_run_group(run_g_id,"false")
if res!="":
if sbool(recursive):
for rg in res.split(","):
del_run_group(rg,recursive)
else:
raise pyrame_exc("run group not empty. it contains other run groups")
# check if run group contains runs
res=get_runs_in_run_group(run_g_id)
if res!="":
if sbool(recursive):
for r in res.split(","):
del_run(r,recursive)
else:
raise pyrame_exc("run group not empty. it contains runs")
# del run group
if run_g_id!="root":
run_g_db.purge([run_g_db[run_g_id]])
return "ok"
[docs]def del_run_group_rundb(run_g_id,recursive):
"Remove run group *run_g_id*."
return call("Error deleting run group",del_run_group,run_g_id,recursive)
#####################
# ADD PARAMS
#####################
def set_params(doc_type,doc_id,params):
if doc_type=="acq":
check_acq_id(doc_id)
db=acq_db
group="run_id"
elif doc_type=="run":
check_run_id(doc_id)
db=run_db
group="run_g_id"
else:
raise pyrame_exc("invalid document type")
# modify RunDB
d=db[doc_id]
d.update(params)
db[doc_id]=d
# update json dump
if doc_type=="acq":
dir_name=os.path.dirname(d["name"])
else:
dir_name=d["name"]
update_json_dump(doc_type,d)
#####################
def set_param_acq(acq_id,name,value):
# check name collision
if name in ["run_id","run_g_id","mp","path","name","_id","_rev"]:
raise pyrame_exc("invalid parameter name")
params={name:value}
# add to RunDB
set_params("acq",acq_id,params)
return "ok"
[docs]def set_param_acq_rundb(acq_id,name,value):
"Set parameter (*name*, *value*) to *acq_id*."
return call("Error adding parameter to acquisition",set_param_acq,acq_id,name,value)
#####################
def set_param_run(run_id,name,value):
# check name collision
if name in ["run_id","run_g_id","mp","path","name","_id","_rev"]:
raise pyrame_exc("invalid parameter name")
params={name:value}
# add to RunDB
set_params("run",run_id,params)
return "ok"
[docs]def set_param_run_rundb(run_id,name,value):
"Set parameter (*name*, *value*) to *run_id*."
return call("Error adding parameter to run",set_param_run,run_id,name,value)
#####################
def set_xml_params_acq(acq_id,xml_file,add_parent):
# get xml content
try:
with open(xml_file,"r") as f:
data=f.read()
except EnvironmentError as e:
raise pyrame_exc(e)
# parse
params=xml_parser.parse(data,add_parent)
# add to RunDB
set_params("acq",acq_id,params)
return "ok"
[docs]def set_xml_params_acq_rundb(acq_id,xml_file,add_parent):
"Parse XML data from *xml_file* and include it in *acq_id*."
return call("Error adding XML parameters to acquisition",set_xml_params_acq,acq_id,xml_file,add_parent)
#####################
# ORPHANS
#####################
def get_orphans():
# acqs
acq_orphans=[]
acqs=acq_db.view("_design/basic/_view/acq_run")
for a in acqs:
if a.key not in run_db:
acq_orphans.append(a.id)
# runs
run_orphans=[]
runs=run_db.view("_design/basic/_view/run_run_g")
for r in runs:
if r.key not in run_g_db:
run_orphans.append(r.id)
# run groups
run_g_orphans=[]
run_groups=run_g_db.view("_design/basic/_view/run_g_run_g")
for rg in run_groups:
#rg.key!=None to skip root run group
if rg.key!=None and rg.key not in run_g_db:
run_g_orphans.append(rg.id)
return ";".join([",".join(acq_orphans),",".join(run_orphans),",".join(run_g_orphans)])
#####################
# GET ACQ
#####################
def get_acq(acq_id):
check_acq_id(acq_id)
res=[]
for k,v in acq_db[acq_id].items():
if k not in ["_id","_rev"]:
res.append("%s\t%s"%(k,v))
return base64.b64encode("\n".join(res))
[docs]def get_acq_rundb(acq_id):
"Get acquisition parameters and values of *acq_id* separated by a tab. One line for each parameter. The result is returned encoded in base64."
return call("Error getting acq",get_acq,acq_id)
#####################
# GET RUN
#####################
def get_run(run_id):
check_run_id(run_id)
res=[]
for k,v in run_db[run_id].items():
if k not in ["_id","_rev"]:
res.append("%s\t%s"%(k,v))
return base64.b64encode("\n".join(res))
[docs]def get_run_rundb(run_id):
"Get run parameters and values of *run_id* separated by a tab. One line for each parameter. The result is returned encoded in base64."
return call("Error getting run",get_run,run_id)
#####################
# GET RUN GROUP
#####################
def get_run_group(run_g_id):
check_run_g_id(run_g_id)
res=[]
for k,v in run_g_db[run_g_id].items():
if k not in ["_id","_rev"]:
res.append("%s\t%s"%(k,v))
return base64.b64encode("\n".join(res))
[docs]def get_run_group_rundb(run_g_id):
"Get run group parameters and values of *run_g_id* separated by a tab. One line for each parameter. The result is returned encoded in base64."
return call("Error getting run group",get_run_group,run_g_id)
#####################
# GET PARAMS ACQ
#####################
def get_params_acq(acq_id):
check_acq_id(acq_id)
params=acq_db[acq_id].keys()
params.remove("_id")
params.remove("_rev")
return ",".join(params)
[docs]def get_params_acq_rundb(acq_id):
"Get a list of parameters defined for *acq_id*."
return call("Error getting acquisition params",get_params_acq,acq_id)
#####################
def get_param_acq(acq_id,param):
check_acq_id(acq_id)
if param not in acq_db[acq_id]:
raise pyrame_exc("parameter not found")
return str(acq_db[acq_id][param])
[docs]def get_param_acq_rundb(acq_id,param):
"Get value of *param* in *acq_id*."
return call("Error getting acquisition param",get_param_acq,acq_id,param)
#####################
def get_param_run(run_id,param):
check_run_id(run_id)
if param not in run_db[run_id]:
raise pyrame_exc("parameter not found")
return str(run_db[run_id][param])
[docs]def get_param_run_rundb(run_id,param):
"Get value of *param* in *run_id*."
return call("Error getting run param",get_param_run,run_id,param)
#####################
def map_reduce(map_func,**params):
res=[]
for a in acq_db.query(map_func,**params):
res.append([a.id,a.key,a.value])
return res
#####################
regexp_map="""
function(doc) {
var key_re = new RegExp("key_re_str");
var value_re = new RegExp("value_re_str");
for (var key in doc) {
if (key.search(key_re)!=-1 && doc[key].search(value_re)!=-1) {
emit(key,doc[key]);
}
}
}
"""
def map_reduce_regex(key_regex,value_regex):
query_map=str(regexp_map) # copy string
query_map=query_map.replace("key_re_str",key_regex)
query_map=query_map.replace("value_re_str",value_regex)
return map_reduce(query_map)
#####################
def get_acqs_regex(key_regex,value_regex):
res=map_reduce_regex(key_regex,value_regex)
return ",".join(row[0] for row in res)
[docs]def get_acqs_regex_rundb(key_regex,value_regex):
"Get a list of acquisitions where the key matches the regular expression *key_regex* and/or the value matches *value_regex*."
return call("Error getting acquisitions",get_acqs_regex,key_regex,value_regex)
#####################
def get_acqs_table_regex(key_regex,value_regex):
found=map_reduce_regex(key_regex,value_regex)
to_encode="\n".join("\t".join(acq_id) for acq_id in found)
return base64.b64encode(to_encode)
[docs]def get_acqs_table_regex_rundb(key_regex,value_regex):
"Get a list of acquisitions where the key matches the regular expression *key_regex* and/or the value matches *value_regex*. Return a tab-separated list with the acq name, the found key and the found value. One line for each result. Result is return encoded in Base64"
return call("Error getting acquisitions",get_acqs_table_regex,key_regex,value_regex)
#####################
def get_acqs_script(map_func,red_func,group):
grouping=sbool(group)
res=[]
print "map_func",map_func
print "red_func",red_func
res=map_reduce(map_func,reduce_fun=red_func,group=grouping)
return ",".join(row[0] for row in res)
[docs]def get_acqs_script_rundb(map_func,red_func,group):
"Get a list of acquisitions using the map and reduce functions *map_func* and *red_func*, respectively. *red_func* and *group* can be \"none\" to not be sent."
return call("Error getting acquisitions",get_acqs_script,map_func,red_func,group)
#####################
def get_acqs_in_run(run_id,return_ids="1"):
check_run_id(run_id)
acqs=acq_db.view("_design/basic/_view/acq_run",key=run_id)
a_list=[]
for a in acqs:
a_list.append(a.id)
res=a_list
if not sbool(return_ids):
res=[]
for acq_id in a_list:
res.append(acq_db[acq_id]["name"])
return ",".join(res)
[docs]def get_acqs_in_run_rundb(run_id,return_ids="1"):
"Get acquisitions in run. The returned list can be id's or names depending on *return_ids*"
return call("Error getting acquisitions",get_acqs_in_run,run_id,return_ids)
#####################
# GET RUN (GROUPS)
#####################
def get_runs_in_run_group(run_g_id):
check_run_g_id(run_g_id)
runs=run_db.view("_design/basic/_view/run_run_g",key=run_g_id)
r_list=[]
if len(runs)!=0:
for r in runs:
r_list.append(r.id)
return ",".join(r_list)
[docs]def get_runs_in_run_group_rundb(run_g_id):
"Get runs in run group *run_g_id*."
return call("Error getting runs",get_runs_in_run_group,run_g_id)
#####################
def get_run_groups_in_run_group(run_g_id,recursive):
check_run_g_id(run_g_id)
run_groups=run_g_db.view("_design/basic/_view/run_g_run_g",key=run_g_id)
r_list=""
if len(run_groups)!=0:
for r in run_groups:
r_list+=r.id+","
if sbool(recursive):
res=get_run_groups_in_run_group(r.id,recursive)
if res!="":
r_list+=res+","
return r_list[:-1]
[docs]def get_run_groups_in_run_group_rundb(run_g_id,recursive="false"):
"Get run groups in run group *run_g_id*. Nested run groups are also returned if *recursive* is true or 1 (case-insensitive)"
return call("Error getting runs",get_run_groups_in_run_group,run_g_id,recursive)
#####################
# GET NAMES
#####################
def get_acq_name(acq_id):
check_acq_id(acq_id)
return acq_db[acq_id]["name"]
[docs]def get_acq_name_rundb(acq_id):
"Get acq name from *acq_id*."
return call("Error getting acq name",get_acq_name,acq_id)
#####################
def get_run_name(run_id):
check_run_id(run_id)
return run_db[run_id]["name"]
[docs]def get_run_name_rundb(run_id):
"Get run name from *run_id*."
return call("Error getting run name",get_run_name,run_id)
#####################
def get_run_group_name(run_g_id):
check_run_g_id(run_g_id)
return run_g_db[run_g_id]["name"]
[docs]def get_run_group_name_rundb(run_g_id):
"Get run name from *run_g_id*."
return call("Error getting run group name",get_run_group_name,run_g_id)
#####################
# EMPTY
#####################
def is_empty_element(doc_type,doc_id):
if doc_type=="run":
childs=[{"db":acq_db,"name":"acq"}]
elif doc_type=="run_g":
childs=[{"db":run_db,"name":"run"},{"db":run_g_db,"name":"run_g"}]
for child in childs:
view="%s_%s"%(child["name"],doc_type)
els=child["db"].view("_design/basic/_view/%s"%(view),key=doc_id)
if len(els)!=0:
return "0"
return "1"
[docs]def is_empty_run_rundb(run_id):
"Check if run *run_id* is empty."
return call("Error checking emptiness of run",is_empty_element,"run",run_id)
[docs]def is_empty_run_group_rundb(run_g_id):
"Check if run group *run_g_id* is empty."
return call("Error checking emptiness of run group",is_empty_element,"run_g",run_g_id)
#####################
# CHECK ELEMENT
#####################
def check_acq_id(acq_id):
if acq_id not in acq_db:
raise pyrame_exc("acquisition not found")
return acq_id
[docs]def check_acq_id_rundb(acq_id):
"Check existance of *acq_id*."
return call("",check_acq_id,acq_id)
#####################
def check_run_id(run_id):
if run_id not in run_db:
# RC depends on this exact message
raise pyrame_exc("run not found")
return run_id
[docs]def check_run_id_rundb(run_id):
"Check existance of *run_id*"
return call("",check_run_id,run_id)
#####################
def check_run_g_id(run_g_id):
if run_g_id not in run_g_db:
raise pyrame_exc("run group not found")
return run_g_id
[docs]def check_run_g_id_rundb(run_g_id):
"Check existance of *run_g_id*"
return call("",check_run_g_id,run_g_id)