// Scheduler.cxx #include "dial_sched/Scheduler.h" #include #include #include #include #include "sys/resource.h" #include "sys/vtimes.h" #include "dataset_util/MemoryUsage.h" #include "dial_job/JobList.h" #include "dial_sched/Scheduler_t.h" using std::string; using std::ostream; using std::cout; using std::ostringstream; using std::endl; using dset::Dataset; using dial::JobId; using dial::JobIdList; using dial::Job; using dial::JobList; using dial::JobRepository; using dial::Scheduler; typedef Scheduler::Creator Creator; typedef Scheduler::CreatorList CreatorList; typedef std::map CreatorMap; //********************************************************************** // Local definitions. //********************************************************************** namespace { // Map of functions indexed by XML name. CreatorMap& creator_map() { static CreatorMap cmap; return cmap; } // List of known creators. CreatorList& creator_list() { static CreatorList clist; return clist; } } // end unnamed namespace //********************************************************************** // Static member functions. //********************************************************************** // Create a scheduler. Scheduler* Scheduler::create(const XmlElement& ele) { CreatorMap& cres = creator_map(); CreatorMap::const_iterator icre = cres.find(ele.name()); if ( icre == cres.end() ) { return 0; } Creator cre = icre->second; Scheduler* psch = cre(ele); return psch; } //********************************************************************** // Register a creator. int Scheduler::register_creator(std::string name, Creator pfun) { CreatorMap& cres = creator_map(); CreatorMap::const_iterator icre = cres.find(name); if ( icre != cres.end() ) { if ( icre->second == pfun ) return 0; return 1; } if ( pfun == 0 ) return 2; cres[name] = pfun; creator_list().push_back(name); return 0; } //********************************************************************** // Return the known creators. const CreatorList& Scheduler::creators() { return creator_list(); } //********************************************************************** // Return if a creator is registered. bool Scheduler::has_creator(std::string name) { CreatorMap& cres = creator_map(); CreatorMap::const_iterator icre = cres.find(name); return icre != cres.end(); } //********************************************************************** // Member functions. //********************************************************************** // Destructor. Scheduler::~Scheduler() { } //********************************************************************** // Add a task. int Scheduler::add_task(const Application& app, const Task& tsk) { return 99; } //********************************************************************** // Remove a task. int Scheduler::remove_task(const Application& app, const Task& tsk) { return 99; } //********************************************************************** // Start a job. int Scheduler::start(JobId jid) { if ( ! has_job(jid) ) return 101; return job(jid).start(); } //********************************************************************** // Start a job specified by a string. int Scheduler::start(string sjid) { return start(JobId(sjid)); } //********************************************************************** // Kill a job. int Scheduler::kill(JobId jid) { if ( ! has_job(jid) ) return 101; return job(jid).kill(); } //********************************************************************** // Kill a job specified by a string. int Scheduler::kill(string sjid) { return kill(JobId(sjid)); } //********************************************************************** // Remove a job specified by a string. int Scheduler::remove(string sjid) { return remove(JobId(sjid)); } //********************************************************************** // Lock the mutex. int Scheduler::lock() const { return 0; } //********************************************************************** // Unlock the mutex. int Scheduler::unlock() const { return 0; } //********************************************************************** // Const functions. //********************************************************************** // Has a task. bool Scheduler:: has_task(const Application& app, const Task& tsk) const { return false; } //********************************************************************** // Task job. JobId Scheduler:: task_job(const Application& app, const Task& tsk) const { return JobId(); } //********************************************************************** // Has a job. bool Scheduler::has_job(JobId jid) const { const JobIdList& jids = jobs(); return find(jids.begin(), jids.end(), jid) != jids.end(); } //********************************************************************** // Job. Job& Scheduler::job(string sjid) const { return job(JobId(sjid)); } //********************************************************************** // Job repository connection. string Scheduler::job_repository_connection() const { return ""; } //********************************************************************** // Job repository. JobRepository* Scheduler::job_repository() const { return 0; } //********************************************************************** // Print jobs to std out. ostream& Scheduler::print_jobs(ostream& lhs, JobIdList::size_type maxprint, JobIdList::size_type skip) const { lock(); JobIdList jids = jobs(); JobList outjobs; JobIdList::size_type last = maxprint - skip; if ( last > jids.size() ) last = jids.size(); for ( JobIdList::size_type ijid=skip; ijid 0 ) lhs << "\n " << ninvalid << " invalid"; if ( ninit > 0 ) lhs << "\n " << ninit << " initializing"; if ( nrunning > 0 ) lhs << "\n " << nrunning << " running"; if ( nfailed > 0 ) lhs << "\n " << nfailed << " failed"; if ( ndone > 0 ) lhs << "\n " << ndone << " done"; if ( nkilled > 0 ) lhs << "\n " << nkilled << " killed"; lhs.flush(); return lhs; } //********************************************************************** // Job report to stdout. ostream& Scheduler::job_report() const { return job_report(std::cout) << endl; } //********************************************************************** // Display. void Scheduler::display() const { std::cout << *this << std::endl; } //********************************************************************** // Resource report. string Scheduler::resource_report() const { ostringstream ssout; // General. /* struct rusage usg; int rstat = getrusage(RUSAGE_SELF, &usg); if ( rstat == 0 ) { ssout << "Peak memory: " << usg.ru_maxrss << " KB"; ssout << "\nText memory: " << usg.ru_ixrss << " KB-tick"; ssout << "\nData memory: " << usg.ru_idrss << " KB-tick"; ssout << "\nStac memory: " << usg.ru_isrss << " KB-tick"; ssout << "\nUser time: " << usg.ru_utime.tv_sec << " sec"; ssout << "\nBehalf time: " << usg.ru_stime.tv_sec << " sec"; } else { ssout << "rusage returned " << rstat; } struct vtimes vsg; vsg.vm_maxrss = 123; int vstat = vtimes(&vsg, 0); if ( vstat == 0 ) { ssout << "\nTotal memory: " << vsg.vm_maxrss << " KB"; ssout << "\nUser time: " << vsg.vm_utime << " sec"; } else { ssout << "vtimes returned " << rstat; } */ MemoryUsage mu; ssout << mu; // Datasets. ssout << "\n" << Dataset::memory_report(); return ssout.str(); } //********************************************************************** // Free functions. //********************************************************************** // Output stream. ostream& operator<<(ostream& lhs, const Scheduler& rhs) { return rhs.ostr(lhs); } //**********************************************************************