scheduler.cpp
00001 /* This file is part of the KDE libraries 00002 Copyright (C) 2000 Stephan Kulow <coolo@kde.org> 00003 Waldo Bastian <bastian@kde.org> 00004 00005 This library is free software; you can redistribute it and/or 00006 modify it under the terms of the GNU Library General Public 00007 License version 2 as published by the Free Software Foundation. 00008 00009 This library is distributed in the hope that it will be useful, 00010 but WITHOUT ANY WARRANTY; without even the implied warranty of 00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00012 Library General Public License for more details. 00013 00014 You should have received a copy of the GNU Library General Public License 00015 along with this library; see the file COPYING.LIB. If not, write to 00016 the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, 00017 Boston, MA 02110-1301, USA. 00018 */ 00019 00020 #include "tdeio/sessiondata.h" 00021 #include "tdeio/slaveconfig.h" 00022 #include "tdeio/scheduler.h" 00023 #include "tdeio/authinfo.h" 00024 #include "tdeio/slave.h" 00025 #include <tqptrlist.h> 00026 #include <tqdict.h> 00027 00028 #include <dcopclient.h> 00029 00030 #include <kdebug.h> 00031 #include <tdeglobal.h> 00032 #include <tdeprotocolmanager.h> 00033 #include <kprotocolinfo.h> 00034 #include <assert.h> 00035 #include <kstaticdeleter.h> 00036 #include <tdesu/client.h> 00037 00038 00039 // Slaves may be idle for MAX_SLAVE_IDLE time before they are being returned 00040 // to the system wide slave pool. (3 minutes) 00041 #define MAX_SLAVE_IDLE (3*60) 00042 00043 using namespace TDEIO; 00044 00045 template class TQDict<TDEIO::Scheduler::ProtocolInfo>; 00046 00047 Scheduler *Scheduler::instance = 0; 00048 00049 class TDEIO::SlaveList: public TQPtrList<Slave> 00050 { 00051 public: 00052 SlaveList() { } 00053 }; 00054 00055 // 00056 // There are two kinds of protocol: 00057 // (1) The protocol of the url 00058 // (2) The actual protocol that the io-slave uses. 00059 // 00060 // These two often match, but not necasserily. Most notably, they don't 00061 // match when doing ftp via a proxy. 00062 // In that case (1) is ftp, but (2) is http. 00063 // 00064 // JobData::protocol stores (2) while Job::url().protocol() returns (1). 00065 // The ProtocolInfoDict is indexed with (2). 00066 // 00067 // We schedule slaves based on (2) but tell the slave about (1) via 00068 // Slave::setProtocol(). 00069 00070 class TDEIO::Scheduler::JobData 00071 { 00072 public: 00073 JobData() : checkOnHold(false) { } 00074 00075 public: 00076 TQString protocol; 00077 TQString proxy; 00078 bool checkOnHold; 00079 }; 00080 00081 class TDEIO::Scheduler::ExtraJobData: public TQPtrDict<TDEIO::Scheduler::JobData> 00082 { 00083 public: 00084 ExtraJobData() { setAutoDelete(true); } 00085 }; 00086 00087 class TDEIO::Scheduler::ProtocolInfo 00088 { 00089 public: 00090 ProtocolInfo() : maxSlaves(1), skipCount(0) 00091 { 00092 joblist.setAutoDelete(false); 00093 } 00094 00095 TQPtrList<SimpleJob> joblist; 00096 SlaveList activeSlaves; 00097 int maxSlaves; 00098 int skipCount; 00099 TQString protocol; 00100 }; 00101 00102 class TDEIO::Scheduler::ProtocolInfoDict : public TQDict<TDEIO::Scheduler::ProtocolInfo> 00103 { 00104 public: 00105 ProtocolInfoDict() { } 00106 00107 TDEIO::Scheduler::ProtocolInfo *get( const TQString &protocol); 00108 }; 00109 00110 TDEIO::Scheduler::ProtocolInfo * 00111 TDEIO::Scheduler::ProtocolInfoDict::get(const TQString &protocol) 00112 { 00113 ProtocolInfo *info = find(protocol); 00114 if (!info) 00115 { 00116 info = new ProtocolInfo; 00117 info->protocol = protocol; 00118 info->maxSlaves = KProtocolInfo::maxSlaves( protocol ); 00119 00120 insert(protocol, info); 00121 } 00122 return info; 00123 } 00124 00125 00126 Scheduler::Scheduler() 00127 : DCOPObject( "TDEIO::Scheduler" ), 00128 TQObject(kapp, "scheduler"), 00129 slaveTimer(0, "Scheduler::slaveTimer"), 00130 coSlaveTimer(0, "Scheduler::coSlaveTimer"), 00131 cleanupTimer(0, "Scheduler::cleanupTimer") 00132 { 00133 checkOnHold = true; // !! Always check with TDELauncher for the first request. 00134 slaveOnHold = 0; 00135 protInfoDict = new ProtocolInfoDict; 00136 slaveList = new SlaveList; 00137 idleSlaves = new SlaveList; 00138 coIdleSlaves = new SlaveList; 00139 extraJobData = new ExtraJobData; 00140 sessionData = new SessionData; 00141 slaveConfig = SlaveConfig::self(); 00142 connect(&slaveTimer, TQT_SIGNAL(timeout()), TQT_SLOT(startStep())); 00143 connect(&coSlaveTimer, TQT_SIGNAL(timeout()), TQT_SLOT(slotScheduleCoSlave())); 00144 connect(&cleanupTimer, TQT_SIGNAL(timeout()), TQT_SLOT(slotCleanIdleSlaves())); 00145 busy = false; 00146 } 00147 00148 Scheduler::~Scheduler() 00149 { 00150 protInfoDict->setAutoDelete(true); 00151 delete protInfoDict; protInfoDict = 0; 00152 delete idleSlaves; idleSlaves = 0; 00153 delete coIdleSlaves; coIdleSlaves = 0; 00154 slaveList->setAutoDelete(true); 00155 delete slaveList; slaveList = 0; 00156 delete extraJobData; extraJobData = 0; 00157 delete sessionData; sessionData = 0; 00158 instance = 0; 00159 } 00160 00161 void 00162 Scheduler::debug_info() 00163 { 00164 } 00165 00166 bool Scheduler::process(const TQCString &fun, const TQByteArray &data, TQCString &replyType, TQByteArray &replyData ) 00167 { 00168 if ( fun != "reparseSlaveConfiguration(TQString)" ) 00169 return DCOPObject::process( fun, data, replyType, replyData ); 00170 00171 slaveConfig = SlaveConfig::self(); 00172 replyType = "void"; 00173 TQDataStream stream( data, IO_ReadOnly ); 00174 TQString proto; 00175 stream >> proto; 00176 00177 kdDebug( 7006 ) << "reparseConfiguration( " << proto << " )" << endl; 00178 KProtocolManager::reparseConfiguration(); 00179 slaveConfig->reset(); 00180 sessionData->reset(); 00181 NetRC::self()->reload(); 00182 00183 Slave *slave = slaveList->first(); 00184 for (; slave; slave = slaveList->next() ) 00185 if ( slave->slaveProtocol() == proto || proto.isEmpty() ) 00186 { 00187 slave->send( CMD_REPARSECONFIGURATION ); 00188 slave->resetHost(); 00189 } 00190 return true; 00191 } 00192 00193 QCStringList Scheduler::functions() 00194 { 00195 QCStringList funcs = DCOPObject::functions(); 00196 funcs << "void reparseSlaveConfiguration(TQString)"; 00197 return funcs; 00198 } 00199 00200 void Scheduler::_doJob(SimpleJob *job) { 00201 JobData *jobData = new JobData; 00202 jobData->protocol = KProtocolManager::slaveProtocol(job->url(), jobData->proxy); 00203 // kdDebug(7006) << "Scheduler::_doJob protocol=" << jobData->protocol << endl; 00204 if (job->command() == CMD_GET) 00205 { 00206 jobData->checkOnHold = checkOnHold; 00207 checkOnHold = false; 00208 } 00209 extraJobData->replace(job, jobData); 00210 newJobs.append(job); 00211 slaveTimer.start(0, true); 00212 #ifndef NDEBUG 00213 if (newJobs.count() > 150) 00214 kdDebug() << "WARNING - TDEIO::Scheduler got more than 150 jobs! This shows a misuse in your app (yes, a job is a TQObject)." << endl; 00215 #endif 00216 } 00217 00218 void Scheduler::_scheduleJob(SimpleJob *job) { 00219 newJobs.removeRef(job); 00220 JobData *jobData = extraJobData->find(job); 00221 if (!jobData) 00222 { 00223 kdFatal(7006) << "BUG! _ScheduleJob(): No extraJobData for job!" << endl; 00224 return; 00225 } 00226 TQString protocol = jobData->protocol; 00227 // kdDebug(7006) << "Scheduler::_scheduleJob protocol=" << protocol << endl; 00228 ProtocolInfo *protInfo = protInfoDict->get(protocol); 00229 protInfo->joblist.append(job); 00230 00231 slaveTimer.start(0, true); 00232 } 00233 00234 void Scheduler::_cancelJob(SimpleJob *job) { 00235 // kdDebug(7006) << "Scheduler: canceling job " << job << endl; 00236 Slave *slave = job->slave(); 00237 if ( !slave ) 00238 { 00239 // was not yet running (don't call this on a finished job!) 00240 JobData *jobData = extraJobData->find(job); 00241 if (!jobData) 00242 return; // I said: "Don't call this on a finished job!" 00243 00244 newJobs.removeRef(job); 00245 ProtocolInfo *protInfo = protInfoDict->get(jobData->protocol); 00246 protInfo->joblist.removeRef(job); 00247 00248 // Search all slaves to see if job is in the queue of a coSlave 00249 slave = slaveList->first(); 00250 for(; slave; slave = slaveList->next()) 00251 { 00252 JobList *list = coSlaves.find(slave); 00253 if (list && list->removeRef(job)) 00254 break; // Job was found and removed. 00255 // Fall through to kill the slave as well! 00256 } 00257 if (!slave) 00258 { 00259 extraJobData->remove(job); 00260 return; // Job was not yet running and not in a coSlave queue. 00261 } 00262 } 00263 kdDebug(7006) << "Scheduler: killing slave " << slave->slave_pid() << endl; 00264 slave->kill(); 00265 _jobFinished( job, slave ); 00266 slotSlaveDied( slave); 00267 } 00268 00269 void Scheduler::startStep() 00270 { 00271 while(newJobs.count()) 00272 { 00273 (void) startJobDirect(); 00274 } 00275 TQDictIterator<TDEIO::Scheduler::ProtocolInfo> it(*protInfoDict); 00276 while(it.current()) 00277 { 00278 if (startJobScheduled(it.current())) return; 00279 ++it; 00280 } 00281 } 00282 00283 void Scheduler::setupSlave(TDEIO::Slave *slave, const KURL &url, const TQString &protocol, const TQString &proxy , bool newSlave, const TDEIO::MetaData *config) 00284 { 00285 TQString host = url.host(); 00286 int port = url.port(); 00287 TQString user = url.user(); 00288 TQString passwd = url.pass(); 00289 00290 if ((newSlave) || 00291 (slave->host() != host) || 00292 (slave->port() != port) || 00293 (slave->user() != user) || 00294 (slave->passwd() != passwd)) 00295 { 00296 slaveConfig = SlaveConfig::self(); 00297 00298 MetaData configData = slaveConfig->configData(protocol, host); 00299 sessionData->configDataFor( configData, protocol, host ); 00300 00301 configData["UseProxy"] = proxy; 00302 00303 TQString autoLogin = configData["EnableAutoLogin"].lower(); 00304 if ( autoLogin == "true" ) 00305 { 00306 NetRC::AutoLogin l; 00307 l.login = user; 00308 bool usern = (protocol == "ftp"); 00309 if ( NetRC::self()->lookup( url, l, usern) ) 00310 { 00311 configData["autoLoginUser"] = l.login; 00312 configData["autoLoginPass"] = l.password; 00313 if ( usern ) 00314 { 00315 TQString macdef; 00316 TQMap<TQString, TQStringList>::ConstIterator it = l.macdef.begin(); 00317 for ( ; it != l.macdef.end(); ++it ) 00318 macdef += it.key() + '\\' + it.data().join( "\\" ) + '\n'; 00319 configData["autoLoginMacro"] = macdef; 00320 } 00321 } 00322 } 00323 if (config) 00324 configData += *config; 00325 slave->setConfig(configData); 00326 slave->setProtocol(url.protocol()); 00327 slave->setHost(host, port, user, passwd); 00328 } 00329 } 00330 00331 bool Scheduler::startJobScheduled(ProtocolInfo *protInfo) 00332 { 00333 if (protInfo->joblist.isEmpty()) 00334 return false; 00335 00336 // kdDebug(7006) << "Scheduling job" << endl; 00337 debug_info(); 00338 bool newSlave = false; 00339 00340 SimpleJob *job = 0; 00341 Slave *slave = 0; 00342 00343 if (protInfo->skipCount > 2) 00344 { 00345 bool dummy; 00346 // Prevent starvation. We skip the first entry in the queue at most 00347 // 2 times in a row. The 00348 protInfo->skipCount = 0; 00349 job = protInfo->joblist.at(0); 00350 slave = findIdleSlave(protInfo, job, dummy ); 00351 } 00352 else 00353 { 00354 bool exact=false; 00355 SimpleJob *firstJob = 0; 00356 Slave *firstSlave = 0; 00357 for(uint i = 0; (i < protInfo->joblist.count()) && (i < 10); i++) 00358 { 00359 job = protInfo->joblist.at(i); 00360 slave = findIdleSlave(protInfo, job, exact); 00361 if (!firstSlave) 00362 { 00363 firstJob = job; 00364 firstSlave = slave; 00365 } 00366 if (!slave) break; 00367 if (exact) break; 00368 } 00369 00370 if (!exact) 00371 { 00372 slave = firstSlave; 00373 job = firstJob; 00374 } 00375 if (job == firstJob) 00376 protInfo->skipCount = 0; 00377 else 00378 protInfo->skipCount++; 00379 } 00380 00381 if (!slave) 00382 { 00383 if ( protInfo->maxSlaves > static_cast<int>(protInfo->activeSlaves.count()) ) 00384 { 00385 newSlave = true; 00386 slave = createSlave(protInfo, job, job->url()); 00387 if (!slave) 00388 slaveTimer.start(0, true); 00389 } 00390 } 00391 00392 if (!slave) 00393 { 00394 // kdDebug(7006) << "No slaves available" << endl; 00395 // kdDebug(7006) << " -- active: " << protInfo->activeSlaves.count() << endl; 00396 return false; 00397 } 00398 00399 protInfo->activeSlaves.append(slave); 00400 idleSlaves->removeRef(slave); 00401 protInfo->joblist.removeRef(job); 00402 // kdDebug(7006) << "scheduler: job started " << job << endl; 00403 00404 00405 JobData *jobData = extraJobData->find(job); 00406 setupSlave(slave, job->url(), jobData->protocol, jobData->proxy, newSlave); 00407 job->start(slave); 00408 00409 slaveTimer.start(0, true); 00410 return true; 00411 } 00412 00413 bool Scheduler::startJobDirect() 00414 { 00415 debug_info(); 00416 SimpleJob *job = newJobs.take(0); 00417 JobData *jobData = extraJobData->find(job); 00418 if (!jobData) 00419 { 00420 kdFatal(7006) << "BUG! startjobDirect(): No extraJobData for job!" 00421 << endl; 00422 return false; 00423 } 00424 TQString protocol = jobData->protocol; 00425 ProtocolInfo *protInfo = protInfoDict->get(protocol); 00426 00427 bool newSlave = false; 00428 bool dummy; 00429 00430 // Look for matching slave 00431 Slave *slave = findIdleSlave(protInfo, job, dummy); 00432 00433 if (!slave) 00434 { 00435 newSlave = true; 00436 slave = createSlave(protInfo, job, job->url()); 00437 } 00438 00439 if (!slave) 00440 return false; 00441 00442 idleSlaves->removeRef(slave); 00443 // kdDebug(7006) << "scheduler: job started " << job << endl; 00444 00445 setupSlave(slave, job->url(), protocol, jobData->proxy, newSlave); 00446 job->start(slave); 00447 return true; 00448 } 00449 00450 static Slave *searchIdleList(SlaveList *idleSlaves, const KURL &url, const TQString &protocol, bool &exact) 00451 { 00452 TQString host = url.host(); 00453 int port = url.port(); 00454 TQString user = url.user(); 00455 exact = true; 00456 00457 for( Slave *slave = idleSlaves->first(); 00458 slave; 00459 slave = idleSlaves->next()) 00460 { 00461 if ((protocol == slave->slaveProtocol()) && 00462 (host == slave->host()) && 00463 (port == slave->port()) && 00464 (user == slave->user())) 00465 return slave; 00466 } 00467 00468 exact = false; 00469 00470 // Look for slightly matching slave 00471 for( Slave *slave = idleSlaves->first(); 00472 slave; 00473 slave = idleSlaves->next()) 00474 { 00475 if (protocol == slave->slaveProtocol()) 00476 return slave; 00477 } 00478 return 0; 00479 } 00480 00481 Slave *Scheduler::findIdleSlave(ProtocolInfo *, SimpleJob *job, bool &exact) 00482 { 00483 Slave *slave = 0; 00484 JobData *jobData = extraJobData->find(job); 00485 if (!jobData) 00486 { 00487 kdFatal(7006) << "BUG! findIdleSlave(): No extraJobData for job!" << endl; 00488 return 0; 00489 } 00490 if (jobData->checkOnHold) 00491 { 00492 slave = Slave::holdSlave(jobData->protocol, job->url()); 00493 if (slave) 00494 return slave; 00495 } 00496 if (slaveOnHold) 00497 { 00498 // Make sure that the job wants to do a GET or a POST, and with no offset 00499 bool bCanReuse = (job->command() == CMD_GET); 00500 TDEIO::TransferJob * tJob = dynamic_cast<TDEIO::TransferJob *>(job); 00501 if ( tJob ) 00502 { 00503 bCanReuse = (job->command() == CMD_GET || job->command() == CMD_SPECIAL); 00504 if ( bCanReuse ) 00505 { 00506 TDEIO::MetaData outgoing = tJob->outgoingMetaData(); 00507 TQString resume = (!outgoing.contains("resume")) ? TQString() : outgoing["resume"]; 00508 kdDebug(7006) << "Resume metadata is '" << resume << "'" << endl; 00509 bCanReuse = (resume.isEmpty() || resume == "0"); 00510 } 00511 } 00512 // kdDebug(7006) << "bCanReuse = " << bCanReuse << endl; 00513 if (bCanReuse) 00514 { 00515 if (job->url() == urlOnHold) 00516 { 00517 kdDebug(7006) << "HOLD: Reusing held slave for " << urlOnHold.prettyURL() << endl; 00518 slave = slaveOnHold; 00519 } 00520 else 00521 { 00522 kdDebug(7006) << "HOLD: Discarding held slave (" << urlOnHold.prettyURL() << ")" << endl; 00523 slaveOnHold->kill(); 00524 } 00525 slaveOnHold = 0; 00526 urlOnHold = KURL(); 00527 } 00528 if (slave) 00529 return slave; 00530 } 00531 00532 return searchIdleList(idleSlaves, job->url(), jobData->protocol, exact); 00533 } 00534 00535 Slave *Scheduler::createSlave(ProtocolInfo *protInfo, SimpleJob *job, const KURL &url) 00536 { 00537 int error; 00538 TQString errortext; 00539 Slave *slave = Slave::createSlave(protInfo->protocol, url, error, errortext); 00540 if (slave) 00541 { 00542 slaveList->append(slave); 00543 idleSlaves->append(slave); 00544 connect(slave, TQT_SIGNAL(slaveDied(TDEIO::Slave *)), 00545 TQT_SLOT(slotSlaveDied(TDEIO::Slave *))); 00546 connect(slave, TQT_SIGNAL(slaveStatus(pid_t,const TQCString &,const TQString &, bool)), 00547 TQT_SLOT(slotSlaveStatus(pid_t,const TQCString &, const TQString &, bool))); 00548 00549 connect(slave,TQT_SIGNAL(authorizationKey(const TQCString&, const TQCString&, bool)), 00550 sessionData,TQT_SLOT(slotAuthData(const TQCString&, const TQCString&, bool))); 00551 connect(slave,TQT_SIGNAL(delAuthorization(const TQCString&)), sessionData, 00552 TQT_SLOT(slotDelAuthData(const TQCString&))); 00553 } 00554 else 00555 { 00556 kdError() <<": couldn't create slave : " << errortext << endl; 00557 if (job) 00558 { 00559 protInfo->joblist.removeRef(job); 00560 extraJobData->remove(job); 00561 job->slotError( error, errortext ); 00562 } 00563 } 00564 return slave; 00565 } 00566 00567 void Scheduler::slotSlaveStatus(pid_t, const TQCString &, const TQString &, bool) 00568 { 00569 } 00570 00571 void Scheduler::_jobFinished(SimpleJob *job, Slave *slave) 00572 { 00573 JobData *jobData = extraJobData->take(job); 00574 if (!jobData) 00575 { 00576 kdFatal(7006) << "BUG! _jobFinished(): No extraJobData for job!" << endl; 00577 return; 00578 } 00579 ProtocolInfo *protInfo = protInfoDict->get(jobData->protocol); 00580 delete jobData; 00581 slave->disconnect(job); 00582 protInfo->activeSlaves.removeRef(slave); 00583 if (slave->isAlive()) 00584 { 00585 JobList *list = coSlaves.find(slave); 00586 if (list) 00587 { 00588 assert(slave->isConnected()); 00589 assert(!coIdleSlaves->contains(slave)); 00590 coIdleSlaves->append(slave); 00591 if (!list->isEmpty()) 00592 coSlaveTimer.start(0, true); 00593 return; 00594 } 00595 else 00596 { 00597 assert(!slave->isConnected()); 00598 idleSlaves->append(slave); 00599 slave->setIdle(); 00600 _scheduleCleanup(); 00601 // slave->send( CMD_SLAVE_STATUS ); 00602 } 00603 } 00604 if (protInfo->joblist.count()) 00605 { 00606 slaveTimer.start(0, true); 00607 } 00608 } 00609 00610 void Scheduler::slotSlaveDied(TDEIO::Slave *slave) 00611 { 00612 assert(!slave->isAlive()); 00613 ProtocolInfo *protInfo = protInfoDict->get(slave->slaveProtocol()); 00614 protInfo->activeSlaves.removeRef(slave); 00615 if (slave == slaveOnHold) 00616 { 00617 slaveOnHold = 0; 00618 urlOnHold = KURL(); 00619 } 00620 idleSlaves->removeRef(slave); 00621 JobList *list = coSlaves.find(slave); 00622 if (list) 00623 { 00624 // coSlave dies, kill jobs waiting in queue 00625 disconnectSlave(slave); 00626 } 00627 00628 if (!slaveList->removeRef(slave)) 00629 kdDebug(7006) << "Scheduler: BUG!! Slave " << slave << "/" << slave->slave_pid() << " died, but is NOT in slaveList!!!\n" << endl; 00630 else 00631 slave->deref(); // Delete slave 00632 } 00633 00634 void Scheduler::slotCleanIdleSlaves() 00635 { 00636 for(Slave *slave = idleSlaves->first();slave;) 00637 { 00638 if (slave->idleTime() >= MAX_SLAVE_IDLE) 00639 { 00640 // kdDebug(7006) << "Removing idle slave: " << slave->slaveProtocol() << " " << slave->host() << endl; 00641 Slave *removeSlave = slave; 00642 slave = idleSlaves->next(); 00643 idleSlaves->removeRef(removeSlave); 00644 slaveList->removeRef(removeSlave); 00645 removeSlave->connection()->close(); 00646 removeSlave->deref(); 00647 } 00648 else 00649 { 00650 slave = idleSlaves->next(); 00651 } 00652 } 00653 _scheduleCleanup(); 00654 } 00655 00656 void Scheduler::_scheduleCleanup() 00657 { 00658 if (idleSlaves->count()) 00659 { 00660 if (!cleanupTimer.isActive()) 00661 cleanupTimer.start( MAX_SLAVE_IDLE*1000, true ); 00662 } 00663 } 00664 00665 void Scheduler::_putSlaveOnHold(TDEIO::SimpleJob *job, const KURL &url) 00666 { 00667 Slave *slave = job->slave(); 00668 slave->disconnect(job); 00669 00670 if (slaveOnHold) 00671 { 00672 slaveOnHold->kill(); 00673 } 00674 slaveOnHold = slave; 00675 urlOnHold = url; 00676 slaveOnHold->suspend(); 00677 } 00678 00679 void Scheduler::_publishSlaveOnHold() 00680 { 00681 if (!slaveOnHold) 00682 return; 00683 00684 slaveOnHold->hold(urlOnHold); 00685 } 00686 00687 void Scheduler::_removeSlaveOnHold() 00688 { 00689 if (slaveOnHold) 00690 { 00691 slaveOnHold->kill(); 00692 } 00693 slaveOnHold = 0; 00694 urlOnHold = KURL(); 00695 } 00696 00697 Slave * 00698 Scheduler::_getConnectedSlave(const KURL &url, const TDEIO::MetaData &config ) 00699 { 00700 TQString proxy; 00701 TQString protocol = KProtocolManager::slaveProtocol(url, proxy); 00702 bool dummy; 00703 Slave *slave = searchIdleList(idleSlaves, url, protocol, dummy); 00704 if (!slave) 00705 { 00706 ProtocolInfo *protInfo = protInfoDict->get(protocol); 00707 slave = createSlave(protInfo, 0, url); 00708 } 00709 if (!slave) 00710 return 0; // Error 00711 idleSlaves->removeRef(slave); 00712 00713 setupSlave(slave, url, protocol, proxy, true, &config); 00714 00715 slave->send( CMD_CONNECT ); 00716 connect(slave, TQT_SIGNAL(connected()), 00717 TQT_SLOT(slotSlaveConnected())); 00718 connect(slave, TQT_SIGNAL(error(int, const TQString &)), 00719 TQT_SLOT(slotSlaveError(int, const TQString &))); 00720 00721 coSlaves.insert(slave, new TQPtrList<SimpleJob>()); 00722 // kdDebug(7006) << "_getConnectedSlave( " << slave << ")" << endl; 00723 return slave; 00724 } 00725 00726 void 00727 Scheduler::slotScheduleCoSlave() 00728 { 00729 Slave *nextSlave; 00730 slaveConfig = SlaveConfig::self(); 00731 for(Slave *slave = coIdleSlaves->first(); 00732 slave; 00733 slave = nextSlave) 00734 { 00735 nextSlave = coIdleSlaves->next(); 00736 JobList *list = coSlaves.find(slave); 00737 assert(list); 00738 if (list && !list->isEmpty()) 00739 { 00740 SimpleJob *job = list->take(0); 00741 coIdleSlaves->removeRef(slave); 00742 // kdDebug(7006) << "scheduler: job started " << job << endl; 00743 00744 assert(!coIdleSlaves->contains(slave)); 00745 00746 KURL url =job->url(); 00747 TQString host = url.host(); 00748 int port = url.port(); 00749 00750 if (slave->host() == "<reset>") 00751 { 00752 TQString user = url.user(); 00753 TQString passwd = url.pass(); 00754 00755 MetaData configData = slaveConfig->configData(url.protocol(), url.host()); 00756 slave->setConfig(configData); 00757 slave->setProtocol(url.protocol()); 00758 slave->setHost(host, port, user, passwd); 00759 } 00760 00761 assert(slave->protocol() == url.protocol()); 00762 assert(slave->host() == host); 00763 assert(slave->port() == port); 00764 job->start(slave); 00765 } 00766 } 00767 } 00768 00769 void 00770 Scheduler::slotSlaveConnected() 00771 { 00772 Slave *slave = (Slave *)sender(); 00773 // kdDebug(7006) << "slotSlaveConnected( " << slave << ")" << endl; 00774 slave->setConnected(true); 00775 disconnect(slave, TQT_SIGNAL(connected()), 00776 this, TQT_SLOT(slotSlaveConnected())); 00777 emit slaveConnected(slave); 00778 assert(!coIdleSlaves->contains(slave)); 00779 coIdleSlaves->append(slave); 00780 coSlaveTimer.start(0, true); 00781 } 00782 00783 void 00784 Scheduler::slotSlaveError(int errorNr, const TQString &errorMsg) 00785 { 00786 Slave *slave = (Slave *)sender(); 00787 if (!slave->isConnected() || (coIdleSlaves->find(slave) != -1)) 00788 { 00789 // Only forward to application if slave is idle or still connecting. 00790 emit slaveError(slave, errorNr, errorMsg); 00791 } 00792 } 00793 00794 bool 00795 Scheduler::_assignJobToSlave(TDEIO::Slave *slave, SimpleJob *job) 00796 { 00797 // kdDebug(7006) << "_assignJobToSlave( " << job << ", " << slave << ")" << endl; 00798 TQString dummy; 00799 if ((slave->slaveProtocol() != KProtocolManager::slaveProtocol( job->url(), dummy )) 00800 || 00801 (!newJobs.removeRef(job))) 00802 { 00803 kdDebug(7006) << "_assignJobToSlave(): ERROR, nonmatching or unknown job." << endl; 00804 job->kill(); 00805 return false; 00806 } 00807 00808 JobList *list = coSlaves.find(slave); 00809 assert(list); 00810 if (!list) 00811 { 00812 kdDebug(7006) << "_assignJobToSlave(): ERROR, unknown slave." << endl; 00813 job->kill(); 00814 return false; 00815 } 00816 00817 assert(list->contains(job) == 0); 00818 list->append(job); 00819 coSlaveTimer.start(0, true); // Start job on timer event 00820 00821 return true; 00822 } 00823 00824 bool 00825 Scheduler::_disconnectSlave(TDEIO::Slave *slave) 00826 { 00827 // kdDebug(7006) << "_disconnectSlave( " << slave << ")" << endl; 00828 JobList *list = coSlaves.take(slave); 00829 assert(list); 00830 if (!list) 00831 return false; 00832 // Kill jobs still in queue. 00833 while(!list->isEmpty()) 00834 { 00835 Job *job = list->take(0); 00836 job->kill(); 00837 } 00838 delete list; 00839 coIdleSlaves->removeRef(slave); 00840 assert(!coIdleSlaves->contains(slave)); 00841 disconnect(slave, TQT_SIGNAL(connected()), 00842 this, TQT_SLOT(slotSlaveConnected())); 00843 disconnect(slave, TQT_SIGNAL(error(int, const TQString &)), 00844 this, TQT_SLOT(slotSlaveError(int, const TQString &))); 00845 if (slave->isAlive()) 00846 { 00847 idleSlaves->append(slave); 00848 slave->send( CMD_DISCONNECT ); 00849 slave->setIdle(); 00850 slave->setConnected(false); 00851 _scheduleCleanup(); 00852 } 00853 return true; 00854 } 00855 00856 void 00857 Scheduler::_checkSlaveOnHold(bool b) 00858 { 00859 checkOnHold = b; 00860 } 00861 00862 void 00863 Scheduler::_registerWindow(TQWidget *wid) 00864 { 00865 if (!wid) 00866 return; 00867 00868 TQObject *obj = TQT_TQOBJECT(wid); 00869 if (!m_windowList.contains(obj)) 00870 { 00871 // We must store the window Id because by the time 00872 // the destroyed signal is emitted we can no longer 00873 // access TQWidget::winId() (already destructed) 00874 WId windowId = wid->winId(); 00875 m_windowList.insert(obj, windowId); 00876 connect(TQT_TQOBJECT(wid), TQT_SIGNAL(destroyed(TQObject *)), 00877 this, TQT_SLOT(slotUnregisterWindow(TQObject*))); 00878 TQByteArray params; 00879 TQDataStream stream(params, IO_WriteOnly); 00880 stream << windowId; 00881 if( !kapp->dcopClient()->send( "kded", "kded", 00882 "registerWindowId(long int)", params ) ) 00883 kdDebug(7006) << "Could not register window with kded!" << endl; 00884 } 00885 } 00886 00887 void 00888 Scheduler::slotUnregisterWindow(TQObject *obj) 00889 { 00890 if (!obj) 00891 return; 00892 00893 TQMap<TQObject *, WId>::Iterator it = m_windowList.find(obj); 00894 if (it == m_windowList.end()) 00895 return; 00896 WId windowId = it.data(); 00897 disconnect( it.key(), TQT_SIGNAL(destroyed(TQObject *)), 00898 this, TQT_SLOT(slotUnregisterWindow(TQObject*))); 00899 m_windowList.remove( it ); 00900 if (kapp) 00901 { 00902 TQByteArray params; 00903 TQDataStream stream(params, IO_WriteOnly); 00904 stream << windowId; 00905 kapp->dcopClient()->send( "kded", "kded", 00906 "unregisterWindowId(long int)", params ); 00907 } 00908 } 00909 00910 Scheduler* Scheduler::self() { 00911 if ( !instance ) { 00912 instance = new Scheduler; 00913 } 00914 return instance; 00915 } 00916 00917 void Scheduler::virtual_hook( int id, void* data ) 00918 { DCOPObject::virtual_hook( id, data ); } 00919 00920 00921 00922 #include "scheduler.moc"