kbufferedsocket.cpp
00001 /* -*- C++ -*- 00002 * Copyright (C) 2003-2005 Thiago Macieira <thiago.macieira@kdemail.net> 00003 * 00004 * 00005 * Permission is hereby granted, free of charge, to any person obtaining 00006 * a copy of this software and associated documentation files (the 00007 * "Software"), to deal in the Software without restriction, including 00008 * without limitation the rights to use, copy, modify, merge, publish, 00009 * distribute, sublicense, and/or sell copies of the Software, and to 00010 * permit persons to whom the Software is furnished to do so, subject to 00011 * the following conditions: 00012 * 00013 * The above copyright notice and this permission notice shall be included 00014 * in all copies or substantial portions of the Software. 00015 * 00016 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 00017 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 00018 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 00019 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 00020 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 00021 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 00022 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 00023 */ 00024 00025 #include <config.h> 00026 00027 #include <tqmutex.h> 00028 #include <tqtimer.h> 00029 00030 #include "tdesocketdevice.h" 00031 #include "tdesocketaddress.h" 00032 #include "tdesocketbuffer_p.h" 00033 #include "kbufferedsocket.h" 00034 00035 using namespace KNetwork; 00036 using namespace KNetwork::Internal; 00037 00038 class KNetwork::TDEBufferedSocketPrivate 00039 { 00040 public: 00041 mutable TDESocketBuffer *input, *output; 00042 00043 TDEBufferedSocketPrivate() 00044 { 00045 input = 0L; 00046 output = 0L; 00047 } 00048 }; 00049 00050 TDEBufferedSocket::TDEBufferedSocket(const TQString& host, const TQString& service, 00051 TQObject *parent, const char *name) 00052 : KStreamSocket(host, service, parent, name), 00053 d(new TDEBufferedSocketPrivate) 00054 { 00055 setInputBuffering(true); 00056 setOutputBuffering(true); 00057 } 00058 00059 TDEBufferedSocket::~TDEBufferedSocket() 00060 { 00061 closeNow(); 00062 delete d->input; 00063 delete d->output; 00064 delete d; 00065 } 00066 00067 void TDEBufferedSocket::setSocketDevice(TDESocketDevice* device) 00068 { 00069 KStreamSocket::setSocketDevice(device); 00070 device->setBlocking(false); 00071 } 00072 00073 bool TDEBufferedSocket::setSocketOptions(int opts) 00074 { 00075 if (opts == Blocking) 00076 return false; 00077 00078 opts &= ~Blocking; 00079 return KStreamSocket::setSocketOptions(opts); 00080 } 00081 00082 void TDEBufferedSocket::close() 00083 { 00084 if (!d->output || d->output->isEmpty()) 00085 closeNow(); 00086 else 00087 { 00088 setState(Closing); 00089 TQSocketNotifier *n = socketDevice()->readNotifier(); 00090 if (n) 00091 n->setEnabled(false); 00092 emit stateChanged(Closing); 00093 } 00094 } 00095 00096 #ifdef USE_QT3 00097 TQ_LONG TDEBufferedSocket::bytesAvailable() const 00098 #endif 00099 #ifdef USE_QT4 00100 qint64 TDEBufferedSocket::bytesAvailable() const 00101 #endif 00102 { 00103 if (!d->input) 00104 return KStreamSocket::bytesAvailable(); 00105 00106 return d->input->length(); 00107 } 00108 00109 TQ_LONG TDEBufferedSocket::waitForMore(int msecs, bool *timeout) 00110 { 00111 TQ_LONG retval = KStreamSocket::waitForMore(msecs, timeout); 00112 if (d->input) 00113 { 00114 resetError(); 00115 slotReadActivity(); 00116 return bytesAvailable(); 00117 } 00118 return retval; 00119 } 00120 00121 TQT_TQIO_LONG TDEBufferedSocket::tqreadBlock(char *data, TQT_TQIO_ULONG maxlen) 00122 { 00123 if (d->input) 00124 { 00125 if (d->input->isEmpty()) 00126 { 00127 setError(IO_ReadError, WouldBlock); 00128 emit gotError(WouldBlock); 00129 return -1; 00130 } 00131 resetError(); 00132 return d->input->consumeBuffer(data, maxlen); 00133 } 00134 return KStreamSocket::tqreadBlock(data, maxlen); 00135 } 00136 00137 TQT_TQIO_LONG TDEBufferedSocket::tqreadBlock(char *data, TQT_TQIO_ULONG maxlen, TDESocketAddress& from) 00138 { 00139 from = peerAddress(); 00140 return tqreadBlock(data, maxlen); 00141 } 00142 00143 TQ_LONG TDEBufferedSocket::peekBlock(char *data, TQ_ULONG maxlen) 00144 { 00145 if (d->input) 00146 { 00147 if (d->input->isEmpty()) 00148 { 00149 setError(IO_ReadError, WouldBlock); 00150 emit gotError(WouldBlock); 00151 return -1; 00152 } 00153 resetError(); 00154 return d->input->consumeBuffer(data, maxlen, false); 00155 } 00156 return KStreamSocket::peekBlock(data, maxlen); 00157 } 00158 00159 TQ_LONG TDEBufferedSocket::peekBlock(char *data, TQ_ULONG maxlen, TDESocketAddress& from) 00160 { 00161 from = peerAddress(); 00162 return peekBlock(data, maxlen); 00163 } 00164 00165 TQT_TQIO_LONG TDEBufferedSocket::tqwriteBlock(const char *data, TQT_TQIO_ULONG len) 00166 { 00167 if (state() != Connected) 00168 { 00169 // cannot write now! 00170 setError(IO_WriteError, NotConnected); 00171 return -1; 00172 } 00173 00174 if (d->output) 00175 { 00176 if (d->output->isFull()) 00177 { 00178 setError(IO_WriteError, WouldBlock); 00179 emit gotError(WouldBlock); 00180 return -1; 00181 } 00182 resetError(); 00183 00184 // enable notifier to send data 00185 TQSocketNotifier *n = socketDevice()->writeNotifier(); 00186 if (n) 00187 n->setEnabled(true); 00188 00189 return d->output->feedBuffer(data, len); 00190 } 00191 00192 return KStreamSocket::tqwriteBlock(data, len); 00193 } 00194 00195 TQT_TQIO_LONG TDEBufferedSocket::tqwriteBlock(const char *data, TQT_TQIO_ULONG maxlen, 00196 const TDESocketAddress&) 00197 { 00198 // ignore the third parameter 00199 return tqwriteBlock(data, maxlen); 00200 } 00201 00202 void TDEBufferedSocket::enableRead(bool enable) 00203 { 00204 KStreamSocket::enableRead(enable); 00205 if (!enable && d->input) 00206 { 00207 // reenable it 00208 TQSocketNotifier *n = socketDevice()->readNotifier(); 00209 if (n) 00210 n->setEnabled(true); 00211 } 00212 00213 if (enable && state() != Connected && d->input && !d->input->isEmpty()) 00214 // this means the buffer is still dirty 00215 // allow the signal to be emitted 00216 TQTimer::singleShot(0, this, TQT_SLOT(slotReadActivity())); 00217 } 00218 00219 void TDEBufferedSocket::enableWrite(bool enable) 00220 { 00221 KStreamSocket::enableWrite(enable); 00222 if (!enable && d->output && !d->output->isEmpty()) 00223 { 00224 // reenable it 00225 TQSocketNotifier *n = socketDevice()->writeNotifier(); 00226 if (n) 00227 n->setEnabled(true); 00228 } 00229 } 00230 00231 void TDEBufferedSocket::stateChanging(SocketState newState) 00232 { 00233 if (newState == Connecting || newState == Connected) 00234 { 00235 // we're going to connect 00236 // make sure the buffers are clean 00237 if (d->input) 00238 d->input->clear(); 00239 if (d->output) 00240 d->output->clear(); 00241 00242 // also, turn on notifiers 00243 enableRead(emitsReadyRead()); 00244 enableWrite(emitsReadyWrite()); 00245 } 00246 KStreamSocket::stateChanging(newState); 00247 } 00248 00249 void TDEBufferedSocket::setInputBuffering(bool enable) 00250 { 00251 TQMutexLocker locker(mutex()); 00252 if (!enable) 00253 { 00254 delete d->input; 00255 d->input = 0L; 00256 } 00257 else if (d->input == 0L) 00258 { 00259 d->input = new TDESocketBuffer; 00260 } 00261 } 00262 00263 TDEIOBufferBase* TDEBufferedSocket::inputBuffer() 00264 { 00265 return d->input; 00266 } 00267 00268 void TDEBufferedSocket::setOutputBuffering(bool enable) 00269 { 00270 TQMutexLocker locker(mutex()); 00271 if (!enable) 00272 { 00273 delete d->output; 00274 d->output = 0L; 00275 } 00276 else if (d->output == 0L) 00277 { 00278 d->output = new TDESocketBuffer; 00279 } 00280 } 00281 00282 TDEIOBufferBase* TDEBufferedSocket::outputBuffer() 00283 { 00284 return d->output; 00285 } 00286 00287 #ifdef USE_QT3 00288 TQ_ULONG TDEBufferedSocket::bytesToWrite() const 00289 #endif 00290 #ifdef USE_QT4 00291 qint64 TDEBufferedSocket::bytesToWrite() const 00292 #endif 00293 { 00294 if (!d->output) 00295 return 0; 00296 00297 return d->output->length(); 00298 } 00299 00300 void TDEBufferedSocket::closeNow() 00301 { 00302 KStreamSocket::close(); 00303 if (d->output) 00304 d->output->clear(); 00305 } 00306 00307 bool TDEBufferedSocket::canReadLine() const 00308 { 00309 if (!d->input) 00310 return false; 00311 00312 return d->input->canReadLine(); 00313 } 00314 00315 TQCString TDEBufferedSocket::readLine() 00316 { 00317 return d->input->readLine(); 00318 } 00319 00320 void TDEBufferedSocket::waitForConnect() 00321 { 00322 if (state() != Connecting) 00323 return; // nothing to be waited on 00324 00325 KStreamSocket::setSocketOptions(socketOptions() | Blocking); 00326 connectionEvent(); 00327 KStreamSocket::setSocketOptions(socketOptions() & ~Blocking); 00328 } 00329 00330 void TDEBufferedSocket::slotReadActivity() 00331 { 00332 if (d->input && state() == Connected) 00333 { 00334 mutex()->lock(); 00335 TQ_LONG len = d->input->receiveFrom(socketDevice()); 00336 00337 if (len == -1) 00338 { 00339 if (socketDevice()->error() != WouldBlock) 00340 { 00341 // nope, another error! 00342 copyError(); 00343 mutex()->unlock(); 00344 emit gotError(error()); 00345 closeNow(); // emits closed 00346 return; 00347 } 00348 } 00349 else if (len == 0) 00350 { 00351 // remotely closed 00352 setError(IO_ReadError, RemotelyDisconnected); 00353 mutex()->unlock(); 00354 emit gotError(error()); 00355 closeNow(); // emits closed 00356 return; 00357 } 00358 00359 // no error 00360 mutex()->unlock(); 00361 } 00362 00363 if (state() == Connected) 00364 KStreamSocket::slotReadActivity(); // this emits readyRead 00365 else if (emitsReadyRead()) // state() != Connected 00366 { 00367 if (d->input && !d->input->isEmpty()) 00368 { 00369 // buffer isn't empty 00370 // keep emitting signals till it is 00371 TQTimer::singleShot(0, this, TQT_SLOT(slotReadActivity())); 00372 emit readyRead(); 00373 } 00374 } 00375 } 00376 00377 void TDEBufferedSocket::slotWriteActivity() 00378 { 00379 if (d->output && !d->output->isEmpty() && 00380 (state() == Connected || state() == Closing)) 00381 { 00382 mutex()->lock(); 00383 TQ_LONG len = d->output->sendTo(socketDevice()); 00384 00385 if (len == -1) 00386 { 00387 if (socketDevice()->error() != WouldBlock) 00388 { 00389 // nope, another error! 00390 copyError(); 00391 mutex()->unlock(); 00392 emit gotError(error()); 00393 closeNow(); 00394 return; 00395 } 00396 } 00397 else if (len == 0) 00398 { 00399 // remotely closed 00400 setError(IO_ReadError, RemotelyDisconnected); 00401 mutex()->unlock(); 00402 emit gotError(error()); 00403 closeNow(); 00404 return; 00405 } 00406 00407 if (d->output->isEmpty()) 00408 // deactivate the notifier until we have something to send 00409 // writeNotifier can't return NULL here 00410 socketDevice()->writeNotifier()->setEnabled(false); 00411 00412 mutex()->unlock(); 00413 emit bytesWritten(len); 00414 } 00415 00416 if (state() != Closing) 00417 KStreamSocket::slotWriteActivity(); 00418 else if (d->output && d->output->isEmpty() && state() == Closing) 00419 { 00420 KStreamSocket::close(); // finished sending data 00421 } 00422 } 00423 00424 #include "kbufferedsocket.moc"