// ChildWatcher.cxx // This must be defined if threads are used anywhere. // And is probably a good thing to use in any case. #define ChildWatcher_USES_THREADS #include "dial_job/ChildWatcher.h" #include #include #include #include #include #include #include #include #include #include "dataset_util/getcwd.h" #include "dataset_util/FileStatus.h" #include "dataset_util/PThreadMutex.h" using std::vector; using std::map; using std::ostream; using std::string; using std::cerr; using std::endl; using dial::ProcessStatus; using dial::ChildWatcher; //********************************************************************** // Local data. //********************************************************************** typedef void (*SignalHandler)(int); typedef ChildWatcher::FunPtr FunPtr; typedef vector PidList; typedef map FunMap; namespace { std::ostream& operator<<(std::ostream& str, const sigset_t& sigs) { for ( int isig=0; isig<64; ++isig ) { int stat = sigismember(&sigs, isig); if ( stat == 1 || stat == 0 ) { str << stat; } else { str << "."; } } return str; } // Show the status of SIGCHLD. int show_sigchild(std::ostream& str) { struct sigaction oldact; int chkstat = sigaction(SIGCHLD, NULL, &oldact); if ( chkstat == 0 ) { str << "End of child signal handler:" << endl; str << " handler = " << long(oldact.sa_handler) << endl; str << " mask = " << oldact.sa_mask << endl; str << " flags = " << oldact.sa_flags << endl; if ( oldact.sa_flags == SA_NOCLDSTOP ) { str << "(End of child signal will not be generated!)" << endl; } str << " Signal is "; if ( ! ChildWatcher::is_blocked() ) str << "not "; str << "blocked"; } else { str << "Unable to fetch endo of child signal handler"; } return chkstat; } // File for debug flag. // Take it while we are still in the starting directory. string DEBUG_FILE(getcwd() + "/debug_ChildWatcher"); // Mutex to protect messages. static PThreadMutex& msgmtx() { static PThreadMutex mtx; return mtx; } // Function indicating if debugging messages should be written. // Locks message mutex if true! bool debug_msg(bool update =false) { static bool flag = false; if ( update ) { flag = FileStatus(DEBUG_FILE).exists(); } if ( flag ) { msgmtx().lock(); } return flag; } // Counter for handled signals. unsigned int& rip_count() { static unsigned int count = 0; return count; } // Flag indicating child watcher has been initialized. bool& init() { static bool flag = false; return flag; } // Flag indicating child watcher is closed. bool& closed() { static bool flag = false; return flag; } // External handler for signal. // Called if the terminated process is not known. // This allows the unthreaded ChildWatcher to coexist with other signal // handlers. SignalHandler& external_handler() { static SignalHandler phan = 0; return phan; } //********************************************************************** // Process table. //********************************************************************** // Process counts unsigned int BORN_COUNT = 0; // registered unsigned int LIVE_COUNT = 0; // registered and not yet handled unsigned UNBORN_COUNT = 0; // signal without registration unsigned ERROR_COUNT = 0; // Invalid state change // State of each process. const int MAXPID = 33000; enum ProcessState { UNKNOWN_STATE, // Not known here LIVE_STATE, // Registered and not yet handled DONE_STATE, // Registered and handled UNBORN_STATE, // Unregistered and handled ERROR_STATE // Unsupported state change (e.g. unborn -> live) }; ProcessState PROCSTATES[MAXPID]; // Process status table. // This is the status returned by POSIX wait(); int PROCSTATS[MAXPID]; // Parent process table. int PARENTS[MAXPID]; // Functions for live processes. void do_not_use(pid_t, int) { } FunPtr PROCFUNS[MAXPID]; // Mutex to protect table. PThreadMutex& process_table_mutex() { static PThreadMutex mutex; return mutex; } //********************************************************************** // Data for threazad implementation. #ifdef ChildWatcher_USES_THREADS // Newest PID. pid_t NEWPID; // Mutex to protect this value. PThreadMutex& newpid_mutex() { static PThreadMutex mutex; return mutex; } bool THREADBLOCK = true; int PEND_COUNT = 0; #endif //********************************************************************** // Function update process table when a process completes. int close_pid(pid_t pid, int pstat) { string prefix = "ChildScheduler::close_pid: "; int hstat = 99; // Extract return status from process status. int rstat = -99; if ( WIFEXITED(pstat) ) { rstat = WEXITSTATUS(pstat); if ( debug_msg() ) { cerr << prefix << "RIP with exit status " << rstat << endl; msgmtx().unlock(); } } else if ( WIFSIGNALED(pstat) ) { int rsig = WTERMSIG(pstat); rstat = 10000 + rsig; if ( debug_msg() ) { cerr << prefix << "RIP with signal " << rsig << endl; msgmtx().unlock(); } } else { if ( debug_msg() ) { cerr << prefix << "RIP with abnormal exit " << pid << endl; msgmtx().unlock(); } } // Call function to handle signal end of process and update table. process_table_mutex().lock(); ProcessState oldstate = PROCSTATES[pid]; // If no function is defined ... if ( oldstate == UNKNOWN_STATE ) { cerr << "ChildWatcher: close_pid: " << "Process " << pid << " is unknown" << endl; ++ERROR_COUNT; PROCSTATES[pid] = UNBORN_STATE; hstat = 1; } else if ( oldstate == LIVE_STATE ) { --LIVE_COUNT; PROCSTATES[pid] = DONE_STATE; FunPtr pfun = PROCFUNS[pid]; if ( pfun != 0 ) { pfun(pid, rstat); } hstat = 0; } else { cerr << "ChildWatcher: close_pid: " << "Process " << pid << " is not live" << endl; PROCSTATES[pid] = ERROR_STATE; ++ERROR_COUNT; hstat = 2; } process_table_mutex().unlock(); return hstat; } //********************************************************************** // Class to force closeout. class Closer { public: ~Closer() { ChildWatcher::close(); } }; Closer CLOSER; //********************************************************************** #ifndef ChildWatcher_USES_THREADS // Function to handle signal. // RH 7.2 wants int argument. void rip(int arg) { if ( closed() ) return; ++rip_count(); // Do not allow too many simultaneous RIP's. // Stopping interrupts during handling does not seem to work. // Reap all child processes. int pstat; pid_t pid = 1; while ( pid > 0 ) { pid = waitpid(-1, &pstat, WNOHANG); if ( pid == 0 ) break; assert( pid <= MAXPID ); if ( pid < 0 ) { // Ignore no child process. // RH7.2 calls to system() lead to this condition. // Print message if error is something besides no children. if ( errno != ECHILD ) { cerr << "RIP waitpid returned error " << errno << endl; cerr << strerror(errno) << endl; } break; } // Check the PID. assert( pid > 0 ); if ( pid <= 0 || pid >= MAXPID ) { return; } int cstat = close_pid(pid, pstat); if ( cstat == 1 ) { if ( external_handler() != 0 ) { external_handler()(arg); } } } } #endif //********************************************************************** #ifdef ChildWatcher_USES_THREADS // Mutex to protect table. PThreadMutex& thread_update_mutex() { static PThreadMutex mutex; return mutex; } // Thread function to monitor a process. void* signal_waiter(void* arg) { string prefix = "ChildWatcher::signal_waiter: "; pthread_t th = pthread_self(); pid_t pid = NEWPID; if ( debug_msg() ) { cerr << prefix << "Started thread " << th << " to monitor process " << pid << endl; msgmtx().unlock(); } int proc_stat; if ( debug_msg() ) { cerr << prefix << "Waiting for process " << pid << " to terminate" << endl; msgmtx().unlock(); } pid_t wpid = waitpid(pid, &proc_stat, 0); if ( debug_msg() ) { cerr << prefix << "Process " << pid << " terminated" << endl; msgmtx().unlock(); } ++PEND_COUNT; while ( THREADBLOCK ) { sleep(1); } if ( debug_msg() ) { cerr << prefix << "Ready to post results for process " << pid << endl; msgmtx().unlock(); } thread_update_mutex().lock(); if ( debug_msg() ) { cerr << prefix << "Posting results for process " << pid << endl; msgmtx().unlock(); } close_pid(pid, proc_stat); --PEND_COUNT; thread_update_mutex().unlock(); if ( debug_msg() ) { cerr << prefix << "Results posted for process " << pid << endl; msgmtx().unlock(); } return arg; } #endif //********************************************************************** // This is here so that SIGCHLD is blocked in the main thread before // any other threads are created that might otherwise handle that // signal. Thid is needed here and in LocalScheduler for task building. int CWINIT = ChildWatcher::initialize(); } // end unnamed namespace //********************************************************************** // Static member functions. //********************************************************************** // Is initialized. bool ChildWatcher::is_initialized() { return init(); } //********************************************************************** // Initialize. int ChildWatcher::initialize() { static int istat = -1; if ( init() ) return istat; if ( debug_msg(true) ) { std::cerr << "Initializing Child Watcher" << endl; msgmtx().unlock(); } init() = true; if ( debug_msg() ) { ChildWatcher::print(std::cout) << endl; msgmtx().unlock(); } #ifdef ChildWatcher_USES_THREADS // Mask SIGCHLD now and forever. sigset_t blkset; sigemptyset(&blkset); sigaddset(&blkset, SIGCHLD); pthread_sigmask(SIG_BLOCK, &blkset, 0); #endif // First check the existing action. // If someone else is already handling this signal, then make // sure our external handler is not yet defined. // This should never happen. struct sigaction oldact; int stat = sigaction(SIGCHLD, NULL, &oldact); if ( stat != 0 ) return istat=1; if ( oldact.sa_handler != SIG_DFL && oldact.sa_handler != SIG_IGN && external_handler() != 0 ) { return istat=2; } #ifndef ChildWatcher_USES_THREADS // Define action for this signal to call rip(). // and to block SIGCHLD during execution. struct sigaction act; act.sa_handler = rip; sigset_t sigset; sigaddset(&sigset, SIGCHLD); act.sa_mask = sigset; stat = sigaction(SIGCHLD, &act, &oldact); if ( stat != 0 ) return istat=3; #endif // Block SIGCHLD. block(); // Record the old handler for unknown process termination. external_handler() = oldact.sa_handler; // Initialize process arrays. process_table_mutex().lock(); for ( pid_t pid=0; pid 0; #else sigset_t sset; int stat = sigpending(&sset); if ( stat != 0 ) perror("ChildWatcher::pending (1)"); stat = sigismember(&sset, SIGCHLD); if ( stat == 1 ) return true; if ( stat == 0 ) return false; perror("ChildWatcher::pending (2)"); return false; #endif } //********************************************************************** // Print. ostream& ChildWatcher::print(ostream& str, int level) { msgmtx().lock(); str << "Child watcher "; if ( ! init() ) { str << "ChildWatcher is not initialized" << endl; msgmtx().unlock(); return str; } else { str << "is "; if ( ! is_blocked() ) str << "not "; str << "blocked"; } str << endl; long lcount = live_count(); long dcount = dead_count(); long ucount = unborn_count(); long ecount = error_count(); str << " " << lcount << " live"; if ( pending() ) { str << " or pending"; } str << " process"; if ( lcount != 1 ) str << "es"; str << endl; str << " " << dcount << " dead process"; if ( dcount != 1 ) str << "es"; if ( pending() ) { str << " (more pending)"; } str << endl; str << " " << ucount << " unknown process"; if ( ucount != 1 ) str << "es"; if ( debug_msg() ) { str << endl; #ifndef ChildWatcher_USES_THREADS str << "ChildWatcher internal action: " << long(rip) << endl; #endif show_sigchild(std::cout); msgmtx().unlock(); } if ( ecount != 0 ) { str << " " << ecount << " error process"; if ( ecount != 1 ) str << "es"; } // End of level 0. if ( level <= 0 ) { msgmtx().unlock(); return str; } str << "\nProcess status table" << endl; for ( pid_t pid=0; pid