// JobUpdater.cxx #include "dial_sched/JobUpdater.h" #include #include #include #include #include #include "dataset_util/PThreadCondition.h" #include "dataset_util/Time.h" #include "dataset_util/pthread_sleep.h" #include "dataset_util/getcwd.h" #include "dataset_util/FileStatus.h" #include "dataset_credential/GssCredentialManager.h" #include "dial_job/JobRepository.h" #include "dial_sched/Scheduler.h" using std::string; using std::ostream; using std::cerr; using std::endl; using std::istringstream; using dset::GssCredential; using dset::GssCredentialManager; using dial::JobId; using dial::JobIdList; using dial::Job; using dial::JobRepository; using dial::Scheduler; using dial::JobUpdater; //********************************************************************** // Local declarations. //********************************************************************** namespace { void* update_thread(void* parg); bool dbg() { static string file = getcwd() + "/debug_JobUpdater"; return FileStatus(file).exists(); } } //********************************************************************** // Implementation class. //********************************************************************** class JobUpdater::Imp { public: Scheduler& sch; bool updating; pthread_t thread; unsigned long nloop; int error; PThreadCondition cond; int interval; bool update; bool purge; int purge_interval; public: Imp(Scheduler& schin, unsigned int interval_time); ~Imp(); }; JobUpdater::Imp::Imp(Scheduler& schin, unsigned int interval_time) : sch(schin), updating(false), nloop(0), error(0), interval(interval_time), update(true), purge(true), purge_interval(7*24*3600) { } JobUpdater::Imp::~Imp() { } //********************************************************************** // Local definitions. //********************************************************************** namespace { // Thread to loop over jobs and update. void* update_thread(void* parg) { JobUpdater* pup = (JobUpdater*) parg; Scheduler& sch = pup->imp().sch; int interval = pup->imp().interval; while ( true ) { if ( dbg() ) { cerr << "JU: Locking mutex" << endl; } // Acquire the condition mutex. This gives other threads, e.g. // service loop a chance to block this thread. pup->condition().lock(); if ( ! pup->is_updating() ) { cerr << "JU: Exiting loop " << endl; pup->condition().unlock(); break; } ++pup->imp().nloop; if ( dbg() ) { cerr << "JU: Begin loop " << pup->imp().nloop << endl; } // Copy jobs. JobIdList jobs = sch.jobs(); // Loop over jobs and update. unsigned int oldtime = Time::now().unix_time(); for ( JobIdList::const_iterator ijid=jobs.begin(); ijid!=jobs.end(); ++ijid ) { JobId jid = *ijid; Job& job = sch.job(jid); // Update. if ( pup->imp().update && job.is_active() ) { // Set the credential for the job. GssCredential::Name oldname = GssCredentialManager::name(); GssCredential::Name jobname = job.credential_name(); if ( jobname.size() ) { GssCredentialManager::set_name(jobname); } // Update job. job.update(); // Unset credential. if ( jobname.size() ) { GssCredentialManager::set_name(oldname); } } // Purge. if ( pup->imp().purge && ! job.is_active() ) { TimeInterval age(job.stop_time(), Time::now()); if ( age.seconds() > pup->imp().purge_interval ) { // Set the credential for the job. GssCredential::Name oldname = GssCredentialManager::name(); GssCredential::Name jobname = job.credential_name(); if ( jobname.size() ) { GssCredentialManager::set_name(jobname); } if ( dbg() ) { cerr << "JU: Job " << job.id() << " with age " << age << " is being purged" << endl; } JobRepository& jr = JobRepository::default_instance(); bool saveok = true; for ( JobIdList::const_iterator isjid=job.subjobs().begin(); isjid!=job.subjobs().end(); ++isjid ) { JobId sjid = *isjid; Job& sjob = sch.job(sjid); jr.insert(&sjob, false); if ( ! jr.has(sjid) ) { if ( dbg() ) { cerr << "JU: Insertion failed for subjob " << sjid << endl; } saveok = false; break; } } if ( saveok ) { jr.insert(&job, false); if ( ! jr.has(jid) ) { if ( dbg() ) { cerr << "JU: Insertion failed for top job " << jid << endl; } saveok = false; } } if ( saveok ) { int rstat = sch.remove(jid); if ( dbg() ) { if ( rstat ) { cerr << " Removal failed" << endl; } else { cerr << " Job purged" << endl; } } } // Unset credential. if ( jobname.size() ) { GssCredentialManager::set_name(oldname); } } } } if ( dbg() ) { cerr << "JU: End loop " << pup->imp().nloop << endl; } // Minimum update period is interval. // Minumum wait between updates is waitmin. // This gives the other threads higher priority. unsigned int waitmin = 2; Text twaitmin("job_update_interval"); if ( twaitmin.size() ) { istringstream sswaitmin(twaitmin.line(0)); sswaitmin >> waitmin; } unsigned int newtime = Time::now().unix_time(); unsigned int diff = newtime - oldtime; unsigned int waittime = waitmin; if ( interval > diff+waitmin ) { waittime = interval - diff; } // Give other processes a chance to run. pup->condition().unlock(); pup->condition().broadcast(); if ( dbg() ) { cerr << "JU: Unlocked mutex" << endl; } // Sleep. if ( dbg() ) { cerr << "JU: Sleeping for " << waittime << " seconds" << endl; } pthread_sleep(waittime); } return 0; } } //********************************************************************** // Member functions. //********************************************************************** // Constructor. JobUpdater::JobUpdater(Scheduler& sch, int interval) : pimp(new Imp(sch, interval)) { } //********************************************************************** // Destructor. JobUpdater::~JobUpdater() { if ( is_updating() ) stop(); } //********************************************************************** // Implementation. JobUpdater::Imp& JobUpdater::imp() { return *pimp; } //********************************************************************** // Start. int JobUpdater::start(bool lockit) { if ( ! is_valid() ) return 1; if ( is_updating() ) return 2; pimp->updating = true; if ( lockit ) { condition().lock(); } int tstat = pthread_create(&pimp->thread, 0, update_thread, this); if ( tstat != 0 ) { pimp->updating = false; pimp->error = 100 + tstat; return 3; } return 0; } //********************************************************************** // Stop. int JobUpdater::stop(bool* pwas_locked) { if ( ! is_valid() ) return 1; if ( ! is_updating() ) return 2; pimp->updating = false; bool was_locked = condition().is_locked(); if ( pwas_locked != 0 ) { *pwas_locked = was_locked; } if ( was_locked ) { condition().unlock(); } void* prstat; int tstat = pthread_join(pimp->thread, &prstat); if ( tstat != 0 ) { pimp->error = 200 + tstat; return 3; } return 0; } //********************************************************************** // Return the condition. PThreadCondition& JobUpdater::condition() { return pimp->cond; } //********************************************************************** // Const methods. //********************************************************************** // Validity. bool JobUpdater::is_valid() const { return pimp->sch.is_valid() && error() == 0; } //********************************************************************** // Updating. bool JobUpdater::is_updating() const { return is_valid() && pimp->updating; } //********************************************************************** // Error status. int JobUpdater::error() const { return pimp->error; } //********************************************************************** // Scheduler. const Scheduler& JobUpdater::scheduler() const { return pimp->sch; } //********************************************************************** // Loop count. unsigned long JobUpdater::loop_count() const { return pimp->nloop; } //********************************************************************** // Free function. //********************************************************************** // Output stream. ostream& operator<<(ostream& lhs, const JobUpdater& rhs) { if ( ! rhs.is_valid() ) { lhs << "Updater is invalid with error " << rhs.error(); } else { lhs << "Updater is "; if ( ! rhs.is_updating() ) lhs << "not "; lhs << "updating" << endl; lhs << rhs.scheduler() << endl; lhs << "Loop count = " << rhs.loop_count() << endl; } return lhs; } //**********************************************************************