// ScriptedJob.cxx #include "dial_scripted/ScriptedJob.h" #include #include #include #include #include #include #include #include "dataset_util/ssystem.h" #include "dataset_id/EventIdList.h" #include "dataset_util/FileName.h" #include "dataset_util/FileStatus.h" #include "dataset_util/Text.h" #include "dataset_util/getcwd.h" #include "dataset_util/get_hostname.h" #include "dataset_base/DatasetCreator.h" #include "dataset_xml/XmlParser.h" using std::string; using std::vector; using std::cout; using std::cerr; using std::endl; using std::ostream; using std::istringstream; using dset::Dataset; using dset::DatasetCreator; using dial::Job; using dial::ScriptedJob; //********************************************************************** // Local functions. //********************************************************************** namespace { string response_file(string dir) { return dir + "/job_response.dat"; } // Return if a method is allowed for an action. bool is_allowed(string action, string method) { if ( method == "set_failed" ) return true; if ( action == "start" ) { if ( method == "set_running" ) return true; if ( method == "set_local_id" ) return true; if ( method == "set_run_host" ) return true; } if ( action == "update" ) { if ( method == "set_done" ) return true; if ( method == "set_local_id" ) return true; if ( method == "set_run_host" ) return true; if ( method == "set_return_status" ) return true; } if ( action == "kill" ) { if ( method == "set_killed" ) return true; if ( method == "set_local_id" ) return true; if ( method == "set_run_host" ) return true; } return false; } } // end unnamed namespace //********************************************************************** // Private member functions. //********************************************************************** // Process a script response. // // Known methods: // set_failed int // set_running // set_done // set_killed int // set_local_id string void ScriptedJob::execute_script_action(string action) { string prefix = "ScriptedJob:" + action + ": "; // Invoke script. string fullcom = "cd " + job_directory() + "; "; fullcom += script() + " " + action; int sstat = ssystem(fullcom); if ( sstat ) { cerr << prefix << "Script returned error " << sstat << endl; cerr << " " << fullcom << endl; base_set_failed(81); return; } // Make sure response file can be read. string file = response_file(job_directory()); if ( ! FileStatus(file).is_readable() ) { cerr << prefix << "Unable to read response file" << endl; cerr << " " << file << endl; base_set_failed(82); return; } // Read response into text object and loop over lines. Text res(file); int iline; int mstat = 0; bool failed = false; bool done = false; bool killed = false; int endstat = 0; for (iline=0; iline 1 ) sarg1 = words[1]; int iarg1 = 0; if ( sarg1.size() ) iarg1 = atoi(sarg1.c_str()); // If method is allowed for this action, invoke it. if ( ! is_allowed(action, method) ) { cerr << prefix << "Action " << action << " does not support method " << method << endl; cerr << " File: " << file << endl; cerr << " Line " << iline << ": " << line << endl; base_set_failed(83); return; } // Set job failed -- ignore any other methods. if ( method == "set_failed" ) { failed = true; endstat = iarg1; // Set job running. } else if ( method == "set_running" ) { mstat = base_set_running(); // Set job done--read result first. } else if ( method == "set_done" ) { string resfile = job_directory() + "/" + "result.xml"; if ( FileStatus(resfile).is_readable() ) { XmlParser parser; const XmlElement* pxres = parser.parse(resfile); if ( pxres == 0 ) { base_set_failed(85); break; } const Dataset* pres = DatasetCreator::create(*pxres); delete pxres; if ( pres == 0 ) { base_set_failed(86); break; } int stat = base_set_result(pres); if ( stat != 0 ) { delete pres; base_set_failed(87); break; } if ( dataset()!=0 && dataset()->is_event_dataset() ) { base_set_event_count(dataset()->event_count()); } else { base_set_event_count(0); } } done = true; // Set job killed } else if ( method == "set_killed" ) { killed = true; endstat = iarg1; // Set the local ID for the job. } else if ( method == "set_local_id" ) { base_set_local_id(sarg1); // Set the run host. } else if ( method == "set_run_host" ) { base_set_run_host(sarg1); // Set the return status. } else if ( method == "set_return_status" ) { base_set_return_status(iarg1); // unknown method. } else { cerr << prefix << "Unknown method: " << method << endl; cerr << " File: " << file << endl; cerr << " Line " << iline << ": " << line << endl; } // Exit if we have a method invocation failure. if ( mstat ) { cerr << prefix << "Base method returned error " << mstat; cerr << " File: " << file << endl; cerr << " Line " << iline << ": " << line << endl; base_set_failed(88); return; } } } // end loop over lines // Invoke final action. if ( failed ) { base_set_failed(endstat); } else if ( done ) { base_set_done(); } else if ( killed ) { base_set_killed(endstat); } // Remove the response file. unlink(file.c_str()); return; } //********************************************************************** // Member functions. //********************************************************************** // Constructor. ScriptedJob:: ScriptedJob(JobId jid, string scrname, const Application& app, const Task& tsk, const Dataset& dst, const JobPreferences& prf, string jobdir, string runfile) : Job("ScriptedJob", jid, app, tsk, dst, prf, jobdir, runfile), m_scrname(scrname) { base_set_submit_host(get_hostname()); // Check that script exists. FileStatus fstat(scrname); if ( ! fstat.is_executable() ) { cerr << "ScriptedJob::ctor: " << "Script is not executable:" << endl; cerr << " " << scrname << endl; base_set_failed(80); } execute_script_action("create"); // Return error if job moved to failed state. if ( is_failed() ) return; // Return error if job did not stay in intializing state. if ( ! is_initialized() ) { base_set_failed(90); } create_local_run_script_wrapper(); } //********************************************************************** // Conversion constructor. ScriptedJob::ScriptedJob(const Job& job, string scrname) :Job(job), m_scrname(scrname) { string hname = get_hostname(); if ( hname != submit_host() ) { cerr << "ScriptedJob::copy: " << "Changing submit host" << endl; base_set_submit_host(hname); } } //********************************************************************** // Destructor. ScriptedJob::~ScriptedJob() { } //********************************************************************** // Start a job. int ScriptedJob::start() { if ( ! is_initialized() ) return 1; // Define and check files. { FileStatus fstat(job_directory()); if ( ! fstat.is_directory() ) return base_set_failed(101); if ( ! fstat.is_readable() ) return base_set_failed(102); if ( ! fstat.is_writeable() ) return base_set_failed(103); } execute_script_action("start"); // Return error if job moved to failed state. if ( is_failed() ) return error(); // Return error if job did not move to running state. if ( ! is_running() ) { return base_set_failed(91); } return 0; } //********************************************************************** // Update a job. int ScriptedJob::update() { if ( is_initialized() ) { int stat = start(); if ( stat != 0 ) return 2; return 0; } if ( ! is_running() ) return 1; // Invoke script and process the response. execute_script_action("update"); // Return error if job moved to failed state. if ( is_failed() ) return error(); // Record update if job is still active. if ( is_active() ) { base_set_update(); } return 0; } //********************************************************************** // Kill. // Initial DIAL state must be initialized or running. // Successful if LSF reports that it terminated the job or if the job // was already finished. int ScriptedJob::kill(int err) { if ( is_initialized() ) { base_set_killed(err); return 0; } if ( ! is_running() ) { return 1; } execute_script_action("kill"); // Return error if job moved to failed state. if ( is_failed() ) return error(); // Return error if job did not move to killed state. if ( ! is_killed() ) { return base_set_failed(92); } return 0; } //**********************************************************************