Source code for cmd_rundb

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# 
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
# 
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame.  If not, see <http://www.gnu.org/licenses/>

import sys,os,socket,json,base64,random,subprocess,time
import xml_param_parser

online=-2

if __name__=='__main__':
    # 1. so that CouchDB doesn't become a dependance for Pyrame installation
    # 2. imports cannot be done on the init function.
    try:
        import couchdb
        online=-1
    except:
        print("warning: couchdb python module not installed")

cdbs=None
acq_db=None
run_db=None
run_g_db=None
xml_parser=None

[docs]def init(*uri): "Initialize server connection based on list *uri*. This function is called with the arguments from the command line call to cmdmod. If not provided, the default value of the database engine is used. Databases are created if non-existant." global online,cdbs,acq_db,run_db,run_g_db,xml_parser if online!=-1: return cdbs=couchdb.Server(*uri) # check connectivity try: cdbs.version() except Exception as e: # try to start it if os.path.exists("/bin/systemctl"): result=subprocess.Popen(["systemctl","start","couchdb"]) elif os.path.exists("/etc/init.d/couchdb"): result=subprocess.Popen(["/etc/init.d/couchdb","start"]) else: print("warning: Couchdb not running and unable to start it") return time.sleep(1) for delay in range(5): try: cdbs.version() break except: pass print("waiting %ds for couchdb to start..."%(delay)) time.sleep(delay) # DB acqs try: acq_db=cdbs["acqs"] acq_db.compact() except couchdb.http.ResourceNotFound: acq_db=cdbs.create("acqs") acq_db["_design/basic"]={ "views":{ "acq_name":{ "map":"function(doc){emit(doc.name,doc.run_id);}" }, "acq_run":{ "map":"function(doc){emit(doc.run_id,doc.name);}" } } } # DB runs try: run_db=cdbs["runs"] run_db.compact() except couchdb.http.ResourceNotFound: run_db=cdbs.create("runs") run_db["_design/basic"]={ "views":{ "run_name":{ "map":"function(doc){emit(doc.name,doc.run_g_id);}" }, "run_run_g":{ "map":"function(doc){emit(doc.run_g_id,doc.name);}" } } } # DB run groups try: run_g_db=cdbs["run_groups"] run_g_db.compact() except couchdb.http.ResourceNotFound: run_g_db=cdbs.create("run_groups") run_g_db["root"]={"name":"root","run_g_id":None} run_g_db["_design/basic"]={ "views":{ "run_g_name":{ "map":"function(doc){emit(doc.name,doc.run_g_id);}" }, "run_g_run_g":{ "map":"function(doc){emit(doc.run_g_id,doc.name);}" } } } # Clean up and compact acq_db.compact() run_db.compact() run_g_db.compact() # Check for orphans # res=get_orphans() # if res!=";;": # res=res.split(";") # print("warning: The RunDB contains the following orphan items. Some data may be inaccessible") # print("acquisitions: %s"%(res[0])) # print("runs: %s"%(res[1])) # print("run groups: %s"%(res[2])) # XML parser xml_parser=xml_param_parser.xml_param_parser() online=0 print("RunDB is ready")
##################### class pyrame_exc(Exception): pass def call(msg,function,*params): if online<0: return 0,"RunDB not properly initialized" try: res=function(*params) except pyrame_exc as e: if msg!="": msg+=": " return 0,msg+str(e) return 1,res ##################### # JSON DUMP ##################### def update_json_dump(doc_type,doc): name=os.path.basename(doc["name"]) # write json dump of doc at the path retcode,res=submod_execcmd("mount@mountd",doc["mp"]) if retcode==0: raise pyrame_exc("unable to mount %s <- %s"%(doc["mp"],res)) mp_local=res.rstrip("/") path="%s/%s"%(mp_local,doc["path"]) try: os.makedirs(path) except OSError as e: # just ignore if path already exists (errno 17) if e.errno==17: pass else: raise(e) if "/" in name: filename=name.rsplit("/",1)[1] else: filename=name c_filename="%s/%s_%s_rundb_dump.json"%(path,name,doc_type) #print("dumping json backup on %s"%(c_filename)) with open(c_filename,"w") as f: f.write(json.dumps(doc)) ##################### # GENERIC NEW ##################### def new_element(e_name,name,mp,path,group_id,mode): # function to create acqs and runs group_id=group_id.rstrip("/") if e_name=="acq": db=acq_db eg_name="run" del_element=del_acq check_run_id(group_id) check_collisions=[acq_db] elif e_name=="run": db=run_db eg_name="run_g" del_element=del_run if name=="root": raise pyrame_exc("invalid run name") if "/" in name or "\\" in name: raise pyrame_exc("invalid characters in run name") check_run_g_id(group_id) check_collisions=[run_db,run_g_db] elif e_name=="run_g": db=run_g_db eg_name="run_g" del_element=del_run_group if name=="root": raise pyrame_exc("invalid run group name") if "/" in name or "\\" in name: raise pyrame_exc("invalid characters in run group name") check_run_g_id(group_id) check_collisions=[run_db,run_g_db] else: raise pyrame_exc("invalid element type") gid_name="%s_id"%(eg_name) # get and check path so we can dump json data after # we do it before deleting things e_id="%s/%s"%(group_id,name) # check if element already exists or collides for c_db in check_collisions: #print("checking existance of %s in "%(e_id),c_db) if e_id in c_db: if mode=="remove": if e_name=="acq": del_element(e_id) else: del_element(e_id,"true") else: if e_name=="acq": raise pyrame_exc("%s already exists in %s"%(name,group_id)) if e_name=="run" and c_db==run_g_db: raise pyrame_exc("unable to create run. a run group with the same name already exists at %s"%(group_id)) if e_name=="run_g" and c_db==run_db: raise pyrame_exc("unable to create run group. a run with the same name already exists at %s"%(group_id)) # if run or run_g exist, then update: # first delete element, then recreate it db.purge([db[e_id]]) # create and store element doc={gid_name:group_id,"name":name,"path":path,"mp":mp} db[e_id]=doc # dump json data update_json_dump(e_name,db[e_id]) return e_id ##################### # NEW ##################### def new_acq(name,mp,path,run_id,mode="append"): return new_element("acq",name,mp,path,run_id,mode)
[docs]def new_acq_rundb(name,mp,path,run_id,mode="append"): "Add an acquisition in *run_id*." return call("Error adding acquisition",new_acq,name,mp,path,run_id,mode)
##################### def new_run(name,mp,path,run_g_id,mode="append"): return new_element("run",name,mp,path,run_g_id,mode)
[docs]def new_run_rundb(name,mp,path,run_g_id,mode="append"): "Add new run with *name*, in run group *run_g_id*." return call("Error adding run",new_run,name,mp,path,run_g_id,mode)
##################### def new_run_group(name,mp,path,run_g_id,mode="append"): return new_element("run_g",name,mp,path,run_g_id,mode)
[docs]def new_run_group_rundb(name,mp,path,run_g_id,mode="append"): "Add new run group with *name*, in run group *run_g_id*." return call("Error adding run group",new_run_group,name,mp,path,run_g_id,mode)
##################### # DEL ##################### def del_acq(acq_id): check_acq_id(acq_id) acq_db.purge([acq_db[acq_id]]) return "ok"
[docs]def del_acq_rundb(acq_id): "Remove acquisition *acq_id*." return call("Error deleting acquisition",del_acq,acq_id)
##################### def del_run(run_id,recursive): run_id=run_id.rstrip("/") check_run_id(run_id) # check if run contains acqs res=get_acqs_in_run(run_id,"true") if res!="": if sbool(recursive): for a in res.split(","): del_acq(a) else: raise pyrame_exc("run not empty. it contains acquisitions") # del run run_db.purge([run_db[run_id]]) return "ok"
[docs]def del_run_rundb(run_id,recursive): "Remove run *run_id*." return call("Error deleting run",del_run,run_id,recursive)
##################### def del_run_group(run_g_id,recursive): run_g_id=run_g_id.rstrip("/") check_run_g_id(run_g_id) # check if run group contains other run groups res=get_run_groups_in_run_group(run_g_id,"false") if res!="": if sbool(recursive): for rg in res.split(","): del_run_group(rg,recursive) else: raise pyrame_exc("run group not empty. it contains other run groups") # check if run group contains runs res=get_runs_in_run_group(run_g_id) if res!="": if sbool(recursive): for r in res.split(","): del_run(r,recursive) else: raise pyrame_exc("run group not empty. it contains runs") # del run group if run_g_id!="root": run_g_db.purge([run_g_db[run_g_id]]) return "ok"
[docs]def del_run_group_rundb(run_g_id,recursive): "Remove run group *run_g_id*." return call("Error deleting run group",del_run_group,run_g_id,recursive)
##################### # ADD PARAMS ##################### def set_params(doc_type,doc_id,params): if doc_type=="acq": check_acq_id(doc_id) db=acq_db group="run_id" elif doc_type=="run": check_run_id(doc_id) db=run_db group="run_g_id" else: raise pyrame_exc("invalid document type") # modify RunDB d=db[doc_id] d.update(params) db[doc_id]=d # update json dump if doc_type=="acq": dir_name=os.path.dirname(d["name"]) else: dir_name=d["name"] update_json_dump(doc_type,d) ##################### def set_param_acq(acq_id,name,value): # check name collision if name in ["run_id","run_g_id","mp","path","name","_id","_rev"]: raise pyrame_exc("invalid parameter name") params={name:value} # add to RunDB set_params("acq",acq_id,params) return "ok"
[docs]def set_param_acq_rundb(acq_id,name,value): "Set parameter (*name*, *value*) to *acq_id*." return call("Error adding parameter to acquisition",set_param_acq,acq_id,name,value)
##################### def set_param_run(run_id,name,value): # check name collision if name in ["run_id","run_g_id","mp","path","name","_id","_rev"]: raise pyrame_exc("invalid parameter name") params={name:value} # add to RunDB set_params("run",run_id,params) return "ok"
[docs]def set_param_run_rundb(run_id,name,value): "Set parameter (*name*, *value*) to *run_id*." return call("Error adding parameter to run",set_param_run,run_id,name,value)
##################### def set_xml_params_acq(acq_id,xml_file,add_parent): # get xml content try: with open(xml_file,"r") as f: data=f.read() except EnvironmentError as e: raise pyrame_exc(e) # parse params=xml_parser.parse(data,add_parent) # add to RunDB set_params("acq",acq_id,params) return "ok"
[docs]def set_xml_params_acq_rundb(acq_id,xml_file,add_parent): "Parse XML data from *xml_file* and include it in *acq_id*." return call("Error adding XML parameters to acquisition",set_xml_params_acq,acq_id,xml_file,add_parent)
##################### # ORPHANS ##################### def get_orphans(): # acqs acq_orphans=[] acqs=acq_db.view("_design/basic/_view/acq_run") for a in acqs: if a.key not in run_db: acq_orphans.append(a.id) # runs run_orphans=[] runs=run_db.view("_design/basic/_view/run_run_g") for r in runs: if r.key not in run_g_db: run_orphans.append(r.id) # run groups run_g_orphans=[] run_groups=run_g_db.view("_design/basic/_view/run_g_run_g") for rg in run_groups: #rg.key!=None to skip root run group if rg.key!=None and rg.key not in run_g_db: run_g_orphans.append(rg.id) return ";".join([",".join(acq_orphans),",".join(run_orphans),",".join(run_g_orphans)]) ##################### # GET ACQ ##################### def get_acq(acq_id): check_acq_id(acq_id) res=[] for k,v in acq_db[acq_id].items(): if k not in ["_id","_rev"]: res.append("%s\t%s"%(k,v)) return base64.b64encode("\n".join(res))
[docs]def get_acq_rundb(acq_id): "Get acquisition parameters and values of *acq_id* separated by a tab. One line for each parameter. The result is returned encoded in base64." return call("Error getting acq",get_acq,acq_id)
##################### # GET RUN ##################### def get_run(run_id): check_run_id(run_id) res=[] for k,v in run_db[run_id].items(): if k not in ["_id","_rev"]: res.append("%s\t%s"%(k,v)) return base64.b64encode("\n".join(res))
[docs]def get_run_rundb(run_id): "Get run parameters and values of *run_id* separated by a tab. One line for each parameter. The result is returned encoded in base64." return call("Error getting run",get_run,run_id)
##################### # GET RUN GROUP ##################### def get_run_group(run_g_id): check_run_g_id(run_g_id) res=[] for k,v in run_g_db[run_g_id].items(): if k not in ["_id","_rev"]: res.append("%s\t%s"%(k,v)) return base64.b64encode("\n".join(res))
[docs]def get_run_group_rundb(run_g_id): "Get run group parameters and values of *run_g_id* separated by a tab. One line for each parameter. The result is returned encoded in base64." return call("Error getting run group",get_run_group,run_g_id)
##################### # GET PARAMS ACQ ##################### def get_params_acq(acq_id): check_acq_id(acq_id) params=acq_db[acq_id].keys() params.remove("_id") params.remove("_rev") return ",".join(params)
[docs]def get_params_acq_rundb(acq_id): "Get a list of parameters defined for *acq_id*." return call("Error getting acquisition params",get_params_acq,acq_id)
##################### def get_param_acq(acq_id,param): check_acq_id(acq_id) if param not in acq_db[acq_id]: raise pyrame_exc("parameter not found") return str(acq_db[acq_id][param])
[docs]def get_param_acq_rundb(acq_id,param): "Get value of *param* in *acq_id*." return call("Error getting acquisition param",get_param_acq,acq_id,param)
##################### def get_param_run(run_id,param): check_run_id(run_id) if param not in run_db[run_id]: raise pyrame_exc("parameter not found") return str(run_db[run_id][param])
[docs]def get_param_run_rundb(run_id,param): "Get value of *param* in *run_id*." return call("Error getting run param",get_param_run,run_id,param)
##################### def map_reduce(map_func,**params): res=[] for a in acq_db.query(map_func,**params): res.append([a.id,a.key,a.value]) return res ##################### regexp_map=""" function(doc) { var key_re = new RegExp("key_re_str"); var value_re = new RegExp("value_re_str"); for (var key in doc) { if (key.search(key_re)!=-1 && doc[key].search(value_re)!=-1) { emit(key,doc[key]); } } } """ def map_reduce_regex(key_regex,value_regex): query_map=str(regexp_map) # copy string query_map=query_map.replace("key_re_str",key_regex) query_map=query_map.replace("value_re_str",value_regex) return map_reduce(query_map) ##################### def get_acqs_regex(key_regex,value_regex): res=map_reduce_regex(key_regex,value_regex) return ",".join(row[0] for row in res)
[docs]def get_acqs_regex_rundb(key_regex,value_regex): "Get a list of acquisitions where the key matches the regular expression *key_regex* and/or the value matches *value_regex*." return call("Error getting acquisitions",get_acqs_regex,key_regex,value_regex)
##################### def get_acqs_table_regex(key_regex,value_regex): found=map_reduce_regex(key_regex,value_regex) to_encode="\n".join("\t".join(acq_id) for acq_id in found) return base64.b64encode(to_encode)
[docs]def get_acqs_table_regex_rundb(key_regex,value_regex): "Get a list of acquisitions where the key matches the regular expression *key_regex* and/or the value matches *value_regex*. Return a tab-separated list with the acq name, the found key and the found value. One line for each result. Result is return encoded in Base64" return call("Error getting acquisitions",get_acqs_table_regex,key_regex,value_regex)
##################### def get_acqs_script(map_func,red_func,group): grouping=sbool(group) res=[] print "map_func",map_func print "red_func",red_func res=map_reduce(map_func,reduce_fun=red_func,group=grouping) return ",".join(row[0] for row in res)
[docs]def get_acqs_script_rundb(map_func,red_func,group): "Get a list of acquisitions using the map and reduce functions *map_func* and *red_func*, respectively. *red_func* and *group* can be \"none\" to not be sent." return call("Error getting acquisitions",get_acqs_script,map_func,red_func,group)
##################### def get_acqs_in_run(run_id,return_ids="1"): check_run_id(run_id) acqs=acq_db.view("_design/basic/_view/acq_run",key=run_id) a_list=[] for a in acqs: a_list.append(a.id) res=a_list if not sbool(return_ids): res=[] for acq_id in a_list: res.append(acq_db[acq_id]["name"]) return ",".join(res)
[docs]def get_acqs_in_run_rundb(run_id,return_ids="1"): "Get acquisitions in run. The returned list can be id's or names depending on *return_ids*" return call("Error getting acquisitions",get_acqs_in_run,run_id,return_ids)
##################### # GET RUN (GROUPS) ##################### def get_runs_in_run_group(run_g_id): check_run_g_id(run_g_id) runs=run_db.view("_design/basic/_view/run_run_g",key=run_g_id) r_list=[] if len(runs)!=0: for r in runs: r_list.append(r.id) return ",".join(r_list)
[docs]def get_runs_in_run_group_rundb(run_g_id): "Get runs in run group *run_g_id*." return call("Error getting runs",get_runs_in_run_group,run_g_id)
##################### def get_run_groups_in_run_group(run_g_id,recursive): check_run_g_id(run_g_id) run_groups=run_g_db.view("_design/basic/_view/run_g_run_g",key=run_g_id) r_list="" if len(run_groups)!=0: for r in run_groups: r_list+=r.id+"," if sbool(recursive): res=get_run_groups_in_run_group(r.id,recursive) if res!="": r_list+=res+"," return r_list[:-1]
[docs]def get_run_groups_in_run_group_rundb(run_g_id,recursive="false"): "Get run groups in run group *run_g_id*. Nested run groups are also returned if *recursive* is true or 1 (case-insensitive)" return call("Error getting runs",get_run_groups_in_run_group,run_g_id,recursive)
##################### # GET NAMES ##################### def get_acq_name(acq_id): check_acq_id(acq_id) return acq_db[acq_id]["name"]
[docs]def get_acq_name_rundb(acq_id): "Get acq name from *acq_id*." return call("Error getting acq name",get_acq_name,acq_id)
##################### def get_run_name(run_id): check_run_id(run_id) return run_db[run_id]["name"]
[docs]def get_run_name_rundb(run_id): "Get run name from *run_id*." return call("Error getting run name",get_run_name,run_id)
##################### def get_run_group_name(run_g_id): check_run_g_id(run_g_id) return run_g_db[run_g_id]["name"]
[docs]def get_run_group_name_rundb(run_g_id): "Get run name from *run_g_id*." return call("Error getting run group name",get_run_group_name,run_g_id)
##################### # EMPTY ##################### def is_empty_element(doc_type,doc_id): if doc_type=="run": childs=[{"db":acq_db,"name":"acq"}] elif doc_type=="run_g": childs=[{"db":run_db,"name":"run"},{"db":run_g_db,"name":"run_g"}] for child in childs: view="%s_%s"%(child["name"],doc_type) els=child["db"].view("_design/basic/_view/%s"%(view),key=doc_id) if len(els)!=0: return "0" return "1"
[docs]def is_empty_run_rundb(run_id): "Check if run *run_id* is empty." return call("Error checking emptiness of run",is_empty_element,"run",run_id)
[docs]def is_empty_run_group_rundb(run_g_id): "Check if run group *run_g_id* is empty." return call("Error checking emptiness of run group",is_empty_element,"run_g",run_g_id)
##################### # CHECK ELEMENT ##################### def check_acq_id(acq_id): if acq_id not in acq_db: raise pyrame_exc("acquisition not found") return acq_id
[docs]def check_acq_id_rundb(acq_id): "Check existance of *acq_id*." return call("",check_acq_id,acq_id)
##################### def check_run_id(run_id): if run_id not in run_db: # RC depends on this exact message raise pyrame_exc("run not found") return run_id
[docs]def check_run_id_rundb(run_id): "Check existance of *run_id*" return call("",check_run_id,run_id)
##################### def check_run_g_id(run_g_id): if run_g_id not in run_g_db: raise pyrame_exc("run group not found") return run_g_id
[docs]def check_run_g_id_rundb(run_g_id): "Check existance of *run_g_id*" return call("",check_run_g_id,run_g_id)