// CompoundJob.h #ifndef dial__CompoundJob_H #define dial__CompoundJob_H // David Adams // July 2003 // // CompoundJob is a job that corresponds to a collection of subjobs. // The compound job is started after the subjobs have been added. // It then starts each of the subjobs. When update is called, the // compound job updates each of the subjobs and records those that // exit the running state. If they exit normally, then their results // are appended to a global result. When all subjobs leave the running // state, then the compound job also leaves that state. It is placed // in the done (normal exit) state if all subjobs exit normally. // Otherwise it is put in the failed state. // #include #include #include "dataset_split/DatasetMergeResult.h" #include "dial_job/Job.h" namespace dset { class DatasetMerger; } namespace dial { class Scheduler; class JobRepository; class CompoundJob : public Job { public: // typedefs // Map jobs by job ID. typedef std::map JobMap; // Map job jobs to status. typedef std::map StatusMap; // Map of jobs indexed by dataset. typedef std::map JobResultMap; private: // data // List of all jobs. JobMap m_jobs; // List of jobs mapped to last status. StatusMap m_job_status; // List job results. dset::DatasetList m_results; // List job results owned by the compound job. // These are deleted when the compound job is deleted. dset::DatasetList m_owned_results; // Merger. dset::DatasetMerger* m_pmrg; // Maximum number of running jobs. JobIdList::size_type m_maxjobs; // Maximum number of jobs to start. JobIdList::size_type m_maxstart; // Iterator pointing to the next subjob (in subjobs()) to start. JobIdList::const_iterator m_nextjob; // Text log. Text* m_plog; // Map of jobs indexed by result dataset. JobResultMap m_jobresmap; // Merged result. dset::DatasetMergeResult m_dmres; // Job repository holding this job. JobRepository* m_pcjr; // Scheduler for subjobs. Scheduler* m_psch; // ID for the task job. Job* m_pjob_task; private: // functions // Record a mesage. void msg(std::string line) const; // Update the repository. void update_repository(); public: // functions // Default constructor. // Creates an invalid job. CompoundJob(); // Constructor. // The job directory must not exist before this call. // maxjobs = max # of concurrent running jobs // maxstart = max # jobs to start at one time // pcjr = pointer to the repository holding this job // psch = pointer to the scheduler that manages the subjobs CompoundJob(JobId jid, const Application& app, const Task& tsk, const dset::Dataset& dst, const JobPreferences& prf, std::string jobdir, JobIdList::size_type maxjobs, JobIdList::size_type maxstart, JobRepository* pcjr, Scheduler* psch); // Conversion constructor. CompoundJob(const Job& job, JobIdList::size_type maxjobs, JobIdList::size_type maxstart, JobRepository* pcjr, Scheduler* psch); // Destructor. ~CompoundJob(); // Add subjob to build the task. // The compound job must be in the INITIALIZED state. // The subjob must not be deleted before this compound job is // deleted. // This may not be called more than once for a given object. // Returns 0 for success. int add_task_job(Job& job); // Start the job. // Add a subjob. // Job must be in the INITIALIZED state. // The subjob must not be deleted before this compound job is // deleted. // Returns 0 for success. int add_job(Job& job); // Start the job. // Starts all the sub-jobs. // If successful, this returns 0 and leaves object in RUNNING. // Succeeds at most once. int start(); // Update the status of the job. // Updates each sub-job. // Handle any jobs that have changed`status. int update(); // Kill the job. // Kills any running sub-jobs. int kill(int err =0); // Fail the job. // Returns the given error code. int fail(int err, bool unlock=false); // Assign a text object for logging message. void set_log(Text& txtlog); // Update the result. // Return if an update has taken place. bool update_result(); // Return the subjob for an ID. Job* subjob(JobId sjid); public: // const functions // Return the merger. dset::DatasetMerger& merger() const; }; } // end namespace dial #endif