// CondorJob.cxx #include "dial_condor/CondorJob.h" #include #include #include #include #include #include #include #include "dataset_id/EventIdList.h" #include "dataset_util/FileStatus.h" #include "dataset_util/FileName.h" #include "dataset_util/Text.h" #include "dataset_util/get_hostname.h" #include "dataset_util/getcwd.h" #include "dataset_util/SystemCommand.h" #include "dataset_base/DatasetCreator.h" #include "dataset_xml/XmlParser.h" using std::string; using std::vector; using std::cerr; using std::endl; using std::ostream; using std::istringstream; using dset::Dataset; using dset::DatasetCreator; using dial::Job; using dial::CondorJob; //********************************************************************** // Local definitions. //********************************************************************** string setauth(const Job& job) { string jobdir = job.job_directory(); if ( ! jobdir.size() ) { return ""; } string credfile = jobdir + "/private/cred.dat"; if ( ! FileStatus(credfile).is_readable() ) { return ""; } string com = "unset X509_USER_CERT; unset X509_USER_KEY"; com += "; X509_USER_PROXY=" + credfile; com += "; export X509_USER_PROXY"; com += "; "; return com; } //********************************************************************** // Static member functions. //********************************************************************** // Find condor. string CondorJob::find_condor() { string com = "EXE=$(command -v condor_q 2>/dev/null); "; com += "if [ -n \"$EXE\" ]; then dirname $\"EXE\"; fi"; SystemCommand scom(com); scom.runout(); string dir = scom.out().line(0); return dir; } //********************************************************************** // Member functions. //********************************************************************** // Constructor. CondorJob::CondorJob(JobId jid, string descfile, const Application& app, const Task& tsk, const Dataset& dst, const JobPreferences& prf, string jobdir, string runfile) : Job("CondorJob", jid, app, tsk, dst, prf, jobdir, runfile), m_descfile(descfile) { base_set_submit_host(get_hostname()); create_local_run_script_wrapper(); } //********************************************************************** // Conversion constructor. CondorJob::CondorJob(const Job& job, string descfile) : Job(job), m_descfile(descfile) { string hname = get_hostname(); if ( hname != submit_host() ) { cerr << "LsfJob: Change host name in copy" << endl; base_set_submit_host(hname); } } //********************************************************************** // Destructor. CondorJob::~CondorJob() { } //********************************************************************** int CondorJob::start() { string prefix = "CondorJob::start: "; static string condor_dir = Environment::current().value("DIAL_CONDOR_CONF"); if ( ! condor_dir.size() ) { cerr << "CondorJob::start: DIAL_CONDOR_CONF is not defined" << endl; return base_set_failed(100); } if ( ! is_initialized() ) return 1; // Check that the executable is present. { string runscript = job_directory() + "/dial_run_script"; FileStatus estat(runscript); if ( ! estat.is_executable() ) return 4; } Text basedesc(m_descfile); string universe = "vanilla"; for ( int iline=0; ilinejpos ) { cerr << prefix << "Error parsing condor log:" << endl; cerr << logfile << endl; base_set_failed(108); } string condorid = logline.substr(ipos+1, jpos-ipos-1); base_set_local_id(condorid); base_set_running(); } else { cerr << prefix << "submission failed" << endl; cerr << scom << endl; base_set_failed(109); } return 0; } //********************************************************************** int CondorJob::update() { if ( is_initialized() ) { int stat = start(); if ( stat != 0 ) { return 1; } return 0; } if ( ! is_running() ) return 2; // Open the log file. SystemCommand copylog("cat " + job_directory() + "/condor.log"); copylog.runerr(); const Text& log = copylog.out(); if ( ! log.size() ) { cerr << "CondorJob::update: Job log is missing or empty" << endl; cerr << copylog << endl; cerr << " Time since start: " << TimeInterval(start_time(), Time::now()) << endl; cerr << " " << job_directory() + "/condor.log" << endl; return 3; } int iline = log.size(); int jobstat = -1; string::size_type ipos; string::size_type jpos; while ( iline > 0 ) { string line = log.line(--iline); if ( line.size() < 3 ) continue; if ( isspace(line[0]) ) continue; istringstream sstat(line.substr(0,3)); int newstat = -1; sstat >> newstat; if ( newstat < 0 ) continue; // submitted. if ( newstat == 0 ) { if ( jobstat < 0 ) jobstat = newstat; break; // running. } else if ( newstat == 1 ) { if ( jobstat < 0 ) jobstat = newstat; if ( ! run_host().size() ) { ipos = line.find("Job executing on host:"); ipos = line.find("<", ipos); jpos = line.find(":", ipos); if ( jpos == string::npos ) jpos = line.find(":", ipos); if ( ipos != string::npos && jpos != string::npos ) { string runhost = line.substr( ipos+1, jpos-ipos-1 ); base_set_run_host(runhost); } } // terminated. } else if ( newstat == 5 ) { if ( jobstat < 0 ) jobstat = newstat; int jline = iline + 1; string nextline = log.line(jline); ipos = nextline.find("return value"); if ( ipos == string::npos || jpos == string::npos ) { cerr << "CondorJob::update: Unable to find return value" << endl; cerr << " " << job_directory() + "/condor.log" << endl; cerr << " " << nextline << endl; } else { ipos += 12; jpos = nextline.find(")", ipos); istringstream srstat(nextline.substr(ipos, jpos-ipos)); int rstat = -1; srstat >> rstat; if ( rstat >= 0 ) { base_set_return_status(rstat); } else { cerr << "CondorJob::update: Unable to parse return value" << endl; cerr << " " << job_directory() + "/condor.log" << endl; cerr << " " << nextline << endl; } } } } if ( jobstat > 1 ) { if ( return_status() == 0 ) { string resfile = job_directory() + "/result.xml"; if ( FileStatus(resfile).is_readable() ) { XmlParser parser; const XmlElement* pxres = parser.parse(resfile); if ( pxres == 0 ) { base_set_failed(143); return 0; } const Dataset* pres = DatasetCreator::create(*pxres); delete pxres; if ( pres == 0 ) { base_set_failed(144); return 0; } int stat = base_set_result(pres); if ( stat != 0 ) { delete pres; base_set_failed(145); return 0; } if ( pres == 0 ) { base_set_failed(146); } if ( dataset()!=0 && dataset()->is_event_dataset() ) { base_set_event_count(dataset()->event_count()); } else { base_set_event_count(0); } } base_set_done(); } else { base_set_failed(147); } } else if ( jobstat < 0 ) { base_set_failed(142); } else { base_set_update(); } return 0; } //********************************************************************** int CondorJob::kill(int err) { string prefix = "CondorJob::kill: "; if ( is_initialized() ) { base_set_killed(err); return 0; } if ( ! is_running() ) return 1; SystemCommand scom(setauth(*this) + "condor_rm " + local_id()); scom.runout(); string outcome = scom.out().line(0); if ( outcome.find("removal") != string::npos || outcome.find("not found") != string::npos ) { base_set_killed(err); // for condor_g } else if ( outcome.find("skipped") != string::npos ) { string lid = local_id(); string condorid_short = lid.erase( lid.find_last_of(".") ); SystemCommand scom2(setauth(*this) + "condor_rm " + condorid_short); scom2.runout(); string outcome = scom2.out().line(0); if ( outcome.find("removal") != string::npos || outcome.find("not found") != string::npos ) { base_set_killed(err); } else { cerr << prefix << "Condor-G removal failed" << endl; cerr << scom2 << endl; return 2; } } else { cerr << prefix << "Condor removal failed" << endl; cerr << scom << endl; return 1; } return 0; } //********************************************************************** ostream& CondorJob::ostr(ostream& lhs) const { Job::ostr(lhs); if ( is_initialized() ) return lhs; lhs << endl; lhs << "Run script: " << run_script() << endl; lhs << "Condor job ID: " << local_id(); if ( is_running() ) return lhs; lhs << endl; lhs << "Return status: " << return_status(); return lhs; }