Main Page   Class Hierarchy   Compound List   File List   Compound Members   File Members  

wvstream.cc

Go to the documentation of this file.
00001 /*
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  * 
00005  * Unified support for streams, that is, sequences of bytes that may or
00006  * may not be ready for read/write at any given time.
00007  * 
00008  * We provide typical read and write routines, as well as a select() function
00009  * for each stream.
00010  */
00011 #include "wvstream.h"
00012 #include "wvtask.h"
00013 #include <time.h>
00014 #include <sys/types.h>
00015 #include <errno.h>
00016 #include <assert.h>
00017 
00018 // enable this to add some read/write trace messages (this can be VERY
00019 // verbose)
00020 #if 0
00021 # define TRACE(x, y...) fprintf(stderr, x, ## y)
00022 #else
00023 # define TRACE(x, y...)
00024 #endif
00025 
00026 WvTaskMan *WvStream::taskman;
00027 
00028 static void normalize(struct timeval &tv)
00029 {
00030     tv.tv_sec += tv.tv_usec / 1000000;
00031     tv.tv_usec %= 1000000;
00032 }
00033 
00034 
00035 WvStream::WvStream(int _fd) : callfunc(NULL)
00036 {
00037     init();
00038     rwfd = _fd;
00039 }
00040 
00041 
00042 void WvStream::init()
00043 {
00044     wvstream_execute_called = false;
00045     userdata = NULL;
00046     errnum = 0;
00047     max_outbuf_size = 0;
00048     outbuf_delayed_flush = alarm_was_ticking = false;
00049     force.readable = true;
00050     force.writable = force.isexception = false;
00051     read_requires_writable = write_requires_readable = NULL;
00052     running_callback = false;
00053     queue_min = 0;
00054     autoclose_time = 0;
00055     alarm_time.tv_sec = alarm_time.tv_usec = 0;
00056     
00057     // magic multitasking support
00058     uses_continue_select = false;
00059     personal_stack_size = 65536;
00060     task = NULL;
00061 }
00062 
00063 
00064 WvStream::~WvStream()
00065 {
00066     TRACE("destroying %p\n", this);
00067     if (running_callback)
00068     {
00069         // user should have called terminate_continue_select()...
00070         TRACE("eek! destroying while running_callback!\n");
00071         assert(!running_callback);
00072     }
00073     close();
00074     
00075     if (task)
00076     {
00077         while (task->isrunning())
00078             taskman->run(*task);
00079         task->recycle();
00080         task = NULL;
00081     }
00082     TRACE("done destroying %p\n", this);
00083 }
00084 
00085 
00086 void WvStream::close()
00087 {
00088     int rfd = getrfd(), wfd = getwfd();
00089     
00090     flush(2000);
00091     if (rfd >= 0)
00092         ::close(getrfd());
00093     if (wfd >= 0 && wfd != rfd)
00094         ::close(getwfd());
00095     rwfd = -1;
00096 }
00097 
00098 
00099 void WvStream::autoforward_callback(WvStream &s, void *userdata)
00100 {
00101     WvStream &s2 = *(WvStream *)userdata;
00102     char buf[1024];
00103     size_t len;
00104     
00105     len = s.read(buf, sizeof(buf));
00106     s2.write(buf, len);
00107 }
00108 
00109 
00110 // this is run in the subtask owned by 'stream', if any; NOT necessarily
00111 // the task that runs WvStream::callback().  That's why this needs to be
00112 // a separate function.
00113 void WvStream::_callback(void *stream)
00114 {
00115     WvStream *s = (WvStream *)stream;
00116     
00117     s->running_callback = true;
00118     
00119     s->wvstream_execute_called = false;
00120     s->execute();
00121     if (s->callfunc)
00122         s->callfunc(*s, s->userdata);
00123     
00124     // if this assertion fails, a derived class's virtual execute() function
00125     // didn't call its parent's execute() function, and we didn't make it
00126     // all the way back up to WvStream::execute().  This doesn't always
00127     // matter right now, but it could lead to obscure bugs later, so we'll
00128     // enforce it.
00129     assert(s->wvstream_execute_called);
00130     
00131     s->running_callback = false;
00132 }
00133 
00134 
00135 void WvStream::callback()
00136 {
00137     TRACE("(?)");
00138     
00139     // callback is already running -- don't try to start it again, or we
00140     // could end up in an infinite loop!
00141     if (running_callback)
00142         return;
00143     
00144     // if the alarm has gone off and we're calling callback... good!
00145     if (alarm_remaining() == 0)
00146     {
00147         alarm_time.tv_sec = alarm_time.tv_usec = 0;
00148         alarm_was_ticking = true;
00149     }
00150     else
00151         alarm_was_ticking = false;
00152     
00153     assert(!uses_continue_select || personal_stack_size >= 1024);
00154     
00155 //    if (1)
00156     if (uses_continue_select && personal_stack_size >= 1024)
00157     {
00158         if (!taskman)
00159             taskman = new WvTaskMan;
00160     
00161         if (!task)
00162         {
00163             TRACE("(!)");
00164             task = taskman->start("streamexec", _callback, this,
00165                                   personal_stack_size);
00166         }
00167         else if (!task->isrunning())
00168         {
00169             TRACE("(.)");
00170             fflush(stderr);
00171             task->start("streamexec2", _callback, this);
00172         }
00173         
00174         // This loop is much more subtle than it looks.
00175         // By implementing it this way, we provide something that works
00176         // like a typical callback() stack: that is, a child callback
00177         // must return before the parent's callback does.
00178         // 
00179         // What _actually_ happens is a child will call yield() upon returning
00180         // from its callback function, which exits the taskman and returns to
00181         // the top level.  The top level, though, is running this loop, which
00182         // re-executes taskman->run() since its child (which is eventually
00183         // the parent of the child that called yield()) hasn't finished yet.
00184         // We build our way all the way back up to the first-level parent of
00185         // the child calling yield(), which now notices its child has finished
00186         // and continues on in its execute() function.
00187         // 
00188         // continue_select() will set running_callback to false, even though
00189         // it doesn't actually return from the callback function.  That
00190         // causes this loop to terminate, and the callback will get resumed
00191         // later when select() returns true.
00192         do
00193         {
00194             taskman->run(*task);
00195         } while (task && task->isrunning() && running_callback);
00196     }
00197     else
00198         _callback(this);
00199     
00200     /* DON'T PUT ANY CODE HERE!
00201      * 
00202      * WvStreamList calls its child streams above via taskman->run().
00203      * If a child is deleted, it waits for its callback task to finish the
00204      * current iteration, then recycles its WvTask object and allows the
00205      * "delete" call to finish, so the object no longer exists.
00206      * 
00207      * The catch: the callback() function is actually running in
00208      * the WvStreamList's task (if any), which hasn't had a chance to
00209      * exit yet.  Next time we jump into the WvStreamList, we will arrive
00210      * immediately after the taskman->run() line, ie. right here in the
00211      * code.  In that case, the 'this' pointer could be pointing at an
00212      * invalid object, so we should just exit before we do something stupid.
00213      */
00214 }
00215 
00216 
00217 void WvStream::execute()
00218 {
00219     // do nothing by default, but notice that we were here.
00220     wvstream_execute_called = true;
00221 }
00222 
00223 
00224 // by default, we use the same fd for reading and writing
00225 int WvStream::getrfd() const
00226 {
00227     return rwfd;
00228 }
00229 
00230 
00231 // by default, we use the same fd for reading and writing
00232 int WvStream::getwfd() const
00233 {
00234     return rwfd;
00235 }
00236 
00237 
00238 int WvStream::getfd() const
00239 {
00240     int rfd = getrfd(), wfd = getwfd();
00241     assert(rfd == wfd);
00242     return rfd;
00243 }
00244 
00245 
00246 bool WvStream::isok() const
00247 {
00248     return (getrfd() != -1) && (getwfd() != -1);
00249 }
00250 
00251 
00252 void WvStream::seterr(int _errnum)
00253 {
00254     if (!errnum)
00255         errnum = _errnum;
00256     close();
00257 }
00258 
00259 
00260 void WvStream::seterr(const WvString &specialerr)
00261 {
00262     if (!errnum)
00263     {
00264         errstring = specialerr;
00265         errnum = -1;
00266     }
00267     close();
00268 }
00269 
00270 
00271 int WvStream::geterr() const
00272 {
00273     return errnum;
00274 }
00275 
00276 
00277 const char *WvStream::errstr() const
00278 {
00279     if (errnum == -1)
00280     {
00281         assert(errstring);
00282         return errstring;
00283     }
00284     else
00285         return strerror(errnum);
00286 }
00287 
00288 
00289 size_t WvStream::read(void *buf, size_t count)
00290 {
00291     size_t bufu = inbuf.used(), i;
00292     unsigned char *newbuf;
00293 
00294     bufu = inbuf.used();
00295     if (bufu < queue_min)
00296     {
00297         newbuf = inbuf.alloc(queue_min - bufu);
00298         i = uread(newbuf, queue_min - bufu);
00299         inbuf.unalloc(queue_min - bufu - i);
00300         
00301         bufu = inbuf.used();
00302     }
00303     
00304     if (bufu < queue_min)
00305         return 0;
00306         
00307     // if buffer is empty, do a hard read
00308     if (!bufu)
00309         bufu = uread(buf, count);
00310     else
00311     {
00312         // otherwise just read from the buffer
00313         if (bufu > count)
00314             bufu = count;
00315     
00316         memcpy(buf, inbuf.get(bufu), bufu);
00317     }
00318     
00319     TRACE("read  obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu, count);
00320     return bufu;
00321 }
00322 
00323 
00324 size_t WvStream::uread(void *buf, size_t count)
00325 {
00326     if (!isok() || !buf || !count) return 0;
00327     
00328     int in = ::read(getrfd(), buf, count);
00329     
00330     if (in < 0 && (errno==EINTR || errno==EAGAIN || errno==ENOBUFS))
00331         return 0; // interrupted
00332 
00333     if (in < 0 || (count && in==0))
00334     {
00335         seterr(in < 0 ? errno : 0);
00336         return 0;
00337     }
00338 
00339     return in;
00340 }
00341 
00342 
00343 size_t WvStream::write(const void *buf, size_t count)
00344 {
00345     if (!isok() || !buf || !count) return 0;
00346     
00347     size_t wrote = 0;
00348     
00349     if (!outbuf_delayed_flush && outbuf.used())
00350         flush(0);
00351     
00352     if (!outbuf_delayed_flush && !outbuf.used())
00353         wrote = uwrite(buf, count);
00354     
00355     if (max_outbuf_size && (outbuf.used() > max_outbuf_size))
00356         return wrote;
00357 
00358     outbuf.put((unsigned char *)buf + wrote, count - wrote);
00359 
00360     //TRACE("queue obj 0x%08x, bytes %d/%d, total %d\n", (unsigned int)this, count - wrote, count, outbuf.used());
00361     
00362     return count;
00363 }
00364 
00365 
00366 size_t WvStream::uwrite(const void *buf, size_t count)
00367 {
00368     if (!isok() || !buf || !count) return 0;
00369     
00370     int out = ::write(getwfd(), buf, count);
00371     
00372     if (out < 0 && (errno == ENOBUFS || errno==EAGAIN))
00373         return 0; // kernel buffer full - data not written
00374     
00375     if (out < 0 || (count && out==0))
00376     {
00377         seterr(out < 0 ? errno : 0); // a more critical error
00378         return 0;
00379     }
00380     
00381     //TRACE("write obj 0x%08x, bytes %d/%d\n", (unsigned int)this, out, count);
00382     return out;
00383 }
00384 
00385 
00386 // NOTE:  wait_msec is implemented wrong, but it has cleaner code this way
00387 // and can at least handle wait vs wait forever vs wait never.
00388 char *WvStream::getline(time_t wait_msec, char separator)
00389 {
00390     size_t i;
00391     unsigned char *buf;
00392     
00393     // if we get here, we either want to wait a bit or there is data
00394     // available.
00395     while (isok())
00396     {
00397         queuemin(0);
00398     
00399         // if there is a newline already, return its string.
00400         if ((i = inbuf.strchr(separator)) > 0)
00401         {
00402             buf = inbuf.get(i);
00403             buf[i-1] = 0;
00404             return (char *)buf;
00405         }
00406         else if (!isok())    // uh oh, stream is in trouble.
00407         {
00408             if (inbuf.used())
00409             {
00410                 // handle "EOF without newline" condition
00411                 inbuf.alloc(1)[0] = 0; // null-terminate it
00412                 return (char *)inbuf.get(inbuf.used());
00413             }
00414             else
00415                 return NULL;   // nothing else to do!
00416         }
00417 
00418         // make select not return true until more data is available
00419         if (inbuf.used())
00420             queuemin(inbuf.used() + 1);
00421         
00422         // note: this _always_ does the select, even if wait_msec < 0.
00423         // That's good, because the fd might be nonblocking!
00424         if (uses_continue_select)
00425         {
00426             if (!continue_select(wait_msec) && isok() && wait_msec >= 0)
00427                 return NULL;
00428         }
00429         else
00430         {
00431             if (!select(wait_msec) && isok() && wait_msec >= 0)
00432                 return NULL;
00433         }
00434         
00435         if (!isok())
00436             return NULL;
00437 
00438         // read a few bytes
00439         buf = inbuf.alloc(1024);
00440         i = uread(buf, 1024);
00441         inbuf.unalloc(1024 - i);
00442     }
00443     
00444     // we timed out or had a socket error
00445     return NULL;
00446 }
00447 
00448 
00449 void WvStream::drain()
00450 {
00451     char buf[1024];
00452     while (select(0))
00453         read(buf, sizeof(buf));
00454 }
00455 
00456 
00457 void WvStream::flush(time_t msec_timeout)
00458 {
00459     size_t attempt, real;
00460     
00461     //TRACE("flush obj 0x%08x, time %ld, outbuf length %d\n", (unsigned int)this, msec_timeout, outbuf.used());
00462     
00463     if (!isok()) return;
00464     
00465     while (isok() && outbuf.used())
00466     {
00467         attempt = outbuf.used();
00468         if (attempt > 1400)
00469             attempt = 1400;
00470         real = uwrite(outbuf.get(attempt), attempt);
00471         if (real < attempt)
00472             outbuf.unget(attempt - real);
00473         
00474         // since post_select() can call us, and select() calls post_select(),
00475         // we need to be careful not to call select() if we don't need to!
00476         if (!msec_timeout || !select(msec_timeout, false, true))
00477         {
00478             if (msec_timeout >= 0)
00479                 break;
00480         }
00481     }
00482 
00483     if (autoclose_time)
00484     {
00485         time_t now = time(NULL);
00486         TRACE("Autoclose enabled for 0x%08X - now-time=%ld, buf %d bytes\n", 
00487                 (unsigned int)this, now - autoclose_time, outbuf.used());
00488         if (!outbuf.used() || now > autoclose_time)
00489         {
00490             autoclose_time = 0; // avoid infinite recursion!
00491             close();
00492         }
00493     }
00494 }
00495 
00496 
00497 void WvStream::flush_then_close(int msec_timeout)
00498 {
00499     time_t now = time(NULL);
00500     autoclose_time = now + (msec_timeout + 999) / 1000;
00501     
00502     TRACE("Autoclose SETUP for 0x%08X - buf %d bytes, timeout %ld sec\n", 
00503             (unsigned int)this, outbuf.used(), autoclose_time - now);
00504 
00505     // as a fast track, we _could_ close here: but that's not a good idea,
00506     // since flush_then_close() deals with obscure situations, and we don't
00507     // want the caller to use it incorrectly.  So we make things _always_
00508     // break when the caller forgets to call select() later.
00509     
00510     flush(0);
00511 }
00512 
00513 
00514 bool WvStream::pre_select(SelectInfo &si)
00515 {
00516     int rfd, wfd;
00517     
00518     time_t alarmleft = alarm_remaining();
00519     
00520     if (alarmleft == 0)
00521         return true; // alarm has rung
00522     
00523     // handle read-ahead buffering
00524     if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
00525         return true; // already ready
00526     
00527     rfd = getrfd();
00528     wfd = getwfd();
00529     
00530     if (si.wants.readable && (rfd >= 0))
00531         FD_SET(rfd, &si.read);
00532     if ((si.wants.writable || outbuf.used() || autoclose_time) && (wfd >= 0))
00533         FD_SET(wfd, &si.write);
00534     if (si.wants.isexception)
00535     {
00536         if (rfd >= 0) FD_SET(rfd, &si.except);
00537         if (wfd >= 0) FD_SET(wfd, &si.except);
00538     }
00539     
00540     if (si.max_fd < rfd)
00541         si.max_fd = rfd;
00542     if (si.max_fd < wfd)
00543         si.max_fd = wfd;
00544     
00545     if (alarmleft >= 0
00546       && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
00547         si.msec_timeout = alarmleft;
00548     
00549     return false;
00550 }
00551 
00552 
00553 bool WvStream::post_select(SelectInfo &si)
00554 {
00555     size_t outbuf_used = outbuf.used();
00556     int rfd = getrfd(), wfd = getwfd();
00557     bool val;
00558     
00559     // flush the output buffer if possible
00560     if (wfd >= 0 
00561         && (outbuf_used || autoclose_time)
00562         && FD_ISSET(wfd, &si.write))
00563     {
00564         flush(0);
00565         
00566         // flush() might have closed the file!
00567         if (!isok()) return false;
00568     }
00569     
00570     val = ((rfd >= 0 && FD_ISSET(rfd, &si.read)) ||
00571             (wfd >= 0 && FD_ISSET(wfd, &si.write)) ||
00572             (rfd >= 0 && FD_ISSET(rfd, &si.except)) ||
00573             (wfd >= 0 && FD_ISSET(wfd, &si.except)));
00574     
00575     if (val && si.wants.readable && read_requires_writable
00576       && !read_requires_writable->select(0, false, true))
00577         return false;
00578     if (val && si.wants.writable && write_requires_readable
00579       && !write_requires_readable->select(0, true, false))
00580         return false;
00581     
00582     return val;
00583 }
00584 
00585 
00586 bool WvStream::_select(time_t msec_timeout,
00587                        bool readable, bool writable, bool isexcept,
00588                        bool forceable)
00589 {
00590     bool sure;
00591     int sel;
00592     timeval tv;
00593     SelectInfo si;
00594     
00595     if (!isok()) return false;
00596 
00597     FD_ZERO(&si.read);
00598     FD_ZERO(&si.write);
00599     FD_ZERO(&si.except);
00600     
00601     if (forceable)
00602         si.wants = force;
00603     else
00604     {
00605         si.wants.readable = readable;
00606         si.wants.writable = writable;
00607         si.wants.isexception = isexcept;
00608     }
00609     
00610     si.max_fd = -1;
00611     si.msec_timeout = msec_timeout;
00612     si.inherit_request = !forceable;
00613     
00614     sure = pre_select(si);
00615     
00616     if (sure)
00617     {
00618         si.msec_timeout = 0;
00619         tv.tv_sec = tv.tv_usec = 0; // never wait: already have a sure thing!
00620     }
00621     else
00622     {
00623         tv.tv_sec = si.msec_timeout / 1000;
00624         tv.tv_usec = (si.msec_timeout % 1000) * 1000;
00625     }
00626     
00627     sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
00628                    si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
00629     
00630     if (sel < 0)
00631     {
00632         if (errno!=EAGAIN && errno!=EINTR && errno!=ENOBUFS)
00633             seterr(errno);
00634         return sure;
00635     }
00636 
00637     if (!sel)
00638         return sure;    // timed out
00639     
00640     return isok() && post_select(si);
00641 }
00642 
00643 
00644 void WvStream::force_select(bool readable, bool writable, bool isexception)
00645 {
00646     force.readable |= readable;
00647     force.writable |= writable;
00648     force.isexception |= isexception;
00649 }
00650 
00651 
00652 void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
00653 {
00654     force.readable &= !readable;
00655     force.writable &= !writable;
00656     force.isexception &= !isexception;
00657 }
00658 
00659 
00660 void WvStream::alarm(time_t msec_timeout)
00661 {
00662     struct timezone tz;
00663     
00664     if (msec_timeout >= 0)
00665     {
00666         gettimeofday(&alarm_time, &tz);
00667         alarm_time.tv_sec += msec_timeout / 1000;
00668         alarm_time.tv_usec += (msec_timeout % 1000) * 1000;
00669         normalize(alarm_time);
00670     }
00671     else
00672     {
00673         // cancel alarm
00674         alarm_time.tv_sec = alarm_time.tv_usec = 0;
00675     }
00676 }
00677 
00678 
00679 time_t WvStream::alarm_remaining()
00680 {
00681     struct timeval &a = alarm_time;
00682     
00683     if (a.tv_sec)
00684     {
00685         struct timeval tv;
00686         struct timezone tz;
00687         
00688         gettimeofday(&tv, &tz);
00689         normalize(tv);
00690         
00691         if (a.tv_sec < tv.tv_sec
00692             || (   a.tv_sec  == tv.tv_sec 
00693                 && a.tv_usec <= tv.tv_usec))
00694         {
00695             return 0;
00696         }
00697         else if (a.tv_sec > tv.tv_sec)
00698         {
00699             return ((a.tv_sec - tv.tv_sec) * 1000
00700                     + (a.tv_usec - tv.tv_usec) / 1000);
00701         }
00702         else // a.tv_sec == tv.tv_sec
00703         {
00704             return (a.tv_usec - tv.tv_usec) / 1000;
00705         }
00706     }
00707     
00708     return -1;
00709 }
00710 
00711 
00712 bool WvStream::continue_select(time_t msec_timeout)
00713 {
00714     assert(uses_continue_select);
00715     assert(task);
00716     assert(taskman);
00717     assert(taskman->whoami() == task);
00718     
00719     if (msec_timeout >= 0)
00720         alarm(msec_timeout);
00721     
00722     running_callback = false;
00723     taskman->yield();
00724     alarm(-1);
00725     
00726     // when we get here, someone has jumped back into our task.
00727     // We have to select(0) here because it's possible that the alarm was 
00728     // ticking _and_ data was available.  This is aggravated especially if
00729     // msec_delay was zero.  Note that running select() here isn't
00730     // inefficient, because if the alarm was expired then pre_select()
00731     // returned true anyway and short-circuited the previous select().
00732     return !alarm_was_ticking || select(0);
00733 }
00734 
00735 
00736 void WvStream::terminate_continue_select()
00737 {
00738     close();
00739     if (task)
00740     {
00741         while (task->isrunning())
00742             taskman->run(*task);
00743         task->recycle();
00744         task = NULL;
00745     }
00746 }
00747 
00748 
00749 const WvAddr *WvStream::src() const
00750 {
00751     return NULL;
00752 }
00753 

Generated on Sun Mar 16 01:01:12 2003 for WvStreams by doxygen1.3-rc3