#!/usr/bin/env python2
# -*- coding: utf-8 -*-
#
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
#
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame. If not, see <http://www.gnu.org/licenses/>
import sys,os,time,subprocess,signal,json,base64,datetime,threading,pools,bindpyrame
storage_pool=pools.pool("storage")
def init():
pass
[docs]def init_storage(storage_id,use_rundb,brg_name,brg_mp,brg_path,brg_parent_id,elog_server,elog_name,stats_names,producers_count):
"""Initialize storage named *storage_id*.
*use_rundb* (1/0) indicates if the RunDB should be populated. brg arguments refer to the base run group: *brg_mp* mount point, *brg_path* path relative the the mount point, *brg_parent_id* RunDB id of the brg parent.
*elog_server* and *elog_name* describe the elog to be used, if any.
*stats_names* is a comma-separated list of varmod variables to be stored on the log of the acquisitions.
*producers_count* is an integer with the number of producers that will call the function mark_as_finished_storage. Once all of them will have called the function, storage will mark the aquisition as finished."""
if "undef" in [brg_name,brg_mp,brg_path,brg_parent_id]:
return 0,"no base run arguments can be undef"
# verify that brg_path is not absolute
if brg_path.startswith("/"):
return 0,"brg_path cannot start with /. is relative to its parent path"
# verify that brg_path finishes with brg_name
if not brg_path.endswith(brg_name):
return 0,"brg_path (%s) must end with brg_name (%s)"%(brg_path,brg_name)
retcode,res=submod_execcmd("mount@mountd",brg_mp)
if retcode==0:
return 0,"invalid mount point %s <- %s"%(brg_mp,res)
try:
storage=storage_pool.new(storage_id,{
"use_rundb":int(use_rundb),
"brg_name":brg_name,
"brg_mp":brg_mp,
"brg_path":brg_path,
"brg_parent_id":brg_parent_id,
"elog_server":elog_server,
"elog_name":elog_name,
"stats_names":stats_names,
"producers_count":int(producers_count)})
except Exception as e:
return 0,str(e)
return 1,"ok"
[docs]def config_storage(storage_id):
"Configure *storage_id*. Init elog, if any"
try:
storage=storage_pool.get(storage_id)
except Exception as e:
return 0,str(e)
if storage["elog_server"]!="undef":
storage["elog_id"]="elog_%s"%(storage_id)
retcode,res=submod_execcmd("init@elog",storage["elog_id"],storage["elog_server"],"8080",storage["elog_name"])
if retcode==0:
return 0,"cant init elog <- %s"%(res)
return 1,"ok"
[docs]def inval_storage(storage_id):
"Invalidate *storage_id*. Deinit elog, if any"
try:
storage=storage_pool.get(storage_id)
except Exception as e:
return 0,str(e)
if storage["elog_server"]!="undef":
retcode,res=submod_execcmd("deinit@elog",storage["elog_id"],storage["elog_server"],"8080",storage["elog_name"])
if retcode==0:
return 0,"cant deinit elog <- %s"%(res)
return 1,"ok"
[docs]def deinit_storage(storage_id):
"Deinitialize *storage_id*"
try:
storage_pool.remove(storage_id)
except Exception as e:
return 0,str(e)
return 1,"ok"
[docs]def register_producer_storage(storage_id):
"""Register a new producer for the storage"""
try:
storage=storage_pool.get(storage_id)
except Exception as e:
return 0,str(e)
storage["producers_count"]+=1
return 1,"ok"
######################################################################
# run functions
######################################################################
[docs]def new_run_storage(storage_id,run_name,mode="append"):
"""Create run with *run_name* on *storage_id*. *mode* can be append (do not remove anything) or remove (if something exists with the same name it is removed before starting)."""
try:
storage=storage_pool.get(storage_id)
except Exception as e:
return 0,str(e)
name=run_name.strip().strip("/")
if name=="":
return 0,"run name cannot be empty"
#build the run_id and run_path
run_id="%s/%s/%s"%(storage["brg_parent_id"],storage["brg_name"],name)
run_path="%s/%s"%(storage["brg_path"],name)
#create dest dir and cleanup if remove mode
retcode,res=submod_execcmd("mount@mountd",storage["brg_mp"])
if retcode==0:
return 0,"unable to mount %s <- %s"%(storage["brg_mp"],res)
run_mp_local=res.rstrip("/")
if mode=="remove":
res=os.system("rm -f %s/%s/*"%(run_mp_local,run_path))
if res!=0:
return 0,"unable to cleanup run path"
res=os.system("mkdir -p %s/%s"%(run_mp_local,run_path))
if res!=0:
return 0,"unable to create run path"
res=os.system("chmod 777 %s/%s"%(run_mp_local,run_path))
if res!=0:
return 0,"unable to change permissions of path"
if storage["use_rundb"]==1:
# split name with slashes.
# The last name is the run. All previous ones are run groups
rg_r=run_id.split("/")[1:]
run_groups=rg_r[:-1]
run=rg_r[-1]
parent_id=storage["brg_parent_id"]
previous_path=storage["brg_path"][:-len(storage["brg_name"])].rstrip("/")
for run_g in run_groups:
run_g_path="%s/%s"%(previous_path,run_g)
retcode,res=submod_execcmd("new_run_group@rundb",run_g,storage["brg_mp"],run_g_path,parent_id)
if retcode==0:
return 0,"error creating new run group <- %s"%(res)
parent_id="%s/%s"%(parent_id,run_g)
previous_path="%s/%s"%(previous_path,run_g)
retcode,res=submod_execcmd("new_run@rundb",run,storage["brg_mp"],run_path,parent_id,mode)
if retcode==0:
return 0,"error creating new run <- %s"%(res)
return 1,json.dumps({"run_id":run_id,"run_name":name,"run_mp":storage["brg_mp"],"run_path":run_path})
[docs]def set_param_run_storage(storage_id,run_name,param,value):
"Set a parameter *param* with *value* for *run_name* on *storage_id*. If *use_rundb* (init_storage) is 0, this function does nothing."
try:
storage=storage_pool.get(storage_id)
except Exception as e:
return 0,str(e)
run_id="%s/%s/%s"%(storage["brg_parent_id"],storage["brg_name"],run_name)
if storage["use_rundb"]==1:
retcode,res=submod_execcmd("check_run_id@rundb",run_id)
if retcode==0:
# if run_id not exists print warning and return ok
print("warning: run_id %s not exists in RunDB <- %s"%(run_id,res))
return 1,"ok"
retcode,res=submod_execcmd("set_param_run@rundb",run_id,param,value)
if retcode==0:
return 0,"error setting param in RunDB <- %s"%(res)
return 1,"ok"
######################################################################
[docs]def new_acq_storage(storage_id,acq_name,run_name,mode="append",convert_script="undef"):
"Create acquisition with *acq_name* in *run_name*. *mode* can be append (do not remove anything: reuse or append) or remove (if something exists with the same name it is removed before starting). At the end of the acquistion launch *convert_script*"
try:
storage=storage_pool.get(storage_id)
except Exception as e:
return 0,str(e)
# arguments checks
name=acq_name.strip().strip("/")
if name=="":
return 0,"acq name cannot be empty"
#check run_name
if run_name=="":
return 0,"run name cannot be empty"
# check that run exists in rundb, otherwise create it
run_id="%s/%s/%s"%(storage["brg_parent_id"],storage["brg_name"],run_name)
if storage["use_rundb"]==1:
retcode,res=submod_execcmd("check_run_id@rundb",run_id)
if retcode==0 and res.endswith("run not found"):
retcode,res=new_run_storage(storage_id,run_name,mode)
if retcode==0:
return 0,res
elif retcode==0:
return 0,"error checking run existence <- %s"%(res)
# build acq_id
acq_id="%s/%s"%(run_id,name)
if "/" in name:
acq_name=name.rsplit("/",1)[1]
acq_path="%s/%s/%s"%(storage["brg_path"],run_name,name.rsplit("/",1)[0])
else:
acq_name=name
acq_path="%s/%s"%(storage["brg_path"],run_name)
#insert params of acq_id in pools
storage["%s_convert_script"%(acq_id)]=convert_script
#create dest dir and cleanup if remove mode
retcode,res=submod_execcmd("mount@mountd",storage["brg_mp"])
if retcode==0:
return 0,"unable to mount %s <- %s"%(storage["brg_mp"],res)
acq_mp_local=res.rstrip("/")
acq_local_path="%s/%s"%(acq_mp_local,acq_path)
if mode=="remove":
res=os.system("rm -f %s/%s*"%(acq_local_path,acq_name))
if res!=0:
return 0,"unable to cleanup acq files"
res=os.system("mkdir -p %s"%(acq_local_path))
if res!=0:
return 0,"unable to create acq path"
res=os.system("chmod 777 %s"%(acq_local_path))
if res!=0:
return 0,"unable to change permissions of path"
# create acq in RunDB
if storage["use_rundb"]==1:
# create acquisition in RunDB
retcode,res=submod_execcmd("new_acq@rundb",name,storage["brg_mp"],acq_path,run_id,mode)
if retcode==0:
return 0,"error creating new acq <- %s"%(res)
# backup config file
retcode,res=submod_execcmd("gener_configb64@cmod")
if retcode==0:
return 0,res
xml_file="%s/%s.xml"%(acq_local_path,acq_name)
with open(xml_file,"w") as f:
f.write(base64.b64decode(res))
# add xml config to rundb if possible
if storage["use_rundb"]==1:
retcode,res=submod_execcmd("set_xml_params_acq@rundb",acq_id,xml_file,"true")
if retcode==0:
return 0,"error adding configuration to RunDB <- %s"%(res)
# create log
start_time=str(datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))
log_file="%s/%s.log"%(acq_local_path,acq_name)
with open(log_file,"w") as f:
f.write("<log name=\"log\">\n")
f.write(" <acq name=\"%s\">\n"%(name))
f.write(" <param name=\"start_time\">%s</param>\n"%(start_time))
#fill elog
if storage["elog_server"]!="undef":
with open("/tmp/elog_msg.txt","w") as f:
f.write("Acquisition %s started\n"%(name))
f.write("Data available at %s\n"%(acq_local_path))
title="Acq %s started"%(name)
retcode,res=submod_execcmd("add_msg_file@elog",storage["elog_id"],"Acq",title,"/tmp/elog_msg.txt")
if retcode==0:
print("error : can fill the elog : %s"%(res))
# wait that acq with same name finishes
for _ in range(5):
if "%s_nb_prod"%(acq_id) in storage:
time.sleep(2)
else:
break
if "%s_nb_prod"%(acq_id) in storage:
return 0,"previous acq with equal acq_id is unfinished"
#create ns for acq in varmod
#retcode,res=submod_execcmd("newns@varmod","varmod",acq_id)
#if retcode==0:
# return 0,"error creating ns for acq <- %s"%(res)
# set number of producers to wait
storage["%s_nb_prod"%(acq_id)]=storage["producers_count"]
return 1,json.dumps({"acq_id":acq_id,"acq_name":acq_name,"acq_mp":storage["brg_mp"],"acq_path":acq_path})
def mark_as_finished_storage(storage_id,acq_id,acq_name,acq_mp,acq_path):
try:
storage=storage_pool.get(storage_id)
except Exception as e:
return 0,str(e)
storage["%s_nb_prod"%(acq_id)]-=1
#print("nb_producers:%s"%(storage["%s_nb_prod"%(acq_id)]))
# if all producers have finished:
if storage["%s_nb_prod"%(acq_id)]<=0:
retcode,res=submod_execcmd("mount@mountd",acq_mp)
if retcode==0:
return 0,"unable to mount %s <- %s"%(acq_mp,res)
acq_mp_local=res.rstrip("/")
acq_local_path="%s/%s"%(acq_mp_local,acq_path)
# finish log
stop_time=str(datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))
log_file="%s/%s.log"%(acq_local_path,acq_name)
stats_list=storage["stats_names"].split(",")
with open(log_file,"a") as f:
f.write(" <param name=\"stop_time\">%s</param>\n"%(stop_time))
for stat in stats_list:
retcode,res=submod_execcmd("getvar@varmod","varmod","default",stat)
if retcode==0:
res="unavailable"
f.write(" <param name=\"%s\">%s</param>\n"%(stat,res))
f.write(" </acq>\n")
f.write("</log>\n")
#fill elog
if storage["elog_server"]!="undef":
with open("/tmp/elog_msg.txt","w") as f:
f.write("Acquisition %s stopped\n"%(acq_name))
f.write("Data available at %s\n"%(acq_local_path))
title="Acq %s stopped"%(acq_name)
retcode,res=submod_execcmd("add_msg_file@elog",storage["elog_id"],"Acq",title,"/tmp/elog_msg.txt")
if retcode==0:
print("error : can fill the elog : %s"%(res))
#insert log in rundb
if storage["use_rundb"]==1:
retcode,res=submod_execcmd("set_xml_params_acq@rundb",acq_id,log_file,"false")
if retcode==0:
print("error adding log in rundb <- %s"%(res))
t=threading.Thread(target=integrate_acq,args=(acq_id,acq_local_path,acq_name,storage["use_rundb"],storage["%s_convert_script"%(acq_id)]))
t.start()
del storage["%s_nb_prod"%(acq_id)]
return 1,"ok"
def integrate_acq(acq_id,acq_local_path,acq_name,use_rundb,convert_script):
#retcode,res=submod_execcmd("delns@varmod","varmod",acq_id)
#if retcode==0:
# print("post_treatment: error removing finished varmod ns <- %s"%(res))
if convert_script not in ["undef","none"]:
cmd="%s %s > %s_convert_script.trace 2>&1"%(convert_script,acq_local_path,acq_name)
retcode=os.system(cmd)
if retcode!=0:
print("error running convert_script: %s"%(cmd))
return
#mark acq as finished in rundb
if use_rundb==1:
retcode,res=submod_execcmd("set_param_acq@rundb",acq_id,"finished","1")
if retcode==0:
print("error marking acq %s as finished in rundb <- %s"%(acq_id,res))
return 1,"ok"
######################################################################
[docs]def set_param_acq_storage(storage_id,acq_id,param,value):
"Set a parameter *param* with *value* for *acq_id* on *storage_id*. If *use_rundb* (init_storage) is 0, this function does nothing."
try:
storage=storage_pool.get(storage_id)
except Exception as e:
return 0,str(e)
if storage["use_rundb"]==1:
retcode,res=submod_execcmd("set_param_acq@rundb",acq_id,param,value)
if retcode==0:
return 0,"error setting param in RunDB <- %s"%(res)
return 1,"ok"
######################################################################
[docs]def is_acq_finished_storage(storage_id,acq_id):
"Check if acquisition *acq_id* is finished at *storage_id*. The RunDB is used to know when the acquisition is finished. If use_rundb is 0, this function returns 1."
try:
storage=storage_pool.get(storage_id)
except Exception as e:
return 0,str(e)
if storage["use_rundb"]==1:
retcode,res=submod_execcmd("check_acq_id@rundb",acq_id)
if retcode==0:
return 0,"acq not exists in RunDB <- %s"%(res)
retcode,res=submod_execcmd("get_param_acq@rundb",acq_id,"finished")
return 1,str(retcode)
else:
# consider finished if not using RunDB
return 1,"1"
[docs]def is_run_finished_storage(storage_id,run_id):
"Check if all acquisitions in *run_id* are finished at *storage_id*. The RunDB is used to know when the acquisitions are finished. If use_rundb is 0, this function returns 1."
try:
storage=storage_pool.get(storage_id)
except Exception as e:
return 0,str(e)
if storage["use_rundb"]==1:
retcode,res=submod_execcmd("check_run_id@rundb",run_id)
if retcode==0:
return 0,"run not exists in RunDB <- %s"%(res)
retcode,acq_list=submod_execcmd("get_acqs_in_run@rundb",run_id)
if retcode==0:
return 0,"error getting list of acqs in run <- %s"%(res)
print("acqs:%s"%(acq_list))
acqs=acq_list.split(",")
for acq in acqs:
print("checking if acq %s is finished"%(acq))
retcode,res=is_acq_finished_storage(storage_id,acq)
if retcode==0:
return 0,"error verifying finished state of acq %s <- %s"%(acq,res)
if res=="0":
return 1,"0"
# consider finished also if not using RunDB
return 1,"1"