// MasterScheduler.cxx #include "dial_sched/MasterScheduler.h" #include #include #include #include #include "dataset_util/FileName.h" #include "dataset_util/FileStatus.h" #include "dataset_util/mkdir.h" #include "dataset_util/copy_file.h" #include "dataset_util/Environment.h" #include "dataset_util/XmlElement.h" #include "dataset_util/DtdRegistry.h" #include "dataset_util/PThreadMutexLocker.h" #include "dataset_util/PThreadCondition.h" #include "dataset_util/PThreadConditionLocker.h" #include "dataset_xml/XmlParser.h" #include "dataset_split/DatasetSplitter.h" #include "dataset_split/NoSplitDatasetSplitter.h" #include "dataset_split/SimpleEventDatasetSplitter.h" #include "dataset_credential/GssCredentialManager.h" #include "dial_job/JobRepository.h" #include "dial_sched/CompoundJob.h" #include "dial_sched/JobUpdater.h" using std::string; using std::ostream; using std::cerr; using std::endl; using std::ostringstream; using std::auto_ptr; using dset::DatasetSplitter; using dset::NoSplitDatasetSplitter; using dset::SimpleEventDatasetSplitter; using dset::Dataset; using dset::DatasetList; using dset::GssCredential; using dset::GssCredentialManager; using dial::Application; using dial::Task; using dial::Job; using dial::JobId; using dial::JobIdList; using dial::JobRepository; using dial::Scheduler; using dial::CompoundJob; using dial::JobUpdater; using dial::MasterScheduler; typedef std::map JobMap; typedef std::map SubJobMap; //********************************************************************** // Local definitions. //********************************************************************** namespace { //********************************************************************** string base_job_directory(string dir ="") { static string bjdir; if ( dir.size() ) { bjdir = dir; } else if ( ! bjdir.size() ) { Environment env; env.sysfill(); bjdir = FileName(env["DIAL_MASTER_JOBS"]).fullpath().name(); if ( ! bjdir.size() ) { bjdir = FileName(env["DIAL_JOBS"]).fullpath().name(); } } return bjdir; } //********************************************************************** // Creator. Scheduler* create(const XmlElement& ele) { if ( ele.name() != MasterScheduler::xml_name() ) return 0; if ( ! ele.has_attribute("auto_update") ) return 0; int autoup = ele.attribute_as_bool("auto_update"); if ( ! ele.has_attribute("cleanup") ) return 0; bool cleanup = ele.attribute_as_bool("cleanup"); if ( ! ele.has_attribute("maxjobs") ) return 0; int maxjobs = ele.attribute_as_int("maxjobs"); if ( ! ele.has_attribute("maxstart") ) return 0; int maxstart = ele.attribute_as_int("maxstart"); if ( ! ele.has_attribute("jobrep") ) return 0; string jobrep = ele.attribute("jobrep"); // Loop over elements. // One should be a scheduler and the other a splitter. // We leak the memory for each. if ( ele.children().size() != 2 ) return 0; Scheduler* psch = 0; const DatasetSplitter* pspl = 0; const XmlElement::ElementList& eles = ele.children(); for ( XmlElement::ElementList::const_iterator iele=eles.begin(); iele!=eles.end(); ++iele ) { const XmlElement* pele = *iele; if ( pele == 0 ) return 0; if ( psch == 0 ) { psch = Scheduler::create(*pele); if ( psch != 0 ) continue; } if ( pspl == 0 ) { pspl = DatasetSplitter::create(*pele); if ( pspl != 0 ) continue; } return 0; } return new MasterScheduler(*pspl, *psch, autoup, cleanup, maxjobs, maxstart, jobrep); } //********************************************************************** // Register creator. int STAT_MasterScheduler = Scheduler::register_creator(MasterScheduler::xml_name(), create); //********************************************************************** // Register the DTD. DtdRegistry::Status ISTAT_DTD_MasterScheduler = DtdRegistry::register_dtd("dial"); //********************************************************************** // Return if the current thread has write access for the provided owner. bool write_access(string owner, string& msg) { string me = GssCredentialManager::name(); if ( owner.size() ) { msg = "Access granted because owner is not specified"; return true; } if ( owner == me ) { msg = "Access granted to " + owner; return true; } Text tacc("write_access"); for ( int iline=0; ilineis_valid() ) { log.append(" Job repository is invalid!"); } } } MasterScheduler::Imp::~Imp() { } // Convert int to string. string int_to_string(int ival) { ostringstream osstr; osstr << ival; return osstr.str(); } } // end unnamed namespace //********************************************************************** // Static member functions. //********************************************************************** // Set base job directory. void MasterScheduler::set_job_directory(string dir) { base_job_directory(dir); } //********************************************************************** // DTD const Text& MasterScheduler::dtd() { static Text txt; if ( txt.size() == 0 ) { txt.append(""); txt.append(""); } return txt; } //********************************************************************** // Member functions. //********************************************************************** // Log a message. void MasterScheduler::msg(string txt) const { pimp->log.append(txt); if ( FileStatus("debug_MasterScheduler").exists() ) { std::cerr << "MS: " << txt << endl; } } //********************************************************************** // Constructor. MasterScheduler:: MasterScheduler(const DatasetSplitter& splitter, Scheduler& slave, bool aup, bool cleanup, unsigned int maxjobs, unsigned int maxstart, string jobrep) : pimp(new Imp(splitter, slave, cleanup, maxjobs, maxstart, jobrep)) { msg("MasterScheduler log"); if ( jobrep.size() ) { msg("Using repository at " + jobrep); if ( ! job_repository()->is_valid() ) { msg("Repository is invalid!!!"); } } else { msg("No repository"); } if ( aup ) { pimp->pup = new JobUpdater(*this); assert( pimp->pup != 0 ); pimp->pup->start(); } // Read existing jobs from repository. if ( job_repository()->is_valid() ) { JobRepository& jr = *job_repository(); JobIdList& cjobs = pimp->jobs; JobMap& cjobmap = pimp->jobmap; SubJobMap& sjobmap = pimp->sjobmap; msg("Extracting jobs from JR"); JobRepository::IdList jids = jr.get_ids(); msg(" # jobs found: " + int_to_string(jids.size())); for ( size_t ijid=0; ijidpup != 0 ) { pimp->pup->stop(); delete pimp->pup; } // Lock after shutdown to avoid deadlock with updater thread. pimp->mutex.lock(); // Remove all jobs. if ( cleanup() ) { msg("Removing all jobs"); for ( JobIdList::reverse_iterator ijid=pimp->jobs.rbegin(); ijid!=pimp->jobs.rend(); ++ijid ) { remove(*ijid); } } // Delete implementation. pimp->mutex.unlock(); delete pimp; } //********************************************************************** // Add a task and build it for an application. int MasterScheduler:: add_task(const Application& app, const Task& tsk) { return slave().add_task(app,tsk); } //********************************************************************** // Remove a task. int MasterScheduler:: remove_task(const Application& app, const Task& tsk) { // Verify ownership. string amsg; if ( ! write_access("dial", amsg) ) { msg(" " + amsg); return 11; } return slave().remove_task(app,tsk); } //********************************************************************** // Submit a job. JobId MasterScheduler:: submit(const Application& app, const Task& tsk, const Dataset& dst, const JobPreferences& prf) { msg("Processing new job"); pimp->mutex.lock(); JobId jidout; { JobIdList& cjobs = pimp->jobs; JobMap& cjobmap = pimp->jobmap; SubJobMap& sjobmap = pimp->sjobmap; JobId tjid; // Check if the save has already installed the task. if ( slave().has_task(app, tsk) ) { msg(" Task is already installed"); // Otherwise the task build job is included in the compound job. // The compound job will not begin processing the other subjobs until // the task job completes successfully. } else { msg(" Slave has not installed task: check for task job"); tjid = slave().task_job(app, tsk); // If no job exists to build the task, then create one. if ( ! tjid.is_valid() ) { msg(" Slave is not building task: add task"); int tstat = slave().add_task(app, tsk); if ( tstat != 0 ) { msg(" Slave is unable to build task"); pimp->mutex.unlock(); return JobId(); } if ( app.has("build_task") ) { tjid = slave().task_job(app, tsk); assert( tjid.is_valid() ); } } } // Create the splitter. const DatasetSplitter* psplitter = &splitter(); bool use_pref_splitter = false; if ( prf.has("splitter") ) { msg(" Creating splitter from job preferences"); string sxspl = prf.extract("splitter"); if ( sxspl.size() == 0 ) { msg(" Unable to retrieve splitter from preferences"); pimp->mutex.unlock(); return JobId(); } static XmlParser parser; const XmlElement* pxspl = parser.parse(sxspl); if ( pxspl == 0 ) { msg(" Unable to parse splitter XML"); pimp->mutex.unlock(); return JobId(); } psplitter = DatasetSplitter::create(*pxspl); delete pxspl; if ( psplitter == 0 ) { msg(" Unable to construct splitter"); pimp->mutex.unlock(); return JobId(); } use_pref_splitter = true; } // Split input dataset. DatasetList subdsts = psplitter->split(dst); if ( use_pref_splitter ) delete psplitter; if ( subdsts.size() == 0 ) { msg(" Job split failed " + int_to_string(subdsts.size())); pimp->mutex.unlock(); return JobId(); } msg(" # sub-datasets = " + int_to_string(subdsts.size())); // Assign job ID. JobId jid = JobId::generate(); if ( ! jid.is_valid() ) { msg(" Unable to generate job ID"); pimp->mutex.unlock(); return jid; } msg(" Assigned job ID " + jid.to_string()); if ( cjobmap.find(jid) != cjobmap.end() ) { msg(" Job ID already assigned! Job not created."); pimp->mutex.unlock(); return JobId(); } // Create compound job. string jobdir = job_directory() + "/" + jid.to_path(); CompoundJob* pcjob = new CompoundJob(jid, app, tsk, dst, prf, jobdir, pimp->maxjobs, pimp->maxstart, job_repository(), &slave()); CompoundJob& cjob = *pcjob; cjob.lock_mutex(); cjob.set_log(pimp->log); int nother_subdsts = 0; if ( tjid.is_valid() ) { msg(" Adding task job"); Job& tjob = slave().job(tjid); int astat = cjob.add_task_job(tjob); if ( astat != 0 ) { msg(" Addition of task job failed"); pimp->mutex.unlock(); cjob.unlock_mutex(); return jid; } ++nother_subdsts; } cjobmap[jid] = pcjob; cjobs.push_back(jid); // Check job is valid. if ( ! cjob.is_valid() ) { msg(" New compound job is invalid with error " + int_to_string(cjob.error())); pimp->mutex.unlock(); cjob.unlock_mutex(); return jid; } if ( cjob.is_failed() ) { msg(" New compound job is failed with error " + int_to_string(cjob.error())); msg(" Job directory: " + jobdir); pimp->mutex.unlock(); cjob.unlock_mutex(); return jid; } // Create sub-job map. sjobmap[jid]; // Write the job ID. { Text jidtxt(jobdir + "/" + "jid"); jidtxt.append(jid.to_string()); jidtxt.write(); } // Loop over sub-datasets and create sub-jobs. for ( DatasetList::const_iterator idst=subdsts.begin(); idst!=subdsts.end(); ++idst ) { const Dataset* pdst = *idst; if ( pdst == 0 ) { msg(" Slave job has null dataset! Aborting."); cjob.kill(103); pimp->mutex.unlock(); cjob.unlock_mutex(); return jid; } JobId sjid = slave().submit(app, tsk, **idst, prf); if ( ! sjid.is_valid() ) { msg(" Slave job submission failed! Aborting."); cjob.kill(101); pimp->mutex.unlock(); cjob.unlock_mutex(); return jid; } msg(" Submitted slave job " + sjid.to_string()); Job& sjob = slave().job(sjid); assert( sjob.is_valid() ); int stat = cjob.add_job(sjob); if ( stat != 0 ) { msg(" Unable to add job to compound job! Aborting."); msg(" Attempt returned status " + int_to_string(stat)); msg(" Job killed"); cjob.kill(102); pimp->mutex.unlock(); cjob.unlock_mutex(); return jid; } sjobmap[jid].push_back(sjid); } msg(" # sub-jobs = " + int_to_string(cjob.subjobs().size())); if ( cjob.subjobs().size() != subdsts.size() + nother_subdsts ) { std::cout << "*** # sub-datasets = " << subdsts.size() << std::endl; std::cout << "*** # sub-jobs = " << cjob.subjobs().size() << std::endl; assert(false); } msg("End processing new job"); cjob.unlock_mutex(); jidout = jid; } pimp->mutex.unlock(); return jidout; } //********************************************************************** // Kill a job. int MasterScheduler::kill(JobId jid) { msg("Killing job " + jid.to_string()); // Verify ownership. string amsg; if ( ! write_access(job(jid).owner(), amsg) ) { msg(" " + amsg); return 11; } JobMap& cjobmap = pimp->jobmap; // Only kill top-level jobs. if ( cjobmap.find(jid) == cjobmap.end() ) { msg(" Job does not exist. Not killed."); return 101; } Job& cjob = job(jid); int kstat = cjob.kill(); return kstat; } //********************************************************************** // Remove a job. int MasterScheduler::remove(JobId jid) { msg("Removing job " + jid.to_string() ); // Verify ownership. string amsg; if ( ! write_access(job(jid).owner(), amsg) ) { msg(" " + amsg); return 11; } // Stop updating. // If the update thread is not locked, then we lock it here to block // any updates during removal. // If it is locked, we make sure this thread holds the lock. // There will likely be problems if the updater ever calls this // method. PThreadCondition* pupcond = 0; if ( pimp->pup != 0 ) { pupcond = &pimp->pup->condition(); if ( pupcond->is_locked() ) { //assert( pupcond->lock_owner() == pthread_self() ); if ( pupcond->lock_owner() == pthread_self() ) { pupcond = 0; } } } PThreadConditionLocker update_mutex_locker(pupcond); // Lock job mutex. // Must be after stopping update to avoid deadlock. PThreadMutexLocker lock_job(pimp->mutex); // Continue with job removal. int errstat = 0; Job& xjob = job(jid); //CompoundJob& mjob = dynamic_cast(xjob); // Check validity of job. if ( ! xjob.is_valid() ) { msg(" Job is invalid"); return 1; } // Find if this is a top level job. // If not, try to remove it as a subjob. JobMap::iterator ijob = pimp->jobmap.find(jid); if ( ijob == pimp->jobmap.end() ) { msg(" Unable to locate job in job map"); // Try as subjob. if ( slave().has_job(jid) ) { msg(" Try as subjob"); int rstat = slave().remove(jid); if ( rstat != 0 ) { msg(" Unable to remove as subjob "); return 20 + rstat; } else { msg(" Removed as subjob"); return 0; } } msg(" Unable to locate as subjob--give up"); return 2; } // Loop over subjobs and remove them from the slave scheduler. SubJobMap::iterator isjid = pimp->sjobmap.find(jid); if ( isjid == pimp->sjobmap.end() ) { msg(" Invalid subjob map"); return 3; } const JobIdList& jids = isjid->second; for ( JobIdList::const_reverse_iterator ijid=jids.rbegin(); ijid!=jids.rend(); ++ijid) { JobId sjid = *ijid; if ( slave().has_job(sjid) ) { int rstat = slave().remove(sjid); if ( rstat != 0 ) { msg(" Unable to remove subjob " + sjid.to_string()); return errstat = 10 + rstat; } else { msg(" Removed subjob " + sjid.to_string()); } } else { msg(" Unable to find subjob " + sjid.to_string()); } } // Drop job from lists. JobIdList::iterator ijid = find(pimp->jobs.begin(), pimp->jobs.end(), jid); if ( ijid == pimp->jobs.end() ) { msg(" Unable to locate job ID in job ID list"); return 5; } pimp->jobs.erase(ijid); pimp->jobmap.erase(ijob); pimp->sjobmap.erase(isjid); // Delete run directory. string jobdir = job_directory() + "/" + jid.to_path(); string com = "rm -rf " + jobdir; system(com.c_str()); if ( FileStatus(jobdir).exists() ) { msg(" Unable to delete run directory:"); msg(" " + jobdir ); return 6; } // Delete job. //delete &xjob; // Restart updating. //if ( auto_update() ) { // pimp->pup->start(updater_is_locked); //} // Remove job from repository. if ( job_repository()->is_valid() ) { if ( job_repository()->remove(jid) != 0 ) { msg(" Removal from JR failed."); msg(" " + job_repository()->error_message() ); } } return errstat; } //********************************************************************** // Validity. bool MasterScheduler::is_valid() const { return slave().is_valid(); } //********************************************************************** // Is an application available? bool MasterScheduler::has_application(const Application& app) const { return slave().has_application(app); } //********************************************************************** // Is a task installed? bool MasterScheduler:: has_task(const Application& app, const Task& tsk) const { return slave().has_task(app,tsk); } //********************************************************************** // Task job. JobId MasterScheduler:: task_job(const Application& app, const Task& tsk) const { return slave().task_job(app,tsk); } //********************************************************************** // Return the list of jobs. JobIdList MasterScheduler::jobs() const { lock(); JobIdList jids = pimp->jobs; unlock(); return jids; } //********************************************************************** // Has a job. bool MasterScheduler::has_job(JobId jid) const { JobMap& cjobmap = pimp->jobmap; return cjobmap.find(jid) != cjobmap.end(); } //********************************************************************** // Fetch a job. Job& MasterScheduler::job(JobId jid) const { lock(); //msg("Fetching job " + jid.to_string()); JobMap& cjobmap = pimp->jobmap; // Return top-level job if it exists. if ( cjobmap.find(jid) != cjobmap.end() ) { Job& job = *cjobmap[jid]; if ( job.is_running() ) { //msg(" Updating job "); //int stat= job.update(); } //msg("Fetch returns job " + jid.to_string()); unlock(); return job; } // If not, return the slave job. //msg("Fetch returns subjob " + jid.to_string()); unlock(); return slave().job(jid); } //********************************************************************** // XML. const XmlElement* MasterScheduler::xml() const { auto_ptr pele(new XmlElement(xml_name())); pele->add_attribute_as_bool("auto_update", auto_update()); pele->add_attribute_as_bool("cleanup", cleanup()); pele->add_attribute_as_int("maxjobs", max_jobs()); pele->add_attribute_as_int("maxstart", max_start()); pele->add_attribute("jobrep", job_repository_connection()); const XmlElement* pxsch = slave().xml(); if ( pxsch == 0 ) return 0; pele->add_child(pxsch); const XmlElement* pxdsp = splitter().xml(); if ( pxdsp == 0 ) return 0; pele->add_child(pxdsp); return pele.release(); } //********************************************************************** // Return the dataset splitter. const DatasetSplitter& MasterScheduler::splitter() const { return pimp->splitter; } //********************************************************************** // Return the slave scheduler. Scheduler& MasterScheduler::slave() { return pimp->slave; } //********************************************************************** // Return the slave scheduler. const Scheduler& MasterScheduler::slave() const { return pimp->slave; } //********************************************************************** // Return parameters. bool MasterScheduler::auto_update() const { return pimp->pup != 0; } bool MasterScheduler::cleanup() const { return pimp->cleanup; } int MasterScheduler::max_jobs() const { return pimp->maxjobs; } int MasterScheduler::max_start() const { return pimp->maxstart; } //********************************************************************** // Return the updater. const JobUpdater* MasterScheduler::updater() const { return pimp->pup; } //********************************************************************** // Return the log. Text MasterScheduler::log() const { pimp->mutex.lock(); Text tmplog = pimp->log; pimp->mutex.unlock(); return tmplog; } //********************************************************************** // Fetch a local job as a compound job. CompoundJob& MasterScheduler::compound_job(JobId jid) const { JobMap& cjobmap = pimp->jobmap; // Return top-level job if it exists. if ( cjobmap.find(jid) != cjobmap.end() ) { Job* pjob = cjobmap[jid]; assert( pjob != 0 ); CompoundJob* pcjob = dynamic_cast(pcjob); return *pcjob; } static CompoundJob badjob; return badjob; } //********************************************************************** // Return the directory where job directories are found. string MasterScheduler::job_directory() const { return pimp->m_jobdir; } //********************************************************************** // Job repository connection. string MasterScheduler::job_repository_connection() const { return pimp->jr_conn; } //********************************************************************** // Job repository. JobRepository* MasterScheduler::job_repository() const { return pimp->pjr; } //********************************************************************** // Output stream. ostream& MasterScheduler::ostr(ostream& lhs) const { lhs << "MasterScheduler" << endl; return job_report(lhs); } //********************************************************************** // Lock mutex. int MasterScheduler::lock() const { return pimp->mutex.lock(); } //********************************************************************** // Unlock mutex. int MasterScheduler::unlock() const { return pimp->mutex.unlock(); } //**********************************************************************