// Job.cxx #include "dial_job/Job.h" #include #include #include #include #include #include #include #include #include "dataset_util/mkdir.h" #include "dataset_util/copy_file.h" #include "dataset_util/FileStatus.h" #include "dataset_util/FileDirectory.h" #include "dataset_util/Environment.h" #include "dataset_util/ssystem.h" #include "dataset_util/XmlElement.h" #include "dataset_util/DtdRegistry.h" #include "dataset_xml/XmlParser.h" #include "dataset_file/FileManagementSystem.h" #include "dataset_id/DatasetId.h" #include "dataset_base/DatasetCreator.h" #include "dataset_base/GenericDataset.h" #include "dataset_base/DatasetRepository.h" #include "dataset_credential/GssCredentialManager.h" #include "dial_app/ApplicationRepository.h" #include "dial_task/TaskRepository.h" #include "dial_job/ChildWatcher.h" #include "dial_job/JobList.h" // For testing... #include "dataset_base/DatasetCreator_t.h" using std::string; using std::cerr; using std::ostream; using std::ostringstream; using std::endl; using std::auto_ptr; using dset::Url; using dset::FileManagementSystem; using dset::Dataset; using dset::DatasetRepository; using dset::DatasetCreator; using dset::GenericDataset; using dset::GssCredential; using dset::GssCredentialManager; using dial::Application; using dial::ApplicationRepository; using dial::Task; using dial::TaskRepository; using dial::JobPreferences; using dial::JobIdList; using dial::Job; typedef Job::Status Status; typedef Job::Substate Substate; typedef Job::SubstateList SubstateList; typedef FileDirectory::NameList NameList; //********************************************************************** // Local definitions. //********************************************************************** namespace { // Register the DTD. DtdRegistry::Status ISTAT_DTD_Job = DtdRegistry::register_dtd("dial"); //********************************************************************** // Create an HTML link for a list of jobs. ostream& html_jobs_link(ostream& lhs, string baseurl, string title, const JobIdList& jids, string link) { if ( baseurl.size() == 0 ) { lhs << " " << jids.size() << " " << title; return lhs; } lhs << " " << jids.size() << " " << title << ""; return lhs; } } //********************************************************************** // Static member functions. //********************************************************************** // DTD const Text& Job::dtd() { static Text txt; if ( txt.size() == 0 ) { txt.append(""); txt.append(""); } return txt; } //********************************************************************** // Convert status to string. string Job::status_to_string(Status stat) { switch(stat) { case Job::INVALID: return "INVALID"; case Job::FAILED: return "FAILED"; case Job::INITIALIZED: return "INITIALIZED"; case Job::RUNNING: return "RUNNING"; case Job::DONE: return "DONE"; case Job::KILLED: return "KILLED"; } return ""; } //********************************************************************** // Convert string to status. Status Job::string_to_status(string str) { if ( str == "INVALID" ) return Job::INVALID; if ( str == "FAILED" ) return Job::FAILED; if ( str == "INITIALIZED" ) return Job::INITIALIZED; if ( str == "RUNNING" ) return Job::RUNNING; if ( str == "DONE" ) return Job::DONE; if ( str == "KILLED" ) return Job::KILLED; return Job::INVALID; } //********************************************************************** // Return a URL from a URL map. string Job::url(const UrlMap& urls, string name) { UrlMap::const_iterator iurl = urls.find(name); if ( iurl == urls.end() ) return ""; return iurl->second; } //********************************************************************** // Protected member functions. //********************************************************************** // Default constructor. Job::Job() : m_stat(INVALID), m_papp(0), m_ptsk(0), m_pdst(0), m_submit_host(""), m_run_host(""), m_jobdir(""), m_lid(""), m_processed_event_count(0), m_processed_result_event_count(0), m_pres(0), m_error(0), m_rstat(0) { } //********************************************************************** // Constructor. Job::Job(Type fulltype, JobId jid, const Application& app, const Task& tsk, const Dataset& dst, const JobPreferences& prf, string jobdir, string run_script) : m_fulltype(fulltype), m_id(jid), m_stat(INITIALIZED), m_aid(app.id()), m_papp(&app), m_tid(tsk.id()), m_ptsk(&tsk), m_did(dst.id()), m_pdst(&dst), m_prf(prf), m_create_time(Time::now()), m_submit_host(""), m_run_host(""), m_jobdir(jobdir), m_lid(""), m_processed_event_count(0), m_processed_result_event_count(0), m_pres(0), m_error(0), m_rstat(0), m_run_script(run_script) { // Validity checks. string prefix = "Job::ctor: "; if ( ! task()->is_valid() ) { cerr << prefix << "Invalid task." << endl; base_set_failed(1); return; } if ( ! application()->is_valid() ) { cerr << prefix << "Invalid application." << endl; base_set_failed(2); return; } if ( ! dataset()->is_valid() ) { cerr << prefix << "Invalid dataset." << endl; base_set_failed(3); return; } if ( ! preferences().is_valid() ) { cerr << prefix << "Invalid preferences." << endl; base_set_failed(4); return; } if ( ! mutex().is_valid() ) { cerr << prefix << "Invalid mutex." << endl; base_set_failed(5); return; } // If needed, create the run directory. if ( ! FileStatus(job_directory()).exists() ) { mkfulldir(jobdir); } if ( ! FileStatus(job_directory()).is_directory() ) { cerr << prefix << "Unable to find job directory" << endl; cerr << " " << job_directory() << endl; base_set_failed(6); return; } if ( ! FileStatus(job_directory()).is_writeable() ) { cerr << prefix << "Unable to write to job directory" << endl; cerr << " " << job_directory() << endl; base_set_failed(7); return; } // Write AJDL job description. XmlParser parser; // ..application string appfile = job_directory() + "/" + "application.xml"; const XmlElement* pxapp = application()->xml(); if ( pxapp == 0 ) { cerr << prefix << "Unable extract application XML" << endl; base_set_failed(11); return; } int astat = parser.write(appfile, *pxapp); delete pxapp; if ( astat != 0 ) { cerr << prefix << "Unable write application XML" << endl; base_set_failed(12); return; } // ..task string tskfile = job_directory() + "/" + "task.xml"; const XmlElement* pxtsk = task()->xml(); if ( pxtsk == 0 ) { cerr << prefix << "Unable extract task XML" << endl; base_set_failed(13); return; } int tstat = parser.write(tskfile, *pxtsk); delete pxtsk; if ( tstat != 0 ) { cerr << prefix << "Unable write task XML" << endl; base_set_failed(14); return; } // ..dataset string dstfile = job_directory() + "/" + "dataset.xml"; const XmlElement* pxdst = dataset()->xml(); if ( pxdst == 0 ) { cerr << prefix << "Unable extract dataset XML" << endl; base_set_failed(15); return; } int dstat = parser.write(dstfile, *pxdst); delete pxdst; if ( dstat != 0 ) { cerr << prefix << "Unable write dataset XML" << endl; base_set_failed(16); return; } // ..preferences string prffile = job_directory() + "/" + "preferences.xml"; const XmlElement* pxprf = preferences().xml(); if ( pxprf == 0 ) { cerr << prefix << "Unable extract preferences XML" << endl; base_set_failed(17); return; } int pstat = parser.write(prffile, *pxprf); delete pxprf; if ( pstat != 0 ) { cerr << prefix << "Unable write preferences XML" << endl; base_set_failed(18); return; } // Take credential from current thread. GssCredential* pcred = GssCredentialManager::credential(); if ( pcred != 0 ) { m_credname = pcred->name(); } // For now use credential name as owner. m_owner = GssCredentialManager::owner(); } //********************************************************************** // XML constructor. Job::Job(const XmlElement& ele) : m_fulltype(""), m_stat(INVALID), m_papp(0), m_ptsk(0), m_pdst(0), m_submit_host(""), m_run_host(""), m_jobdir(""), m_lid(""), m_processed_event_count(0), m_processed_result_event_count(0), m_pres(0), m_error(0) { if ( ele.name() != xml_name() ) return; string ssubstates = ele.attribute("substates"); m_substates = Text::split(ssubstates, ","); m_fulltype = ele.attribute("fulltype"); m_id = JobId(ele.attribute("id")); m_aid = JobId(ele.attribute("app_id")); m_tid = JobId(ele.attribute("tsk_id")); m_did = JobId(ele.attribute("dst_id")); m_rid = JobId(ele.attribute("res_id")); m_owner = ele.attribute("owner"); m_credname = ele.attribute("credname"); m_stat = string_to_status(ele.attribute("status")); // Check mutex. if ( ! mutex().is_valid() ) base_set_failed(4); // If invalid, we are done. if ( m_stat == Job::INVALID ) { return; } m_create_time = Time(ele.attribute_as_int("create_time")); m_start_time = Time(ele.attribute_as_int("start_time")); m_update_time = Time(ele.attribute_as_int("update_time")); m_stop_time = Time(ele.attribute_as_int("stop_time")); m_submit_host = ele.attribute("submit_host"); m_run_host = ele.attribute("run_host"); m_jobdir = ele.attribute("job_directory"); m_archive = Url(ele.attribute("archive")); m_lid = ele.attribute("local_id"); m_processed_event_count = ele.attribute_as_int("processed_event_count"); m_processed_result_event_count = ele.attribute_as_int("processed_result_event_count"); m_error = ele.attribute_as_int("error"); m_rstat = ele.attribute_as_int("rstat"); m_run_script = ele.attribute("run_script"); const XmlElement* pxprf = ele.single_child(JobPreferences::xml_name()); m_prf = JobPreferences(*pxprf); if ( ! m_prf.is_valid() ) { base_set_failed(101); return; } // Subjob lists XmlElement::ElementList jeles = ele.children(JobIdList::xml_name()); for ( XmlElement::ElementList::const_iterator iele = jeles.begin(); iele!=jeles.end(); ++iele ) { const XmlElement* pxjids = *iele; if ( pxjids == 0 ) { base_set_failed(104); return; } const XmlElement& xjids = *pxjids; JobIdList jids(xjids); string label = xjids.attribute("label"); if ( label == "subjobs" ) { base_subjobs() = jids; } else if ( label == "running_subjobs" ) { base_running_subjobs() = jids; } else if ( label == "done_subjobs" ) { base_done_subjobs() = jids; } else if ( label == "failed_subjobs" ) { base_failed_subjobs() = jids; } else if ( label == "killed_subjobs" ) { base_killed_subjobs() = jids; } else if ( label == "result_subjobs" ) { base_result_subjobs() = jids; } else if ( label.size() > 9 && label.substr(0,9) == "category:" ) { string catname = label.substr(9); m_catjobs[catname] = jids; } else { base_set_failed(105); return; } } } //********************************************************************** // Set processed event count. int Job::base_set_processed_event_count(EventCount count) { if ( ! is_running() ) return -1; if ( dataset() != 0 && dataset()->is_event_dataset() ) { if ( count > dataset()->event_count() ) return -2; } m_processed_event_count = count; return 0; } //********************************************************************** // Set processed result event count. int Job::base_set_processed_result_event_count(EventCount count) { if ( ! is_running() ) return -1; if ( dataset() != 0 && dataset()->is_event_dataset() ) { if ( count > dataset()->event_count() ) return -2; } m_processed_result_event_count = count; return 0; } //********************************************************************** // Set processed and processed result event counts. int Job::base_set_event_count(EventCount count) { if ( ! is_running() ) return -1; if ( dataset() != 0 && dataset()->is_event_dataset() ) { if ( count > dataset()->event_count() ) return -2; } m_processed_event_count = count; m_processed_result_event_count = count; return 0; } //********************************************************************** // Set the result. int Job::base_set_result(const Dataset* pres, bool manage) { if ( ! is_running() ) return 1; m_pres = pres; if ( m_pres != 0 ) { m_rid = pres->id(); } return 0; } //********************************************************************** // Set RUNNING. int Job::base_set_running() { if ( ! is_initialized() ) return -1; m_stat = RUNNING; m_start_time.set(); m_update_time = m_start_time; return 0; } //********************************************************************** // Set FAILED. int Job::base_set_failed(int err, bool unlock) { if ( ! is_failed() ) { m_error = err; m_stat = FAILED; m_update_time.set(); if ( ! m_stop_time.is_valid() ) { m_stop_time = m_update_time; } } if ( unlock ) { unlock_mutex(); } return error(); } //********************************************************************** // Set DONE. int Job::base_set_done() { if ( ! is_running() ) return -1; // Require a result. // Without this, subjobs get stuck in CompoundrJob. if ( ! has_result() ) { return base_set_failed(19); } m_stat = DONE; m_update_time.set(); m_stop_time = m_update_time; return 0; } //********************************************************************** // Set KILLED. int Job::base_set_killed(int err) { if ( is_inactive() ) return -1; m_error = err; m_stat = KILLED; m_update_time.set(); m_stop_time = m_update_time; return 0; } //********************************************************************** // Set update time. int Job::base_set_update() { m_update_time.set(); return 1; } //********************************************************************** // Set a substate. int Job::base_set_substate(Substate state) { if ( in_substate(state) ) return 1; if ( state.find(',') != string::npos ) return 2; m_substates.push_back(state); base_set_update(); return 0; } //********************************************************************** // Unset a substate. int Job::base_unset_substate(Substate state) { if ( ! in_substate(state) ) return 1; m_substates.erase( remove(m_substates.begin(), m_substates.end(), state), m_substates.end() ); return 0; } //********************************************************************** // Public constructors, destructor and assignment. //********************************************************************** // Copy constructor. Job::Job(const Job& rhs) : m_fulltype((rhs.lock_mutex(), rhs.m_fulltype)), m_id(rhs.m_id), m_stat(rhs.m_stat), m_substates(rhs.m_substates), m_aid(rhs.m_aid), m_papp(rhs.m_papp), m_tid(rhs.m_tid), m_ptsk(rhs.m_ptsk), m_did(rhs.m_did), m_pdst(rhs.m_pdst), m_prf(rhs.m_prf), m_owner(rhs.m_owner), m_credname(rhs.m_credname), m_create_time(rhs.m_create_time), m_start_time(rhs.m_start_time), m_update_time(rhs.m_update_time), m_stop_time(rhs.m_stop_time), m_submit_host(rhs.m_submit_host), m_run_host(rhs.m_run_host), m_jobdir(rhs.m_jobdir), m_archive(rhs.m_archive), m_lid(rhs.m_lid), m_subjobs(rhs.m_subjobs), m_running_subjobs(rhs.m_running_subjobs), m_done_subjobs(rhs.m_done_subjobs), m_failed_subjobs(rhs.m_failed_subjobs), m_killed_subjobs(rhs.m_killed_subjobs), m_result_subjobs(rhs.m_result_subjobs), m_catjobs(rhs.m_catjobs), m_processed_event_count(rhs.m_processed_event_count), m_processed_result_event_count(rhs.m_processed_result_event_count), m_rid(rhs.m_rid), m_pres(rhs.m_pres), m_error(rhs.m_error), m_rstat(rhs.m_rstat), m_run_script(rhs.m_run_script) { if ( ! mutex().is_valid() ) { std::cout << "TTT> Error initializing mutex: " << mutex().error() << endl; } rhs.unlock_mutex(); } //********************************************************************** // Assignment. Job& Job::operator=(const Job& rhs) { if ( &rhs == this ) return *this; rhs.lock_mutex(); lock_mutex(); m_fulltype = rhs.m_fulltype; m_id = rhs.m_id; m_stat = rhs.m_stat; m_substates = rhs.m_substates; m_aid = rhs.m_aid; m_papp = rhs.m_papp; m_tid = rhs.m_tid; m_ptsk = rhs.m_ptsk; m_did = rhs.m_did; m_pdst = rhs.m_pdst; m_prf = rhs.m_prf; m_owner = rhs.m_owner; m_credname = rhs.m_credname; m_create_time = rhs.m_create_time; m_start_time = rhs.m_start_time; m_update_time = rhs.m_update_time; m_stop_time = rhs.m_stop_time; m_run_host = rhs.m_run_host; m_submit_host = rhs.m_submit_host; m_jobdir = rhs.m_jobdir; m_archive = rhs.m_archive; m_lid = rhs.m_lid; m_subjobs = rhs.m_subjobs; m_failed_subjobs = rhs.m_failed_subjobs; m_running_subjobs = rhs.m_running_subjobs; m_done_subjobs = rhs.m_done_subjobs; m_killed_subjobs = rhs.m_killed_subjobs; m_result_subjobs = rhs.m_result_subjobs; m_catjobs = rhs.m_catjobs; m_processed_event_count = rhs.m_processed_event_count; m_processed_result_event_count = rhs.m_processed_result_event_count; m_rid = rhs.m_rid; m_pres = rhs.m_pres; m_error = rhs.m_error; m_rstat = rhs.m_rstat; m_run_script = rhs.m_run_script; rhs.unlock_mutex(); unlock_mutex(); return *this; } //********************************************************************** // Destructor. Job::~Job() { } //********************************************************************** // Output stream without HTML links. ostream& Job::ostr(ostream& lhs) const { static UrlMap empty; return ostr(lhs, empty); } //********************************************************************** // Output stream with HTML links. ostream& Job::ostr(ostream& lhs, const UrlMap& urls) const { string baseurl = Job::url(urls, "job"); string appurl = Job::url(urls, "applications"); string tskurl = Job::url(urls, "tasks"); string dsturl = Job::url(urls, "datasets"); lhs << full_type() << " " << id().to_string() << " is "; // Status. Job::Status stat = status(); switch ( stat ) { case Job::INVALID: lhs << "invalid"; return lhs; case Job::FAILED: lhs << "failed"; break; case Job::INITIALIZED: lhs << "initialized"; break; case Job::RUNNING: lhs << "running"; break; case Job::DONE: lhs << "done"; break; case Job::KILLED: lhs << "killed"; break; } if ( m_substates.size() ) { lhs << " (" << substates_as_string() << ")"; } if ( error() ) { lhs << " with error " << error(); } // Application. lhs << endl; lhs << " Application: "; if ( appurl.size() ) { string sid = application_id().to_string(); lhs << "" << sid << ""; } else { lhs << application_id(); } // Run script. if ( run_script().size() ) { if ( appurl.size() ) { string sid = application_id().to_string(); lhs << " " << run_script() << ""; } else { lhs << " " << run_script(); } } // Task. lhs << endl; lhs << " Task: "; if ( tskurl.size() ) { string sid = task_id().to_string(); lhs << "" << sid << ""; } else { lhs << task_id(); } // Input dataset. lhs << endl; lhs << " Dataset: "; if ( dsturl.size() ) { string sid = dataset_id().to_string(); lhs << "" << sid << ""; } else { lhs << dataset_id(); } bool has_events = dataset()!=0 && dataset()->is_event_dataset(); long event_count = 0; if ( has_events ) { event_count = dataset()->event_count(); lhs << " with " << event_count << " events"; } // Job preferences. lhs << endl << "Job preferences: "; if ( baseurl.size() ) { string sid = preferences().id().to_string(); lhs << "" << sid << ""; } else { lhs << preferences().id(); } // Owner. lhs << "\n Owner: " << owner(); // Owner. lhs << "\nCredential: " << credential_name(); // Host and directory. if ( submit_host().size() ) lhs << endl << "Submit host: " << submit_host(); if ( run_host().size() ) lhs << endl << "Run host: " << run_host(); if ( job_directory().size() ) lhs << endl << "Job directory: " << job_directory(); if ( archive().size() ) lhs << endl << "Archive: " << archive(); // Local ID. if ( local_id().size() ) { lhs << endl << "Local ID: " << local_id(); } // Creation time lhs << endl; lhs << " UTC create time: " << create_time(); // Start time if ( start_time().is_valid() ) { TimeInterval dtim(create_time(), start_time()); lhs << endl; lhs << " start time: " << start_time() << " (" << dtim << " elapsed)"; } // Stop time. if ( stop_time().is_valid() ) { TimeInterval dtim(create_time(), stop_time()); lhs << endl; lhs << " stop time: " << stop_time() << " (" << dtim << " elapsed)"; } else if ( update_time().is_valid() ) { TimeInterval dtim(create_time(), update_time()); lhs << endl; lhs << " update time: " << update_time() << " (" << dtim << " elapsed)"; } if ( is_inactive() ) { lhs << "\nReturn status: " << return_status(); } // Subjobs. JobIdList::size_type njobs = subjobs().size(); if ( njobs > 0 ) { if ( njobs == 1 ) { lhs << "\nThere is"; html_jobs_link(lhs, baseurl, "subjobs", subjobs(), ""); } else { lhs << "\nThere are"; html_jobs_link(lhs, baseurl, "subjobs", subjobs(), ""); } lhs << "\n "; html_jobs_link(lhs, baseurl, "running", running_subjobs(), "running"); lhs << "\n "; html_jobs_link(lhs, baseurl, "done", done_subjobs(), "done"); lhs << "\n "; html_jobs_link(lhs, baseurl, "failed", failed_subjobs(), "failed"); lhs << "\n "; html_jobs_link(lhs, baseurl, "killed", killed_subjobs(), "killed"); lhs << "\n "; html_jobs_link(lhs, baseurl, "included in result", result_subjobs(), "result"); } for ( CategoryMap::const_iterator icat=category_subjobs().begin(); icat!=category_subjobs().end(); ++icat ) { string catname = icat->first; JobIdList::size_type nsjobs = icat->second.size(); lhs << "\n "; html_jobs_link(lhs, baseurl, "in category " + catname, category_subjobs(catname), "category="+catname); } // Event counts. if ( has_events ) { long proc_count = processed_event_count(); lhs << endl; lhs << "Events processed: " << proc_count; lhs << " (" << (100*proc_count)/event_count << "%)"; long rproc_count = processed_result_event_count(); lhs << endl; lhs << " in result: " << rproc_count; lhs << " (" << (100*rproc_count)/event_count << "%)"; } lhs << endl; if ( has_result() ) { lhs << "Result dataset: "; if ( dsturl.size() ) { string sid = result_id().to_string(); lhs << "" << sid << ""; } else { lhs << result_id(); } if ( result() != 0 ) { const Dataset& res = *result(); if ( res.is_event_dataset() ) { lhs << " with " << res.event_count() << " events"; } else { lhs << " is not an event dataset "; } } else { lhs << " is not accessible"; } } else { lhs << "The job does not have a result"; } return lhs; } //********************************************************************** // Non-const virtual functions. //********************************************************************** // Start. int Job::start() { return -1; } //********************************************************************** // Update. int Job::update() { return -1; } //********************************************************************** // Kill. int Job::kill(int) { return -1; } //********************************************************************** // Const non-virtual methods. //********************************************************************** // Lock mutex. void Job::lock_mutex() const { int stat = mutex().lock(); if ( stat != 0 ) { std::cout << "TTT> Error locking mutex: " << stat << endl; } } //********************************************************************** // Unlock mutex. void Job::unlock_mutex() const { int stat = mutex().unlock(); if ( stat != 0 ) { std::cout << "TTT> Error unlocking mutex: " << stat << endl; } } //********************************************************************** // Status as string. string Job::status_as_string() const { return status_to_string(status()); } //********************************************************************** // Substates as string. string Job::substates_as_string() const { string ssubstates; for ( SubstateList::const_iterator isst=m_substates.begin(); isst!=m_substates.end(); ++isst ) { if ( isst != m_substates.begin() ) ssubstates += ","; ssubstates += *isst; } return ssubstates; } //********************************************************************** // Application. const Application* Job::application() const { if ( m_papp == 0 ) { static ApplicationRepository& ar = ApplicationRepository::default_instance(); m_papp = ar.extract(application_id()); } return m_papp; } //********************************************************************** // Task. const Task* Job::task() const { if ( m_ptsk == 0 ) { static TaskRepository& tr = TaskRepository::default_instance(); m_ptsk = tr.extract(task_id()); } return m_ptsk; } //********************************************************************** // Dataset. const Dataset* Job::dataset() const { if ( m_pdst == 0 ) { static DatasetRepository& dr = DatasetRepository::default_instance(); m_pdst = dr.extract(dataset_id()); } return m_pdst; } //********************************************************************** // Category subjobs. const JobIdList& Job::category_subjobs(Category catname) const { CategoryMap::const_iterator icat = m_catjobs.find(catname); if ( icat == m_catjobs.end() ) { static JobIdList empty; return empty; } return icat->second; } //********************************************************************** // Result. const Dataset* Job::result() const { if ( m_pres == 0 && result_id().is_valid() ) { static DatasetRepository& dr = DatasetRepository::default_instance(); m_pres = dr.extract(result_id()); } return m_pres; } //********************************************************************** // Check substate. bool Job::in_substate(Substate state) const { SubstateList::const_iterator isst = find(m_substates.begin(), m_substates.end(), state); return isst != m_substates.end(); } //********************************************************************** // Create the the job archive. int Job::create_archive() const { static string prefix = "Job::create_archive: "; Text tarchive(job_directory() + "/job_archive"); if ( tarchive.size() == 1 ) { Url url(tarchive.line(0)); if ( ! url.is_valid() ) { cerr << prefix << "Job reports an invalid archive:" << endl; cerr << " " << tarchive.line(0) << endl; } else { m_archive = url; return 0; } } else { cerr << prefix << "Unable to find file job_archive" << endl; } cerr << prefix << "Attempting direct creation of archive" << endl; // Fetch the FMS. FileManagementSystem& fms = FileManagementSystem::default_instance(); // Create job directory object. FileDirectory jobdir(job_directory()); if ( ! jobdir.is_valid() ) { cerr << prefix << "Unable to find job directory:" << endl; cerr << " " << job_directory() << endl; } // Create temporary directory. char ctmpdir[32] = "/tmp/dial_jobarchive_XXXXXX"; char* ret = mkdtemp(ctmpdir); if ( ret == 0 ) { cerr << prefix << "Unable to create temporary directory:" << endl; cerr << " " << ctmpdir << endl; return 2; } string tmpdir(ctmpdir); // Create name for archive file. string localtar = "dial_job_archive_" + id().to_string() + ".tar.gz"; string tarfile = job_directory() + "/" + localtar; string lname = "dial_job_archives/" + localtar; // Create list of files to archive. const NameList& allnames = jobdir.file_names(); string names; for ( NameList::const_iterator inam=allnames.begin(); inam!=allnames.end(); ++inam ) { string name = *inam; if ( name != "private" && name != localtar ) { names += " "; names += *inam; } } // Create archive. string com = "cd " + job_directory() + "; tar -zcf " + localtar + names; int astat = ssystem(com); if ( astat != 0 ) { cerr << prefix << "Unable to create archive"; cerr << " Error " << astat << endl; return 3; } // Put archive in FMS. int lifetime = 0; Url purl("file:" + tarfile); Url lurl = fms.put(purl); if ( ! lurl.is_valid() ) { cerr << prefix << "Unable to put archive file in FMS" << endl; cerr << fms.long_error_message() << endl; cerr << "Internal credential:" << endl; cerr << " " << GssCredentialManager::name() << endl; cerr << "Exported credential:" << endl; ssystem("grid-proxy-info 1>&2"); return 4; } m_archive = lurl; return 0; } //********************************************************************** // Return if the job has an archive. bool Job::has_archive() const { return archive().size() != 0; } //********************************************************************** // Open the job archive. string Job::open_archive(std::string dir, bool overwrite) const { string prefix = "Job::open_archive: "; // Check the target directory. FileStatus fstat(dir); if ( ! fstat.is_directory() ) { cerr << prefix << "Unable to find target directory:" << endl; cerr << " " << dir << endl; return ""; } if ( ! fstat.is_writeable() ) { cerr << prefix << "Unable to write to target directory:" << endl; cerr << " " << dir << endl; return ""; } // Fetch the file. FileManagementSystem& fms = FileManagementSystem::default_instance(); Url purl = fms.get(archive()); string tarfile = purl.fullpath(); if ( tarfile.size() == 0 ) { cerr << prefix << "Unable to fetch archive file:" << endl; cerr << fms.long_error_message() << endl; return ""; } // Create output directory. string outdir = dir + "/job_" + id().to_string(); // If output directory already exists... if ( FileStatus(outdir).exists() ) { // Error if overwrite is not enabled. if ( ! overwrite ) { cerr << prefix << "Destination directory exists:" << endl; cerr << " " << outdir << endl; cerr << prefix << "Please remove it or set overwrite flag." << endl; return ""; } ssystem("rm -rf " + outdir); if ( FileStatus(outdir).exists() ) { cerr << prefix << "Unable to delete destination directory." << endl; cerr << " " << outdir << endl; return ""; } } // Create output directory. int mstat = mkdir(outdir); if ( mstat != 0 ) { cerr << prefix << "Unable to create output directory:" << endl; cerr << " " << outdir << endl; return ""; } // Unpack archive; string com = "cd " + outdir + "; tar -zxf " + tarfile; int cstat = ssystem(com); if ( cstat != 0 ) { cerr << prefix << "Unable to unpack archive" << endl; return ""; } assert( FileStatus(outdir).is_directory() ); return outdir; } //********************************************************************** // Write to XML. const XmlElement* Job::xml() const { lock_mutex(); // If job is finished, then create archive. if ( is_inactive() && archive().size()==0 ) { create_archive(); } // Create top XML elment. auto_ptr pele(new XmlElement(xml_name())); // Add attributes. pele->add_attribute("xml_version", "1.20"); pele->add_attribute("fulltype", m_fulltype); pele->add_attribute("id", m_id.to_string()); pele->add_attribute("app_id", application_id().to_string()); pele->add_attribute("tsk_id", task_id().to_string()); pele->add_attribute("dst_id", dataset_id().to_string()); pele->add_attribute("res_id", result_id().to_string()); pele->add_attribute("owner", owner()); pele->add_attribute("credname", credential_name()); pele->add_attribute("status", status_as_string()); pele->add_attribute("substates", substates_as_string()); pele->add_attribute("submit_host", m_submit_host); pele->add_attribute("run_host", m_run_host); pele->add_attribute("job_directory", m_jobdir); pele->add_attribute("archive", m_archive.to_string()); pele->add_attribute("local_id", m_lid); pele->add_attribute_as_int("create_time", m_create_time.unix_time()); pele->add_attribute_as_int("start_time", m_start_time.unix_time()); pele->add_attribute_as_int("update_time", m_update_time.unix_time()); pele->add_attribute_as_int("stop_time", m_stop_time.unix_time()); pele->add_attribute_as_int( "processed_event_count", m_processed_event_count ); pele->add_attribute_as_int( "processed_result_event_count", m_processed_result_event_count ); pele->add_attribute_as_int("error", m_error); pele->add_attribute_as_int("return_status", m_rstat); pele->add_attribute("run_script", m_run_script); // Add preferences. const XmlElement* pxprf = m_prf.xml(); if ( pxprf == 0 ) { if ( ! is_valid() ) { static JobPreferences empty; pxprf = empty.xml(); } if ( pxprf == 0 ) { unlock_mutex(); return 0; } } pele->add_child(pxprf); // Add lists of subjobs. const XmlElement* pxsjob = subjobs().xml("subjobs"); pele->add_child(pxsjob); pxsjob = running_subjobs().xml("running_subjobs"); pele->add_child(pxsjob); pxsjob = done_subjobs().xml("done_subjobs"); pele->add_child(pxsjob); pxsjob = failed_subjobs().xml("failed_subjobs"); pele->add_child(pxsjob); pxsjob = killed_subjobs().xml("killed_subjobs"); pele->add_child(pxsjob); pxsjob = result_subjobs().xml("result_subjobs"); pele->add_child(pxsjob); for ( CategoryMap::const_iterator icat=category_subjobs().begin(); icat!=category_subjobs().end(); ++icat ) { string catname = icat->first; string label = "category:" + catname; const JobIdList& jids= icat->second; pele->add_child(jids.xml(label)); } // Return the XML. unlock_mutex(); return pele.release(); } //********************************************************************** // Display. void Job::display() const { std::cout << *this << std::endl; } //********************************************************************** // Web page. Text Job::web_page(const UrlMap& urls, string entry) const { string baseurl = Job::url(urls, "job"); Text wp; bool add_footer = true; wp.append(""); wp.append("
");
  string topentry = entry;
  string subentry;
  string::size_type ipos = entry.find("?");
  if ( ipos != string::npos ) {
    topentry = entry.substr(0, ipos);
    subentry = entry.substr(ipos+1);
  }
  if ( entry == "" ) {
    ostringstream ssout;
    ostr(ssout, urls);
    wp.append(ssout.str());
  } else if ( entry == "xml" ) {
    const XmlElement* pxjob = xml();
    if ( pxjob != 0 ) {
      wp = Text();
      add_footer = false;
      wp.append(pxjob->to_xml_text());
    } else {
      wp.append("Unable to extract XML for job " + id().to_string());
    }
    delete pxjob;
  } else if ( topentry == "application" ) {
    const Application* papp = application();
    if ( papp == 0 ) {
      wp.append("Job does not have an application");
    } else {
      return papp->web_page(baseurl, subentry);
    }
  } else if ( topentry == "task" ) {
    const Task* ptsk = task();
    if ( ptsk == 0 ) {
      wp.append("Job does not have a task");
    } else {
      return ptsk->web_page(baseurl, subentry);
    }
  } else if ( topentry == "preferences" ) {
    const JobPreferences& prf = preferences();
    return prf.web_page(baseurl, subentry);
  } else {
    wp.append("Job::web_page: Invalid entry: " + entry);
  }
  if ( add_footer ) {
    wp.append("
"); wp.append(""); } return wp; } //********************************************************************** // Create a local run script wrapper. int Job::create_local_run_script_wrapper() { string prefix = "Job::create_local_run_script_wrapper: "; string pmenv = Environment::current().value("PKGMGR_ENV"); if ( ! pmenv.size() ) { cerr << prefix << "PKGMGR_ENV is not defined" << endl; } // Fetch DIAL package and configuration. string dialpkg = Environment::current().value("DIAL_PKGNAME"); string dialcfg = Environment::current().value("DIAL_CFGNAME"); // Create wrapper script. Text twrap; twrap.append("#!/bin/sh"); twrap.append(""); twrap.append( "# Created by dial::Job::create_local_run_script_wrapper"); twrap.append(""); twrap.append("echo"); twrap.append("echo Setting up pkgmgr"); twrap.append(". " + pmenv); twrap.append(""); twrap.append("echo"); twrap.append("echo Setting home directory."); twrap.append("HOME=`pwd`"); twrap.append("export HOME"); twrap.append("echo HOME = $HOME"); twrap.append(""); twrap.append("echo"); twrap.append("echo Setting proxy location."); twrap.append("X509_USER_PROXY=$HOME/private/cred.dat"); twrap.append("export X509_USER_PROXY"); twrap.append("echo X509_USER_PROXY = $X509_USER_PROXY"); twrap.append("ls -ls $X509_USER_PROXY"); twrap.append("unset X509_USER_CERT"); twrap.append("unset X509_USER_KEY"); twrap.append(""); twrap.append("echo"); twrap.append("COM=./dial_run_script"); twrap.append("echo $COM"); twrap.append("$COM"); twrap.append("COM_RETURN_STAT=$?"); twrap.append("echo Command returned $COM_RETURN_STAT"); twrap.append(""); twrap.append("echo"); twrap.append("echo Creating archive"); twrap.append("TAROBJS=`ls -a | grep -v ^.$ | grep -v ^..$ | grep -v ^private$ | grep -v ^.dial$`"); twrap.append("TARFILE=dial_job_archive_`cat jid`.tar"); twrap.append("tar -cf $TARFILE $TAROBJS"); twrap.append("gzip $TARFILE"); twrap.append(""); twrap.append("echo"); twrap.append("echo Put archive in FMS"); twrap.append("DIAL_PKGNAME=" + dialpkg); twrap.append("DIAL_CFGNAME=" + dialcfg); twrap.append("DIALDIR=`pkgmgr locate $DIAL_PKGNAME`"); twrap.append(". $DIALDIR/bin/dialsetup.sh $DIAL_PKGNAME $DIAL_CFGNAME"); twrap.append("fms put file:`pwd`/$TARFILE.gz > job_archive"); twrap.append(""); twrap.append("echo"); twrap.append("echo Exit with error from run script"); twrap.append("exit $COM_RETURN_STAT"); string fwrap = job_directory() + "/dial_run_script_wrapper"; twrap.write(fwrap); ssystem("chmod +x " + fwrap); // Copy authentication files to job directory. // Getr rid of this when GSI is working well. ssystem("cp -r $HOME/.dial " + job_directory()); // Write the credential. string cdir = job_directory() + "/private"; string cfile = cdir + "/cred.dat"; mkdir(cdir); GssCredential* pcred = GssCredentialManager::credential(); if ( pcred != 0 ) { pcred->export_to_file(); string oldcredfile = pcred->file(); int cstat = copy_file(oldcredfile, cfile); if ( cstat != 0 ) { cerr << prefix << "Unable to copy credential" << endl; cerr << " " + oldcredfile << endl; cerr << " " + cfile << endl; } pcred->delete_file(); } else { cerr << prefix << "Unable to find GSI credential" << endl; } return 0; } //********************************************************************** // Free functions. //********************************************************************** // Output stream. ostream& operator<<(ostream& lhs, const Job& rhs) { return rhs.ostr(lhs); } //**********************************************************************