// dial_run_job.cxx // David Adams // November 2005 // // Runs a dial job script (either run or build_task). #include #include #include #include #include "dataset_util/getcwd.h" #include "dataset_util/mkdir.h" #include "dataset_util/Environment.h" #include "dataset_util/FileStatus.h" #include "dataset_util/ssystem.h" #include "dataset_util/TempDir.h" #include "dataset_xml/XmlParser.h" #include "dial_job/Job.h" #include "dial_ws_sched/WsClientScheduler.h" using std::string; using std::cout; using std::cerr; using std::endl; using std::ofstream; using dset::Dataset; using dial::Task; using dial::Application; using dial::JobId; using dial::Job; using dial::WsClientScheduler; int touch(string name) { return ssystem("touch " + name); } int run_job(string sid, string surl) { XmlParser parser; string rundir = getcwd(); cout << "\nWorking directory: " << rundir << endl; // Write the job ID. ofstream ssid("jid"); ssid << sid << endl; ssid.close(); // Create scheduler. cout << "\nEstablishing connection to scheduler at" << endl; cout << surl << endl; WsClientScheduler sch(surl, false); cout << sch << endl; if ( ! sch.is_valid() ) return 21; // Fetch job. cout << "\nFetching job " << sid << endl; JobId jid(sid); Job job = sch.job(jid); cout << job << endl; if ( ! job.is_valid() ) return 22; // Extract application. cout << "\nExtracting application" << endl; const Application& app = *job.application(); cout << app << endl; app.write_files("appdir"); const XmlElement* pxapp = app.xml(); assert( parser.write("application.xml", *pxapp) == 0 ); // Extract task. cout << "\nExtracting task" << endl; const Task& tsk = *job.task(); cout << tsk << endl; tsk.write_files("tskdir"); const XmlElement* pxtsk = tsk.xml(); assert( parser.write("task.xml", *pxtsk) == 0 ); // Define the task directory. cout << "Defining task directory" << endl; string taskdir = Environment::current().value("DIAL_TASKS"); taskdir += "/" + app.id().to_string(); taskdir += "/" + tsk.id().to_string(); cout << taskdir << endl; // Write dir anem in file taskdir. Text ttaskdir("taskdir"); ttaskdir.append(taskdir); ttaskdir.write(); // Extract dataset. cout << "\nExtracting dataset" << endl; const Dataset* pdst = job.dataset(); if ( pdst == 0 ) { cout << "No input datset" << endl; } else { cout << *pdst << endl; const XmlElement* pxdst = pdst->xml(); assert( parser.write("dataset.xml", *pxdst) == 0 ); } // Extract the run command. cout << "\nExtracting run comand" << endl; string run_script = job.run_script(); if ( run_script == "" ) { cerr << "Unable to find run script" << endl; return 32; } cout << "Run script: " << run_script << endl; string runfile = rundir + "/appdir/" + run_script; cout << "Run file: " << runfile << endl; string runcom = rundir + "/dial_run_script"; cout << "Run command: " << runcom << endl; bool have_runcom = false; if ( FileStatus(runfile).exists() ) { have_runcom = true; cout << "Copying run file to run command" << endl; ssystem("cp " + runfile + " " + runcom); ssystem("chmod +x " + runcom); if ( ! FileStatus(runcom).is_executable() ) { cerr << "Run command is missing or not executable" << endl; return 33; } } cout << "Have run command: " << have_runcom << endl; // Locate the pkgmgr setup file. cout << "\nLocating the pkgmgr setup file" << endl; string pmenv = Environment::current().value("PKGMGR_ENV"); if ( ! pmenv.size() ) { cerr << "PKGMGR_ENV is not defined" << endl; return 34; } cout << pmenv << endl; cout << "\n*** Run job with script " << run_script << endl; if ( run_script == "run" ) { if ( ! have_runcom ) { cout << "Run command is missing" << endl; return 41; } cout << "Locating task directory" << endl; if ( ! FileStatus(taskdir).is_directory() ) { cerr << "Unable to find task directory" << endl; return 42; } cout << "Run command" << endl; string com = ". " + pmenv + "; " + runcom + " >dialout.log 2>dialerr.log"; cout << com << endl; cout << " dialout.log - stdout" << endl; cout << " dialerr.log - stderr" << endl; cout << " dialstat.log - return status" << endl; int rstat = ssystem(com, false); cout << "Command returned " << rstat << endl; ofstream sstat("dialstat.log"); sstat << rstat << endl; sstat.close(); if ( rstat ) return 49; } else if ( run_script == "build_task" ) { string taskok = taskdir + "/task_installed"; string taskfail = taskdir + "/task_install_failed"; string taskbuild = taskdir + "/task_building"; // If the directory exists then someone else has built or is // building it. if ( FileStatus(taskdir).exists() ) { cout << "Task directory already exists" << endl; // Wait for build to finish. int wait_count = 0; while ( FileStatus(taskbuild).exists() ) { if ( wait_count == 0 ) { cout << "Waiting for task build to finish" << endl; } else { cout << "."; } sleep(10); } if ( wait_count ) { cout << endl; } if ( FileStatus(taskfail).exists() ) { cout << "Build failed" << endl; return 51; } if ( ! FileStatus(taskok).exists() ) { cout << "Unable to find " << taskok << " or " << taskfail << endl; return 52; } // Otherwise, install and build the task. } else { cout << "Creating task directory" << endl; if ( mkfulldir(taskdir) != 0 ) { cerr << "Unable to create directory" << endl; return 61; } touch(taskbuild); cout << "Copying task files." << endl; string ccom = "cp tskdir/* " + taskdir; int cstat = ssystem(ccom); if ( cstat ) { cerr << "Copy failed" << endl; touch(taskfail); unlink(taskbuild.c_str()); return 62; } cout << "Task copy was successful" << endl; // If build_task exists, then run it in the task directory. if ( have_runcom ) { cout << "Building task" << endl; string outfile = rundir + "/dialout.log"; string errfile = rundir + "/dialerr.log"; string statfile = rundir + "/dialstat.log"; string com = ". " + pmenv + "; " + runcom + + " >" + outfile + " 2>" + errfile; cout << com << endl; cout << " stdout: " << outfile << endl; cout << " stderr: " << errfile << endl; cout << " return status: " << statfile << endl; int rstat = ssystem(com, false); cout << "Command returned " << rstat << endl; ofstream sstat(statfile.c_str()); sstat << rstat << endl; sstat.close(); if ( rstat ) { touch(taskfail); unlink(taskbuild.c_str()); return 69; } cout << "Task build was successful" << endl; } touch(taskok); unlink(taskbuild.c_str()); } } else { cerr << "Unrecognized run script" << endl; return 71; } cout << "Success" << endl; return 0; } int main(int argc, char* argv[]) { string prefix = string(argv[0]) + ": "; int err = 0; if ( argc == 1 ) err = 101; string arg1 = argv[1]; bool help = err==0 && arg1=="-h"; if ( !help && argc < 3 ) err = 101; if ( err || help ) { cout << "Usage: " << prefix << " SVC_URL JOB_ID [OPT]" << endl; cout << " OPT = archive - to create job archive" << endl; return err; } string sid = arg1; string surl = argv[2]; string opt = ""; if ( argc > 3 ) opt = argv[3]; // Run job. int rstat = run_job(sid, surl); if ( opt == "archive" ) { // Tar up results. cout << "Building job archive" << endl; string tarname = "dialjob_" + sid + ".tar"; string gzname = tarname + ".gz"; string tcom = "tar -cf " + tarname + " *"; cout << tcom << endl; int tstat = ssystem(tcom); if ( tstat ) { cout << "Archive build failed with error " << tstat << endl; return 81; } string archive = tarname; // Compress archive. cout << "Compressing job archive" << endl; string zcom = "gzip tar -cf " + tarname + " *"; cout << zcom << endl; int zstat = ssystem(zcom); if ( zstat ) { cout << "Compression failed with error " << zstat << endl; } else { archive = gzname; } } // Put archive in SE. return rstat; }