// WsClientScheduler_t.cxx #include "dial_ws_sched/WsClientScheduler.h" #include #include #include #include #include #include "dataset_util/getcwd.h" #include "dataset_util/mkdir.h" #include "dataset_util/ssystem.h" #include "dataset_util/FileStatus.h" #include "dataset_util/XmlElement.h" #include "dataset_id/SimpleUniqueIdGenerator.h" #include "dataset_catalog/DatasetSelectionCatalog.h" #include "dataset_catalog/DatasetReplicaCatalog.h" #include "dataset_credential/CredentialSelectionCatalog.h" #include "dataset_credential/GssCredentialManager.h" #include "dataset_base/Dataset_t.h" #include "dataset_base/DatasetCreator_t.h" #include "dataset_base/DatasetRepository.h" #include "dataset_xml/XmlParser.h" #include "dial_ws/dial_ws_flags.h" #include "dial_task/Task.h" #include "dial_task/TaskRepository.h" #include "dial_app/Application.h" #include "dial_app/ApplicationRepository.h" #include "dial_job/JobId.h" #include "dial_job/Job.h" #include "dial_job/ChildWatcher.h" #include "dial_sched/Scheduler_t.h" using std::string; using std::cout; using std::endl; using std::ifstream; using dset::DatasetSelectionCatalog; using dset::DatasetReplicaCatalog; using dset::DatasetRepository; using dset::CredentialSelectionCatalog; using dset::GssCredentialManager; using dial::Task; using dial::TaskRepository; using dial::Application; using dial::ApplicationRepository; using dial::JobPreferences; using dial::JobId; using dial::Job; using dial::WsClientScheduler; void msg(string txt) { cout << "--- "; cout << txt; cout << " ---" << endl; } XmlParser& parser() { static XmlParser psr; return psr; } int junk = dial::ChildWatcher::initialize(); int WsClientScheduler_t() { // Clean up from earlier test. int pid = -1; if ( FileStatus("pid").exists() ) { msg("Clean up from earlier test"); ifstream pidfile("pid"); pidfile >> pid; if ( pid > 0 ) { cout << "Killing process " << pid << endl; kill(pid, SIGKILL); } unlink("pid"); pid = -1; sleep(1); // Give system time to free up port } system("rm -rf UniqueId resolver.dat"); // Define ID generator for the service. msg("Define dataset ID generator"); SimpleUniqueIdGenerator::set_as_default(); SimpleUniqueIdGenerator::create_collection("Dataset", 124, 0); SimpleUniqueIdGenerator::create_collection("Dataset", 125, 0); SimpleUniqueIdGenerator::create_collection("Application", 201, 0); SimpleUniqueIdGenerator::create_collection("Task", 301, 0); SimpleUniqueIdGenerator::create_collection("Job", 130, 0); SimpleUniqueIdGenerator::create_collection("JobPreferences", 140, 0); string uidcon = UniqueIdGenerator::connection(); cout << uidcon << endl; assert( uidcon.size() ); string uidcom = "DIAL_UIDS=" + uidcon + "; export DIAL_UIDS; "; msg("Initialize test catalogs."); system("rm -f resolver.dat"); assert( DatasetRepository::create_default_instance() == 0 ); assert( ApplicationRepository::create_default_instance() == 0 ); assert( TaskRepository::create_default_instance() == 0 ); assert( DatasetSelectionCatalog::create_default_instance() == 0 ); assert( DatasetReplicaCatalog::create_default_instance() == 0 ); assert( CredentialSelectionCatalog::create_default_instance() == 0 ); CredentialSelectionCatalog& sc = CredentialSelectionCatalog::default_instance(); cout << sc << endl; assert( sc.is_valid() ); string owner = "me"; assert( GssCredentialManager::set_default() == 0 ); assert( GssCredentialManager::set_owner("me") == 0 ); #ifdef WITH_GSI // Create auth file. string globus_version = "3"; string com = "rm -f authorized_dn"; com += "; "; if ( globus_version == "2" ) { com += "grid-proxy-info -subject"; com += " | sed 's#/CN=proxy##g'"; } else { com += "grid-proxy-info -identity"; } com += " > authorized_dn"; ssystem(com); #endif msg("Create invalid client scheduler"); WsClientScheduler badsch("no.such.url.com"); assert( ! badsch.is_valid() ); msg("Create less but still invalid client scheduler"); //WsClientScheduler badsch2("httpg://acas010.usatlas.bnl.gov"); //WsClientScheduler badsch2("http://aftpexp01.bnl.gov"); WsClientScheduler badsch2("http://lcoalhost:9999"); badsch2.set_timeout(5); assert( ! badsch2.is_valid() ); pid = -1; if ( FileStatus("pid").exists() ) { msg("Clean up from earlier test"); ifstream pidfile("pid"); pidfile >> pid; cout << "Killing process " << pid << endl; kill(pid, SIGKILL); unlink("pid"); pid = -1; sleep(1); // Give system time to fee up port } msg("Write scheduler XML"); TestScheduler tsch; assert( tsch.is_valid() ); const XmlElement* pxsch = tsch.xml(); assert( pxsch != 0 ); cout << *pxsch << endl; string schfile = "scheduler.xml"; int pstat = parser().write(schfile, *pxsch); if ( pstat != 0 ) { cout << "Parser returned error " << pstat << endl; assert(false); } assert( FileStatus(schfile).is_readable() ); msg("Start server"); assert( ! FileStatus("pid").exists() ); string catcom = "DIAL_CATALOG_CONF=resolver.dat; "; catcom += "export DIAL_CATALOG_CONF; "; string scom = uidcom + catcom + "dialws "; scom += "libxerces-c.so.26:"; scom += "mysqlpp:"; scom += "dataset_sql:"; scom += "dataset_id:"; scom += "dataset_base:"; scom += "dataset_catalog:"; scom += "dataset_split:"; scom += "dataset_xml:"; scom += "dataset_mysqlpp:"; scom += "dial_task:"; scom += "dial_job:"; scom += "dial_ws_sched:"; scom += "scheduler 8080 server.log gsi service >service.log 2>&1 &"; cout << scom << endl; int sysstat = ssystem(scom); if ( sysstat != 0 ) { cout << "system call returned " << sysstat << endl; cout << " Error " << errno << ": " << strerror(errno) << endl; //assert(false); } //assert( sysstat == 0 ); for ( int i=0; i<35; ++i ) { if ( FileStatus("pid").exists() ) break; sleep(1); } assert( FileStatus("pid").exists() ); msg("Set server URL"); #ifdef WITH_GSI string server = "http://127.0.0.1:8080"; #elif WITH_SSL string server = "https://127.0.0.1:8080"; #else string server = "http://127.0.0.1:8080"; #endif cout << server << endl; msg("Check server process"); int nloop = 0; while ( nloop < 10 ) { if ( FileStatus("pid").exists() ) break; sleep(1); ++nloop; } assert( nloop < 10 ); { ifstream pidfile("pid"); pidfile >> pid; } cout << "Server process ID is " << pid << endl; assert( pid > 0 ); msg("Create application"); Application app = Application::test_instance(); cout << app << endl; msg("Create task"); Text cppfile("run.cxx"); cppfile.append("#include "); cppfile.append("int main() {"); cppfile.append(" std::cout << \"Hello world\" << std::endl;"); cppfile.append("}"); cppfile.write(); Task::NameList files; files.push_back(cppfile.name()); Task tsk(files); assert( tsk.is_valid() ); msg("Create dataset"); dset::Content con; dset::Location loc; TestDataset dst(con, loc); dst.lock(); assert( dst.is_valid() ); assert( dst.is_locked() ); msg("Create preferences"); JobPreferences prf; prf.lock(); msg("Create client scheduler"); WsClientScheduler sch(server); msg("Check validity"); assert( sch.is_valid() ); msg("Check timeout"); cout << sch.timeout() << endl; sch.set_timeout(5); cout << sch.timeout() << endl; assert( sch.timeout() == 5 ); msg("Check application"); assert( sch.has_application(app) ); msg("Add task"); assert( ! sch.task_job(app, tsk).is_valid() ); assert( sch.add_task(app, tsk) == 0 ); /* assert( sch.task_job(app, tsk).is_valid() ); msg("Wait for job to finish"); JobId tjid = sch.task_job(app, tsk); for ( int count=0; count<10; ++count ) { cout << "..." << count << endl; if ( sch.job(tjid).is_inactive() ) break; sleep(1); } cout << sch.job(tjid) << endl; assert( sch.job(tjid).is_done() ); msg("Check task"); assert( sch.has_task(app, tsk) ); */ sch.has_task(app, tsk); msg("Submit job"); JobId jid = sch.submit(app, tsk, dst, prf); cout << jid << endl; assert( jid.is_valid() ); msg("Check job presence"); assert( sch.has_job(jid) ); msg("Fetch job"); Job job = sch.job(jid); cout << job << endl; if ( ! job.is_valid() ) { sch.log().write(); } assert( job.is_valid() ); assert( job.is_initialized() ); msg("Start job"); JobId badjid(88,99); assert( sch.start(badjid) != 0 ); assert( sch.start(jid) == 0 ); msg("Kill job"); assert( sch.kill(jid) == 1 ); msg("Remove job"); assert( sch.remove(jid) == 1 ); msg("Resource report"); string report = sch.resource_report(); cout << report << endl; assert( report.size() ); msg("Kill the server."); sch.terminate_service(); assert( ! sch.is_valid() ); msg("All done."); return 0; } #ifdef CTEST_MAIN int main() { return WsClientScheduler_t(); } #endif