// JobRepository.cxx #include "dial_job/JobRepository.h" #include #include #include #include #include "dataset_util/getcwd.h" #include "dataset_util/XmlElement.h" #include "dataset_xml/XmlParser.h" #include "dataset_catalog/SqlGenericRepository.h" #include "dataset_catalog/ConnectionResolver.h" #include "dial_job/JobList.h" using std::string; using std::cout; using std::cerr; using std::endl; using std::ostream; using std::ostringstream; using std::istringstream; using std::auto_ptr; using dset::SqlTable; using dset::GenericRepository; using dset::SqlGenericRepository; using dset::ConnectionResolver; using dset::CatalogError; using dial::JobId; using dial::JobIdList; using dial::Job; using dial::JobList; using dial::JobRepository; typedef Job::UrlMap UrlMap; typedef JobRepository::size_type size_type; typedef GenericRepository::IdList SidList; typedef JobRepository::IdList IdList; //********************************************************************** // Local definitions. //********************************************************************** namespace { //********************************************************************** // Default repository. // First pass, use set_default_instance or define an invalid repository // if this fails. JobRepository* defrep(JobRepository* pnewrep =0) { static bool first = true; static JobRepository* prep = 0; if ( first ) { first = false; JobRepository::set_default_instance(""); if ( prep == 0 ) { prep = new JobRepository; } } if ( pnewrep != 0 ) { delete prep; prep = pnewrep; } return prep; } //********************************************************************** } // end unnamed namespace //********************************************************************** // Static methods. //********************************************************************** // Return the default instance. JobRepository& JobRepository::default_instance() { return *defrep(); } //********************************************************************** // Set the default instance. int JobRepository::set_default_instance(string name) { JobRepository *prep = new JobRepository(name); if ( prep->is_valid() ) { defrep(prep); } else { delete prep; return 1; } return 0; } //********************************************************************** // Create default instance. int JobRepository::create_default_instance() { // Define connection resolver. ConnectionResolver::set_configuration_file("resolver.dat"); // Error if JobRepository is already present. ConnectionResolver res; if ( res.resolve("JobRepository", "").size() ) return 2; // Append to resolver. Text txt("resolver.dat"); txt.append("JobRepository"); txt.append("SQLRESULT:" + getcwd() + "/jr.dat"); txt.write(); // Construct file description of repository. system("rm -f jr.dat"); Text tdsc("jr.dat"); tdsc.append("SQLRESULT"); tdsc.append("idhi,idlo,sxml"); tdsc.append("0,0,JobRepository"); tdsc.write(); return 0; } //********************************************************************** // Private member functions. //********************************************************************** // Copy constructor. JobRepository::JobRepository(const JobRepository&) { assert(false); } //********************************************************************** // Assignment. JobRepository& JobRepository::operator=(const JobRepository&) { assert(false); return *this; } //********************************************************************** // Non-static methods. //********************************************************************** // Default Constructor. JobRepository::JobRepository() : m_prep(0) { } //********************************************************************** // Constructor from a generic repository. JobRepository::JobRepository(GenericRepository *prep) : m_prep(prep) { } //********************************************************************** // Constructor from connection string. JobRepository::JobRepository(string conn) : m_prep(GenericRepository::create("JobRepository", conn)), m_manage(true) { } //********************************************************************** // Destructor. // Manage the GenericRepository only if m_manage=true JobRepository::~JobRepository() { if ( m_manage == true ) delete m_prep; } //********************************************************************** // Validity. bool JobRepository::is_valid() const { m_error = CatalogError::no_error(); if ( m_prep == 0 || ! m_prep->is_valid() ) { m_error = CatalogError::generic_access_error(); return false; } return true; } //********************************************************************** // Error code. int JobRepository::error() const { return m_error; } //********************************************************************** // Error message. string JobRepository::error_message() const { CatalogError err("job"); return err.message(m_error); } //********************************************************************** // Size. size_type JobRepository::size() const { if ( ! is_valid() ) return false; return m_prep->size(); } //********************************************************************** // Size since. size_type JobRepository::size_since(time_t time) const { if ( ! is_valid() ) return false; return m_prep->size_since(time); } //********************************************************************** // Return if the transient store has a job. // Otherwise check in persistent store. bool JobRepository::has(JobId id) const { if ( ! is_valid() ) return false; // Exit if input ID is invalid. if ( ! id.is_valid() ) { m_error = CatalogError::invalid_id_error(); return false; } if ( ! id.is_global() ) { m_error = CatalogError::local_id_error(); return false; } // Transient store. RepMap::const_iterator ids = m_dss.find(id); if ( ids != m_dss.end() ) return true; // Persistent store. return m_prep->has(id.to_string()); } //********************************************************************** // Return the list of ID's. IdList JobRepository::get_ids(size_t maxent) const { SidList sids = m_prep->get_ids(maxent); size_t count = sids.size(); if ( count > maxent ) { count = maxent; } IdList ids(count); for ( SidList::size_type iid=0; iidget_ids_since(time, maxent); size_t count = sids.size(); if ( count > maxent ) { count = maxent; } IdList ids(count); for ( SidList::size_type iid=0; iidfirst != id ) return 0; const Job* pds = ids->second; // If null and readnull is set, then delete the entry and read. if ( pds == 0 ) { m_error = CatalogError::null_object_error(); if ( readnull ) { m_dss.erase(ids); ManageMap::iterator img = m_mgs.find(id); if ( img == m_mgs.end() ) return 0; m_mgs.erase(img); } else { return 0; } } else { return pds; } } const Job* pobj = copy(id); if ( pobj != 0 ) { m_dss[id] = pobj; m_mgs[id] = true; } return pobj; } //********************************************************************** // Return a copy of a job. Job* JobRepository::copy(JobId id) const { if ( ! is_valid() ) return 0; // Exit if input ID is invalid. if ( ! id.is_valid() ) { m_error = CatalogError::invalid_id_error(); return 0; } if ( ! id.is_global() ) { m_error = CatalogError::local_id_error(); return 0; } // Fetch from persistent store. string sxml = m_prep->get(id.to_string()); if ( sxml == "" ) { m_error = CatalogError::no_such_object_error(); return 0; } Text::xml_to_text(sxml); XmlParser par; const XmlElement *pxml = par.parse(sxml); if ( pxml == 0 || ! pxml->is_valid() ) { m_error = CatalogError::object_parse_error(); delete pxml; return 0; } Job *pobj = new Job(*pxml); delete pxml; if ( pobj == 0 || ! pobj->is_valid() ) { m_error = CatalogError::object_create_error(); return 0; } return pobj; } //********************************************************************** // Return the time for a job. time_t JobRepository::time(JobId id) const { if ( ! is_valid() ) return 0; // Exit if input ID is invalid. if ( ! id.is_valid() ) { m_error = CatalogError::invalid_id_error(); return 0; } if ( ! id.is_global() ) { m_error = CatalogError::local_id_error(); return 0; } // Fetch from persistent store. return m_prep->time(id.to_string()); } //********************************************************************** // Put a job in the repository. JobId JobRepository::insert(const Job* pds, bool manage) { if ( ! is_valid() ) return JobId(); // Check input job. if ( pds == 0 ) { m_error = CatalogError::null_object_error(); return JobId(); } if ( ! pds->is_valid() ) { m_error = CatalogError::invalid_object_error(); return JobId(); } if ( ! pds->id().is_global() ) { m_error = CatalogError::local_id_error(); return JobId(); } // Make sure the ID is not already used. JobId id = pds->id(); if ( has(id) ) { m_error = CatalogError::id_assigned_error(); return JobId(); } // Write the job to persistent store. auto_ptr pele(pds->xml()); int stat = m_prep->insert(id.to_string(), pele->to_xml_text("NOCR")); if ( stat ) { m_error = CatalogError::generic_write_error(); return JobId(); } m_dss[id] = pds; m_mgs[id] = manage; return id; } //********************************************************************** // Remove a job from repository. // Returns 0 on success. int JobRepository::remove(JobId id) { if ( ! is_valid() ) return 1; if ( ! id.is_valid() ) { m_error = CatalogError::invalid_id_error(); return m_error; } if ( ! id.is_global() ) { m_error = CatalogError::local_id_error(); return m_error; } int stat = m_prep->remove(id.to_string()); if ( stat != 0 ) m_error = CatalogError::generic_write_error(); m_dss.erase(id); m_mgs.erase(id); return stat; } //********************************************************************** // Verify all entries. bool JobRepository::verify() const { // Exit if repository is invalid if ( ! is_valid() ) return false; XmlParser par; GenericRepository::IdList idl = m_prep->get_ids(); for( GenericRepository::IdList::const_iterator it = idl.begin(); it != idl.end(); ++it) { string sxml = m_prep->get(*it); Text::xml_to_text(sxml); // Entry with ID "0-0" is used to identify type of repository. // Do not verify that it has a valid job. if ( (*it) == "0-0" ) continue; const XmlElement *pxml = par.parse(sxml); if ( pxml == 0 || ! pxml->is_valid() ) { m_error = CatalogError::object_parse_error(); delete pxml; return false; } const Job *pdb = new Job(*pxml); delete pxml; if ( pdb == 0 || ! pdb->is_valid() ) { m_error = CatalogError::object_create_error(); return false; } } return true; } //********************************************************************** // Output stream. ostream& JobRepository::ostr(ostream& str) const { if ( ! is_valid() ) { str << "Invalid job repository"; return str; } size_type count = size(); str << "JobRepository has " << count << " entr"; if ( count == 1 ) { str << "y"; } else { str << "ies"; } return str; } //********************************************************************** // Web page. // // Supported entries: // maxent=123 // jid=123-456 // jids=123-456,123-789 // jid=123-456?subentry (handled by Job::web_page except as follows) // jid=123-456?subjobs // jid=123-456?subjobs?xxx (xxx=running, done, failed, killed, result) // jid=123-456?subjobs?category=catname const Text& JobRepository::web_page(const UrlMap& urls, string entry, JobRepository* prep) { // Header. m_wp = Text(); m_wp.append(""); bool footer = true; // Decipher entry. int maxent = 100; JobId jid; JobIdList jids; bool showtable = false; bool showrep = false; bool showjob = false; bool showjobs = false; // Split entry into first and remaining sub entries. string::size_type ipos = entry.find('?'); string entry0 = entry; string entryrem; string view; if ( ipos != string::npos ) { entry0 = entry.substr(0, ipos); entryrem = entry.substr(ipos+1); } // Parse the first entry to determine the action to take. if ( entry0.size() ) { Text::WordList words = Text::split(entry0, "="); if ( words.size() == 2 ) { string name = words[0]; string value = words[1]; // Display repository with explicit maxent. if ( name == "maxent" ) { istringstream ssvalue(value); ssvalue >> maxent; showrep = true; // Table view. } else if ( name == "view" ) { istringstream ssvalue(value); ssvalue >> view; showrep = true; // Display single job or single job property. } else if ( name == "jid" ) { jid = JobId(value); if ( jid.is_valid() ) { showjob = true; } else { m_wp.append("Invalid job ID"); m_wp.append("Entry: " + entry); } // Display a list of jobs. } else if ( name == "jids" ) { showjobs = true; Text::WordList sjids = Text::split(value, ","); for ( Text::WordList::const_iterator ijid=sjids.begin(); ijid!=sjids.end(); ++ijid ) { JobId newjid(*ijid); if ( newjid.is_valid() ) { jids.push_back(newjid); } } } } // Display repository with implicit maxent. } else { showrep = true; } // Repository where jobs in table are to be found. JobRepository* prep_table = this; // If a job ID was found, then fetch the job. const Job* pjob = 0; if ( showjob ) { // Fetch job. pjob = extract(jid); if ( pjob == 0 ) { m_wp.append("Unable to find job " + jid.to_string()); showjob = false; } } // If job is found, intercept any request for subjobs. // Create a table using the subjob repository. if ( showjob ) { assert( ! showtable ); assert( ! showrep ); if ( entryrem.size() > 6 ) { Text::WordList subents = Text::split(entryrem, "?"); if ( subents[0] == "subjobs" ) { showjob = false; prep_table = prep; if ( subents.size() == 1 ) { jids = pjob->subjobs(); showtable = true; } else if ( subents.size() == 2 ) { string state = subents[1]; if ( state == "running" ) { jids = pjob->running_subjobs(); showtable = true; } else if ( state == "done" ) { jids = pjob->done_subjobs(); showtable = true; } else if ( state == "failed" ) { jids = pjob->failed_subjobs(); showtable = true; } else if ( state == "killed" ) { jids = pjob->killed_subjobs(); showtable = true; } else if ( state == "result" ) { jids = pjob->result_subjobs(); showtable = true; } else { ipos = state.find("="); if ( ipos != string::npos ) { string name = state.substr(0, ipos); string value = state.substr(ipos+1); if ( name == "category" ) { jids = pjob->category_subjobs(value); showtable = true; } } } } } } } // If showjob is still set, let Job generate the page. if ( showjob ) { assert( showtable == false ); // Create map with jobs pointing to the appropriate repository. UrlMap newurls = urls; newurls["job"] = newurls["jobs"] + "?jid=" + jid.to_string(); // Create page. m_wp = pjob->web_page(newurls, entryrem); footer = false; } // If the repository is to be displayed, then extract the last maxent ID's. if ( showrep ) { JobIdList::size_type ijid = 0; const JobIdList& repjids = get_ids(); if ( repjids.size() > maxent ) { ijid = repjids.size() - maxent; } for ( ; ijidJob repository has " << njob; if ( njob == 1 ) { header << " entry"; } else { header << " entries"; } header << ""; m_wp.append(header.str()); } // For a list of jobs, use the table. if ( showjobs ) { showtable = true; } // Create table. if ( showtable ) { // Extract list of jobs from list of ID's. JobList jobs(jids.size(), 0); int ijob = jids.size(); for ( JobIdList::const_iterator ijid=jids.begin(); ijid!=jids.end(); ++ijid ) { JobId jid = *ijid; const Job* pjob = prep_table->extract(jid); if ( pjob == 0 ) { string prefix = "JobRepository::web_page: "; cerr << prefix << "Unable to extract job with ID " << jid << endl; } jobs[--ijob] = pjob; } assert( ijob == 0 ); // Create map with jobs pointing to the appropriate repository. UrlMap newurls = urls; if ( prep_table == prep ) { newurls["jobs"] = newurls["subjobs"]; } // Create table. if ( view == "" ) { write_html(m_wp, jobs, newurls, maxent, 0); } else if ( view == "provenance" ) { write_html_provenance(m_wp, jobs, newurls, maxent, 0); } else { m_wp.append("Unknown view: " + view); } } // Footer. if ( footer ) m_wp.append(""); return m_wp; } //********************************************************************** // Free functions. //********************************************************************** // Output stream. ostream& operator<<(ostream& lhs, const JobRepository& rhs) { return rhs.ostr(lhs); } //**********************************************************************