// dialproc_t.cxx #include #include #include #include #include #include "dataset_util/FileStatus.h" #include "dataset_util/getcwd.h" #include "dataset_util/ssystem.h" #include "dataset_util/mkdir.h" #include "dataset_util/Path.h" #include "dataset_util/Environment.h" #include "dataset_xml/XmlParser.h" #include "dataset_id/UniqueIdGenerator.h" #include "dataset_id/SimpleUniqueIdGenerator.h" #include "dataset_id/ContentIdList.h" #include "dataset_credential/CredentialSelectionCatalog.h" #include "dataset_credential/GssCredentialManager.h" #include "dataset_base/DatasetRepository.h" #include "dataset_base/DatasetCreator.h" #include "dial_job/ChildWatcher.h" #include "dial_job/ProcessJobCreator.h" #include "dial_sched/LocalScheduler.h" using std::string; using std::ostream; using std::cout; using std::endl; using std::vector; using dset::CredentialSelectionCatalog; using dset::GssCredentialManager; using dset::Content; using dset::Dataset; using dset::DatasetRepository; using dial::Task; using dial::Application; using dial::ChildWatcher; using dial::LocalScheduler; using dial::JobPreferences; using dial::JobId; using dial::Job; using dial::ProcessJobCreator; void msg(string msg) { cout << "----- "; cout << msg; cout << " -----" << endl; } string DSEP = "/"; // Initialization for TestDataset. #include "dataset_base/DatasetCreator_t.h" int dialproc_t() { string plat = Environment::initial().value("CTG_PLATFORM"); if ( plat == "rh73_gcc323" ) { msg("Skipping test of dialproc on " + plat); return 0; } system("rm -rf dial dial_setup.sh UniqueId task.cxx"); system("rm -rf dial resolver.dat csc.dat"); msg("Set UID generator"); SimpleUniqueIdGenerator::set_as_default(getcwd() + "/UniqueId"); msg("Fetch default instance of CSC."); assert( DatasetRepository::create_default_instance() == 0 ); assert( CredentialSelectionCatalog::create_default_instance() == 0 ); CredentialSelectionCatalog& sc = CredentialSelectionCatalog::default_instance(); cout << sc << endl; assert( sc.is_valid() ); string owner = "me"; assert( GssCredentialManager::set_default() == 0 ); assert( GssCredentialManager::set_owner("me") == 0 ); msg("Fetch current directory"); string cwdir = getcwd();; msg("Create the DIAL directories."); int stat = ssystem("dialenv"); if ( stat != 0 ) { cout << "dialenv returned status " << stat << endl; } assert( stat == 0 ); msg("Create job creator"); ProcessJobCreator cre; msg("Create valid scheduler"); Environment env; string appdir = cwdir + DSEP + "dial/apps"; string tskdir = cwdir + DSEP + "dial/tasks"; env["DIAL_APPS"] = appdir; env["DIAL_TASKS"] = tskdir; env["DIAL_JOBS"] = cwdir + DSEP + "dial/jobs"; LocalScheduler sch(cre, false, "", &env); cout << sch << endl; assert( sch.is_valid() ); msg("Create application"); string pwd = getcwd(); chdir("dial/apps/dialproc/test"); Application app(Text::split("run build_task")); chdir(pwd.c_str()); cout << app << endl; assert( app.is_valid() ); msg("Create task"); Text txt("task.cxx"); txt.append("#include \"dataset_id/EventId.h\""); txt.append("#include \"dataset_base/VirtualEventDataset.h\""); txt.append(""); txt.append("using dset::ContentBlock;"); txt.append("using dset::Content;"); txt.append("using dset::Dataset;"); txt.append("using dset::VirtualEventDataset;"); txt.append(""); txt.append("extern \"C\" int task(const EventId&, Dataset&);"); txt.append("int task(const EventId& eid, Dataset& res) {"); txt.append(" VirtualEventDataset& veds = res.cast();"); txt.append(" const ContentBlock& oldblk = veds.content().front();"); txt.append(" EventIdList eids;"); txt.append(" eids.insert(eid);"); txt.append(" Content newcon(oldblk.dataset_type(), oldblk.name(),"); txt.append(" oldblk.content_ids(), eids);"); txt.append(" VirtualEventDataset newdst(newcon);"); txt.append(" newdst.lock();"); txt.append(" veds.merge(newdst);"); txt.append(" return 0;"); txt.append("}"); txt.write(); Task::NameList files; files.push_back(txt.name()); Task tsk(files); cout << tsk << endl; assert( tsk.is_valid() ); assert( tsk.id().is_global() ); string tskname = tsk.id().to_string(); tskdir += DSEP + app.id().to_string() + DSEP + tskname; msg("Check task directory"); cout << sch.task_directory(app, tsk) << endl; cout << tskdir << endl; assert( sch.task_directory(app, tsk) == tskdir ); msg("Check application"); if ( ! sch.has_application(app) ) { cout << sch.log() << endl; assert(false); } Application badapp; assert( ! sch.has_application(badapp) ); msg("Check that task is not yet installed"); assert( ! sch.has_task(app,tsk) ); msg("Install task"); bool install_stat = sch.add_task(app, tsk); if ( ! install_stat ) cout << sch.log() << endl; assert( install_stat == 0 ); msg("Check presence of task job"); JobId tjid = sch.task_job(app, tsk); assert( tjid.is_valid() ); msg("Check task build"); Job& tjob = sch.job(tjid); int iwait; for ( iwait=0; iwait<10; ++iwait ) { tjob.update(); if ( sch.has_task(app, tsk) ) break; cout << "."; cout.flush(); sleep(1); } cout << endl; cout << tjob << endl; assert( iwait < 10 ); msg("Create dataset"); EventIdList eids; eids.insert(EventIdRange(501,1,10)); assert( eids.size() == 10 ); ContentId cid1("Test1", "junk"); assert( cid1.is_valid() ); ContentIdList cids; cids.insert(cid1); assert( cids.size() == 1 ); TestDataset dst(Content("TestDataset", "test", cids, eids)); dst.lock(); cout << dst << endl; assert( dst.is_valid() ); msg("Create preferences"); JobPreferences prf; prf.lock(); msg("Submit task"); JobId jid = sch.submit(app, tsk, dst, prf); if ( ! jid.is_valid() ) cout << sch.log() << endl; assert( jid.is_valid() ); cout << sch.job(jid) << endl; msg("Wait for job to finish"); Job& job = sch.job(jid); job.update(); while ( job.is_running() ) { sleep(1); job.update(); } cout.flush(); cout << sch.job(jid) << endl; if ( ! sch.job(jid).is_done() ) { cout << "=== Scheduler log" << endl; cout << sch.log() << endl; cout << "=== End log" << endl; } assert( sch.job(jid).is_done() ); assert( sch.job(jid).processed_event_count() == 10 ); assert( sch.job(jid).processed_result_event_count() == 10 ); msg("Fetch result"); assert( sch.job(jid).result() != 0 ); const Dataset& res = *sch.job(jid).result(); cout << res << endl; assert( res.is_event_dataset() ); assert( res.event_count() == 10 ); return 0; } #ifdef CTEST_MAIN int main() { return dialproc_t(); } #endif