Source code for cmd_storage

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# 
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
# 
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame.  If not, see <http://www.gnu.org/licenses/>

import sys,os,time,subprocess,signal,json,base64,datetime,threading,pools,bindpyrame
storage_pool=pools.pool("storage")

def init():
    pass

[docs]def init_storage(storage_id,use_rundb,brg_name,brg_mp,brg_path,brg_parent_id,elog_server,elog_name,stats_names,producers_count): """Initialize storage named *storage_id*. *use_rundb* (1/0) indicates if the RunDB should be populated. brg arguments refer to the base run group: *brg_mp* mount point, *brg_path* path relative the the mount point, *brg_parent_id* RunDB id of the brg parent. *elog_server* and *elog_name* describe the elog to be used, if any. *stats_names* is a comma-separated list of varmod variables to be stored on the log of the acquisitions. *producers_count* is an integer with the number of producers that will call the function mark_as_finished_storage. Once all of them will have called the function, storage will mark the aquisition as finished.""" if "undef" in [brg_name,brg_mp,brg_path,brg_parent_id]: return 0,"no base run arguments can be undef" # verify that brg_path is not absolute if brg_path.startswith("/"): return 0,"brg_path cannot start with /. is relative to its parent path" # verify that brg_path finishes with brg_name if not brg_path.endswith(brg_name): return 0,"brg_path (%s) must end with brg_name (%s)"%(brg_path,brg_name) retcode,res=submod_execcmd("mount@mountd",brg_mp) if retcode==0: return 0,"invalid mount point %s <- %s"%(brg_mp,res) try: storage=storage_pool.new(storage_id,{ "use_rundb":int(use_rundb), "brg_name":brg_name, "brg_mp":brg_mp, "brg_path":brg_path, "brg_parent_id":brg_parent_id, "elog_server":elog_server, "elog_name":elog_name, "stats_names":stats_names, "producers_count":int(producers_count)}) except Exception as e: return 0,str(e) return 1,"ok"
[docs]def config_storage(storage_id): "Configure *storage_id*. Init elog, if any" try: storage=storage_pool.get(storage_id) except Exception as e: return 0,str(e) if storage["elog_server"]!="undef": storage["elog_id"]="elog_%s"%(storage_id) retcode,res=submod_execcmd("init@elog",storage["elog_id"],storage["elog_server"],"8080",storage["elog_name"]) if retcode==0: return 0,"cant init elog <- %s"%(res) return 1,"ok"
[docs]def inval_storage(storage_id): "Invalidate *storage_id*. Deinit elog, if any" try: storage=storage_pool.get(storage_id) except Exception as e: return 0,str(e) if storage["elog_server"]!="undef": retcode,res=submod_execcmd("deinit@elog",storage["elog_id"],storage["elog_server"],"8080",storage["elog_name"]) if retcode==0: return 0,"cant deinit elog <- %s"%(res) return 1,"ok"
[docs]def deinit_storage(storage_id): "Deinitialize *storage_id*" try: storage_pool.remove(storage_id) except Exception as e: return 0,str(e) return 1,"ok"
[docs]def register_producer_storage(storage_id): """Register a new producer for the storage""" try: storage=storage_pool.get(storage_id) except Exception as e: return 0,str(e) storage["producers_count"]+=1 return 1,"ok"
###################################################################### # run functions ######################################################################
[docs]def new_run_storage(storage_id,run_name,mode="append"): """Create run with *run_name* on *storage_id*. *mode* can be append (do not remove anything) or remove (if something exists with the same name it is removed before starting).""" try: storage=storage_pool.get(storage_id) except Exception as e: return 0,str(e) name=run_name.strip().strip("/") if name=="": return 0,"run name cannot be empty" #build the run_id and run_path run_id="%s/%s/%s"%(storage["brg_parent_id"],storage["brg_name"],name) run_path="%s/%s"%(storage["brg_path"],name) #create dest dir and cleanup if remove mode retcode,res=submod_execcmd("mount@mountd",storage["brg_mp"]) if retcode==0: return 0,"unable to mount %s <- %s"%(storage["brg_mp"],res) run_mp_local=res.rstrip("/") if mode=="remove": res=os.system("rm -f %s/%s/*"%(run_mp_local,run_path)) if res!=0: return 0,"unable to cleanup run path" res=os.system("mkdir -p %s/%s"%(run_mp_local,run_path)) if res!=0: return 0,"unable to create run path" res=os.system("chmod 777 %s/%s"%(run_mp_local,run_path)) if res!=0: return 0,"unable to change permissions of path" if storage["use_rundb"]==1: # split name with slashes. # The last name is the run. All previous ones are run groups rg_r=run_id.split("/")[1:] run_groups=rg_r[:-1] run=rg_r[-1] parent_id=storage["brg_parent_id"] previous_path=storage["brg_path"][:-len(storage["brg_name"])].rstrip("/") for run_g in run_groups: run_g_path="%s/%s"%(previous_path,run_g) retcode,res=submod_execcmd("new_run_group@rundb",run_g,storage["brg_mp"],run_g_path,parent_id) if retcode==0: return 0,"error creating new run group <- %s"%(res) parent_id="%s/%s"%(parent_id,run_g) previous_path="%s/%s"%(previous_path,run_g) retcode,res=submod_execcmd("new_run@rundb",run,storage["brg_mp"],run_path,parent_id,mode) if retcode==0: return 0,"error creating new run <- %s"%(res) return 1,json.dumps({"run_id":run_id,"run_name":name,"run_mp":storage["brg_mp"],"run_path":run_path})
[docs]def set_param_run_storage(storage_id,run_name,param,value): "Set a parameter *param* with *value* for *run_name* on *storage_id*. If *use_rundb* (init_storage) is 0, this function does nothing." try: storage=storage_pool.get(storage_id) except Exception as e: return 0,str(e) run_id="%s/%s/%s"%(storage["brg_parent_id"],storage["brg_name"],run_name) if storage["use_rundb"]==1: retcode,res=submod_execcmd("check_run_id@rundb",run_id) if retcode==0: # if run_id not exists print warning and return ok print("warning: run_id %s not exists in RunDB <- %s"%(run_id,res)) return 1,"ok" retcode,res=submod_execcmd("set_param_run@rundb",run_id,param,value) if retcode==0: return 0,"error setting param in RunDB <- %s"%(res) return 1,"ok"
######################################################################
[docs]def new_acq_storage(storage_id,acq_name,run_name,mode="append",convert_script="undef"): "Create acquisition with *acq_name* in *run_name*. *mode* can be append (do not remove anything: reuse or append) or remove (if something exists with the same name it is removed before starting). At the end of the acquistion launch *convert_script*" try: storage=storage_pool.get(storage_id) except Exception as e: return 0,str(e) # arguments checks name=acq_name.strip().strip("/") if name=="": return 0,"acq name cannot be empty" #check run_name if run_name=="": return 0,"run name cannot be empty" # check that run exists in rundb, otherwise create it run_id="%s/%s/%s"%(storage["brg_parent_id"],storage["brg_name"],run_name) if storage["use_rundb"]==1: retcode,res=submod_execcmd("check_run_id@rundb",run_id) if retcode==0 and res.endswith("run not found"): retcode,res=new_run_storage(storage_id,run_name,mode) if retcode==0: return 0,res elif retcode==0: return 0,"error checking run existence <- %s"%(res) # build acq_id acq_id="%s/%s"%(run_id,name) if "/" in name: acq_name=name.rsplit("/",1)[1] acq_path="%s/%s/%s"%(storage["brg_path"],run_name,name.rsplit("/",1)[0]) else: acq_name=name acq_path="%s/%s"%(storage["brg_path"],run_name) #insert params of acq_id in pools storage["%s_convert_script"%(acq_id)]=convert_script #create dest dir and cleanup if remove mode retcode,res=submod_execcmd("mount@mountd",storage["brg_mp"]) if retcode==0: return 0,"unable to mount %s <- %s"%(storage["brg_mp"],res) acq_mp_local=res.rstrip("/") acq_local_path="%s/%s"%(acq_mp_local,acq_path) if mode=="remove": res=os.system("rm -f %s/%s*"%(acq_local_path,acq_name)) if res!=0: return 0,"unable to cleanup acq files" res=os.system("mkdir -p %s"%(acq_local_path)) if res!=0: return 0,"unable to create acq path" res=os.system("chmod 777 %s"%(acq_local_path)) if res!=0: return 0,"unable to change permissions of path" # create acq in RunDB if storage["use_rundb"]==1: # create acquisition in RunDB retcode,res=submod_execcmd("new_acq@rundb",name,storage["brg_mp"],acq_path,run_id,mode) if retcode==0: return 0,"error creating new acq <- %s"%(res) # backup config file retcode,res=submod_execcmd("gener_configb64@cmod") if retcode==0: return 0,res xml_file="%s/%s.xml"%(acq_local_path,acq_name) with open(xml_file,"w") as f: f.write(base64.b64decode(res)) # add xml config to rundb if possible if storage["use_rundb"]==1: retcode,res=submod_execcmd("set_xml_params_acq@rundb",acq_id,xml_file,"true") if retcode==0: return 0,"error adding configuration to RunDB <- %s"%(res) # create log start_time=str(datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")) log_file="%s/%s.log"%(acq_local_path,acq_name) with open(log_file,"w") as f: f.write("<log name=\"log\">\n") f.write(" <acq name=\"%s\">\n"%(name)) f.write(" <param name=\"start_time\">%s</param>\n"%(start_time)) #fill elog if storage["elog_server"]!="undef": with open("/tmp/elog_msg.txt","w") as f: f.write("Acquisition %s started\n"%(name)) f.write("Data available at %s\n"%(acq_local_path)) title="Acq %s started"%(name) retcode,res=submod_execcmd("add_msg_file@elog",storage["elog_id"],"Acq",title,"/tmp/elog_msg.txt") if retcode==0: print("error : can fill the elog : %s"%(res)) # wait that acq with same name finishes for _ in range(5): if "%s_nb_prod"%(acq_id) in storage: time.sleep(2) else: break if "%s_nb_prod"%(acq_id) in storage: return 0,"previous acq with equal acq_id is unfinished" #create ns for acq in varmod #retcode,res=submod_execcmd("newns@varmod","varmod",acq_id) #if retcode==0: # return 0,"error creating ns for acq <- %s"%(res) # set number of producers to wait storage["%s_nb_prod"%(acq_id)]=storage["producers_count"] return 1,json.dumps({"acq_id":acq_id,"acq_name":acq_name,"acq_mp":storage["brg_mp"],"acq_path":acq_path})
def mark_as_finished_storage(storage_id,acq_id,acq_name,acq_mp,acq_path): try: storage=storage_pool.get(storage_id) except Exception as e: return 0,str(e) storage["%s_nb_prod"%(acq_id)]-=1 #print("nb_producers:%s"%(storage["%s_nb_prod"%(acq_id)])) # if all producers have finished: if storage["%s_nb_prod"%(acq_id)]<=0: retcode,res=submod_execcmd("mount@mountd",acq_mp) if retcode==0: return 0,"unable to mount %s <- %s"%(acq_mp,res) acq_mp_local=res.rstrip("/") acq_local_path="%s/%s"%(acq_mp_local,acq_path) # finish log stop_time=str(datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")) log_file="%s/%s.log"%(acq_local_path,acq_name) stats_list=storage["stats_names"].split(",") with open(log_file,"a") as f: f.write(" <param name=\"stop_time\">%s</param>\n"%(stop_time)) for stat in stats_list: retcode,res=submod_execcmd("getvar@varmod","varmod","default",stat) if retcode==0: res="unavailable" f.write(" <param name=\"%s\">%s</param>\n"%(stat,res)) f.write(" </acq>\n") f.write("</log>\n") #fill elog if storage["elog_server"]!="undef": with open("/tmp/elog_msg.txt","w") as f: f.write("Acquisition %s stopped\n"%(acq_name)) f.write("Data available at %s\n"%(acq_local_path)) title="Acq %s stopped"%(acq_name) retcode,res=submod_execcmd("add_msg_file@elog",storage["elog_id"],"Acq",title,"/tmp/elog_msg.txt") if retcode==0: print("error : can fill the elog : %s"%(res)) #insert log in rundb if storage["use_rundb"]==1: retcode,res=submod_execcmd("set_xml_params_acq@rundb",acq_id,log_file,"false") if retcode==0: print("error adding log in rundb <- %s"%(res)) t=threading.Thread(target=integrate_acq,args=(acq_id,acq_local_path,acq_name,storage["use_rundb"],storage["%s_convert_script"%(acq_id)])) t.start() del storage["%s_nb_prod"%(acq_id)] return 1,"ok" def integrate_acq(acq_id,acq_local_path,acq_name,use_rundb,convert_script): #retcode,res=submod_execcmd("delns@varmod","varmod",acq_id) #if retcode==0: # print("post_treatment: error removing finished varmod ns <- %s"%(res)) if convert_script not in ["undef","none"]: cmd="%s %s > %s_convert_script.trace 2>&1"%(convert_script,acq_local_path,acq_name) retcode=os.system(cmd) if retcode!=0: print("error running convert_script: %s"%(cmd)) return #mark acq as finished in rundb if use_rundb==1: retcode,res=submod_execcmd("set_param_acq@rundb",acq_id,"finished","1") if retcode==0: print("error marking acq %s as finished in rundb <- %s"%(acq_id,res)) return 1,"ok" ######################################################################
[docs]def set_param_acq_storage(storage_id,acq_id,param,value): "Set a parameter *param* with *value* for *acq_id* on *storage_id*. If *use_rundb* (init_storage) is 0, this function does nothing." try: storage=storage_pool.get(storage_id) except Exception as e: return 0,str(e) if storage["use_rundb"]==1: retcode,res=submod_execcmd("set_param_acq@rundb",acq_id,param,value) if retcode==0: return 0,"error setting param in RunDB <- %s"%(res) return 1,"ok"
######################################################################
[docs]def is_acq_finished_storage(storage_id,acq_id): "Check if acquisition *acq_id* is finished at *storage_id*. The RunDB is used to know when the acquisition is finished. If use_rundb is 0, this function returns 1." try: storage=storage_pool.get(storage_id) except Exception as e: return 0,str(e) if storage["use_rundb"]==1: retcode,res=submod_execcmd("check_acq_id@rundb",acq_id) if retcode==0: return 0,"acq not exists in RunDB <- %s"%(res) retcode,res=submod_execcmd("get_param_acq@rundb",acq_id,"finished") return 1,str(retcode) else: # consider finished if not using RunDB return 1,"1"
[docs]def is_run_finished_storage(storage_id,run_id): "Check if all acquisitions in *run_id* are finished at *storage_id*. The RunDB is used to know when the acquisitions are finished. If use_rundb is 0, this function returns 1." try: storage=storage_pool.get(storage_id) except Exception as e: return 0,str(e) if storage["use_rundb"]==1: retcode,res=submod_execcmd("check_run_id@rundb",run_id) if retcode==0: return 0,"run not exists in RunDB <- %s"%(res) retcode,acq_list=submod_execcmd("get_acqs_in_run@rundb",run_id) if retcode==0: return 0,"error getting list of acqs in run <- %s"%(res) print("acqs:%s"%(acq_list)) acqs=acq_list.split(",") for acq in acqs: print("checking if acq %s is finished"%(acq)) retcode,res=is_acq_finished_storage(storage_id,acq) if retcode==0: return 0,"error verifying finished state of acq %s <- %s"%(acq,res) if res=="0": return 1,"0" # consider finished also if not using RunDB return 1,"1"