00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
#include "kmessageio.h"
00025
#include <qsocket.h>
00026
#include <kdebug.h>
00027
#include <kprocess.h>
00028
#include <qfile.h>
00029
00030
00031
00032 KMessageIO::KMessageIO (
QObject *parent,
const char *name)
00033 :
QObject (parent, name), m_id (0)
00034 {}
00035
00036 KMessageIO::~KMessageIO ()
00037 {}
00038
00039 void KMessageIO::setId (Q_UINT32
id)
00040 {
00041 m_id =
id;
00042 }
00043
00044 Q_UINT32
KMessageIO::id ()
00045 {
00046
return m_id;
00047 }
00048
00049
00050
00051 KMessageSocket::KMessageSocket (
QString host, Q_UINT16 port,
QObject *parent,
00052
const char *name)
00053 :
KMessageIO (parent, name)
00054 {
00055 mSocket =
new QSocket ();
00056 mSocket->connectToHost (host, port);
00057 initSocket ();
00058 }
00059
00060 KMessageSocket::KMessageSocket (
QHostAddress host, Q_UINT16 port,
QObject
00061 *parent,
const char *name)
00062 :
KMessageIO (parent, name)
00063 {
00064 mSocket =
new QSocket ();
00065 mSocket->connectToHost (host.toString(), port);
00066 initSocket ();
00067 }
00068
00069 KMessageSocket::KMessageSocket (
QSocket *socket,
QObject *parent,
const char
00070 *name)
00071 :
KMessageIO (parent, name)
00072 {
00073 mSocket = socket;
00074 initSocket ();
00075 }
00076
00077 KMessageSocket::KMessageSocket (
int socketFD,
QObject *parent,
const char
00078 *name)
00079 :
KMessageIO (parent, name)
00080 {
00081 mSocket =
new QSocket ();
00082 mSocket->setSocket (socketFD);
00083 initSocket ();
00084 }
00085
00086 KMessageSocket::~KMessageSocket ()
00087 {
00088
delete mSocket;
00089 }
00090
00091 bool KMessageSocket::isConnected ()
const
00092
{
00093
return mSocket->state() == QSocket::Connection;
00094 }
00095
00096 void KMessageSocket::send (
const QByteArray &msg)
00097 {
00098
QDataStream str (mSocket);
00099 str << Q_UINT8 (
'M');
00100 str.writeBytes (msg.data(), msg.size());
00101 }
00102
00103
void KMessageSocket::processNewData ()
00104 {
00105
if (isRecursive)
00106
return;
00107 isRecursive =
true;
00108
00109
QDataStream str (mSocket);
00110
while (mSocket->bytesAvailable() > 0)
00111 {
00112
if (mAwaitingHeader)
00113 {
00114
00115
if (mSocket->bytesAvailable() < 5)
00116 {
00117 isRecursive =
false;
00118
return;
00119 }
00120
00121
00122
00123
00124 Q_UINT8 v;
00125 str >> v;
00126
if (v !=
'M')
00127 {
00128 kdWarning(11001) << k_funcinfo <<
": Received unexpected data, magic number wrong!" << endl;
00129
continue;
00130 }
00131
00132 str >> mNextBlockLength;
00133 mAwaitingHeader =
false;
00134 }
00135
else
00136 {
00137
00138
if (mSocket->bytesAvailable() < (Q_ULONG) mNextBlockLength)
00139 {
00140 isRecursive =
false;
00141
return;
00142 }
00143
00144
QByteArray msg (mNextBlockLength);
00145 str.readRawBytes (msg.data(), mNextBlockLength);
00146
00147
00148 emit
received (msg);
00149
00150
00151 mAwaitingHeader =
true;
00152 }
00153 }
00154
00155 isRecursive =
false;
00156 }
00157
00158
void KMessageSocket::initSocket ()
00159 {
00160 connect (mSocket, SIGNAL (error(
int)), SIGNAL (
connectionBroken()));
00161 connect (mSocket, SIGNAL (connectionClosed()), SIGNAL (
connectionBroken()));
00162 connect (mSocket, SIGNAL (readyRead()), SLOT (processNewData()));
00163 mAwaitingHeader =
true;
00164 mNextBlockLength = 0;
00165 isRecursive =
false;
00166 }
00167
00168 Q_UINT16
KMessageSocket::peerPort ()
const
00169
{
00170
return mSocket->peerPort();
00171 }
00172
00173 QString KMessageSocket::peerName ()
const
00174
{
00175
return mSocket->peerName();
00176 }
00177
00178
00179
00180 KMessageDirect::KMessageDirect (
KMessageDirect *partner,
QObject *parent,
00181
const char *name)
00182 :
KMessageIO (parent, name), mPartner (0)
00183 {
00184
00185
if (!partner)
00186
return;
00187
00188
00189
if (partner && partner->
mPartner)
00190 {
00191 kdWarning(11001) << k_funcinfo <<
": Object is already connected!" << endl;
00192
return;
00193 }
00194
00195
00196 mPartner = partner;
00197
00198
00199 partner->
mPartner =
this;
00200 }
00201
00202 KMessageDirect::~KMessageDirect ()
00203 {
00204
if (mPartner)
00205 {
00206 mPartner->
mPartner = 0;
00207 emit mPartner->
connectionBroken();
00208 }
00209 }
00210
00211 bool KMessageDirect::isConnected ()
const
00212
{
00213
return mPartner != 0;
00214 }
00215
00216 void KMessageDirect::send (
const QByteArray &msg)
00217 {
00218
if (mPartner)
00219 emit mPartner->
received (msg);
00220
else
00221 kdError(11001) << k_funcinfo <<
": Not yet connected!" << endl;
00222 }
00223
00224
00225
00226
00227 KMessageProcess::~KMessageProcess()
00228 {
00229 kdDebug(11001) <<
"@@@KMessageProcess::Delete process" << endl;
00230
if (mProcess)
00231 {
00232 mProcess->kill();
00233
delete mProcess;
00234 mProcess=0;
00235
00236 mQueue.setAutoDelete(
true);
00237 mQueue.clear();
00238
00239 }
00240 }
00241 KMessageProcess::KMessageProcess(
QObject *parent,
QString file) :
KMessageIO(parent,0)
00242 {
00243
00244 kdDebug(11001) <<
"@@@KMessageProcess::Start process" << endl;
00245 mProcessName=file;
00246 mProcess=
new KProcess;
00247
int id=0;
00248 *mProcess << mProcessName <<
QString(
"%1").arg(
id);
00249 kdDebug(11001) <<
"@@@KMessageProcess::Init:Id= " <<
id << endl;
00250 kdDebug(11001) <<
"@@@KMessgeProcess::Init:Processname: " << mProcessName << endl;
00251 connect(mProcess, SIGNAL(receivedStdout(KProcess *,
char *,
int )),
00252
this, SLOT(slotReceivedStdout(KProcess *,
char * ,
int )));
00253 connect(mProcess, SIGNAL(receivedStderr(KProcess *,
char *,
int )),
00254
this, SLOT(slotReceivedStderr(KProcess *,
char * ,
int )));
00255 connect(mProcess, SIGNAL(processExited(KProcess *)),
00256
this, SLOT(slotProcessExited(KProcess *)));
00257 connect(mProcess, SIGNAL(wroteStdin(KProcess *)),
00258
this, SLOT(slotWroteStdin(KProcess *)));
00259 mProcess->start(KProcess::NotifyOnExit,KProcess::All);
00260 mSendBuffer=0;
00261 mReceiveCount=0;
00262 mReceiveBuffer.resize(1024);
00263 }
00264
bool KMessageProcess::isConnected()
const
00265
{
00266 kdDebug(11001) <<
"@@@KMessageProcess::Is conencted" << endl;
00267
if (!mProcess)
return false;
00268
return mProcess->isRunning();
00269 }
00270
void KMessageProcess::send(
const QByteArray &msg)
00271 {
00272 kdDebug(11001) <<
"@@@KMessageProcess:: SEND("<<msg.size()<<
") to process" << endl;
00273
unsigned int size=msg.size()+2*
sizeof(
long);
00274
00275
char *tmpbuffer=
new char[size];
00276
long *p1=(
long *)tmpbuffer;
00277
long *p2=p1+1;
00278 kdDebug(11001) <<
"p1="<<p1 <<
"p2="<< p2 << endl;
00279 memcpy(tmpbuffer+2*
sizeof(
long),msg.data(),msg.size());
00280 *p1=0x4242aeae;
00281 *p2=size;
00282
00283
QByteArray *buffer=
new QByteArray();
00284 buffer->assign(tmpbuffer,size);
00285
00286 mQueue.enqueue(buffer);
00287 writeToProcess();
00288 }
00289
void KMessageProcess::writeToProcess()
00290 {
00291
00292
if (mSendBuffer || mQueue.isEmpty())
return ;
00293 mSendBuffer=mQueue.dequeue();
00294
if (!mSendBuffer)
return ;
00295
00296
00297
00298
00299
00300 mProcess->writeStdin(mSendBuffer->data(),mSendBuffer->size());
00301
00302 }
00303
void KMessageProcess::slotWroteStdin(KProcess * )
00304 {
00305 kdDebug(11001) << k_funcinfo << endl;
00306
if (mSendBuffer)
00307 {
00308
delete mSendBuffer;
00309 mSendBuffer=0;
00310 }
00311 writeToProcess();
00312 }
00313
00314
void KMessageProcess::slotReceivedStderr(KProcess * proc,
char *buffer,
int buflen)
00315 {
00316
int pid=0;
00317
int len;
00318
char *p;
00319
char *pos;
00320
00321
00322
if (!buffer || buflen==0)
return ;
00323
if (proc) pid=proc->pid();
00324
00325
00326 pos=buffer;
00327
do
00328 {
00329 p=(
char *)memchr(pos,
'\n',buflen);
00330
if (!p) len=buflen;
00331
else len=p-pos;
00332
00333
QByteArray a;
00334 a.setRawData(pos,len);
00335
QString s(a);
00336 kdDebug(11001) <<
"PID" <<pid<<
":" << s << endl;
00337 a.resetRawData(pos,len);
00338
if (p) pos=p+1;
00339 buflen-=len+1;
00340 }
while(buflen>0);
00341 }
00342
00343
00344
void KMessageProcess::slotReceivedStdout(KProcess * ,
char *buffer,
int buflen)
00345 {
00346 kdDebug(11001) <<
"$$$$$$ " << k_funcinfo <<
": Received " << buflen <<
" bytes over inter process communication" << endl;
00347
00348
00349
while (mReceiveCount+buflen>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024);
00350 memcpy(mReceiveBuffer.data()+mReceiveCount,buffer,buflen);
00351 mReceiveCount+=buflen;
00352
00353
00354
while (mReceiveCount>2*
sizeof(
long))
00355 {
00356
long *p1=(
long *)mReceiveBuffer.data();
00357
long *p2=p1+1;
00358
unsigned int len;
00359
if (*p1!=0x4242aeae)
00360 {
00361 kdDebug(11001) << k_funcinfo <<
": Cookie error...transmission failure...serious problem..." << endl;
00362
00363 }
00364 len=(
int)(*p2);
00365
if (len<2*
sizeof(
long))
00366 {
00367 kdDebug(11001) << k_funcinfo <<
": Message size error" << endl;
00368
break;
00369 }
00370
if (len<=mReceiveCount)
00371 {
00372 kdDebug(11001) << k_funcinfo <<
": Got message with len " << len << endl;
00373
00374
QByteArray msg;
00375
00376 msg.duplicate(mReceiveBuffer.data()+2*
sizeof(
long),len-2*
sizeof(
long));
00377 emit
received(msg);
00378
00379
00380
if (len<mReceiveCount)
00381 {
00382 memmove(mReceiveBuffer.data(),mReceiveBuffer.data()+len,mReceiveCount-len);
00383 }
00384 mReceiveCount-=len;
00385 }
00386
else break;
00387 }
00388 }
00389
00390
void KMessageProcess::slotProcessExited(KProcess * )
00391 {
00392 kdDebug(11001) <<
"Process exited (slot)" << endl;
00393 emit
connectionBroken();
00394
delete mProcess;
00395 mProcess=0;
00396 }
00397
00398
00399
00400 KMessageFilePipe::KMessageFilePipe(
QObject *parent,
QFile *readfile,
QFile *writefile) :
KMessageIO(parent,0)
00401 {
00402 mReadFile=readfile;
00403 mWriteFile=writefile;
00404 mReceiveCount=0;
00405 mReceiveBuffer.resize(1024);
00406 }
00407
00408 KMessageFilePipe::~KMessageFilePipe()
00409 {
00410 }
00411
00412
bool KMessageFilePipe::isConnected ()
const
00413
{
00414
return (mReadFile!=0)&&(mWriteFile!=0);
00415 }
00416
00417
void KMessageFilePipe::send(
const QByteArray &msg)
00418 {
00419
unsigned int size=msg.size()+2*
sizeof(
long);
00420
00421
char *tmpbuffer=
new char[size];
00422
long *p1=(
long *)tmpbuffer;
00423
long *p2=p1+1;
00424 memcpy(tmpbuffer+2*
sizeof(
long),msg.data(),msg.size());
00425 *p1=0x4242aeae;
00426 *p2=size;
00427
00428
QByteArray buffer;
00429 buffer.assign(tmpbuffer,size);
00430 mWriteFile->writeBlock(buffer);
00431 mWriteFile->flush();
00432
00433
00434
00435
00436
00437 }
00438
00439
void KMessageFilePipe::exec()
00440 {
00441
00442
00443
00444
00445
int ch=mReadFile->getch();
00446
00447
while (mReceiveCount>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024);
00448 mReceiveBuffer[mReceiveCount]=(
char)ch;
00449 mReceiveCount++;
00450
00451
00452
if (mReceiveCount>=2*
sizeof(
long))
00453 {
00454
long *p1=(
long *)mReceiveBuffer.data();
00455
long *p2=p1+1;
00456
unsigned int len;
00457
if (*p1!=0x4242aeae)
00458 {
00459 fprintf(stderr,
"KMessageFilePipe::exec:: Cookie error...transmission failure...serious problem...\n");
00460
00461 }
00462 len=(
int)(*p2);
00463
if (len==mReceiveCount)
00464 {
00465
00466
00467
QByteArray msg;
00468
00469 msg.duplicate(mReceiveBuffer.data()+2*
sizeof(
long),len-2*
sizeof(
long));
00470 emit
received(msg);
00471
00472 mReceiveCount=0;
00473 }
00474 }
00475
00476
00477
return ;
00478
00479
00480 }
00481
00482
#include "kmessageio.moc"