// MasterScheduler_t.cxx #include "dial_sched/MasterScheduler.h" #include "dial_sched/LocalScheduler.h" #include #include #include #include #include "dataset_util/FileName.h" #include "dataset_util/FileStatus.h" #include "dataset_util/getcwd.h" #include "dataset_util/mkdir.h" #include "dataset_util/Path.h" #include "dataset_util/Environment.h" #include "dataset_util/CommandLine.h" #include "dataset_util/DtdRegistry.h" #include "dataset_util/FileFinder.h" #include "dataset_util/XmlElement.h" #include "dataset_id/SimpleUniqueIdGenerator.h" #include "dataset_id/ContentIdList.h" #include "dataset_base/Dataset_t.h" #include "dataset_base/DatasetRepository.h" #include "dataset_split/NoSplitDatasetSplitter.h" #include "dataset_xml/XmlParser.h" #include "dataset_credential/CredentialSelectionCatalog.h" #include "dataset_credential/GssCredentialManager.h" #include "dial_job/ProcessJob.h" #include "dial_job/ProcessJobCreator.h" #include "dial_job/JobRepository.h" #include "dial_sched/CompoundJob.h" #define WITH_GSI #include "gsoap_gsi/gsi.h" using std::ostream; using std::cout; using std::endl; void msg(const char* msg) { cout << "----- "; cout << msg; cout << " -----" << endl; } using std::string; using dset::Content; using dset::NoSplitDatasetSplitter; using dset::CredentialSelectionCatalog; using dset::GssCredential; using dset::GssCredentialManager; using dial::Task; using dial::Application; using dial::JobId; using dial::JobPreferences; using dial::Job; using dial::ProcessJobCreator; using dial::JobRepository; using dial::CompoundJob; using dial::Scheduler; using dial::LocalScheduler; using dial::MasterScheduler; string DSEP = "/"; int client_authorization(struct soap* psoap) { struct gsi_plugin_data* pgsiplug = (struct gsi_plugin_data *) soap_lookup_plugin(psoap, GSI_PLUGIN_ID); string service_identity = pgsiplug->server_identity; cout << "GSI ID: " << service_identity << endl; return 0; } int run_MasterScheduler_t(bool aup) { msg("Clean up previous run"); system("rm -rf datset.dtd dial.dtd code.cxx dial result.xml"); system("rm -f resolver.dat dr.dat jr.dat"); msg("Create dataset repository"); assert( dset::DatasetRepository::create_default_instance() == 0 ); msg("Fetch default instance of CSC."); assert( CredentialSelectionCatalog::create_default_instance() == 0 ); CredentialSelectionCatalog& sc = CredentialSelectionCatalog::default_instance(); cout << sc << endl; assert( sc.is_valid() ); string owner = "me"; assert( GssCredentialManager::set_default() == 0 ); assert( GssCredentialManager::set_owner("me") == 0 ); msg("Create job repository"); assert( JobRepository::create_default_instance() == 0 ); msg("Create job creator"); ProcessJobCreator cre; cout << cre << endl; msg("Create DIAL directories"); system("rm -rf dial"); assert( mkdir("dial") == 0 ); msg("Create application"); Application app = Application::test_instance(); cout << app << endl; assert( app.is_valid() ); msg("...application"); string appdir = "./dial/apps"; assert( mkdir(appdir) == 0 ); string catdir = appdir + "/task_categories"; appdir += DSEP + app.id().to_string(); assert( mkdir(appdir) == 0 ); msg("Create result in advance"); string resfile = getcwd() + "/result.xml"; // Write result locally. { TestDataset res; res.lock(); cout << res << endl; const XmlElement* pxres = res.xml(); assert( pxres != 0 ); XmlParser parser; assert( parser.write(resfile, *pxres) == 0 ); } // Write run script. Text runtxt; runtxt.append("/bin/touch out.dat"); runtxt.append("cp " + resfile + " ."); string runfile = appdir + + "/run"; runtxt.write(runfile); string chcom = "chmod +x " + runfile; system(chcom.c_str()); cout << runtxt << endl; // Write task category string catname = "gcc1"; { Text txt; txt.append(catname); assert( txt.write(appdir+"/task_category.dat") == 0); } // Write compile command file. Path exepath; exepath.getenv("PATH"); assert( exepath.size() != 0 ); Text buildcom; Path gpath = exepath.find("g++"); assert( gpath.size() > 0 ); string gfile = gpath.front() + DSEP + "g++"; assert( FileStatus(gfile).is_executable() ); buildcom.append("/bin/cp code.cxx task.cxx"); buildcom.append(gfile + " -g -shared -o task.so task.cxx"); buildcom.append("touch task_installed"); string buildexe = appdir + "/build_task"; buildcom.write(buildexe); string chmod_buildexe = "chmod +x " + buildexe; system(chmod_buildexe.c_str()); msg("...task"); string tskdir = "./dial/tasks"; assert( mkdir(tskdir) == 0 ); msg("...job"); string jobdir = "./dial/jobs"; assert( mkdir(jobdir) == 0 ); msg("Create splitter"); NoSplitDatasetSplitter splitter; msg("Create default scheduler"); { LocalScheduler lsch(cre, false); cout << lsch << endl; MasterScheduler msch(splitter, lsch, aup, true, 100, 10, "abcd"); cout << msch << endl; cout << "Repository: " << msch.job_repository_connection() << endl; assert( msch.job_repository_connection() == "abcd" ); } msg("Create local scheduler"); Environment env; env["DIAL_APPS"] = "./dial/apps"; env["DIAL_TASKS"] = "./dial/tasks"; env["DIAL_JOBS"] = "./dial/jobs"; LocalScheduler lsch(cre, false, "", &env); cout << lsch << endl; assert( lsch.is_valid() ); msg("Create master scheduler"); MasterScheduler::set_job_directory("./dial/jobs"); string jobrep = "SQLRESULT:" + getcwd() + "/jr.dat"; MasterScheduler msch(splitter, lsch, aup, true, 100, 10, jobrep); cout << msch << endl; assert( msch.is_valid() ); assert( &msch.slave() == &lsch ); assert( &msch.splitter() == &splitter ); assert( msch.max_jobs() == 100 ); assert( msch.max_start() == 10 ); assert( msch.auto_update() == aup ); assert( msch.job_repository()->is_valid() ); if ( aup ) { assert( msch.updater() != 0 ); } else { assert( msch.updater() == 0 ); } assert( msch.job_repository_connection() == jobrep ); msg("Create task"); Text txt("code.cxx"); txt.append("int f() {"); txt.append(" return 321;"); txt.append("}"); txt.write(); Task::NameList files; files.push_back(txt.name()); Task tsk(files); cout << tsk << endl; assert( tsk.is_valid() ); assert( tsk.id().is_global() ); string tskname = tsk.id().to_string(); tskdir += DSEP + tskname; msg("Check application"); assert( msch.has_application(app) ); Application badapp; assert( ! msch.has_application(badapp) ); msg("Check that task is not yet installed"); assert( ! msch.has_task(app,tsk) ); msg("Check absence of task job"); JobId tjid = msch.task_job(app, tsk); assert( ! tjid.is_valid() ); msg("Install task"); bool install_stat = msch.add_task(app, tsk); if ( ! install_stat ) { cout << msch.log() << endl; cout << msch.slave().log() << endl; } assert( install_stat == 0 ); msg("Check task job"); tjid = msch.task_job(app, tsk); assert( tjid.is_valid() ); Job& tjob = msch.job(tjid); msg("Check job count" ); assert( msch.jobs().size() == 0 ); msg("Check task build"); int iwait; for ( iwait=0; iwait<10; ++iwait ) { if ( ! aup ) tjob.update(); if ( msch.has_task(app, tsk) ) break; cout << "."; cout.flush(); sleep(1); } cout << endl; cout << tjob << endl; assert( iwait < 10 ); msg("Create dataset"); EventIdList eids; eids.insert(EventIdRange(501,1,10)); assert( eids.size() == 10 ); ContentId cid1 = ContentId::register_id(202, "MyClass"); ContentIdList cids; cids.insert(cid1); assert( cids.size() == 1 ); TestDataset dst(Content("TestDataset", "test", cids, eids)); dst.lock(); cout << dst << endl; assert( dst.is_valid() ); msg("Create preferences"); JobPreferences prf; prf.lock(); msg("Submit job"); JobId jid = msch.submit(app, tsk, dst, prf); if ( ! jid.is_valid() ) cout << msch.log() << endl; assert( jid.is_valid() ); Job& job = msch.job(jid); cout << job << endl; if ( ! job.is_valid() ) cout << msch.log() << endl; assert( job.is_valid() ); msg("Check credential copy 1"); FileFinder finder(jobdir); string credfile = finder.find("cred.dat"); cout << credfile << endl; assert( credfile.size() ); int ustat = unlink(credfile.c_str()); if ( ustat ) { cout << "Error " << errno << ": " << strerror(errno) << endl; } msg("Check credential copy 2"); string credfile2 = finder.find("cred.dat"); cout << credfile2 << endl; assert( credfile2.size() ); ustat = unlink(credfile2.c_str()); if ( ustat ) { cout << "Error " << errno << ": " << strerror(errno) << endl; } msg("Check credential copy 3--not"); string credfile3 = finder.find("cred.dat"); if ( credfile3.size() ) { cout << credfile3 << endl; assert( credfile3.size() == 0 ); } msg("Check job"); CompoundJob* pcjob = dynamic_cast(&msch.job(jid)); assert( pcjob != 0 ); if ( pcjob->subjobs().size() != 1 ) cout << msch.log() << endl; assert( pcjob->subjobs().size() == 1 ); msg("Wait for job to finish"); if ( !aup ) { job.update(); assert( job.is_running() || job.is_done() ); } for ( int count=0; count<10&&job.is_active(); ++count ) { cout << "."; cout.flush(); sleep(1); if ( ! aup ) job.update(); } cout << endl; cout << job << endl; assert( job.is_done() ); assert( job.has_result() ); msg("Show repository"); system("cat jr.dat"); msg("Remove job"); assert( msch.has_job(jid) ); assert( msch.jobs().size() == 1 ); assert( msch.remove(jid) == 0 ); assert( msch.jobs().size() == 0 ); assert( ! msch.has_job(jid) ); msg("Display log"); cout << msch.log() << endl; msg("Create XML"); const XmlElement* pxsch = msch.xml(); assert( pxsch != 0 ); cout << *pxsch << endl; assert( pxsch->is_valid() ); msg("Check creator"); assert( Scheduler::has_creator(MasterScheduler::xml_name()) ); Scheduler* psch = Scheduler::create(*pxsch); assert( psch != 0 ); cout << *psch << endl; MasterScheduler* pmsch = dynamic_cast(psch); assert( pmsch != 0 ); assert( pmsch->max_jobs() == msch.max_jobs() ); assert( pmsch->max_start() == msch.max_start() ); assert( msch.auto_update() == aup ); if ( aup ) { assert( msch.updater() != 0 ); } else { assert( msch.updater() == 0 ); } msg("Check DTD"); cout << MasterScheduler::dtd() << endl; assert( MasterScheduler::dtd().size() != 0 ); assert( DtdRegistry::instance("dial").has_type("MasterScheduler") ); return 0; } int MasterScheduler_t() { Text dbg("debug_CompoundJob"); dbg.write(); msg("Create ID directories"); SimpleUniqueIdGenerator::create_collection("Application", 101, 0); SimpleUniqueIdGenerator::create_collection("Task", 201, 0); SimpleUniqueIdGenerator::create_collection("Dataset", 12, 1001); SimpleUniqueIdGenerator::create_collection("Job", 501, 1001); SimpleUniqueIdGenerator:: create_collection("JobPreferences", 601, 1001); SimpleUniqueIdGenerator::set_as_default(); msg("Register credential"); GssCredential mycred; GssCredentialManager::insert(mycred); GssCredentialManager::set_name(mycred.name()); assert( GssCredentialManager::credential() != 0 ); cout << *GssCredentialManager::credential() << endl; assert( GssCredentialManager::credential()->is_valid() ); msg("***** Set auto update false *****"); int stat = run_MasterScheduler_t(false); if ( stat != 0 ) return stat; msg("***** Set auto update true *****"); stat = run_MasterScheduler_t(true); if ( stat != 0 ) return stat; return stat; } #ifdef CTEST_MAIN int main() { return MasterScheduler_t(); } #endif