tdecore
kbufferedsocket.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <config.h>
00026
00027 #include <tqmutex.h>
00028 #include <tqtimer.h>
00029
00030 #include "tdesocketdevice.h"
00031 #include "tdesocketaddress.h"
00032 #include "tdesocketbuffer_p.h"
00033 #include "kbufferedsocket.h"
00034
00035 using namespace KNetwork;
00036 using namespace KNetwork::Internal;
00037
00038 class KNetwork::TDEBufferedSocketPrivate
00039 {
00040 public:
00041 mutable TDESocketBuffer *input, *output;
00042
00043 TDEBufferedSocketPrivate()
00044 {
00045 input = 0L;
00046 output = 0L;
00047 }
00048 };
00049
00050 TDEBufferedSocket::TDEBufferedSocket(const TQString& host, const TQString& service,
00051 TQObject *parent, const char *name)
00052 : KStreamSocket(host, service, parent, name),
00053 d(new TDEBufferedSocketPrivate)
00054 {
00055 setInputBuffering(true);
00056 setOutputBuffering(true);
00057 }
00058
00059 TDEBufferedSocket::~TDEBufferedSocket()
00060 {
00061 closeNow();
00062 delete d->input;
00063 delete d->output;
00064 delete d;
00065 }
00066
00067 void TDEBufferedSocket::setSocketDevice(TDESocketDevice* device)
00068 {
00069 KStreamSocket::setSocketDevice(device);
00070 device->setBlocking(false);
00071 }
00072
00073 bool TDEBufferedSocket::setSocketOptions(int opts)
00074 {
00075 if (opts == Blocking)
00076 return false;
00077
00078 opts &= ~Blocking;
00079 return KStreamSocket::setSocketOptions(opts);
00080 }
00081
00082 void TDEBufferedSocket::close()
00083 {
00084 if (!d->output || d->output->isEmpty())
00085 closeNow();
00086 else
00087 {
00088 setState(Closing);
00089 TQSocketNotifier *n = socketDevice()->readNotifier();
00090 if (n)
00091 n->setEnabled(false);
00092 emit stateChanged(Closing);
00093 }
00094 }
00095
00096 #ifdef USE_QT3
00097 TQ_LONG TDEBufferedSocket::bytesAvailable() const
00098 #endif
00099 #ifdef USE_QT4
00100 qint64 TDEBufferedSocket::bytesAvailable() const
00101 #endif
00102 {
00103 if (!d->input)
00104 return KStreamSocket::bytesAvailable();
00105
00106 return d->input->length();
00107 }
00108
00109 TQ_LONG TDEBufferedSocket::waitForMore(int msecs, bool *timeout)
00110 {
00111 TQ_LONG retval = KStreamSocket::waitForMore(msecs, timeout);
00112 if (d->input)
00113 {
00114 resetError();
00115 slotReadActivity();
00116 return bytesAvailable();
00117 }
00118 return retval;
00119 }
00120
00121 TQT_TQIO_LONG TDEBufferedSocket::tqreadBlock(char *data, TQT_TQIO_ULONG maxlen)
00122 {
00123 if (d->input)
00124 {
00125 if (d->input->isEmpty())
00126 {
00127 setError(IO_ReadError, WouldBlock);
00128 emit gotError(WouldBlock);
00129 return -1;
00130 }
00131 resetError();
00132 return d->input->consumeBuffer(data, maxlen);
00133 }
00134 return KStreamSocket::tqreadBlock(data, maxlen);
00135 }
00136
00137 TQT_TQIO_LONG TDEBufferedSocket::tqreadBlock(char *data, TQT_TQIO_ULONG maxlen, TDESocketAddress& from)
00138 {
00139 from = peerAddress();
00140 return tqreadBlock(data, maxlen);
00141 }
00142
00143 TQ_LONG TDEBufferedSocket::peekBlock(char *data, TQ_ULONG maxlen)
00144 {
00145 if (d->input)
00146 {
00147 if (d->input->isEmpty())
00148 {
00149 setError(IO_ReadError, WouldBlock);
00150 emit gotError(WouldBlock);
00151 return -1;
00152 }
00153 resetError();
00154 return d->input->consumeBuffer(data, maxlen, false);
00155 }
00156 return KStreamSocket::peekBlock(data, maxlen);
00157 }
00158
00159 TQ_LONG TDEBufferedSocket::peekBlock(char *data, TQ_ULONG maxlen, TDESocketAddress& from)
00160 {
00161 from = peerAddress();
00162 return peekBlock(data, maxlen);
00163 }
00164
00165 TQT_TQIO_LONG TDEBufferedSocket::tqwriteBlock(const char *data, TQT_TQIO_ULONG len)
00166 {
00167 if (state() != Connected)
00168 {
00169
00170 setError(IO_WriteError, NotConnected);
00171 return -1;
00172 }
00173
00174 if (d->output)
00175 {
00176 if (d->output->isFull())
00177 {
00178 setError(IO_WriteError, WouldBlock);
00179 emit gotError(WouldBlock);
00180 return -1;
00181 }
00182 resetError();
00183
00184
00185 TQSocketNotifier *n = socketDevice()->writeNotifier();
00186 if (n)
00187 n->setEnabled(true);
00188
00189 return d->output->feedBuffer(data, len);
00190 }
00191
00192 return KStreamSocket::tqwriteBlock(data, len);
00193 }
00194
00195 TQT_TQIO_LONG TDEBufferedSocket::tqwriteBlock(const char *data, TQT_TQIO_ULONG maxlen,
00196 const TDESocketAddress&)
00197 {
00198
00199 return tqwriteBlock(data, maxlen);
00200 }
00201
00202 void TDEBufferedSocket::enableRead(bool enable)
00203 {
00204 KStreamSocket::enableRead(enable);
00205 if (!enable && d->input)
00206 {
00207
00208 TQSocketNotifier *n = socketDevice()->readNotifier();
00209 if (n)
00210 n->setEnabled(true);
00211 }
00212
00213 if (enable && state() != Connected && d->input && !d->input->isEmpty())
00214
00215
00216 TQTimer::singleShot(0, this, TQT_SLOT(slotReadActivity()));
00217 }
00218
00219 void TDEBufferedSocket::enableWrite(bool enable)
00220 {
00221 KStreamSocket::enableWrite(enable);
00222 if (!enable && d->output && !d->output->isEmpty())
00223 {
00224
00225 TQSocketNotifier *n = socketDevice()->writeNotifier();
00226 if (n)
00227 n->setEnabled(true);
00228 }
00229 }
00230
00231 void TDEBufferedSocket::stateChanging(SocketState newState)
00232 {
00233 if (newState == Connecting || newState == Connected)
00234 {
00235
00236
00237 if (d->input)
00238 d->input->clear();
00239 if (d->output)
00240 d->output->clear();
00241
00242
00243 enableRead(emitsReadyRead());
00244 enableWrite(emitsReadyWrite());
00245 }
00246 KStreamSocket::stateChanging(newState);
00247 }
00248
00249 void TDEBufferedSocket::setInputBuffering(bool enable)
00250 {
00251 TQMutexLocker locker(mutex());
00252 if (!enable)
00253 {
00254 delete d->input;
00255 d->input = 0L;
00256 }
00257 else if (d->input == 0L)
00258 {
00259 d->input = new TDESocketBuffer;
00260 }
00261 }
00262
00263 TDEIOBufferBase* TDEBufferedSocket::inputBuffer()
00264 {
00265 return d->input;
00266 }
00267
00268 void TDEBufferedSocket::setOutputBuffering(bool enable)
00269 {
00270 TQMutexLocker locker(mutex());
00271 if (!enable)
00272 {
00273 delete d->output;
00274 d->output = 0L;
00275 }
00276 else if (d->output == 0L)
00277 {
00278 d->output = new TDESocketBuffer;
00279 }
00280 }
00281
00282 TDEIOBufferBase* TDEBufferedSocket::outputBuffer()
00283 {
00284 return d->output;
00285 }
00286
00287 #ifdef USE_QT3
00288 TQ_ULONG TDEBufferedSocket::bytesToWrite() const
00289 #endif
00290 #ifdef USE_QT4
00291 qint64 TDEBufferedSocket::bytesToWrite() const
00292 #endif
00293 {
00294 if (!d->output)
00295 return 0;
00296
00297 return d->output->length();
00298 }
00299
00300 void TDEBufferedSocket::closeNow()
00301 {
00302 KStreamSocket::close();
00303 if (d->output)
00304 d->output->clear();
00305 }
00306
00307 bool TDEBufferedSocket::canReadLine() const
00308 {
00309 if (!d->input)
00310 return false;
00311
00312 return d->input->canReadLine();
00313 }
00314
00315 TQCString TDEBufferedSocket::readLine()
00316 {
00317 return d->input->readLine();
00318 }
00319
00320 void TDEBufferedSocket::waitForConnect()
00321 {
00322 if (state() != Connecting)
00323 return;
00324
00325 KStreamSocket::setSocketOptions(socketOptions() | Blocking);
00326 connectionEvent();
00327 KStreamSocket::setSocketOptions(socketOptions() & ~Blocking);
00328 }
00329
00330 void TDEBufferedSocket::slotReadActivity()
00331 {
00332 if (d->input && state() == Connected)
00333 {
00334 mutex()->lock();
00335 TQ_LONG len = d->input->receiveFrom(socketDevice());
00336
00337 if (len == -1)
00338 {
00339 if (socketDevice()->error() != WouldBlock)
00340 {
00341
00342 copyError();
00343 mutex()->unlock();
00344 emit gotError(error());
00345 closeNow();
00346 return;
00347 }
00348 }
00349 else if (len == 0)
00350 {
00351
00352 setError(IO_ReadError, RemotelyDisconnected);
00353 mutex()->unlock();
00354 emit gotError(error());
00355 closeNow();
00356 return;
00357 }
00358
00359
00360 mutex()->unlock();
00361 }
00362
00363 if (state() == Connected)
00364 KStreamSocket::slotReadActivity();
00365 else if (emitsReadyRead())
00366 {
00367 if (d->input && !d->input->isEmpty())
00368 {
00369
00370
00371 TQTimer::singleShot(0, this, TQT_SLOT(slotReadActivity()));
00372 emit readyRead();
00373 }
00374 }
00375 }
00376
00377 void TDEBufferedSocket::slotWriteActivity()
00378 {
00379 if (d->output && !d->output->isEmpty() &&
00380 (state() == Connected || state() == Closing))
00381 {
00382 mutex()->lock();
00383 TQ_LONG len = d->output->sendTo(socketDevice());
00384
00385 if (len == -1)
00386 {
00387 if (socketDevice()->error() != WouldBlock)
00388 {
00389
00390 copyError();
00391 mutex()->unlock();
00392 emit gotError(error());
00393 closeNow();
00394 return;
00395 }
00396 }
00397 else if (len == 0)
00398 {
00399
00400 setError(IO_ReadError, RemotelyDisconnected);
00401 mutex()->unlock();
00402 emit gotError(error());
00403 closeNow();
00404 return;
00405 }
00406
00407 if (d->output->isEmpty())
00408
00409
00410 socketDevice()->writeNotifier()->setEnabled(false);
00411
00412 mutex()->unlock();
00413 emit bytesWritten(len);
00414 }
00415
00416 if (state() != Closing)
00417 KStreamSocket::slotWriteActivity();
00418 else if (d->output && d->output->isEmpty() && state() == Closing)
00419 {
00420 KStreamSocket::close();
00421 }
00422 }
00423
00424 #include "kbufferedsocket.moc"