// ProcessJob.cxx #include "dial_job/ProcessJob.h" #include #include #include #include #include #include #include #include #include "dataset_util/copy_file.h" #include "dataset_id/EventIdList.h" #include "dataset_util/FileStatus.h" #include "dataset_util/get_hostname.h" #include "dataset_base/DatasetRepository.h" #include "dataset_base/DatasetCreator.h" #include "dataset_xml/XmlParser.h" #include "dial_job/ChildWatcher.h" using std::string; using std::map; using std::cout; using std::cerr; using std::ostream; using std::ofstream; using std::endl; using dset::Dataset; using dset::DatasetRepository; using dset::DatasetCreator; using dial::Job; using dial::ProcessJob; //********************************************************************** // Local definitions. //********************************************************************** namespace { bool CAREFUL = false; } //********************************************************************** // Member functions. //********************************************************************** // Constructor. ProcessJob::ProcessJob(JobId jid, const Application& app, const Task& tsk, const Dataset& dst, const JobPreferences prf, string jobdir, string runfile, string shellcom) : Job("ProcessJob", jid, app, tsk, dst, prf, jobdir, runfile), m_shellcom(shellcom), m_pid(0) { m_pidfile = job_directory() + "/" + "pid"; base_set_run_host(get_hostname()); create_local_run_script_wrapper(); } //********************************************************************** // Conversion constructor. ProcessJob::ProcessJob(const Job& job, string runcom) : Job(job), m_shellcom(runcom) { } //********************************************************************** // Destructor. ProcessJob::~ProcessJob() { } //********************************************************************** // Start a job. int ProcessJob::start() { if ( ! is_initialized() ) return 1; if ( process_id() != 0 || start_time().is_valid() ) return 2; // Check that the executable is present. if ( CAREFUL ) { string runscript = job_directory() + "/dial_run_script_wrapper"; FileStatus estat(runscript); if ( ! estat.is_executable() ) return 4; } // Construct the description of the child process. base_set_running(); if ( ! ChildWatcher::is_initialized() ) { int wstat = ChildWatcher::initialize(); if ( wstat != 0 ) return base_set_failed(120 + wstat); } // Start child process. // Block signals so end-of-process is not handled before watch // is set. bool watcher_blocked = ChildWatcher::is_blocked(); if ( ! watcher_blocked ) ChildWatcher::block(); m_pid = fork(); if ( m_pid == 0 ) { // Child process // Change directory. int cdstat = chdir(job_directory().c_str()); if ( cdstat != 0 ) { perror("ProcessJob::submit subprocess cd"); exit(111); } // Connect stdout to stdout.log. int mask = S_IRUSR | S_IRGRP; int filedes = open("stdout.log", O_WRONLY|O_CREAT, mask); assert( filedes != -1 ); assert( close(STDOUT_FILENO) == 0 ); assert( dup(filedes) == STDOUT_FILENO ); close(filedes); // Connect stderr to stderr.log. filedes = open("stderr.log", O_WRONLY|O_CREAT, mask); assert( filedes != -1 ); assert( close(STDERR_FILENO) == 0 ); assert( dup(filedes) == STDERR_FILENO ); close(filedes); // Execute new process. Environment env = Environment::posix(); string pmsetup = Environment::current().value("PKGMGR_ENV"); env["ENV"] = pmsetup; env["BASH_ENV"] = pmsetup; char* args[] = { 0, 0, 0, 0 }; int iarg = 0; args[iarg++] = const_cast(m_shellcom.c_str()); args[iarg++] = const_cast("./dial_run_script"); execve(args[0], args, env.unsafe_env()); //execv(m_com.cname(), com.argv()); // Only get here if exe cannot be started. exit(112); } // Handle error from fork if ( m_pid < 0 ) return base_set_failed(113); // Continue parent process without error. assert( m_pid > 0 ); // Set watch. int wstat = ChildWatcher::watch(m_pid); assert( wstat == 0 ); if ( wstat != 0 ) base_set_failed(114); // Write PID to file. { ofstream pfile(m_pidfile.c_str()); pfile << m_pid << endl; if ( ! pfile ) base_set_failed(115); } // Unblock end-of-child signal. if ( ! watcher_blocked ) ChildWatcher::unblock(); // Set status and return. // Kill the process if there was a problem after starting. if ( is_failed() ) { assert(false); return error(); } return 0; } //********************************************************************** // Update job status. int ProcessJob::update() { if ( is_killed() ) return 0; if ( is_initialized() ) { int stat = start(); if ( stat != 0 ) return 200+stat; return 0; } if ( ! is_running() ) return 1; if ( ChildWatcher::is_blocked() ) ChildWatcher::update(); pid_t pid = process_id(); if ( ! ChildWatcher::is_registered(pid) ) { return 1; } if ( ! ChildWatcher::is_done(pid) ) { base_set_update(); return 0; } // Process is newly completed. // Record process return status. base_set_return_status(99999); ProcessStatus pstat(ChildWatcher::status(pid)); if ( ! pstat.is_valid() ) { base_set_failed(141); return 0; } if ( pstat.is_signaled() ) { base_set_return_status(pstat.exit_signal()); base_set_failed(142); return 0; } if ( pstat.is_stopped() ) { base_set_return_status(pstat.exit_signal()); base_set_failed(143); return 0; } assert( pstat.is_normal() ); base_set_return_status(pstat.exit_code()); // Exit with error if return status is not zero. if ( return_status() != 0 ) { base_set_failed(144); return 0; } // If there is a result file, read it. string resfile = job_directory() + "/" + "result.xml"; if ( FileStatus(resfile).is_readable() ) { XmlParser parser; const XmlElement* pxres = parser.parse(resfile); if ( pxres == 0 ) { base_set_failed(151); return 0; } DatasetRepository& dsrep = DatasetRepository::default_instance(); const Dataset* pres = DatasetCreator::create(*pxres, &dsrep); if ( pres == 0 ) { base_set_failed(152); return 0; } int stat = base_set_result(pres); if ( stat != 0 ) { delete pres; base_set_failed(153); return 0; } if ( dataset()!=0 && dataset()->is_event_dataset() ) { base_set_event_count(dataset()->event_count()); } } base_set_done(); return 0; } //********************************************************************** // Kill the job. int ProcessJob::kill(int err) { int kstat = 0; // Block signals so end-of-process is not handled while we are // marking the proces as killed. bool watcher_blocked = ChildWatcher::is_blocked(); if ( ! watcher_blocked ) ChildWatcher::block(); // Mark the process as killed. base_set_killed(err); base_set_event_count(0); // Kill. if ( is_running() && ChildWatcher::is_live(m_pid) ) { int stopstat = ::kill(m_pid, SIGKILL); if ( stopstat != 0 ) { cerr << "ProcessJob::kill: Unable to stop process " << m_pid << endl; cerr << " Status: " << stopstat << endl; cerr << " Error: " << errno << ": " << strerror(errno) << endl; cerr << " Will try to kill..." << endl; int killstat = ::kill(m_pid, SIGKILL); if ( killstat != 0 ) { base_set_failed(116); cerr << "ProcessJob::kill: Unable to kill process " << m_pid << endl; cerr << " Status: " << killstat << endl; cerr << " Error: " << errno << ": " << strerror(errno) << endl; kstat = 2; } } } // Unblock end-of-child signal. if ( ! watcher_blocked ) ChildWatcher::unblock(); return kstat; } //********************************************************************** // Output stream. ostream& ProcessJob::ostr(ostream& lhs) const { Job::ostr(lhs); if ( is_initialized() ) return lhs; lhs << endl; lhs << "Run script: " << run_script() << endl; lhs << "Child process ID: " << process_id(); if ( is_running() ) return lhs; lhs << endl; lhs << "Child return status: " << return_status(); return lhs; } //**********************************************************************