// LsfJob.cxx #include "dial_lsf/LsfJob.h" #include #include #include #include #include #include #include #include #include "dataset_util/ssystem.h" #include "dataset_id/EventIdList.h" #include "dataset_util/FileName.h" #include "dataset_util/FileStatus.h" #include "dataset_util/Text.h" #include "dataset_util/getcwd.h" #include "dataset_util/get_hostname.h" #include "dataset_base/DatasetCreator.h" #include "dataset_xml/XmlParser.h" using std::string; using std::vector; using std::cout; using std::cerr; using std::endl; using std::ostream; using std::istringstream; using std::ostringstream; using dset::Dataset; using dset::DatasetCreator; using dial::Job; using dial::LsfJob; typedef std::map StatMap; //********************************************************************** // Local definitions. //********************************************************************** namespace { // Fetch status from LSF. // Returns blank if job is not found in bjobs. // Caches bjobs results and updates the cache if the ID is not present. // Each ID is removed from the cache after its status is retrieved. string bjobs_stat(string lsfid, string queue, string dir) { string jobstat = ""; // Mutex to protect the cache. static PThreadMutex mtx; mtx.lock(); // Status cache. static StatMap stats; StatMap::iterator istat = stats.find(lsfid); if ( istat == stats.end() ) { stats.erase(stats.begin(), stats.end()); string logfile = dir + "/bjobs.log"; string com = "bjobs -a -q " + queue + " >" + logfile; int sstat = ssystem(com); if ( sstat !=0 ) { cerr << "LsfJob::bjobs_stat: Error " << sstat << " retrieving jobs status" << endl; cerr << " Command: " << com << endl; } else { Text tlog(logfile); for ( int iline=1; ilinesecond; stats.erase(istat); } mtx.unlock(); return jobstat; } } // end unnamed namespace //********************************************************************** // Static member functions. //********************************************************************** // Find LSF. string LsfJob::find_lsf() { char tname[32] = "/tmp/dial_LsfJob_XXXXXX"; mktemp(tname); //string com = "pkgmgr locate lsf 2>/dev/null >"; string com = "EXE=$(command -v bjobs 2>/dev/null); "; com += "if [ -n \"$EXE\" ]; then dirname $\"EXE\"; fi >"; com += tname; ssystem(com); Text tdir(tname); string dir = tdir.line(0); unlink(tname); return dir; } //********************************************************************** // Member functions. //********************************************************************** // Constructor. LsfJob::LsfJob(JobId jid, string qname, const Application& app, const Task& tsk, const Dataset& dst, const JobPreferences& prf, string jobdir, string runfile) : Job("LsfJob", jid, app, tsk, dst, prf, jobdir, runfile), m_qname(qname) { base_set_submit_host(get_hostname()); create_local_run_script_wrapper(); } //********************************************************************** // Conversion constructor. LsfJob::LsfJob(const Job& job, string qname) : Job(job), m_qname(qname) { string hname = get_hostname(); } //********************************************************************** // Destructor. LsfJob::~LsfJob() { } //********************************************************************** int LsfJob::start() { if ( ! is_initialized() ) return 1; string runscript = job_directory() + "/dial_run_script_wrapper"; // Check that the executable is present. { FileStatus estat(runscript); if ( ! estat.is_executable() ) return 4; } // Define and check files. { FileStatus fstat(job_directory()); if ( ! fstat.is_directory() ) return base_set_failed(101); if ( ! fstat.is_readable() ) return base_set_failed(102); if ( ! fstat.is_writeable() ) return base_set_failed(103); } // If DIAL_LSF_JOB_WRAPPER is defined then use it to run the job. string wrapper = Environment::current().value("DIAL_LSF_JOB_WRAPPER"); if ( wrapper.size() ) { FileStatus wstat(wrapper); if ( ! wstat.is_executable() ) { cerr << "LsfJob: DIAL_LSF_JOB_WRAPPER " << endl; cerr << " " << wrapper << endl; cerr << " does not point to executable file." << endl; cerr << " Submission will be done without wrapper." << endl; } else { wrapper = FileName(wrapper).fullpath_name(); assert( wrapper.size() ); runscript = wrapper + " " + runscript; } } // Create LSF submit command. string bsublog = job_directory() + "/bsub.log"; string bsubcmd = "bsub -o stdout.log -e stderr.log "; bsubcmd += "-L /bin/sh "; bsubcmd += "-q " + m_qname + " "; bsubcmd += runscript; bsubcmd += " > " + bsublog; ssystem("cd " + job_directory() + "; " + bsubcmd); // Extract job id, and set to running state Text bsubruntxt(bsublog); string outcome = bsubruntxt.line(0); if ( outcome.find("is submitted") != string::npos ) { string lsfid = outcome.substr(5, outcome.find(">")-5); base_set_local_id(lsfid); assert( local_id() == lsfid ); base_set_running(); } else { cerr << "LsfJob::start: Submission failed" << endl; cerr << " Dir: " << job_directory() << endl; cerr << " Command: " << bsubcmd << endl; cerr << " Log: " << bsublog << endl; return base_set_failed(109); } return 0; } //********************************************************************** int LsfJob::update() { if ( is_initialized() ) { int stat = start(); if ( stat != 0 ) return 2; return 0; } if ( ! is_running() ) return 1; // Locations to record LSF results. string bhistlog = job_directory() + "/bhist.log"; string bhistcom = "bhist -l " + local_id() + " > " + bhistlog + " 2>&1"; bool have_bhistlog = false; // Run bjobs to get the job status. string status = bjobs_stat(local_id(), m_qname, job_directory()); // Otherwise we attempt to retrieve the status from bhist. if ( status == "" ) { // Use bhist to determine the job status. ssystem(bhistcom); have_bhistlog = true; Text bhisttxt(bhistlog); for ( int iline=0; iline 10 ) { cerr << "LsfJob:update: Unable to retrieve status" << " for job " << id() << " with LSF ID " << local_id() << " started " << dtime.seconds() << " seconds ago." << endl; base_set_failed(107); } return 0; } if ( (status == "DONE") || (status == "EXIT") ) { // Retrieve the bhist log. if ( ! have_bhistlog ) { ssystem(bhistcom); have_bhistlog = true; } // Find the run host from that log. Text bhisttxt(bhistlog); for ( int iline=0; iline"); string runhost = line.substr(i1+1, i2-i1-1); base_set_run_host(runhost); break; } } } if ( status == "DONE" ) { string resfile = job_directory() + "/" + "result.xml"; if ( FileStatus(resfile).is_readable() ) { XmlParser parser; const XmlElement* pxres = parser.parse(resfile); if ( pxres == 0 ) { base_set_failed(143); return 0; } const Dataset* pres = DatasetCreator::create(*pxres); delete pxres; if ( pres == 0 ) { base_set_failed(144); return 0; } int stat = base_set_result(pres); if ( stat != 0 ) { delete pres; base_set_failed(145); return 0; } if ( pres == 0 ) { base_set_failed(146); } if ( dataset()!=0 && dataset()->is_event_dataset() ) { base_set_event_count(dataset()->event_count()); } else { base_set_event_count(0); } } base_set_done(); } if ( status == "EXIT" ) { base_set_failed(108); assert( have_bhistlog ); Text outtxt(bhistlog); // Parse for the return status. base_set_return_status(999); for ( Text::size_type iline=0; iline> rstat; base_set_return_status(rstat); // If not found, then parse for signal. } else if ( return_status() == 999 ) { key = "Exited by signal "; ipos = line.find(key); if ( ipos != string::npos ) { string remline = line.substr(ipos+key.size()); istringstream sstr(remline); int signal; sstr >> signal; int rstat = 10000 + signal; base_set_return_status(rstat); } } } if ( return_status() ==0 || return_status() == 999 ) { cerr << "LsfJob::update: Error parsing on EXIT" << endl; cerr << " Command: " << bhistcom << endl; cerr << " ----- Return text:" << endl; cerr << outtxt << endl; cerr << " ----- End return text" << endl; } } base_set_update(); return 0; } //********************************************************************** // Kill. // Initial DIAL state must be initialized or running. // Successful if LSF reports that it terminated the job or if the job // was already finished. int LsfJob::kill(int err) { if ( is_initialized() ) { base_set_killed(err); return 0; } if ( ! is_running() ) { return 1; } string bkilllog = job_directory() + "/bkill.log"; ssystem("bkill " + local_id() + " > " + bkilllog + " 2>&1"); Text bkilltxt(bkilllog); string outcome = bkilltxt.line(0); bool terminated = outcome.find("terminated") != string::npos; bool already_done = outcome.find("Job has already finished") != string::npos; if ( terminated || already_done ) { base_set_killed(err); return 0; } return base_set_failed(149); } //**********************************************************************