kmessageio.cpp
00001 /* 00002 This file is part of the KDE games library 00003 Copyright (C) 2001 Burkhard Lehner (Burkhard.Lehner@gmx.de) 00004 00005 This library is free software; you can redistribute it and/or 00006 modify it under the terms of the GNU Library General Public 00007 License version 2 as published by the Free Software Foundation. 00008 00009 This library is distributed in the hope that it will be useful, 00010 but WITHOUT ANY WARRANTY; without even the implied warranty of 00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00012 Library General Public License for more details. 00013 00014 You should have received a copy of the GNU Library General Public License 00015 along with this library; see the file COPYING.LIB. If not, write to 00016 the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, 00017 Boston, MA 02110-1301, USA. 00018 */ 00019 00020 /* 00021 KMessageIO class and subclasses KMessageSocket and KMessageDirect 00022 */ 00023 00024 #include "kmessageio.h" 00025 #include <tqsocket.h> 00026 #include <kdebug.h> 00027 #include <kprocess.h> 00028 #include <tqfile.h> 00029 00030 // ----------------------- KMessageIO ------------------------- 00031 00032 KMessageIO::KMessageIO (TQObject *parent, const char *name) 00033 : TQObject (parent, name), m_id (0) 00034 {} 00035 00036 KMessageIO::~KMessageIO () 00037 {} 00038 00039 void KMessageIO::setId (TQ_UINT32 id) 00040 { 00041 m_id = id; 00042 } 00043 00044 TQ_UINT32 KMessageIO::id () 00045 { 00046 return m_id; 00047 } 00048 00049 // ----------------------KMessageSocket ----------------------- 00050 00051 KMessageSocket::KMessageSocket (TQString host, TQ_UINT16 port, TQObject *parent, 00052 const char *name) 00053 : KMessageIO (parent, name) 00054 { 00055 mSocket = new TQSocket (); 00056 mSocket->connectToHost (host, port); 00057 initSocket (); 00058 } 00059 00060 KMessageSocket::KMessageSocket (TQHostAddress host, TQ_UINT16 port, TQObject 00061 *parent, const char *name) 00062 : KMessageIO (parent, name) 00063 { 00064 mSocket = new TQSocket (); 00065 mSocket->connectToHost (host.toString(), port); 00066 initSocket (); 00067 } 00068 00069 KMessageSocket::KMessageSocket (TQSocket *socket, TQObject *parent, const char 00070 *name) 00071 : KMessageIO (parent, name) 00072 { 00073 mSocket = socket; 00074 initSocket (); 00075 } 00076 00077 KMessageSocket::KMessageSocket (int socketFD, TQObject *parent, const char 00078 *name) 00079 : KMessageIO (parent, name) 00080 { 00081 mSocket = new TQSocket (); 00082 mSocket->setSocket (socketFD); 00083 initSocket (); 00084 } 00085 00086 KMessageSocket::~KMessageSocket () 00087 { 00088 delete mSocket; 00089 } 00090 00091 bool KMessageSocket::isConnected () const 00092 { 00093 return mSocket->state() == TQSocket::Connection; 00094 } 00095 00096 void KMessageSocket::send (const TQByteArray &msg) 00097 { 00098 TQDataStream str (mSocket); 00099 str << TQ_UINT8 ('M'); // magic number for begin of message 00100 str.writeBytes (msg.data(), msg.size()); // writes the length (as TQ_UINT32) and the data 00101 } 00102 00103 void KMessageSocket::processNewData () 00104 { 00105 if (isRecursive) 00106 return; 00107 isRecursive = true; 00108 00109 TQDataStream str (mSocket); 00110 while (mSocket->bytesAvailable() > 0) 00111 { 00112 if (mAwaitingHeader) 00113 { 00114 // Header = magic number + packet length = 5 bytes 00115 if (mSocket->bytesAvailable() < 5) 00116 { 00117 isRecursive = false; 00118 return; 00119 } 00120 00121 // Read the magic number first. If something unexpected is found, 00122 // start over again, ignoring the data that was read up to then. 00123 00124 TQ_UINT8 v; 00125 str >> v; 00126 if (v != 'M') 00127 { 00128 kdWarning(11001) << k_funcinfo << ": Received unexpected data, magic number wrong!" << endl; 00129 continue; 00130 } 00131 00132 str >> mNextBlockLength; 00133 mAwaitingHeader = false; 00134 } 00135 else 00136 { 00137 // Data not completely read => wait for more 00138 if (mSocket->bytesAvailable() < (TQ_ULONG) mNextBlockLength) 00139 { 00140 isRecursive = false; 00141 return; 00142 } 00143 00144 TQByteArray msg (mNextBlockLength); 00145 str.readRawBytes (msg.data(), mNextBlockLength); 00146 00147 // send the received message 00148 emit received (msg); 00149 00150 // Waiting for the header of the next message 00151 mAwaitingHeader = true; 00152 } 00153 } 00154 00155 isRecursive = false; 00156 } 00157 00158 void KMessageSocket::initSocket () 00159 { 00160 connect (mSocket, TQT_SIGNAL (error(int)), TQT_SIGNAL (connectionBroken())); 00161 connect (mSocket, TQT_SIGNAL (connectionClosed()), TQT_SIGNAL (connectionBroken())); 00162 connect (mSocket, TQT_SIGNAL (readyRead()), TQT_SLOT (processNewData())); 00163 mAwaitingHeader = true; 00164 mNextBlockLength = 0; 00165 isRecursive = false; 00166 } 00167 00168 TQ_UINT16 KMessageSocket::peerPort () const 00169 { 00170 return mSocket->peerPort(); 00171 } 00172 00173 TQString KMessageSocket::peerName () const 00174 { 00175 return mSocket->peerName(); 00176 } 00177 00178 // ----------------------KMessageDirect ----------------------- 00179 00180 KMessageDirect::KMessageDirect (KMessageDirect *partner, TQObject *parent, 00181 const char *name) 00182 : KMessageIO (parent, name), mPartner (0) 00183 { 00184 // 0 as first parameter leaves the object unconnected 00185 if (!partner) 00186 return; 00187 00188 // Check if the other object is already connected 00189 if (partner && partner->mPartner) 00190 { 00191 kdWarning(11001) << k_funcinfo << ": Object is already connected!" << endl; 00192 return; 00193 } 00194 00195 // Connect from us to that object 00196 mPartner = partner; 00197 00198 // Connect the other object to us 00199 partner->mPartner = this; 00200 } 00201 00202 KMessageDirect::~KMessageDirect () 00203 { 00204 if (mPartner) 00205 { 00206 mPartner->mPartner = 0; 00207 emit mPartner->connectionBroken(); 00208 } 00209 } 00210 00211 bool KMessageDirect::isConnected () const 00212 { 00213 return mPartner != 0; 00214 } 00215 00216 void KMessageDirect::send (const TQByteArray &msg) 00217 { 00218 if (mPartner) 00219 emit mPartner->received (msg); 00220 else 00221 kdError(11001) << k_funcinfo << ": Not yet connected!" << endl; 00222 } 00223 00224 00225 // ----------------------- KMessageProcess --------------------------- 00226 00227 KMessageProcess::~KMessageProcess() 00228 { 00229 kdDebug(11001) << "@@@KMessageProcess::Delete process" << endl; 00230 if (mProcess) 00231 { 00232 mProcess->kill(); 00233 delete mProcess; 00234 mProcess=0; 00235 // Remove not send buffers 00236 mQueue.setAutoDelete(true); 00237 mQueue.clear(); 00238 // Maybe todo: delete mSendBuffer 00239 } 00240 } 00241 KMessageProcess::KMessageProcess(TQObject *parent, TQString file) : KMessageIO(parent,0) 00242 { 00243 // Start process 00244 kdDebug(11001) << "@@@KMessageProcess::Start process" << endl; 00245 mProcessName=file; 00246 mProcess=new KProcess; 00247 int id=0; 00248 *mProcess << mProcessName << TQString("%1").arg(id); 00249 kdDebug(11001) << "@@@KMessageProcess::Init:Id= " << id << endl; 00250 kdDebug(11001) << "@@@KMessgeProcess::Init:Processname: " << mProcessName << endl; 00251 connect(mProcess, TQT_SIGNAL(receivedStdout(KProcess *, char *, int )), 00252 this, TQT_SLOT(slotReceivedStdout(KProcess *, char * , int ))); 00253 connect(mProcess, TQT_SIGNAL(receivedStderr(KProcess *, char *, int )), 00254 this, TQT_SLOT(slotReceivedStderr(KProcess *, char * , int ))); 00255 connect(mProcess, TQT_SIGNAL(processExited(KProcess *)), 00256 this, TQT_SLOT(slotProcessExited(KProcess *))); 00257 connect(mProcess, TQT_SIGNAL(wroteStdin(KProcess *)), 00258 this, TQT_SLOT(slotWroteStdin(KProcess *))); 00259 mProcess->start(KProcess::NotifyOnExit,KProcess::All); 00260 mSendBuffer=0; 00261 mReceiveCount=0; 00262 mReceiveBuffer.resize(1024); 00263 } 00264 bool KMessageProcess::isConnected() const 00265 { 00266 kdDebug(11001) << "@@@KMessageProcess::Is conencted" << endl; 00267 if (!mProcess) return false; 00268 return mProcess->isRunning(); 00269 } 00270 void KMessageProcess::send(const TQByteArray &msg) 00271 { 00272 kdDebug(11001) << "@@@KMessageProcess:: SEND("<<msg.size()<<") to process" << endl; 00273 unsigned int size=msg.size()+2*sizeof(long); 00274 00275 char *tmpbuffer=new char[size]; 00276 long *p1=(long *)tmpbuffer; 00277 long *p2=p1+1; 00278 kdDebug(11001) << "p1="<<p1 << "p2="<< p2 << endl; 00279 memcpy(tmpbuffer+2*sizeof(long),msg.data(),msg.size()); 00280 *p1=0x4242aeae; 00281 *p2=size; 00282 00283 TQByteArray *buffer=new TQByteArray(); 00284 buffer->assign(tmpbuffer,size); 00285 // buffer->duplicate(msg); 00286 mQueue.enqueue(buffer); 00287 writeToProcess(); 00288 } 00289 void KMessageProcess::writeToProcess() 00290 { 00291 // Previous send ok and item in queue 00292 if (mSendBuffer || mQueue.isEmpty()) return ; 00293 mSendBuffer=mQueue.dequeue(); 00294 if (!mSendBuffer) return ; 00295 00296 // write it out to the process 00297 // kdDebug(11001) << " @@@@@@ writeToProcess::SEND to process " << mSendBuffer->size() << " BYTE " << endl; 00298 // char *p=mSendBuffer->data(); 00299 // for (int i=0;i<16;i++) printf("%02x ",(unsigned char)(*(p+i)));printf("\n"); 00300 mProcess->writeStdin(mSendBuffer->data(),mSendBuffer->size()); 00301 00302 } 00303 void KMessageProcess::slotWroteStdin(KProcess * ) 00304 { 00305 kdDebug(11001) << k_funcinfo << endl; 00306 if (mSendBuffer) 00307 { 00308 delete mSendBuffer; 00309 mSendBuffer=0; 00310 } 00311 writeToProcess(); 00312 } 00313 00314 void KMessageProcess::slotReceivedStderr(KProcess * proc, char *buffer, int buflen) 00315 { 00316 int pid=0; 00317 int len; 00318 char *p; 00319 char *pos; 00320 // kdDebug(11001)<<"############# Got stderr " << buflen << " bytes" << endl; 00321 00322 if (!buffer || buflen==0) return ; 00323 if (proc) pid=proc->pid(); 00324 00325 00326 pos=buffer; 00327 do 00328 { 00329 p=(char *)memchr(pos,'\n',buflen); 00330 if (!p) len=buflen; 00331 else len=p-pos; 00332 00333 TQByteArray a; 00334 a.setRawData(pos,len); 00335 TQString s(a); 00336 kdDebug(11001) << "PID" <<pid<< ":" << s << endl; 00337 a.resetRawData(pos,len); 00338 if (p) pos=p+1; 00339 buflen-=len+1; 00340 }while(buflen>0); 00341 } 00342 00343 00344 void KMessageProcess::slotReceivedStdout(KProcess * , char *buffer, int buflen) 00345 { 00346 kdDebug(11001) << "$$$$$$ " << k_funcinfo << ": Received " << buflen << " bytes over inter process communication" << endl; 00347 00348 // TODO Make a plausibility check on buflen to avoid memory overflow 00349 while (mReceiveCount+buflen>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024); 00350 memcpy(mReceiveBuffer.data()+mReceiveCount,buffer,buflen); 00351 mReceiveCount+=buflen; 00352 00353 // Possbile message 00354 while (mReceiveCount>2*sizeof(long)) 00355 { 00356 long *p1=(long *)mReceiveBuffer.data(); 00357 long *p2=p1+1; 00358 unsigned int len; 00359 if (*p1!=0x4242aeae) 00360 { 00361 kdDebug(11001) << k_funcinfo << ": Cookie error...transmission failure...serious problem..." << endl; 00362 // for (int i=0;i<mReceiveCount;i++) fprintf(stderr,"%02x ",mReceiveBuffer[i]);fprintf(stderr,"\n"); 00363 } 00364 len=(int)(*p2); 00365 if (len<2*sizeof(long)) 00366 { 00367 kdDebug(11001) << k_funcinfo << ": Message size error" << endl; 00368 break; 00369 } 00370 if (len<=mReceiveCount) 00371 { 00372 kdDebug(11001) << k_funcinfo << ": Got message with len " << len << endl; 00373 00374 TQByteArray msg; 00375 // msg.setRawData(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long)); 00376 msg.duplicate(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long)); 00377 emit received(msg); 00378 // msg.resetRawData(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long)); 00379 // Shift buffer 00380 if (len<mReceiveCount) 00381 { 00382 memmove(mReceiveBuffer.data(),mReceiveBuffer.data()+len,mReceiveCount-len); 00383 } 00384 mReceiveCount-=len; 00385 } 00386 else break; 00387 } 00388 } 00389 00390 void KMessageProcess::slotProcessExited(KProcess * /*p*/) 00391 { 00392 kdDebug(11001) << "Process exited (slot)" << endl; 00393 emit connectionBroken(); 00394 delete mProcess; 00395 mProcess=0; 00396 } 00397 00398 00399 // ----------------------- KMessageFilePipe --------------------------- 00400 KMessageFilePipe::KMessageFilePipe(TQObject *parent,TQFile *readfile,TQFile *writefile) : KMessageIO(parent,0) 00401 { 00402 mReadFile=readfile; 00403 mWriteFile=writefile; 00404 mReceiveCount=0; 00405 mReceiveBuffer.resize(1024); 00406 } 00407 00408 KMessageFilePipe::~KMessageFilePipe() 00409 { 00410 } 00411 00412 bool KMessageFilePipe::isConnected () const 00413 { 00414 return (mReadFile!=0)&&(mWriteFile!=0); 00415 } 00416 00417 void KMessageFilePipe::send(const TQByteArray &msg) 00418 { 00419 unsigned int size=msg.size()+2*sizeof(long); 00420 00421 char *tmpbuffer=new char[size]; 00422 long *p1=(long *)tmpbuffer; 00423 long *p2=p1+1; 00424 memcpy(tmpbuffer+2*sizeof(long),msg.data(),msg.size()); 00425 *p1=0x4242aeae; 00426 *p2=size; 00427 00428 TQByteArray buffer; 00429 buffer.assign(tmpbuffer,size); 00430 mWriteFile->writeBlock(buffer); 00431 mWriteFile->flush(); 00432 /* 00433 fprintf(stderr,"+++ KMessageFilePipe:: SEND(%d to parent) realsize=%d\n",msg.size(),buffer.size()); 00434 for (int i=0;i<buffer.size();i++) fprintf(stderr,"%02x ",buffer[i]);fprintf(stderr,"\n"); 00435 fflush(stderr); 00436 */ 00437 } 00438 00439 void KMessageFilePipe::exec() 00440 { 00441 00442 // According to BL: Blocking read is ok 00443 // while(mReadFile->atEnd()) { usleep(100); } 00444 00445 int ch=mReadFile->getch(); 00446 00447 while (mReceiveCount>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024); 00448 mReceiveBuffer[mReceiveCount]=(char)ch; 00449 mReceiveCount++; 00450 00451 // Change for message 00452 if (mReceiveCount>=2*sizeof(long)) 00453 { 00454 long *p1=(long *)mReceiveBuffer.data(); 00455 long *p2=p1+1; 00456 unsigned int len; 00457 if (*p1!=0x4242aeae) 00458 { 00459 fprintf(stderr,"KMessageFilePipe::exec:: Cookie error...transmission failure...serious problem...\n"); 00460 // for (int i=0;i<16;i++) fprintf(stderr,"%02x ",mReceiveBuffer[i]);fprintf(stderr,"\n"); 00461 } 00462 len=(int)(*p2); 00463 if (len==mReceiveCount) 00464 { 00465 //fprintf(stderr,"KMessageFilePipe::exec:: Got Message with len %d\n",len); 00466 00467 TQByteArray msg; 00468 //msg.setRawData(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long)); 00469 msg.duplicate(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long)); 00470 emit received(msg); 00471 //msg.resetRawData(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long)); 00472 mReceiveCount=0; 00473 } 00474 } 00475 00476 00477 return ; 00478 00479 00480 } 00481 00482 #include "kmessageio.moc"