// dial_submit.cxx // David Adams // August 2003 // // Main program that starts a dial scheduler, runs a job and fetches // the result. #include #include #include #include #include #include "dataset_util/Environment.h" #include "dataset_util/FileStatus.h" #include "dataset_util/DtdRegistry.h" #include "dataset_xml/XmlParser.h" #include "dataset_base/DatasetCreator.h" #include "dataset_base/DatasetRepository.h" #include "dataset_split/SimpleEventDatasetSplitter.h" #include "dial_task/Task.h" #include "dial_app/Application.h" #include "dial_job/ChildWatcher.h" #include "dial_job/ProcessJobCreator.h" #include "dial_lsf/LsfJobCreator.h" #include "dial_condor/CondorJobCreator.h" #include "dial_condor/CondorCodJobCreator.h" #include "dial_sched/JobUpdater.h" #include "dial_sched/LocalScheduler.h" #include "dial_sched/CompoundJob.h" #include "dial_sched/MasterScheduler.h" #include "dial_ws_sched/WsClientScheduler.h" #include "dial_com/dial_init.h" #include "dial_com/registrations.h" using std::string; using std::cout; using std::endl; using std::istringstream; using std::ofstream; using dset::Dataset; using dset::DatasetCreator; using dset::DatasetSplitter; using dset::SimpleEventDatasetSplitter; using dset::DatasetRepository; using dial::Task; using dial::Application; using dial::ChildWatcher; using dial::JobCreator; using dial::ProcessJobCreator; using dial::CompoundJob; using dial::Scheduler; using dial::LocalScheduler; using dial::MasterScheduler; using dial::JobId; using dial::Job; using dial::LsfJobCreator; using dial::JobUpdater; using dial::CondorJobCreator; using dial::CondorCodJobCreator; using dial::WsClientScheduler; void msg(string txt) { cout << "---- " << txt << " ----" << endl; } int main(int argc, char* carg[]) { if ( dial::dial_init() ) return 101; // Fetch command name. string arg0 = carg[0]; // Read command line arguments. bool help = false; bool conf = false; for ( int iarg=1; iarg> maxtime; } else if ( opt == "max_subjobs" ) { strval >> max_subjobs; if ( max_subjobs < 1 ) max_subjobs = 1; } else if ( opt == "max_start_jobs" ) { strval >> max_startjobs; if ( max_startjobs < 1 ) max_startjobs = 1; } else if ( opt == "lsf" ) { qtype = "lsf"; qname = val; } else if ( opt == "condor" ) { qtype = "condor"; qname = val; } else if (opt == "condor_cod" ) { qtype = "condor_cod"; qname = val; } else if ( opt == "usercom" ) { qtype = "usercom"; qname = val; } else if ( opt == "looptime" ) { strval >> looptime; if ( looptime < 0 ) looptime = 1; } else if ( opt == "soap" ) { use_local = false; use_master = false; use_soap = true; soap_server = val; } else { badopt = true; } } if ( badopt ) { cout << "Invalid option on configuration line:" << endl; cout << " " << line << endl; return 2; } } } // Display configuration if requested. if ( conf ) { cout << "Configuration:" << endl; if ( use_master ) { cout << " Using master scheduler" << endl; } else if (use_soap ) { cout << " Using soap scheduler " << soap_server << endl; } else { cout << " Using local scheduler" << endl; } cout << " Maximum processing time is " << maxtime << " sec." << endl; return 0; } // Create parser. XmlParser parser; // Write DTD. DtdRegistry::instance("dataset").write(); DtdRegistry::instance("dial").write(); // Open the dataset DB. Environment env; env.sysfill(); string dbdir = env["DIAL_CATALOG_CONF"]; if ( dbdir.size() == 0 ) { cout << "DIAL_CATALOG_CONF must be defined" << endl; return 11; } DatasetRepository& drep = DatasetRepository::default_instance(); if ( ! drep.is_valid() ) { cout << "Invalid dataset repository" << endl; return 13; } if ( use_soap && (auto_update || single_update || manual_update) ) { cout << "No update options may be selected with soap scheduler" << endl; } if ( auto_update && !use_master ) { cout << "auto_update is only valid for master scheduler" << endl; return 14; } // Create scheduler. Scheduler* psch = 0; LocalScheduler* plsch = 0; MasterScheduler* pmsch = 0; WsClientScheduler* pssch = 0; DatasetSplitter* psplit = 0; if ( use_local ) { msg("Create local scheduler"); JobCreator* pjobc = 0; if ( qtype == "fork" ) { cout << "Using fork." << endl; pjobc = new ProcessJobCreator; } else if ( qtype == "lsf" ) { pjobc = new LsfJobCreator(qname); cout << "Using LSF queue " << qname << "." << endl; } else if ( qtype == "condor" ) { pjobc = new CondorJobCreator(qname); cout << "Using Condor queue " << qname << "." << endl; } else if ( qtype == "condor_cod" ) { pjobc = new CondorCodJobCreator(qname); cout << "Using Condor COD Machine file " << qname << "." << endl; } else if ( qtype == "lsrun" ) { pjobc = new ProcessJobCreator("/usr/lsf/bin/lsrun"); cout << "Using lsrun." << endl; } else if ( qtype == "usercom" ) { pjobc = new ProcessJobCreator(qname); cout << "Using lsrun." << endl; } else { cout << "Invalid local queue type: " << qtype << endl; return 15; } plsch = new LocalScheduler(*pjobc, delete_slave_jobs); if ( use_master ) { psplit = new SimpleEventDatasetSplitter; msg("Create master scheduler"); pmsch = new MasterScheduler(*psplit, *plsch, auto_update, delete_master_jobs, max_subjobs, max_startjobs, ""); psch = pmsch; } else { psch = plsch; } } else if ( use_soap ) { msg("Create SOAP client scheduler for " + soap_server); pssch = new WsClientScheduler(soap_server); psch = pssch; single_update = false; manual_update = false; auto_update = false; } assert( psch != 0 ); if ( psch==0 || !psch->is_valid() ) { cout << "Scheduler is invalid" << endl; return 16; } cout << *psch << endl; // Fetch and check the application. const XmlElement* pxapp = parser.parse("app.xml"); if ( pxapp == 0 ) { cout << "Unable to parse app.xml." << endl; return 21; } Application app(*pxapp); delete pxapp; if ( ! app.is_valid() ) { cout << "Invalid application." << endl; return 22; } if ( ! psch->has_application(app) ) { cout << "Unknown application " << app << endl; if ( use_local ) { cout << "Application directory is " << plsch->application_directory(app) << endl; } return 23; } cout << app << endl; // Fetch and check the task. msg("Create task"); const XmlElement* pxtsk = parser.parse("task.xml"); if ( pxtsk == 0 ) { cout << "Unable to parse task.xml." << endl; return 31; } Task tsk(*pxtsk); if ( ! tsk.is_valid() ) { cout << "Invalid task." << endl; return 32; } if ( psch->add_task(app, tsk) != 0 ) { cout << "Unable to install task " << tsk << endl; if ( use_local ) { cout << "Application directory is " << plsch->application_directory(app) << endl; cout << "Task directory is " << plsch->task_directory(app, tsk) << endl; } return 33; } cout << tsk << endl; // Fetch and check the dataset. msg("Create dataset"); const XmlElement* pxdst = parser.parse("dataset.xml"); if ( pxdst == 0 ) { cout << "Unable to parse dataset.xml." << endl; return 41; } DatasetCreator dcreate; const Dataset* pdst = dcreate.create(*pxdst, &drep); if ( pdst == 0 ) { cout << "Creation failed." << endl; return 42; } const Dataset& dst = *pdst; if ( ! dst.is_valid() ) { cout << "Invalid dataset." << endl; return 43; } cout << dst << endl; // Submit job. msg("Submit job"); JobId jid = psch->submit(app, tsk, dst); if (!jid.is_valid()) { cout << "Invalid job id. Job submission failed." << endl; return 44; } // Fetch job. bool is_active = false; msg("Fetch job"); { Job& startjob = psch->job(jid); if ( ! startjob.is_valid() ) { cout << "Scheduler produced invalid job." << endl; return 51; } is_active = startjob.is_active(); } // Update job. msg("Update job"); int nup = 0; int nup0 = 0; int maxloop = 10000; time_t time_start = time(0); unsigned long naup0 = 0; for ( int count=0; count<=maxloop && is_active; ++count ) { Job& job = psch->job(jid); is_active = job.is_active(); cout << endl << "----- " << count << endl; cout << job << endl; if ( single_update || manual_update ) { cout << "# manual updates = " << nup << " (" << nup - nup0 << ")" << endl; } nup0 = nup; // Record # auto updates. if ( auto_update ) { unsigned long naup = pmsch->updater()->loop_count(); cout << "# auto updates = " << naup << " (" << naup - naup0 << ")" << endl; naup0 = naup; } // Do single update. if ( single_update ) { job.update(); ++nup; } // Do manual updates. if ( manual_update ) { time_t time0 = time(0); time_t dt = 0; while ( job.is_active() && dt < looptime ) { job.update(); ++nup; dt = time(0) - time0; sched_yield(); } } // Pause unless manual update has already done so. if ( ! manual_update ) { sleep(looptime); } // Do soap update. if ( use_soap ) { job = psch->job(jid); } // Break if there was no update. if ( !use_soap && !single_update && !manual_update && !auto_update ) { cout << "No update enabled!" << endl; break; } // Break if the job has run too long. if ( (time(0) - time_start) > maxtime ) { cout << "\n----- Job has exceeded time limit" << endl; break; } } Job& endjob = psch->job(jid); // Kill the job if it has not finished. cout << endl; if ( endjob.is_active() ) { msg("Final job update"); endjob = psch->job(jid); } unsigned int errstat = 0; if ( endjob.is_active() ) { cout << "Killing job because it is too slow" << endl; endjob.kill(); if ( endjob.is_killed() ) { cout << "Job kill successful" << endl; errstat = 61; } else { cout << "Job kill failed. Give up." << endl; errstat = 62; } } else { cout << "Job finished." << endl; } cout << endjob << endl; cout << endl; msg("Writing job summary to job.log"); { system("rm -f job.log sch.log"); ofstream jout("job.log"); jout << endjob << endl; CompoundJob* pcjob = dynamic_cast(&endjob); if ( pcjob != 0 ) jout << pcjob->subjobs() << endl; MasterScheduler* pmsch = dynamic_cast(psch); if ( pmsch != 0 ) { ofstream sout("sch.log"); sout << pmsch->log() << endl; } ofstream chout("proc.log"); ChildWatcher::print(chout, 1); chout << endl; } if ( errstat != 0 ) { return errstat; } if ( ! endjob.is_done() ) { cout << "Job failed" << endl; return 71; } cout << endl; msg("Job successful, result follows"); cout << endjob.result() << endl; delete pssch; delete pmsch; delete plsch; delete psplit; return 0; }