Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members | Related Pages

wvstream.cc

Go to the documentation of this file.
00001 /*
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  * 
00005  * Unified support for streams, that is, sequences of bytes that may or
00006  * may not be ready for read/write at any given time.
00007  * 
00008  * We provide typical read and write routines, as well as a select() function
00009  * for each stream.
00010  */
00011 #include "wvstream.h"
00012 #include "wvtask.h"
00013 #include "wvtimeutils.h"
00014 #include <time.h>
00015 #include <sys/types.h>
00016 #include <assert.h>
00017 
00018 #ifdef _WIN32
00019 #define ENOBUFS WSAENOBUFS
00020 #undef errno
00021 #define errno GetLastError()
00022 #else
00023 #include <errno.h>
00024 #endif
00025 
00026 // enable this to add some read/write trace messages (this can be VERY
00027 // verbose)
00028 #if 0
00029 # define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
00030 #else
00031 #ifndef _MSC_VER
00032 # define TRACE(x, y...)
00033 #else
00034 # define TRACE
00035 #endif
00036 #endif
00037 
00038 WvStream *WvStream::globalstream = NULL;
00039 
00040 XUUID_MAP_BEGIN(IWvStream)
00041   XUUID_MAP_ENTRY(IObject)
00042   XUUID_MAP_ENTRY(IWvStream)
00043   XUUID_MAP_END
00044 
00045 WvStream::WvStream()
00046 {
00047     TRACE("Creating wvstream %p\n", this);
00048     
00049 #ifdef _WIN32
00050     WSAData wsaData;
00051     int result = WSAStartup(MAKEWORD(2,0), &wsaData); 
00052     assert(result == 0);
00053 #endif
00054     wvstream_execute_called = false;
00055     userdata = closecb_data = NULL;
00056     errnum = 0;
00057     max_outbuf_size = 0;
00058     outbuf_delayed_flush = false;
00059     want_to_flush = true;
00060     is_flushing = false;
00061     is_auto_flush = true;
00062     alarm_was_ticking = false;
00063     force.readable = true;
00064     force.writable = force.isexception = false;
00065     read_requires_writable = write_requires_readable = NULL;
00066     running_callback = false;
00067     want_nowrite = false;
00068     queue_min = 0;
00069     autoclose_time = 0;
00070     alarm_time = wvtime_zero;
00071     last_alarm_check = wvtime_zero;
00072     taskman = 0;
00073     
00074     // magic multitasking support
00075     uses_continue_select = false;
00076     personal_stack_size = 65536;
00077     task = NULL;
00078 }
00079 
00080 
00081 // FIXME: interfaces (IWvStream) shouldn't have implementations!
00082 IWvStream::IWvStream()
00083 {
00084 }
00085 
00086 
00087 IWvStream::~IWvStream()
00088 {
00089 }
00090 
00091 
00092 WvStream::~WvStream()
00093 {
00094     TRACE("destroying %p\n", this);
00095     if (running_callback)
00096     {
00097         // user should have called terminate_continue_select()...
00098         TRACE("eek! destroying while running_callback!\n");
00099         assert(!running_callback);
00100     }
00101     close();
00102     
00103     if (task)
00104     {
00105         while (task->isrunning())
00106             taskman->run(*task);
00107         task->recycle();
00108         task = NULL;
00109     }
00110     TRACE("done destroying %p\n", this);
00111     if (taskman)
00112         taskman->unlink();
00113 }
00114 
00115 
00116 void WvStream::close()
00117 {
00118     flush(2000); // fixme: should not hardcode this stuff
00119     if (!! closecb_func)
00120     {
00121         WvStreamCallback cb = closecb_func;
00122         closecb_func = 0; // ensure callback is only called once
00123         cb(*this, closecb_data);
00124     }
00125 }
00126 
00127 
00128 void WvStream::autoforward(WvStream &s)
00129 {
00130     setcallback(autoforward_callback, &s);
00131     read_requires_writable = &s;
00132 }
00133 
00134 
00135 void WvStream::noautoforward()
00136 {
00137     setcallback(0, NULL);
00138     read_requires_writable = NULL;
00139 }
00140 
00141 
00142 void WvStream::autoforward_callback(WvStream &s, void *userdata)
00143 {
00144     WvStream &s2 = *(WvStream *)userdata;
00145     char buf[1024];
00146     size_t len;
00147     
00148     len = s.read(buf, sizeof(buf));
00149     s2.write(buf, len);
00150 }
00151 
00152 
00153 // this is run in the subtask owned by 'stream', if any; NOT necessarily
00154 // the task that runs WvStream::callback().  That's why this needs to be
00155 // a separate function.
00156 void WvStream::_callback(void *stream)
00157 {
00158     WvStream *s = (WvStream *)stream;
00159     
00160     s->running_callback = true;
00161     
00162     s->wvstream_execute_called = false;
00163     s->execute();
00164     if (!! s->callfunc)
00165         s->callfunc(*s, s->userdata);
00166     
00167     // if this assertion fails, a derived class's virtual execute() function
00168     // didn't call its parent's execute() function, and we didn't make it
00169     // all the way back up to WvStream::execute().  This doesn't always
00170     // matter right now, but it could lead to obscure bugs later, so we'll
00171     // enforce it.
00172     assert(s->wvstream_execute_called);
00173     
00174     s->running_callback = false;
00175 }
00176 
00177 
00178 void WvStream::callback()
00179 {
00180     TRACE("(?)");
00181     
00182     // callback is already running -- don't try to start it again, or we
00183     // could end up in an infinite loop!
00184     if (running_callback)
00185         return;
00186     
00187     // if the alarm has gone off and we're calling callback... good!
00188     if (alarm_remaining() == 0)
00189     {
00190         alarm_time = wvtime_zero;
00191         alarm_was_ticking = true;
00192     }
00193     else
00194         alarm_was_ticking = false;
00195     
00196     assert(!uses_continue_select || personal_stack_size >= 1024);
00197     
00198 //    if (1)
00199     if (uses_continue_select && personal_stack_size >= 1024)
00200     {
00201         if (!taskman)
00202             taskman = WvTaskMan::get();
00203     
00204         if (!task)
00205         {
00206             TRACE("(!)");
00207             task = taskman->start("streamexec", _callback, this,
00208                                   personal_stack_size);
00209         }
00210         else if (!task->isrunning())
00211         {
00212             TRACE("(.)");
00213             task->start("streamexec2", _callback, this);
00214         }
00215         
00216         // This loop is much more subtle than it looks.
00217         // By implementing it this way, we provide something that works
00218         // like a typical callback() stack: that is, a child callback
00219         // must return before the parent's callback does.
00220         // 
00221         // What _actually_ happens is a child will call yield() upon returning
00222         // from its callback function, which exits the taskman and returns to
00223         // the top level.  The top level, though, is running this loop, which
00224         // re-executes taskman->run() since its child (which is eventually
00225         // the parent of the child that called yield()) hasn't finished yet.
00226         // We build our way all the way back up to the first-level parent of
00227         // the child calling yield(), which now notices its child has finished
00228         // and continues on in its execute() function.
00229         // 
00230         // continue_select() will set running_callback to false, even though
00231         // it doesn't actually return from the callback function.  That
00232         // causes this loop to terminate, and the callback will get resumed
00233         // later when select() returns true.
00234         do
00235         {
00236             taskman->run(*task);
00237         } while (task && task->isrunning() && running_callback);
00238     }
00239     else
00240         _callback(this);
00241     
00242     /* DON'T PUT ANY CODE HERE!
00243      * 
00244      * WvStreamList calls its child streams above via taskman->run().
00245      * If a child is deleted, it waits for its callback task to finish the
00246      * current iteration, then recycles its WvTask object and allows the
00247      * "delete" call to finish, so the object no longer exists.
00248      * 
00249      * The catch: the callback() function is actually running in
00250      * the WvStreamList's task (if any), which hasn't had a chance to
00251      * exit yet.  Next time we jump into the WvStreamList, we will arrive
00252      * immediately after the taskman->run() line, ie. right here in the
00253      * code.  In that case, the 'this' pointer could be pointing at an
00254      * invalid object, so we should just exit before we do something stupid.
00255      */
00256 }
00257 
00258 
00259 void WvStream::execute()
00260 {
00261     // do nothing by default, but notice that we were here.
00262     wvstream_execute_called = true;
00263 }
00264 
00265 
00266 bool WvStream::isok() const
00267 {
00268     return WvError::isok();
00269 }
00270 
00271 
00272 void WvStream::seterr(int _errnum)
00273 {
00274     if (!errnum)
00275     {
00276         WvError::seterr(_errnum);
00277         close();
00278     }
00279 }
00280 
00281 
00282 size_t WvStream::read(WvBuf &outbuf, size_t count)
00283 {
00284     // for now, just wrap the older read function
00285     size_t free = outbuf.free();
00286     if (count > free)
00287         count = free;
00288 
00289     WvDynBuf tmp;
00290     unsigned char *buf = tmp.alloc(count);
00291     size_t len = read(buf, count);
00292     tmp.unalloc(count - len);
00293     outbuf.merge(tmp);
00294     return len;
00295 }
00296 
00297 
00298 size_t WvStream::continue_read(time_t wait_msec, WvBuf &outbuf, size_t count)
00299 {
00300     // for now, just wrap the older read function
00301     size_t free = outbuf.free();
00302     if (count > free)
00303         count = free;
00304     unsigned char *buf = outbuf.alloc(count);
00305     
00306     // call the non-WvBuf continue_read
00307     size_t len = continue_read(wait_msec, buf, count);
00308     
00309     outbuf.unalloc(count - len);
00310     return len;
00311 }
00312 
00313 
00314 size_t WvStream::write(WvBuf &inbuf, size_t count)
00315 {
00316     // for now, just wrap the older write function
00317     size_t avail = inbuf.used();
00318     if (count > avail)
00319         count = avail;
00320     const unsigned char *buf = inbuf.get(count);
00321     size_t len = write(buf, count);
00322     inbuf.unget(count - len);
00323     return len;
00324 }
00325 
00326 
00327 size_t WvStream::read(void *buf, size_t count)
00328 {
00329     size_t bufu = inbuf.used(), i;
00330     unsigned char *newbuf;
00331 
00332     bufu = inbuf.used();
00333     if (bufu < queue_min)
00334     {
00335         newbuf = inbuf.alloc(queue_min - bufu);
00336         i = uread(newbuf, queue_min - bufu);
00337         inbuf.unalloc(queue_min - bufu - i);
00338         
00339         bufu = inbuf.used();
00340     }
00341     
00342     if (bufu < queue_min)
00343         return 0;
00344         
00345     // if buffer is empty, do a hard read
00346     if (!bufu)
00347         bufu = uread(buf, count);
00348     else
00349     {
00350         // otherwise just read from the buffer
00351         if (bufu > count)
00352             bufu = count;
00353     
00354         memcpy(buf, inbuf.get(bufu), bufu);
00355     }
00356     
00357     TRACE("read  obj 0x%p, bytes %d/%d\n", this, bufu, count);
00358     return bufu;
00359 }
00360 
00361 
00362 size_t WvStream::continue_read(time_t wait_msec, void *buf, size_t count)
00363 {
00364     assert(uses_continue_select);
00365 
00366     if (!count)
00367         return 0;
00368 
00369     // FIXME: continue_select also uses the alarm, so this doesn't work.
00370     if (wait_msec >= 0)
00371         alarm(wait_msec);
00372 
00373     queuemin(count);
00374 
00375     int got = 0;
00376 
00377     while (isok())
00378     {
00379         if (continue_select(-1))
00380         {
00381             if ((got = read(buf, count)) != 0)
00382                 break;
00383             if (alarm_was_ticking) 
00384                 break;
00385         }
00386     }
00387 
00388     if (wait_msec >= 0)
00389         alarm(-1);
00390 
00391     queuemin(0);
00392     
00393     return got;
00394 }
00395 
00396 
00397 size_t WvStream::write(const void *buf, size_t count)
00398 {
00399     if (!isok() || !buf || !count) return 0;
00400     
00401     size_t wrote = 0;
00402     if (!outbuf_delayed_flush && !outbuf.used())
00403     {
00404         wrote = uwrite(buf, count);
00405         count -= wrote;
00406         buf = (const unsigned char*)buf + wrote;
00407     }
00408     if (max_outbuf_size != 0)
00409     {
00410         size_t canbuffer = max_outbuf_size - outbuf.used();
00411         if (count > canbuffer)
00412             count = canbuffer; // can't write the whole amount
00413     }
00414     if (count != 0)
00415     {
00416         outbuf.put(buf, count);
00417         wrote += count;
00418     }
00419 
00420     if (should_flush())
00421     {
00422         if (is_auto_flush)
00423             flush(0);
00424         else 
00425             flush_outbuf(0);
00426     }
00427 
00428     return wrote;
00429 }
00430 
00431 
00432 void WvStream::noread()
00433 {
00434     // FIXME: this really ought to be symmetrical with nowrite(), but instead
00435     // it's empty for some reason.
00436 }
00437 
00438 
00439 void WvStream::nowrite()
00440 {
00441     if (getwfd() < 0)
00442         return;
00443 
00444     want_nowrite = true;
00445 }
00446 
00447 
00448 bool WvStream::isreadable()
00449 {
00450     return isok() && select(0, true, false, false);
00451 }
00452 
00453 
00454 bool WvStream::iswritable()
00455 {
00456     return isok() && select(0, false, true, false);
00457 }
00458 
00459 
00460 char *WvStream::getline(time_t wait_msec, char separator, int readahead)
00461 {
00462     struct timeval timeout_time;
00463     if (wait_msec > 0)
00464         timeout_time = msecadd(wvtime(), wait_msec);
00465 
00466     // if we get here, we either want to wait a bit or there is data
00467     // available.
00468     while (isok())
00469     {
00470         queuemin(0);
00471     
00472         // if there is a newline already, return its string.
00473         size_t i = inbuf.strchr(separator);
00474         if (i > 0)
00475         {
00476             char *eol = (char *)inbuf.mutablepeek(i - 1, 1);
00477             assert(eol);
00478             *eol = 0;
00479             return (char *)inbuf.get(i);
00480         }
00481         else if (!isok())    // uh oh, stream is in trouble.
00482         {
00483             if (inbuf.used())
00484             {
00485                 // handle "EOF without newline" condition
00486                 // FIXME: it's very silly that buffers can't return editable
00487                 // char* arrays.
00488                 inbuf.alloc(1)[0] = 0; // null-terminate it
00489                 return const_cast<char*>(
00490                     (const char*)inbuf.get(inbuf.used()));
00491             }
00492             else
00493                 break; // nothing else to do!
00494         }
00495 
00496         // make select not return true until more data is available
00497         size_t needed = inbuf.used() + 1;
00498         queuemin(needed);
00499 
00500         // compute remaining timeout
00501         if (wait_msec > 0)
00502         {
00503             wait_msec = msecdiff(timeout_time, wvtime());
00504             if (wait_msec < 0)
00505                 wait_msec = 0;
00506         }
00507         
00508         bool hasdata;
00509         if (uses_continue_select)
00510             hasdata = continue_select(wait_msec);
00511         else
00512             hasdata = select(wait_msec, true, false);
00513         if (!isok())
00514             break;
00515 
00516         if (hasdata)
00517         {
00518             // read a few bytes
00519             unsigned char *buf = inbuf.alloc(readahead);
00520             size_t len = uread(buf, readahead);
00521             inbuf.unalloc(readahead - len);
00522             hasdata = inbuf.used() >= needed; // enough?
00523         }
00524 
00525         if (!hasdata && wait_msec == 0)
00526             break; // handle timeout
00527     }
00528     
00529     // we timed out or had a socket error
00530     if (!isok() && inbuf.used())
00531     {
00532         // if the stream has closed, dump the entire buffer as the last line
00533         inbuf.put("", 1);
00534         return (char *)inbuf.get(inbuf.used());
00535     }
00536     else
00537         return NULL;
00538 }
00539 
00540 
00541 void WvStream::drain()
00542 {
00543     char buf[1024];
00544     while (isreadable())
00545         read(buf, sizeof(buf));
00546 }
00547 
00548 
00549 bool WvStream::flush(time_t msec_timeout)
00550 {
00551     if (is_flushing) return false;
00552     
00553     TRACE("%p flush starts\n", this);
00554 
00555     is_flushing = true;
00556     want_to_flush = true;
00557     bool done = flush_internal(msec_timeout) // any other internal buffers
00558         && flush_outbuf(msec_timeout);  // our own outbuf
00559     is_flushing = false;
00560 
00561     TRACE("flush stops (%d)\n", done);
00562     return done;
00563 }
00564 
00565 
00566 bool WvStream::should_flush()
00567 {
00568     return want_to_flush;
00569 }
00570 
00571 
00572 bool WvStream::flush_outbuf(time_t msec_timeout)
00573 {
00574     TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok());
00575     
00576     // flush outbuf
00577     while (isok() && outbuf.used())
00578     {
00579 //      fprintf(stderr, "%p: fd:%d/%d, used:%d\n", 
00580 //              this, getrfd(), getwfd(), outbuf.used());
00581         
00582         size_t attempt = outbuf.used();
00583         size_t real = uwrite(outbuf.get(attempt), attempt);
00584         
00585         // WARNING: uwrite() may have messed up our outbuf!
00586         // This probably only happens if uwrite() closed the stream because
00587         // of an error, so we'll check isok().
00588         if (isok() && real < attempt)
00589         {
00590             TRACE("flush_outbuf: unget %d-%d\n", attempt, real);
00591             assert(outbuf.ungettable() >= attempt - real);
00592             outbuf.unget(attempt - real);
00593         }
00594         
00595         // since post_select() can call us, and select() calls post_select(),
00596         // we need to be careful not to call select() if we don't need to!
00597         if (!msec_timeout || !select(msec_timeout, false, true))
00598         {
00599             if (msec_timeout >= 0)
00600                 break;
00601         }
00602     }
00603 
00604     // handle autoclose
00605     if (isok() && autoclose_time)
00606     {
00607         time_t now = time(NULL);
00608         TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n", 
00609               this, now - autoclose_time, outbuf.used());
00610         if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time)
00611         {
00612             autoclose_time = 0; // avoid infinite recursion!
00613             close();
00614         }
00615     }
00616 
00617     TRACE("flush_outbuf: after autoclose chunk\n");
00618     
00619     if (!outbuf.used() && outbuf_delayed_flush)
00620         want_to_flush = false;
00621     
00622     TRACE("flush_outbuf: now isok=%d\n", isok());
00623 
00624     // if we can't flush the outbuf, at least empty it!
00625     if (!isok())
00626         outbuf.zap();
00627 
00628     TRACE("flush_outbuf stops\n");
00629     
00630     return !outbuf.used();
00631 }
00632 
00633 
00634 bool WvStream::flush_internal(time_t msec_timeout)
00635 {
00636     // once outbuf emptied, that's it for most streams
00637     return true;
00638 }
00639 
00640 
00641 int WvStream::getrfd() const
00642 {
00643     return -1;
00644 }
00645 
00646 
00647 int WvStream::getwfd() const
00648 {
00649     return -1;
00650 }
00651 
00652 
00653 void WvStream::flush_then_close(int msec_timeout)
00654 {
00655     time_t now = time(NULL);
00656     autoclose_time = now + (msec_timeout + 999) / 1000;
00657     
00658     TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n", 
00659             this, outbuf.used(), autoclose_time - now);
00660 
00661     // as a fast track, we _could_ close here: but that's not a good idea,
00662     // since flush_then_close() deals with obscure situations, and we don't
00663     // want the caller to use it incorrectly.  So we make things _always_
00664     // break when the caller forgets to call select() later.
00665     
00666     flush(0);
00667 }
00668 
00669 
00670 bool WvStream::pre_select(SelectInfo &si)
00671 {
00672     time_t alarmleft = alarm_remaining();
00673     
00674     if (alarmleft == 0)
00675         return true; // alarm has rung
00676 
00677     if (!si.inherit_request)
00678         si.wants |= force;
00679     
00680     // handle read-ahead buffering
00681     if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
00682         return true; // already ready
00683     if (alarmleft >= 0
00684       && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
00685         si.msec_timeout = alarmleft;
00686     return false;
00687 }
00688 
00689 
00690 bool WvStream::post_select(SelectInfo &si)
00691 {
00692     // FIXME: need output buffer flush support for non FD-based streams
00693     // FIXME: need read_requires_writable and write_requires_readable
00694     //        support for non FD-based streams
00695     return false;
00696 }
00697 
00698 
00699 bool WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
00700     bool readable, bool writable, bool isexcept, bool forceable)
00701 {
00702     FD_ZERO(&si.read);
00703     FD_ZERO(&si.write);
00704     FD_ZERO(&si.except);
00705     
00706     if (forceable)
00707         si.wants = force;
00708     else
00709     {
00710         si.wants.readable = readable;
00711         si.wants.writable = writable;
00712         si.wants.isexception = isexcept;
00713     }
00714     
00715     si.max_fd = -1;
00716     si.msec_timeout = msec_timeout;
00717     si.inherit_request = ! forceable;
00718     si.global_sure = false;
00719 
00720     if (!isok()) return false;
00721 
00722     bool sure = pre_select(si);
00723     if (globalstream && forceable && (globalstream != this))
00724     {
00725         WvStream *s = globalstream;
00726         globalstream = NULL; // prevent recursion
00727         si.global_sure = s->pre_select(si);
00728         globalstream = s;
00729     }
00730     if (sure || si.global_sure)
00731         si.msec_timeout = 0;
00732     return sure;
00733 }
00734 
00735 
00736 int WvStream::_do_select(SelectInfo &si)
00737 {
00738     // prepare timeout
00739     timeval tv;
00740     tv.tv_sec = si.msec_timeout / 1000;
00741     tv.tv_usec = (si.msec_timeout % 1000) * 1000;
00742     
00743     // block
00744     int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
00745         si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
00746 
00747     // handle errors.
00748     //   EAGAIN and EINTR don't matter because they're totally normal.
00749     //   ENOBUFS is hopefully transient.
00750     //   EBADF is kind of gross and might imply that something is wrong,
00751     //      but it happens sometimes...
00752     if (sel < 0 
00753       && errno != EAGAIN && errno != EINTR 
00754       && errno != EBADF
00755       && errno != ENOBUFS
00756 #ifdef _WIN32
00757       && errno != WSAEINVAL // the sets might be empty
00758 #endif
00759       )
00760     {
00761         seterr(errno);
00762     }
00763     return sel;
00764 }
00765 
00766 
00767 bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable)
00768 {
00769     if (!isok()) return false;
00770     
00771     bool sure = post_select(si);
00772     if (globalstream && forceable && (globalstream != this))
00773     {
00774         WvStream *s = globalstream;
00775         globalstream = NULL; // prevent recursion
00776         si.global_sure = s->post_select(si) || si.global_sure;
00777         globalstream = s;
00778     }
00779     return sure;
00780 }
00781 
00782 
00783 bool WvStream::_select(time_t msec_timeout,
00784     bool readable, bool writable, bool isexcept, bool forceable)
00785 {
00786     SelectInfo si;
00787     bool sure = _build_selectinfo(si, msec_timeout,
00788                                   readable, writable, isexcept, forceable);
00789     
00790     if (!isok())
00791         return false;
00792     
00793     // the eternal question: if 'sure' is true already, do we need to do the
00794     // rest of this stuff?  If we do, it might increase fairness a bit, but
00795     // it encourages select()ing when we know something fishy has happened -
00796     // when a stream is !isok() in a list, for example, pre_select() returns
00797     // true.  If that's the case, our SelectInfo structure might not be
00798     // quite right (eg. it might be selecting on invalid fds).  That doesn't
00799     // sound *too* bad, so let's go for the fairness.
00800 
00801     int sel = _do_select(si);
00802     if (sel >= 0)
00803         sure = _process_selectinfo(si, forceable) || sure; // note the order
00804     if (si.global_sure && globalstream && forceable && (globalstream != this))
00805         globalstream->callback();
00806     return sure;
00807 }
00808 
00809 
00810 void WvStream::force_select(bool readable, bool writable, bool isexception)
00811 {
00812     force.readable |= readable;
00813     force.writable |= writable;
00814     force.isexception |= isexception;
00815 }
00816 
00817 
00818 void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
00819 {
00820     force.readable &= !readable;
00821     force.writable &= !writable;
00822     force.isexception &= !isexception;
00823 }
00824 
00825 
00826 void WvStream::alarm(time_t msec_timeout)
00827 {
00828     if (msec_timeout >= 0)
00829         alarm_time = msecadd(wvtime(), msec_timeout);
00830     else
00831         alarm_time = wvtime_zero;
00832 }
00833 
00834 
00835 time_t WvStream::alarm_remaining()
00836 {
00837     if (alarm_time.tv_sec && !running_callback)
00838     {
00839         WvTime now = wvtime();
00840 
00841         /* Time is going backward! */
00842         if (now < last_alarm_check)
00843             alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
00844 
00845         last_alarm_check = now;
00846 
00847         time_t remaining = msecdiff(alarm_time, now);
00848         if (remaining < 0)
00849             remaining = 0;
00850         return remaining;
00851     }
00852     return -1;
00853 }
00854 
00855 
00856 bool WvStream::continue_select(time_t msec_timeout)
00857 {
00858     assert(uses_continue_select);
00859     assert(task);
00860     assert(taskman);
00861     assert(taskman->whoami() == task);
00862     
00863     if (msec_timeout >= 0)
00864         alarm(msec_timeout);
00865     
00866     running_callback = false;
00867     taskman->yield();
00868     running_callback = true; // and we're back!
00869     alarm(-1);
00870     
00871     // when we get here, someone has jumped back into our task.
00872     // We have to select(0) here because it's possible that the alarm was 
00873     // ticking _and_ data was available.  This is aggravated especially if
00874     // msec_delay was zero.  Note that running select() here isn't
00875     // inefficient, because if the alarm was expired then pre_select()
00876     // returned true anyway and short-circuited the previous select().
00877     // 
00878     // FIXME: we should probably be using select(t,r,w,x) here instead, but
00879     // I'm not sure.
00880     TRACE("hello-%p\n", this);
00881     return !alarm_was_ticking || select(0);
00882 }
00883 
00884 
00885 void WvStream::terminate_continue_select()
00886 {
00887     close();
00888     if (task)
00889     {
00890         while (task->isrunning())
00891             taskman->run(*task);
00892         task->recycle();
00893         task = NULL;
00894     }
00895 }
00896 
00897 
00898 const WvAddr *WvStream::src() const
00899 {
00900     return NULL;
00901 }
00902 
00903 
00904 void WvStream::unread(WvBuf &unreadbuf, size_t count)
00905 {
00906     WvDynBuf tmp;
00907     tmp.merge(unreadbuf, count);
00908     tmp.merge(inbuf);
00909     inbuf.zap();
00910     inbuf.merge(tmp);
00911 }

Generated on Sat Feb 21 21:05:32 2004 for WvStreams by doxygen 1.3.5