// Job.h #ifndef dial__Job_H #define dial__Job_H // David Adams // September 2002 // Revised August 2005 // // Job is the base class for DIAL jobs. A job consists of a dataset, // a task to process that dataset and application to carry out the // task. // // Base job holds the following: // 1. The application, task and dataset used to define the job // The are held by ID and by reference and are not managed. // A job created from XML my hold only ID's. // 2. Job preferences are held directly. // 3. The count of processed events and failed events // 4. The latest available result (possibly partial). // Also held by reference. // 5. The job state (status) and substates as defined below. // 6. Start and stop times. // 7. Other information about the job--see the XML description for // the complete list. // // A job is in one of the following states: // INVALID - from default constructor // INITIALED - Not yet started // RUNNING - Processing input dataset--build and split completed. // FAILED - unrecoverable error; code from error() // DONE - task completed (without being killed) // KILLED - task killed // // A job may also be in any number of substates identified with // strings. The meaning and assignment of these id determined by // the subclass. // // The data that defines the job and the start time are set in the // constructor. It leaves the job in the initialized state. // // Methods to update the event count, update the result or exit the // running state (and set the stop time) are available to subclasses. // These are only effective when the job is in the running state. // // Job holds a mutex that can be used to make subclasses thread-safe. // Job takes care of initializing and destroying th mutex. Subclasses // can use lock_mutex() to lock the mutex and unlock_mutex() or // set_base_failed(code, true) to unlock. // // Job holds a list of subjobs to provide support for compound jobs. // It also maintains separate lists for each job state. It is the // resposibility of the subclass to maintain these lists and ensure // their consistency. Job also provides a map of Job ID lists indexed // by category name so that the subclass may categorize jobs. // // XML: // Job // xml_version = 1.30 // fulltype = XyzJob // id = 123-456 // appid = 101-123 // tskid = 201-123 // dstid = 301-123 // resid = 301-456 // status = INVALID (,...) // owner = jdoe // credname = /DC=org/DC=myorg/OU=People/CN=MyName // create_time = 1000223455 // start_time = 1000223456 // update_time = 1000223457 // stop_time = 1000223457 // submit_host = subnode.org // run_host = runnode.org // job_directory = /home/junk/jobs/123-456 // archive = LFN://atlas/job_archive_123-456.tar.gz // processed_event_count = 10000 // processed_result_event_count = 10000 // error = 0 // rstat = 0 // run_script = /home/apps/123-456/run // substates = building,processing,merging // JobPreferences // ... // JobIdList // label = subjobs // ... // JobIdList // label = running_subjobs // ... // ... // JobIdList // label = category:build // ... // ... #include #include #include #include #include #include "dataset_util/PThreadMutex.h" #include "dataset_util/Time.h" #include "dataset_file/Url.h" #include "dataset_base/Dataset.h" #include "dial_task/Task.h" #include "dial_app/Application.h" #include "dial_job/JobPreferences.h" #include "dial_job/JobId.h" #include "dial_job/JobIdList.h" class XmlElement; namespace dial { class Job { public: // typedefs typedef std::string Type; typedef unsigned int EventCount; typedef std::string Substate; typedef std::vector SubstateList; typedef std::string Category; typedef std::map CategoryMap; typedef std::map UrlMap; public: // enums enum Status { INVALID, FAILED, INITIALIZED, RUNNING, DONE, KILLED }; private: // typedefs typedef std::string Type; typedef unsigned int EventCount; private: // data // Input data. Type m_fulltype; JobId m_id; Status m_stat; SubstateList m_substates; ApplicationId m_aid; mutable const Application* m_papp; TaskId m_tid; mutable const Task* m_ptsk; DatasetId m_did; mutable const dset::Dataset* m_pdst; JobPreferences m_prf; // Owner. std::string m_owner; std::string m_credname; // Times Time m_create_time; Time m_start_time; Time m_update_time; Time m_stop_time; // Host from which job is submitted. std::string m_submit_host; // Host where job was run (or run last). std::string m_run_host; // Directory where job is run. std::string m_jobdir; // Archive of run directory. mutable dset::Url m_archive; // Local job ID. std::string m_lid; // Subjobs. JobIdList m_subjobs; JobIdList m_running_subjobs; JobIdList m_done_subjobs; JobIdList m_failed_subjobs; JobIdList m_killed_subjobs; JobIdList m_result_subjobs; CategoryMap m_catjobs; // Processed event count. EventCount m_processed_event_count; // Result event count. // This is the number of events used to cosntruct the result; not // neccessarily the number in the result dataset. EventCount m_processed_result_event_count; // Result. DatasetId m_rid; mutable const dset::Dataset* m_pres; // Error code. // Nonzero if object is in the FAILED state. int m_error; // Return code from job. bool m_have_rstat; int m_rstat; // Run script. std::string m_run_script; // Mutex. mutable PThreadMutex m_lock; // Web page. mutable Text m_wp; public: // Static functions. // XML name. static const char* xml_name() { return "Job"; } // DTD. static const Text& dtd(); // Return a string represntation of a job state. static std::string status_to_string(Status stat); // Return the job state corresponding to a give string. // Retuns INVALID for an unknown string. static Status string_to_status(std::string sstat); // Return a URL from a Url map. static std::string url(const UrlMap& urls, std::string name); protected: // functions // Constructor. // Sets start time and puts job in the INITIALIZING state. // Writes the AJDL objects and the credential to the job directory. Job(Type fulltype, JobId jid, const Application& app, const Task& tsk, const dset::Dataset& dst, const JobPreferences& prf, std::string jobdir, std::string run_script); // Set the processed event count. // Job must be RUNNING. // Returns 0 for success. int base_set_processed_event_count(EventCount count); // Set the result event count. // Job must be RUNNING. // Returns 0 for success. int base_set_processed_result_event_count(EventCount count); // Set both processed and processed result event counts. // Job must be RUNNING. // Returns 0 for success. int base_set_event_count(EventCount count); // Set result. // Job must be RUNNING. // Current result is deleted. // If manage is true, this job takes over management of the result. // Returns 0 for success. int base_set_result(const dset::Dataset* pres, bool manage =true); // Sets the error code and puts the object into the running state. // Job must be INITIALIZING. // Sets the start time. // Returns 0 for success. int base_set_running(); // Sets the error code and puts the object into the failed state. // If unlock is true, the mutex is unlocked. // Job may not be FAILED. // Sets the stop time if not already set. // Returns the error code. int base_set_failed(int err, bool unlock =false); // Set job completed successfully. // Job must be RUNNING. // Event count and the event lists in the result and dataset // must be consistent. // Sets the stop time. // Returns 0 for success. int base_set_done(); // Set job killed. // Job must be RUNNING. // Sets the stop time. // Returns 0 for success. int base_set_killed(int err); // Set the update time. // Returns 0 for success. int base_set_update(); // Add a substate. // Fails if the job is already in the state. // Returns 0 for success. int base_set_substate(Substate substate); // Remove a substate. // Fails if the job is not in the state. // Returns 0 for success. int base_unset_substate(Substate substate); // Other setters. void base_set_submit_host(std::string host) { m_submit_host = host; } void base_set_run_host(std::string host) { m_run_host = host; } void base_set_local_id(std::string lid) { m_lid = lid; } void base_set_return_status(int rstat) { m_rstat = rstat; } // Subjob lists. JobIdList& base_subjobs() { return m_subjobs; } JobIdList& base_running_subjobs() { return m_running_subjobs; } JobIdList& base_done_subjobs() { return m_done_subjobs; } JobIdList& base_failed_subjobs() { return m_failed_subjobs; } JobIdList& base_killed_subjobs() { return m_killed_subjobs; } JobIdList& base_result_subjobs() { return m_result_subjobs; } CategoryMap& base_category_subjobs() { return m_catjobs; } public: // Constructors, destructor and assignment. // Default constructor. // Creates invalid job. Job(); // XML constructor. Job(const XmlElement&); // Copy constructor. Job(const Job& rhs); // Assignment. Job& operator=(const Job& rhs); // Destructor. virtual ~Job(); public: // non-const functions // Start the job. // Job must not have yet started. // Returns 0 for success. virtual int start(); // Update the job status. // Job should be created or running. // Implementation might use this as trigger to check schedulers, // queues, etc for the latest status of this job. // Returns 0 for success. virtual int update(); // Stop a running job. // Job should be running. // Error status is set to the argument value. // Returns 0 for success. virtual int kill(int err =0); // Return the mutex. PThreadMutex& mutex() const { return m_lock; } public: // const functions // Mutex. void lock_mutex() const; void unlock_mutex() const; // Getters. Type full_type() const { return m_fulltype; } JobId id() const { return m_id; } std::string owner() const { return m_owner; } std::string credential_name() const { return m_credname; } Status status() const { return m_stat; } std::string status_as_string() const; const SubstateList& substates() const { return m_substates; } // Comma separated list. std::string substates_as_string() const; ApplicationId application_id() const { return m_aid; } const Application* application() const; TaskId task_id() const { return m_tid; } const Task* task() const; DatasetId dataset_id() const { return m_did; } const dset::Dataset* dataset() const; const JobPreferences& preferences() const { return m_prf; } Time create_time() const { return m_create_time; } Time start_time() const { return m_start_time; } Time stop_time() const { return m_stop_time; } Time update_time() const { return m_update_time; } std::string submit_host() const { return m_submit_host; } std::string run_host() const { return m_run_host; } std::string job_directory() const { return m_jobdir; } dset::Url archive() const { return m_archive; } std::string local_id() const { return m_lid; } const JobIdList& subjobs() const { return m_subjobs; } const JobIdList& running_subjobs() const { return m_running_subjobs; } const JobIdList& done_subjobs() const { return m_done_subjobs; } const JobIdList& failed_subjobs() const { return m_failed_subjobs; } const JobIdList& killed_subjobs() const { return m_killed_subjobs; } const JobIdList& result_subjobs() const { return m_result_subjobs; } const CategoryMap& category_subjobs() const { return m_catjobs; } const JobIdList& category_subjobs(Category catname) const; EventCount processed_event_count() const { return m_processed_event_count; } EventCount processed_result_event_count() const { return m_processed_result_event_count; } bool has_result() const { return result_id().is_valid(); } DatasetId result_id() const { return m_rid; } const dset::Dataset* result() const; // Status methods. bool is_valid() const { return status() != INVALID; } bool is_initialized() const { return status() == INITIALIZED; } bool is_failed() const { return status() == FAILED; } bool is_running() const { return status() == RUNNING; } bool is_done() const { return status() == DONE; } bool is_killed() const { return status() == KILLED; } bool in_substate(Substate substate) const; // Combined status methods. bool is_active() const { return is_initialized() || is_running(); } bool is_inactive() const { return is_done() || is_killed() || is_failed(); } // Return the error code. // Zero if not in the FAILED state. int error() const { return m_error; } // Return the job return status. int return_status() const { return m_rstat; } // Return the run script. std::string run_script() const { return m_run_script; } // Create the the job archive. // Returns 0 for success. int create_archive() const; // Return if the job has an archive. bool has_archive() const; // Open the job archive. // Unpacks the files into a local directory. // dir - location where the unpacking should occur // overwrite - if true, existing directory may be deleted // Returns the full path to the directory: // dir/ // Returns blank for error. // Caller is responsible for managing, i.e. deleting the returned // directory. std::string open_archive(std::string dir =".", bool overwrite =false) const; // Write to XML. const XmlElement* xml() const; // Output stream. virtual std::ostream& ostr(std::ostream& lhs) const; // Output stream with HTML links. // No HTML links are added if the URL map is empty. virtual std::ostream& ostr(std::ostream& lhs, const UrlMap& urls) const; // Display. void display() const; // Web page. // Base URL map includes: // job = URL for the job page // subjobs = URL for the repository holding subjobs Text web_page(const UrlMap& urls, std::string entry) const; // Create a wrapper for a local job. // This writes GSI credentials to jobdir/private/cred.dat and // creates a wrapper that defines environemnt to use these // credentials. // Zero for success. int create_local_run_script_wrapper(); }; } // end namespace dial // Output stream. // Calls rhs.ostr(lhs) std::ostream& operator<<(std::ostream& lhs, const dial::Job& rhs); #endif