Source code for common_rcs

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# 
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
# 
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame.  If not, see <http://www.gnu.org/licenses/>

import os,json,subprocess
import bindpyrame

rcsocket=None

current_acq=None

def sigint_handler(signal,frame):
    # Control + C is equivalent to stop_rc_script
    scriptenv["allow_run"].clear()
    scriptenv["stop_run"].set()

class rc_stop_exception(Exception):
    pass

class pyrame_exception(Exception):
    pass

[docs]def check_stop(): "Check if the stop flag has been set by the Run Control. If yes, raise a rc_stop_exception" if not scriptenv["allow_run"].is_set(): print("acknowledging stop signal") scriptenv["allow_run"].set() scriptenv["stop_run"].clear() raise rc_stop_exception("stopping")
[docs]def int_sleep(timeout): "Perform an interruptible sleep of *timeout* seconds" if scriptenv["stop_run"].wait(float(timeout)): print("awoken from int_sleep") check_stop()
#print("out of int_sleep")
[docs]def declare_param(name,desc,value): "Declare script parameter named *name* and described by *desc*. Initialized to *value*." #print("name:%s"%(name)) scriptenv["params"][name]={"desc":desc,"default":value,"value":value}
[docs]def rc_call(function,*args): "Performs a call to a pyrame *function* through RC. This function checks for stop signal before returning" global rcsocket if rcsocket==None: rcsocket=bindpyrame.open_socket("localhost",bindpyrame.get_port("RC_PORT")) if scriptenv["verbose"]: print("rc_script: calling %s"%(function)) str_args=map(str,args) retcode,res=bindpyrame.execcmd(rcsocket,function,*str_args) if retcode==0: print("rc_script: retcode=%d res=%s"%(retcode,res)) raise pyrame_exception(res) check_stop() return res
[docs]def rc_pycall(function,*args): "Performs a call to a pyrame *function* through RC. This function checks for stop signal before returning" global rcsocket if rcsocket==None: rcsocket=bindpyrame.open_socket("localhost",bindpyrame.get_port("RC_PORT")) if scriptenv["verbose"]: print("rc_script: calling %s"%(function)) str_args=map(str,args) retcode,res=bindpyrame.execcmd(rcsocket,function,*str_args) check_stop() return retcode,res
[docs]def rc_exec(command,*params): """Executes a command in a shell and checks for stop signal and turns bash returns 1 into exceptions and returns output (stdout and stderr) of command""" print("rc_exec: executing %s"%(command)) result=os.system(command) #result=subprocess.Popen(command,shell=True,*params) #o,e=result.communicate() if result!=0: raise pyrame_exception("error %d executing %s"%(result,command)) check_stop() return ""
[docs]def transition(dev_name,transition_name,transition_fallback,params=""): "Perform an arbitrary transition with name *transition_name* and in case of error, perform the transition *transition_fallback*. Pass *params* to transitions" return rc_call("transition_cmod",dev_name,"true",transition_name,transition_fallback,params)
[docs]def load_config_file(filename): "Load an XML configuration file *filename*" return rc_call("load_config_file_cmod",filename)
[docs]def save_config_file(filename): "Save the current configuration file to *filename*" return rc_call("save_config_cmod",filename)
[docs]def new_run_rcs(storage_id,run_name,mode="append"): "Create a new run named *run_name*. *mode* can be remove or append. 'remove' overwrites existing runs with the same name. Use the storage module with *storage_id*" run=json.loads(rc_call("new_run_storage",storage_id,run_name,mode)) #print("writing to /tmp/script_of_run.py") with open("/tmp/script_of_run.py","w") as f: f.write(scriptenv["module_code"]) f.write("\n") f.write("# parameters used for script execution:\n") for p in scriptenv["params"]: f.write("# %s: %s\n"%(p,scriptenv["params"][p]["value"])) command="/opt/pyrame/mv.py /tmp script_of_run %s %s script_of_run undef undef undef .py"%(run["run_mp"],run["run_path"]) res=os.system(command) if res!=0: raise pyrame_exception("error executing %s"%(command)) mountd_port=bindpyrame.get_port("mountd") retcode,res=bindpyrame.sendcmd("localhost",mountd_port,"mount_mountd",run["run_mp"]) if retcode==0: raise pyrame_exception("error: cant mount destination %s <- %s"%(run["run_mp"],res)) run_local_path=("%s/%s"%(res.rstrip("/"),run["run_path"])).replace("//","/") run["path"]=run_local_path return run
[docs]def new_acq_rcs(storage_id,acq_name,run,mode="append",convert_script="undef"): "Create a new acquisition named *acq_name* in *run*. *mode* can be remove or append. 'remove' overwrites any existing acq with the same name. Use the storage module with *storage_id*" acq=json.loads(rc_call("new_acq_storage",storage_id,acq_name,run["run_name"],mode)) mountd_port=bindpyrame.get_port("mountd") retcode,res=bindpyrame.sendcmd("localhost",mountd_port,"mount_mountd",acq["acq_mp"]) if retcode==0: raise pyrame_exception("error: cant mount destination %s <- %s"%(acq["acq_mp"],res)) acq_local_path=("%s/%s"%(res.rstrip("/"),acq["acq_path"])).replace("//","/") acq["path"]=acq_local_path #store the acquisition in a global variable global current_acq current_acq=acq return acq
def wait_finished(storage_id,doctype,name): # is_*_finished_rc immediately returns retcode=1 if RC isn't using RunDB res="0" while res=="0": print("wait_%s_finished: %s"%(doctype,name["%s_id"%(doctype)])) res=rc_call("is_%s_finished_storage"%(doctype),storage_id,name["%s_id"%(doctype)]) time.sleep(2)
[docs]def wait_acq_finished_rcs(storage_id,acq): "Wait until the acquisition *acq* (object returned by new_acq) has finished. Use the storage module with *storage_id*" wait_finished(storage_id,"acq",acq)
[docs]def wait_run_finished_rcs(storage_id,run): "Wait until the run *run* (object returned by new_run) has finished. Use the storage module with *storage_id*" wait_finished(storage_id,"run",run)
[docs]def set_param_run_rcs(storage_id,run,name,value): "Set the *value* of parameter *name* for *run* (object returned by new_run). Use the storage module with *storage_id*" return rc_call("set_param_run_storage",storage_id,run["run_id"],name,value)
[docs]def get_param_run_rcs(storage_id,run,name): "Get the value of parameter *name* for *run* (object returned by new_run). Use the storage module with *storage_id*" return rc_call("get_param_run_storage",storage_id,run["run_id"],name)
[docs]def set_param_acq_rcs(storage_id,acq,name,value): "Set the value of parameter *name* for acquisition *acq* (object returned by new_acq). Use the storage module with *storage_id*" return rc_call("set_param_acq_storage",storage_id,acq["acq_id"],name,value)
[docs]def get_param_acq_rcs(storage_id,acq,name): "Get the value of parameter *name* for acquisition *acq* (object returned by new_acq). Use the storage module with *storage_id*" return rc_call("get_param_acq_storage",storage_id,acq["acq_id"],name)
execfile("/opt/pyrame/rc_script.py")