// WsClientScheduler.cxx #include "dial_ws_sched/WsClientScheduler.h" #include #include "dataset_util/XmlElement.h" #include "dataset_util/PThreadMutex.h" #include "dataset_xml/XmlParser.h" #include "dial_ws/DialWsClient.h" #include "../gsoap/dial_ws_schedulerH.h" using std::string; using std::ostream; using std::cout; using std::cerr; using std::endl; using std::auto_ptr; using dial::DialWsClient; using dial::WsClientScheduler; using dial::Application; using dial::Task; using dset::Dataset; using dial::JobId; using dial::JobIdList; using dial::Job; using dial::Scheduler; using dial::WsClientScheduler; #ifdef WITH_GSI #define GLOBUS_DEBUG_H #include "gsi.h" #endif //********************************************************************** // Local definitions. //********************************************************************** class WsClientScheduler::Imp : public DialWsClient { public: int status; PThreadMutex mutex; Text log; Imp(string urlin, bool usegsi) : DialWsClient(urlin, usegsi), status(0), log("WsClientScheduler.log") { } }; //********************************************************************** namespace { XmlParser& parser() { static XmlParser psr; return psr; } string gsi_server_name; } // ends unnamed namespace //********************************************************************** // Static methods. //********************************************************************** const Text& WsClientScheduler::dtd() { static Text txt; return txt; } //********************************************************************** // Methods. //********************************************************************** WsClientScheduler::WsClientScheduler(string url, bool usegsi) : pimp(new Imp(url, usegsi)) { set_timeout(60); pimp->set_delegation(true); } //********************************************************************** WsClientScheduler::~WsClientScheduler() { if ( ! pimp->status ) { soap_done(pimp->psoap()); } delete pimp; } //********************************************************************** // Add task. int WsClientScheduler:: add_task(const Application& app, const Task& tsk) { if ( pimp->status ) return false; int hastsk = false; auto_ptr pxapp(app.xml()); auto_ptr pxtsk(tsk.xml(false)); int sstat = dial_ws_scheduler:: soap_call_dial__add_task(pimp->psoap(), pimp->curl(), "", pxapp->to_xml_text(), pxtsk->to_xml_text(), hastsk); if ( sstat != SOAP_OK ) { soap_print_fault(pimp->psoap(), stderr); return false; } return hastsk; } //********************************************************************** // Remove task. int WsClientScheduler:: remove_task(const Application& app, const Task& tsk) { if ( pimp->status ) return false; int hastsk = false; auto_ptr pxapp(app.xml()); auto_ptr pxtsk(tsk.xml(false)); int sstat = dial_ws_scheduler:: soap_call_dial__remove_task(pimp->psoap(), pimp->curl(), "", pxapp->to_xml_text(), pxtsk->to_xml_text(), hastsk); if ( sstat != SOAP_OK ) { soap_print_fault(pimp->psoap(), stderr); return false; } return hastsk; } //********************************************************************** // Submit job. JobId WsClientScheduler:: submit(const Application& app, const Task& tsk, const Dataset& dst, const JobPreferences& prf) { string prefix = "WsClientScheduler::submit: "; if ( pimp->status ) return JobId(); auto_ptr pxapp(app.xml()); if ( &*pxapp == 0 ) { cerr << prefix << "Unable to extract XML from application" << endl; cerr << app << endl; return JobId(); } auto_ptr pxtsk(tsk.xml(false)); if ( &*pxtsk == 0 ) { cerr << prefix << "Unable to extract XML from task" << endl; cerr << tsk << endl; return JobId(); } auto_ptr pxdst(dst.xml()); if ( &*pxdst == 0 ) { cerr << prefix << "Unable to extract XML from dataset" << endl; cerr << dst << endl; return JobId(); } auto_ptr pxprf(prf.xml()); if ( &*pxprf == 0 ) { cerr << prefix << "Unable to extract XML from preferences" << endl; cerr << prf << endl; return JobId(); } string sxjid; int sstat = dial_ws_scheduler:: soap_call_dial__submit(pimp->psoap(), pimp->curl(), "", pxapp->to_xml_text(), pxtsk->to_xml_text(), pxdst->to_xml_text(), pxprf->to_xml_text(), sxjid); if ( sstat != SOAP_OK ) { soap_print_fault(pimp->psoap(), stderr); return JobId(); } auto_ptr pxjid(parser().parse(sxjid)); if ( pxjid.get() == 0 ) return JobId(); JobId jid(*pxjid); return jid; } //********************************************************************** // Start job. int WsClientScheduler::start(JobId jid) { if ( pimp->status ) return 999; int rstat = 50; auto_ptr pxjid(jid.xml()); int sstat = dial_ws_scheduler:: soap_call_dial__start(pimp->psoap(), pimp->curl(), "", pxjid->to_xml_text(), rstat); if ( sstat != SOAP_OK ) { soap_print_fault(pimp->psoap(), stderr); return 51; } return rstat; } //********************************************************************** // Kill job. int WsClientScheduler::kill(JobId jid) { if ( pimp->status ) return 999; int rstat = 50; auto_ptr pxjid(jid.xml()); int sstat = dial_ws_scheduler:: soap_call_dial__kill(pimp->psoap(), pimp->curl(), "", pxjid->to_xml_text(), rstat); if ( sstat != SOAP_OK ) { soap_print_fault(pimp->psoap(), stderr); return 51; } return rstat; } //********************************************************************** // Remove job. int WsClientScheduler::remove(JobId jid) { if ( pimp->status ) return 999; int rstat = 50; auto_ptr pxjid(jid.xml()); int sstat = dial_ws_scheduler:: soap_call_dial__remove(pimp->psoap(), pimp->curl(), "", pxjid->to_xml_text(), rstat); if ( sstat != SOAP_OK ) { soap_print_fault(pimp->psoap(), stderr); return 51; } return rstat; } //********************************************************************** // Lock mutex. int WsClientScheduler::lock() const { return pimp->mutex.lock(); } //********************************************************************** // Unock mutex. int WsClientScheduler::unlock() const { return pimp->mutex.unlock(); } //********************************************************************** // Ping. bool WsClientScheduler::ping() const { return pimp->ping(); } //********************************************************************** // Validity. bool WsClientScheduler::is_valid() const { if ( pimp->status != 0 ) return false; bool valid; int sstat = dial_ws_scheduler::soap_call_dial__is_valid( pimp->psoap(), pimp->curl(), "", valid); if ( sstat != SOAP_OK ) { soap_print_fault(pimp->psoap(), stderr); return false; } if ( ! valid ) return valid; sstat = dial_ws_scheduler:: soap_call_dial__scheduler_is_valid(pimp->psoap(), pimp->curl(), "", valid); if ( sstat != SOAP_OK ) { soap_print_fault(pimp->psoap(), stderr); return false; } return valid; } //********************************************************************** // Has app. bool WsClientScheduler:: has_application(const Application& app) const { if ( pimp->status ) return false; bool hasapp; auto_ptr pxapp(app.xml()); int sstat = dial_ws_scheduler:: soap_call_dial__has_application(pimp->psoap(), pimp->curl(), "", pxapp->to_xml_text(), hasapp); if ( sstat != SOAP_OK ) { soap_print_fault(pimp->psoap(), stderr); return false; } return hasapp; } //********************************************************************** // Has task. bool WsClientScheduler:: has_task(const Application& app, const Task& tsk) const { if ( pimp->status ) return false; bool hastsk = false; auto_ptr pxapp(app.xml()); auto_ptr pxtsk(tsk.xml(false)); int sstat = dial_ws_scheduler:: soap_call_dial__has_task(pimp->psoap(), pimp->curl(), "", pxapp->to_xml_text(), pxtsk->to_xml_text(), hastsk); if ( sstat != SOAP_OK ) { soap_print_fault(pimp->psoap(), stderr); return false; } return hastsk; } //********************************************************************** // Task job. JobId WsClientScheduler:: task_job(const Application& app, const Task& tsk) const { if ( pimp->status ) return JobId(); string sjid; auto_ptr pxapp(app.xml()); auto_ptr pxtsk(tsk.xml(false)); int sstat = dial_ws_scheduler:: soap_call_dial__task_job(pimp->psoap(), pimp->curl(), "", pxapp->to_xml_text(), pxtsk->to_xml_text(), sjid); if ( sstat != SOAP_OK ) { soap_print_fault(pimp->psoap(), stderr); return JobId(); } return JobId(sjid); } //********************************************************************** // List of jobs. JobIdList WsClientScheduler::jobs() const { JobIdList jids; jids.erase(jids.begin(), jids.end()); if ( pimp->status ) return jids; string sjids; int sstat = dial_ws_scheduler:: soap_call_dial__jobs(pimp->psoap(), pimp->curl(), "", sjids); if ( sstat != SOAP_OK ) { cerr << "WsClientScheduler: SOAP call failed" << endl; soap_print_fault(pimp->psoap(), stderr); return jids; } Text::WordList words = Text::split(sjids); for ( Text::WordList::const_iterator ijid=words.begin(); ijid!=words.end(); ++ijid ) { JobId jid(*ijid); jids.push_back(jid); } return jids; } //********************************************************************** // Has job. bool WsClientScheduler::has_job(JobId jid) const { if ( pimp->status ) return false; bool hasjob = false; if ( ! jid.is_valid() ) { return false; } auto_ptr pxjid(jid.xml()); int sstat = dial_ws_scheduler:: soap_call_dial__has_job(pimp->psoap(), pimp->curl(), "", pxjid->to_xml_text(), hasjob); if ( sstat != SOAP_OK ) { soap_print_fault(pimp->psoap(), stderr); return false; } return hasjob; } //********************************************************************** // Fetch job. Job& WsClientScheduler::job(JobId jid) const { static Job bad_job; static Job* pjob = 0; // Delete the last job. delete pjob; pjob = 0; if ( pimp->status ) return bad_job; string sxjob; const char* line1 = "=================================================="; const char* line2 = "--------------------------------------------------"; auto_ptr pxjid(jid.xml()); int sstat = dial_ws_scheduler:: soap_call_dial__job(pimp->psoap(), pimp->curl(), "", pxjid->to_xml_text(), sxjob); if ( sstat != SOAP_OK ) { soap_print_fault(pimp->psoap(), stderr); //soap_end(pimp->psoap()); return bad_job; } if ( ! sxjob.size() ) { return bad_job; } auto_ptr pxjob(parser().parse(sxjob)); //soap_end(pimp->psoap()); if ( pxjob.get() == 0 ) { pimp->log.append(line1); pimp->log.append("Error fetching job"); pimp->log.append(line2); pimp->log.append(pxjid->to_xml_text()); pimp->log.append(line2); pimp->log.append(sxjob); pimp->log.append(line1); return bad_job; } pjob = new Job(*pxjob); return *pjob; } //********************************************************************** // XML. const XmlElement* WsClientScheduler::xml() const { if ( pimp->status ) return 0; auto_ptr pele(new XmlElement(WsClientScheduler::xml_name())); return pele.release(); } //********************************************************************** // Return log. Text WsClientScheduler::log() const { int rstat; pimp->log.append("Requested server to write log"); dial_ws_scheduler:: soap_call_dial__write_log(pimp->psoap(), pimp->curl(), "", rstat); return pimp->log; } //********************************************************************** // Set the timeout. void WsClientScheduler::set_timeout(int to) { pimp->set_timeout(to); } //********************************************************************** // Get the timeout. int WsClientScheduler::timeout() const { return pimp->timeout(); } //********************************************************************** // Set the delegation. int WsClientScheduler::set_delegation(bool del) { return pimp->set_delegation(del); } //********************************************************************** // Get the delegation. bool WsClientScheduler::is_delegating() const { return pimp->is_delegating(); } //********************************************************************** // Get the resource report. string WsClientScheduler::resource_report() const { string srep; dial_ws_scheduler::soap_call_dial__resource_report( pimp->psoap(), pimp->curl(), "", srep ); return srep; } //********************************************************************** // Terminate service. void WsClientScheduler::terminate_service() const { int rstat; dial_ws_scheduler:: soap_call_dial__terminate(pimp->psoap(), pimp->curl(), "", rstat); } //********************************************************************** // Output stream. ostream& WsClientScheduler::ostr(ostream& str) const { str << "WsClientScheduler connected to " << pimp->curl() << endl; #ifdef WITH_GSI gsi_plugin_data* pgdata = (struct gsi_plugin_data*) soap_lookup_plugin(pimp->psoap(), GSI_PLUGIN_ID); if ( gsi_server_name.size() ) { str << "GSI server ID: " << gsi_server_name << endl; } else { str << "GSI server ID is unknown" << endl; } #endif return job_report(str); } //**********************************************************************