// CompoundJob.cxx #include "dataset_util/FileStatus.h" #include #include #include #include #include #include "dataset_util/getcwd.h" #include "dataset_util/mkdir.h" #include "dataset_util/copy_file.h" #include "dataset_util/get_hostname.h" #include "dataset_util/DtdRegistry.h" #include "dataset_base/DatasetRepository.h" #include "dataset_split/DatasetMerger.h" #include "dataset_xml/XmlParser.h" #include "dial_job/JobRepository.h" #include "dial_sched/Scheduler.h" #include "dial_sched/CompoundJob.h" using std::string; using std::cout; using std::cerr; using std::endl; using std::ostream; using std::ofstream; using std::istringstream; using std::ostringstream; using std::endl; using dset::Dataset; using dset::DatasetList; using dset::DatasetRepository; using dset::DatasetMergeResult; using dset::DatasetMerger; using dial::Job; using dial::JobRepository; using dial::CompoundJob; #include "dataset_split/CompoundDatasetMergerCreator.h" using dset::CompoundDatasetMergerCreator; //********************************************************************** // Local definitions. //********************************************************************** namespace { // Maximum number of jobs to display. unsigned int MAX_PRINT_JOBS = 20; // Print debug messages. bool DEBUG = false; static dset::CompoundDatasetMergerCreator merger_creator; int maxlooptime = 8; // Convert int to string. string mstr(int i) { ostringstream sstr; sstr << i; return sstr.str(); } // Function indicating if we should update the result after each append. // This guarantees the result is always up to date but can waste a lot // of time and space. bool update_result_after_append() { return FileStatus("CompoundJob_update_result_after_append").exists(); } // Function indicating if result should be updated after we time out of // the subjob loop. bool update_result_after_timeout() { return FileStatus("CompoundJob_update_result_after_timeout").exists(); } } // end unnamed namespace //********************************************************************** // Member functions. //********************************************************************** // Record a message. void CompoundJob::msg(string iline) const { if ( m_plog ) { string line = "CompoundJob: " + iline; m_plog->append(line); } } //********************************************************************** // Update compound job repository. void CompoundJob::update_repository() { if ( m_pcjr!=0 && m_pcjr->is_valid() ) { if ( DEBUG ) { cout << "CJ: Updating compound job repository with job " << id().to_string() << endl; } int rstat = m_pcjr->remove(id()); if ( rstat ) { cout << "CJ: Remove failed: " << m_pcjr->error_message() << endl; } JobId chkid = m_pcjr->insert(this, false); if ( ! chkid.is_valid() ) { cout << "CJ: Insert failed: " << m_pcjr->error_message() << endl; cout << *m_pcjr << endl; } assert( chkid == id() ); } } //********************************************************************** // Default constructor. CompoundJob::CompoundJob() : m_plog(0) { assert( ! is_valid() ); } //********************************************************************** // Constructor. CompoundJob::CompoundJob( JobId jid, const Application& app, const Task& tsk, const Dataset& dst, const JobPreferences& prf, string jobdir, JobIdList::size_type maxjobs, JobIdList::size_type maxstart, JobRepository* pcjr, Scheduler* psch) : Job("CompoundJob", jid, app, tsk, dst, prf, jobdir, ""), m_pmrg(merger_creator.create(jobdir)), m_maxjobs(maxjobs), m_maxstart(maxstart), m_plog(0), m_pcjr(pcjr), m_psch(psch), m_pjob_task(0) { DEBUG = FileStatus("debug_CompoundJob").is_readable(); if ( DEBUG ) { cout << "CJ: " << "Creating new job " << endl; } // Current node is the run host. base_set_run_host(get_hostname()); // Check the maximum number of jobs. if ( m_maxjobs < 1 || m_maxjobs > 10000 ) { m_maxjobs = 10000; } // Insert in repository. update_repository(); } //********************************************************************** // Constructor. CompoundJob::CompoundJob(const Job& job, JobIdList::size_type maxjobs, JobIdList::size_type maxstart, JobRepository* pcjr, Scheduler* psch) : Job(job), m_pmrg(merger_creator.create(job_directory())), m_maxjobs(maxjobs), m_maxstart(maxstart), m_plog(0), m_pcjr(pcjr), m_psch(psch), m_pjob_task(0) { DEBUG = FileStatus("debug_CompoundJob").is_readable(); if ( DEBUG ) { cout << "CJ: " << "Creating copy of job " << id() << endl; } string hname = get_hostname(); // Fill subjob info. assert( m_psch != 0 ); JobRepository* psjr = m_psch->job_repository(); const JobIdList& sjids = subjobs(); for ( JobIdList::const_iterator ijid=sjids.begin(); ijid!=sjids.end(); ++ijid ) { JobId jid = *ijid; if ( DEBUG ) { cout << "CJ: " << " Fetching subjob " << jid << endl; } assert( m_psch != 0 ); // Normal case: slave scheduler has subjob. if ( m_psch->has_job(jid) ) { Job* pjob = &m_psch->job(jid);; assert( pjob != 0 ); assert( pjob->is_valid() ); m_jobs[jid] = pjob; m_job_status[jid] = pjob->status(); const Dataset* pres = result(); if ( pres != 0 ) { m_results.push_back(pres); } // It is possible that subjob awas deleted. // Allow this as long as job is not running. // We could be smarter and fail the job in this case. } else { cout << "CJ: " << " Unable to find subjob " << jid << endl; assert( ! job.is_active() ); m_jobs[jid] = 0; m_job_status[jid] = Job::INVALID; } // Find the next job to start. if ( job.is_active() ) { for ( m_nextjob=base_subjobs().begin(); m_nextjob!=base_subjobs().end(); ++m_nextjob ) { JobId jid = *ijid; const Job* pjob = m_jobs[jid]; if ( pjob!=0 && pjob->is_initialized() ) break; } } else { m_nextjob = base_subjobs().end(); } // Check job statuses from base class. for ( JobIdList::const_iterator ijid=running_subjobs().begin(); ijid!=running_subjobs().end(); ++ijid ) { JobId jid = *ijid; Job::Status jstat = m_job_status[*ijid]; Job::Status truestat = Job::RUNNING; if ( jstat != truestat ) { if ( DEBUG ) { cout << "CJ: " << " Subjob " << jid << " is " << Job::status_to_string(jstat) << " instead of " << Job::status_to_string(truestat) << endl; } m_job_status[jid] = Job::RUNNING; } } } // Fetch the task job. const JobIdList& tjobs = category_subjobs("build"); if ( tjobs.size() ) { assert( tjobs.size() == 1 ); JobId tjid = tjobs.front(); assert( tjid.is_valid() ); Job* ptjob = subjob(tjid); m_pjob_task = ptjob; if ( ptjob != 0 ) { assert( ptjob->is_valid() ); } } } //********************************************************************** // Destructor. CompoundJob::~CompoundJob() { // Delete the results owned by the compound job. for ( DatasetList::const_iterator idst=m_owned_results.begin(); idst!=m_owned_results.end(); ++idst ) { delete *idst; } delete m_pmrg; } //********************************************************************** // Add the task build subjob. int CompoundJob::add_task_job(Job& job) { if ( ! is_initialized() ) return 1; lock_mutex(); m_pjob_task = &job; base_category_subjobs()["build"].push_back(job.id()); base_subjobs().push_back(job.id()); m_jobs[job.id()] = &job; m_job_status[job.id()] = job.status(); unlock_mutex(); return 0; } //********************************************************************** // Add a subjob. int CompoundJob::add_job(Job& job) { if ( ! is_initialized() ) return 1; if ( ! job.is_initialized() ) return 2; lock_mutex(); base_subjobs().push_back(job.id()); base_category_subjobs()["process"].push_back(job.id()); m_jobs[job.id()] = &job; m_job_status[job.id()] = job.status(); unlock_mutex(); return 0; } //********************************************************************** // Start the job. // We cheat a little and allow entry here while running to submit // jobs that have not yet been submitted. int CompoundJob::start() { DEBUG = FileStatus("debug_CompoundJob").is_readable(); if ( DEBUG ) { cout << "CJ: " << "Starting " << id().to_string() << endl; } // Record the time at which we enter this routine. time_t time0 = time(0); // Flag indicating if mutex should be unlocked on exit. bool unlock = true; if ( is_initialized() ) { if ( DEBUG ) { cout << "CJ: is initialized" << endl; } lock_mutex(); if ( DEBUG ) { cout << "CJ: locked mutex" << endl; } base_set_running(); m_nextjob = base_subjobs().begin(); // Fail if there are no subjobs. if ( base_subjobs().size() == 0 ) { return fail(10, unlock); merger().close(); } // Set the substate. if ( m_pjob_task != 0 ) { base_set_substate("building"); cout << "CJ: set substate to building" << endl; if ( m_pjob_task->is_running() ) { base_running_subjobs().push_back(m_pjob_task->id()); } } else { base_set_substate("processing"); cout << "CJ: set substate to processing" << endl; } update_repository(); } else if ( is_running() ) { if ( DEBUG ) { cout << "CJ: is running" << endl; } // We do not lock mutex here because this call should be internal // and only come when the mutex is already locked. unlock = false; } else { return 1; } // If there is a task subjob... if ( m_pjob_task != 0 ) { if ( DEBUG ) { cout << "CJ: checking task job" << endl; } // Start task job if needed. if ( m_pjob_task->is_initialized() ) { if ( DEBUG ) { cout << "CJ: starting task job" << endl; } int sstat = m_pjob_task->start(); if ( sstat != 0 ) { if ( DEBUG ) { cout << "CJ: start failed" << endl; } return fail(11, unlock); } base_set_substate("building"); base_running_subjobs().push_back(m_pjob_task->id()); update_repository(); } // Exit if task job is building. if ( m_pjob_task->is_running() ) { if ( DEBUG ) { cout << "CJ: wait for task job to finish" << endl; } if ( unlock ) { unlock_mutex(); } return 0; } // If we get here, the task job is inactive. if ( find(substates().begin(), substates().end(), "building") != substates().end() ) { // Remove task job from list of running jobs. base_running_subjobs().erase( remove(base_running_subjobs().begin(), base_running_subjobs().end(), m_pjob_task->id()), base_running_subjobs().end() ); // If task job was successful, exit the building state and add it // to the list of done jobs. if ( m_pjob_task->is_done() ) { base_unset_substate("building"); base_done_subjobs().push_back(m_pjob_task->id()); // Otherwise, fail the compound job. } else { if ( DEBUG ) { cout << "CJ: task job failed:" << endl; cout << *this << endl; } if ( unlock ) { unlock_mutex(); } base_failed_subjobs().push_back(m_pjob_task->id()); return fail(12, unlock); } } } // Process more subjobs. if ( DEBUG ) { cout << "CJ: Start subjobs" << endl; } unsigned int istart = 0; for ( ; m_nextjob!=base_subjobs().end(); ++m_nextjob ) { if ( base_running_subjobs().size() >= m_maxjobs ) { break; } if ( istart > m_maxstart ) break; time_t looptime = time(0) - time0; if ( istart>0 && looptime > maxlooptime ) break; ++istart; JobIdList::const_iterator ijob = m_nextjob; JobId sjid = *ijob; Job& sjob = *subjob(sjid); // Skip the task subjob. if ( &sjob == m_pjob_task ) continue; if ( DEBUG ) { cout << "CJ: Start subjob " << sjid.to_string() << endl; } base_set_substate("processing"); int start_stat = sjob.start(); if ( start_stat != 0 ) { cout << "CJ: Subjob start failed with error " << mstr(start_stat) << endl; msg("Subjob start failed with error " + mstr(start_stat)); return fail(13, unlock); } base_running_subjobs().push_back(sjid); m_job_status[sjid] = Job::RUNNING; } update_repository(); if ( unlock ) { unlock_mutex(); } return error(); } //********************************************************************** // Update job. int CompoundJob::update() { lock_mutex(); DEBUG = FileStatus("debug_CompoundJob").is_readable(); if ( DEBUG ) { cout << "CJ: " << "Updating " << id().to_string() << endl; } if ( is_initialized() ) { if ( DEBUG ) { cout << "CJ: Starting job" << endl; } int stat = start(); if ( DEBUG ) { cout << "CJ: Started job" << endl; } base_set_update(); unlock_mutex(); if ( stat != 0 ) return 200 + stat; return 0; } if ( DEBUG ) { cout << "CJ: Check if running" << endl; } if ( ! is_running() ) { unlock_mutex(); return 1; } // If we are not processing, then call start() to start or // check task job. if ( ! in_substate("processing") ) { cout << "CJ: Not processing: call start" << endl; if ( in_substate("building") ) { assert( m_pjob_task != 0 ); if ( m_pjob_task->is_active() ) { cout << "CJ: Building: update task job" << endl; m_pjob_task->update(); JobId sjid = m_pjob_task->id(); if ( m_psch != 0 ) { JobRepository* psjr = m_psch->job_repository(); if ( psjr!=0 && psjr->is_valid() ) { if ( DEBUG ) { cout << "CJ: Updating subjob repository with job " << sjid << endl; } int rstat = psjr->remove(sjid); if ( rstat ) { if ( psjr->has(sjid) ) { cout << "CJ: Remove failed: " << psjr->error_message() << endl; } } JobId chkid = psjr->insert(m_pjob_task, false); if ( ! chkid.is_valid() ) { cout << "CJ: Insert failed: " << psjr->error_message() << endl; } assert( chkid == sjid ); } } else { if ( DEBUG ) { cout << "CJ: Subjob scheduler is null" << endl; } } } } base_set_update(); unlock_mutex(); return start(); } //msg("Updating"); // Count calls with no change in job status. // This is used as basis to decide when to update result. // Probably should keep separate counter for each thread. static int no_change_count = 0; ++no_change_count; // Record current time. time_t time0 = time(0); if ( DEBUG ) { cout << "CJ: Starting update" << endl; } bool status_change = false; if ( DEBUG ) { cout << "CJ: Checking status [" << time(0) - time0 << "]" << endl; } // Loop over process subjobs. bool loop_timeout = false; for ( JobIdList::iterator ijid=base_running_subjobs().begin(); ijid!=base_running_subjobs().end(); ) { JobId sjid = *ijid; assert( sjid.is_valid() ); Job* pjob = subjob(sjid); if ( pjob == 0 ) { cerr << "CompoundJob: Unable to find subjob with ID " << sjid << endl; assert( false ); } Job& sjob = *pjob; // Update job. if ( DEBUG ) { cout << "CJ: Updating subjob " << sjob.id().to_string() << endl; } sjob.update(); if ( m_psch != 0 ) { JobRepository* psjr = m_psch->job_repository(); if ( psjr!=0 && psjr->is_valid() ) { if ( DEBUG ) { cout << "CJ: Updating subjob repository with job " << sjid << endl; } int rstat = psjr->remove(sjid); if ( rstat ) { cout << "CJ: Remove failed: " << psjr->error_message() << endl; } JobId chkid = psjr->insert(pjob, false); if ( ! chkid.is_valid() ) { cout << "CJ: Insert failed: " << psjr->error_message() << endl; } assert( chkid == sjid ); } } else { if ( DEBUG ) { cout << "CJ: Subjob scheduler is null" << endl; } } Job::Status oldstat = m_job_status[sjid]; Job::Status newstat = sjob.status(); // If job status has changed... if ( newstat != oldstat ) { if ( DEBUG ) { cout << "CJ: Subjob status changed" << endl; } status_change = true; // Remove the job from the old status list. switch ( oldstat ) { case Job::RUNNING: if ( DEBUG ) { cout << "CJ: Was running" << endl; } ijid = base_running_subjobs().erase(ijid); break; default: if ( DEBUG ) { cout << "CJ: Was not running" << endl; } //std::cerr << "CompoundJob::update(): Unrecognized old status" // << std::endl; //std::cerr << sjob << std::endl; merger().close(); kill(); unlock_mutex(); return fail(21, true); } // Add the job to the new status list. if ( newstat == Job::DONE ) { if ( DEBUG ) { cout << "CJ: Is done" << endl; } base_done_subjobs().push_back(sjid); } else if ( newstat == Job::FAILED ) { // For now, kill job if any subjobs fail. if ( DEBUG ) { cout << "CJ: Is failed" << endl; } base_failed_subjobs().push_back(sjid); // Fetch the max # retries from the job preferences. int max_retry = 0; string smax_retry = preferences().extract("max_retry"); if ( smax_retry.size() ) { istringstream ssmax_retry(smax_retry); ssmax_retry >> max_retry; } // If there are not already too many retries, retry this jobs. int nretry = category_subjobs("retry").size(); cout << "CJ: #/max retries: " << nretry << "/" << max_retry << endl; if ( nretry < max_retry && m_psch != 0 ) { ++nretry; if ( DEBUG ) { cout << "CJ: Retry " << nretry << " of " << max_retry << endl; } JobId newid = m_psch->submit(*sjob.application(), *sjob.task(), *sjob.dataset(), sjob.preferences()); Job& newjob = m_psch->job(newid); if ( newid.is_valid() ) { if ( DEBUG ) { cout << "CJ: Retry ID is " << newid << endl; } base_subjobs().push_back(newid); base_category_subjobs()["process"].push_back(newid); m_jobs[newid] = &newjob; m_job_status[newid] = newjob.status(); base_category_subjobs()["retry"].push_back(newid); // Find the next job to start. // (Pushing a new job may trigger a resize and invalidate // the iterator.) for ( m_nextjob=base_subjobs().begin(); m_nextjob!=base_subjobs().end(); ++m_nextjob ) { JobId jid = *m_nextjob; const Job* pjob = m_jobs[jid]; assert( pjob != 0 ); if ( pjob->is_initialized() ) { if ( DEBUG ) { cout << "CJ: Next job to start is " << jid << endl; } break; } } assert( m_nextjob != base_subjobs().end() ); continue; } else { cout << "CJ: Retry submission failed" << endl; } } kill(); unlock_mutex(); return fail(23); } else if ( newstat == Job::KILLED ) { // For now, kill job if any subjobs are killed. if ( DEBUG ) { cout << "CJ: Is killed" << endl; } base_killed_subjobs().push_back(sjid); kill(); unlock_mutex(); return fail(24); } else { if ( DEBUG ) { cout << "CJ: Is not done, failed or killed!" << endl; } merger().close(); kill(); unlock_mutex(); return fail(22, true); } // Update the job status record. m_job_status[sjid] = newstat; // If the subjob is newly done, then append its result. if ( newstat == Job::DONE ) { const Dataset* pres = 0; if ( sjob.has_result() ) { pres = pjob->result(); // We expect the result to be locked. if ( ! pres->is_locked() ) { merger().close(); kill(); unlock_mutex(); return fail(32, true); } // If result does not have a global ID clone it and use it // in place of the original. // This assigns a new (and presumably global) id. // We should also reset the parent here. if ( ! pres->id().is_global() ) { Dataset* pres_new = pres->clone(job_directory()); if ( pres == 0 ) { merger().close(); kill(); return fail(33, true); } pres_new->lock(); pres = pres_new; m_owned_results.push_back(pres); } if ( ! pres->id().is_global() ) { std::cerr << "CompoundJob::update(): Result has local ID." << std::endl; merger().close(); kill(); return fail(34, true); } } // Update the processed event count. base_set_processed_event_count( processed_event_count() + sjob.processed_event_count() ); if ( pres != 0 ) { // Append to the merged result. m_jobresmap[pres] = pjob; if ( DEBUG ) { cout << "CJ: Appending to result" << endl; } int mstat = merger().append(*pres); if ( mstat != 0 ) { ostringstream sserr; sserr << mstat; msg("Result merge failed with error " + sserr.str() + "."); if ( DEBUG ) { cout << "CJ: Result merge failed with error " << mstat << endl; cout << "CJ: Killing job" << endl; } merger().close(); kill(); return fail(35, true); } } // Time out of this loop. time_t looptime = time(0) - time0; if ( looptime > maxlooptime && ijid != base_running_subjobs().end() ) { if ( DEBUG ) { cout << "CJ: Timing out of subjob loop" << endl; } loop_timeout = true; break; } if ( sjob.has_result() ) { // Give chance to view state. if ( update_result_after_append() ) { if ( DEBUG ) { cout << "CJ: Updating result..."; cout.flush(); } status_change |= update_result(); if ( DEBUG ) { cout << "update complete" << endl; } Job::base_set_update(); update_repository(); unlock_mutex(); if ( is_failed() ) return error(); lock_mutex(); } } } } else { ++ijid; } } if ( DEBUG ) { cout << "CJ: End subjob loop [" << time(0) - time0 << "]" << endl; } // Submit more jobs. if ( m_nextjob != base_subjobs().end() ) { if ( DEBUG ) { cout << "CJ: Submitting new jobs" << endl; } int stat = start(); if ( stat != 0 ) { merger().close(); kill(); return fail(37, true); } } // Update result if // 1. loop ends without timeout or // 2. flag to update after timeout is set or // 3. the number of results has doubled since the last update bool update = ! loop_timeout || update_result_after_timeout() || m_jobresmap.size() >= 2*m_results.size(); // Only update if their are done subjobs. update &= m_jobresmap.size() > 0; if ( update ) { if ( ! in_substate("merging") ) { base_set_substate("merging"); } if ( DEBUG ) { cout << "CJ: Updating result..."; cout.flush(); } status_change |= update_result(); if ( DEBUG ) { cout << "update complete" << endl; } Job::base_set_update(); update_repository(); unlock_mutex(); if ( is_failed() ) return error(); lock_mutex(); } else { if ( DEBUG ) { cout << "CJ: Skipping result update" << endl; } } // If there are no more running subjobs and none to start, exit the // processing substate. if ( status_change && running_subjobs().size() == 0 && m_nextjob == base_subjobs().end() ) { base_unset_substate("processing"); // If all processing done jobs are included in the result, exit the merging // substate. if ( result_subjobs().size() == done_subjobs().size() - category_subjobs("build").size() ) { base_unset_substate("merging"); } } // If there is no substate (i.e. not processing or merging), we are done. if ( status_change && substates().size() == 0 ) { if ( DEBUG ) { cout << "CJ: Fetching result [" << time(0) - time0 << "]" << endl; } // Write the XML description of the result. if ( has_result() ) { XmlParser parser; assert( result() != 0 ); const XmlElement* pxres = result()->xml(); if ( pxres != 0 ) { parser.write(job_directory()+"/result.xml", *pxres); delete pxres; } } // Put the result into the repository. if ( result() != 0 ) { DatasetId did = result()->id(); DatasetRepository& dr = DatasetRepository::default_instance(); if ( ! dr.has(did) ) { if ( DEBUG ) { cout << "CJ: Inserting result " << did << endl; } DatasetId chkid = dr.insert(result(), false); if ( chkid != did ) { if ( DEBUG ) { cout << "CJ: Insertion failed " << did << endl; } cerr << "Job::update: " << "Unable to insert result" << endl; } } else if ( DEBUG ) { cout << " CJ: Result is already in respository" << endl; } } // Set the state. if ( done_subjobs().size() + failed_subjobs().size() == subjobs().size() ) { base_set_done(); } else { return fail(38, true); } merger().close(); // Write the job description to job.log. if ( job_directory().size() ) { { string jfile = job_directory() + "/job.log"; ofstream jstr(jfile.c_str()); jstr << *this << endl; jstr << endl; jstr << subjobs() << endl; } } } Job::base_set_update(); update_repository(); if ( DEBUG ) { cout << "CJ: " << "Finished update of " << id().to_string() << " [" << time(0) - time0 << "]" << endl; } unlock_mutex(); return 0; } //********************************************************************** // Kill. int CompoundJob::kill(int err) { // Lock. lock_mutex(); // Loop over jobs and kill any active jobs. int stat = 0; if ( DEBUG ) { cerr << "CJ: Killing " << id() << endl; } for ( JobIdList::iterator ijid=base_subjobs().begin(); ijid!=base_subjobs().end(); ++ijid ) { JobId sjid = *ijid; Job* pjob = subjob(sjid); if ( pjob == 0 ) { if ( ! stat ) stat = 1; continue; } assert( pjob != 0 ); // If subjob is active, then kill it. if ( pjob->is_active() ) { if ( DEBUG ) { cerr << "CJ: Killing active subjob " << pjob->id() << endl; } // Kill int kstat = pjob->kill(101); if ( kstat != 0 ) { if ( DEBUG ) { cerr << "CJ: kill failed" << endl; } if ( ! stat ) stat = 2; continue; } if ( pjob->is_active() ) { if ( DEBUG ) { cerr << "CJ: subjob is still active" << endl; cerr << "CJ: Subjob:" << endl; cerr << *pjob << endl; } if ( ! stat ) stat = 4; continue; } // Remove from list of running subjobs. JobIdList::iterator ijid = find(base_running_subjobs().begin(), base_running_subjobs().end(), sjid); if ( ijid != base_running_subjobs().end() ) { base_running_subjobs().erase(ijid); } // Append to thes list of killed jobs. base_killed_subjobs().push_back(sjid); if ( DEBUG ) { cerr << "CJ: new kill count is " << killed_subjobs().size() << endl; } } else { if ( DEBUG ) { cerr << "CJ: Skipping inactive subjob " << pjob->id() << endl; } } } // Set status. if ( stat == 0 ) { base_set_killed(err); } else { base_set_failed(41); } update_repository(); merger().close(); // Unlock and exit. unlock_mutex(); return stat; } //********************************************************************** // Fail the job. int CompoundJob::fail(int err, bool unlock) { base_set_failed(err); update_repository(); // Unlock and exit. if ( unlock ) { unlock_mutex(); } return err; } //********************************************************************** // Set log. void CompoundJob::set_log(Text& log) { m_plog = &log; } //********************************************************************** // Update the result. bool CompoundJob::update_result() { // Ask the merger to update its result. DatasetMergeResult dmres = merger().result(); // Check the result. if ( ! dmres.is_valid() ) { msg("Merger returned invalid result!!!"); ostringstream smsg; smsg << " Dataset merger returned error " << dmres.error(); msg(smsg.str()); kill(); base_set_failed(51); return false; } // If result is unchanged, return false. if ( dmres == m_dmres ) return false; // Loop over merged results and update the lists of result datasets // and jobs with results. const DatasetList rdsts = dmres.merged_datasets(); unsigned int evcount = 0; m_results.erase(m_results.begin(), m_results.end()); base_result_subjobs().erase( base_result_subjobs().begin(), base_result_subjobs().end()); for ( DatasetList::const_iterator irdst=rdsts.begin(); irdst!=rdsts.end(); ++irdst ) { const Dataset* pdst = *irdst; JobResultMap::const_iterator ijres = m_jobresmap.find(pdst); if ( ijres == m_jobresmap.end() ) { msg("Dataset not present in job result map!!!"); base_set_failed(52); merger().close(); return false; } Job* pjob = ijres->second; base_result_subjobs().push_back(pjob->id()); m_results.push_back(pdst); evcount += pjob->processed_result_event_count(); } // Set the result event count. base_set_processed_result_event_count(evcount); // Set the result dataset. base_set_result(dmres.dataset(), false); // Return that status has changed. m_dmres = dmres; return true; } //********************************************************************** // Return a subjob. Job* CompoundJob::subjob(JobId jid) { JobMap::iterator ijob = m_jobs.find(jid); if ( ijob == m_jobs.end() ) return 0; return ijob->second; } //********************************************************************** // Return the merger. DatasetMerger& CompoundJob::merger() const { assert( m_pmrg != 0 ); return *m_pmrg; } //**********************************************************************