00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "wvstream.h"
00012 #include "wvtask.h"
00013 #include "wvtimeutils.h"
00014 #include <time.h>
00015 #include <sys/types.h>
00016 #include <assert.h>
00017
00018 #ifdef _WIN32
00019 #define ENOBUFS WSAENOBUFS
00020 #undef errno
00021 #define errno GetLastError()
00022 #else
00023 #include <errno.h>
00024 #endif
00025
00026
00027
00028 #if 0
00029 # define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
00030 #else
00031 #ifndef _MSC_VER
00032 # define TRACE(x, y...)
00033 #else
00034 # define TRACE
00035 #endif
00036 #endif
00037
00038 WvStream *WvStream::globalstream = NULL;
00039
00040 XUUID_MAP_BEGIN(IWvStream)
00041 XUUID_MAP_ENTRY(IObject)
00042 XUUID_MAP_ENTRY(IWvStream)
00043 XUUID_MAP_END
00044
00045 WvStream::WvStream()
00046 {
00047 TRACE("Creating wvstream %p\n", this);
00048
00049 #ifdef _WIN32
00050 WSAData wsaData;
00051 int result = WSAStartup(MAKEWORD(2,0), &wsaData);
00052 assert(result == 0);
00053 #endif
00054 wvstream_execute_called = false;
00055 userdata = closecb_data = NULL;
00056 errnum = 0;
00057 max_outbuf_size = 0;
00058 outbuf_delayed_flush = false;
00059 want_to_flush = true;
00060 is_flushing = false;
00061 is_auto_flush = true;
00062 alarm_was_ticking = false;
00063 force.readable = true;
00064 force.writable = force.isexception = false;
00065 read_requires_writable = write_requires_readable = NULL;
00066 running_callback = false;
00067 want_nowrite = false;
00068 queue_min = 0;
00069 autoclose_time = 0;
00070 alarm_time = wvtime_zero;
00071 last_alarm_check = wvtime_zero;
00072 taskman = 0;
00073
00074
00075 uses_continue_select = false;
00076 personal_stack_size = 65536;
00077 task = NULL;
00078 }
00079
00080
00081
00082 IWvStream::IWvStream()
00083 {
00084 }
00085
00086
00087 IWvStream::~IWvStream()
00088 {
00089 }
00090
00091
00092 WvStream::~WvStream()
00093 {
00094 TRACE("destroying %p\n", this);
00095 if (running_callback)
00096 {
00097
00098 TRACE("eek! destroying while running_callback!\n");
00099 assert(!running_callback);
00100 }
00101 close();
00102
00103 if (task)
00104 {
00105 while (task->isrunning())
00106 taskman->run(*task);
00107 task->recycle();
00108 task = NULL;
00109 }
00110 TRACE("done destroying %p\n", this);
00111 if (taskman)
00112 taskman->unlink();
00113 }
00114
00115
00116 void WvStream::close()
00117 {
00118 flush(2000);
00119 if (!! closecb_func)
00120 {
00121 WvStreamCallback cb = closecb_func;
00122 closecb_func = 0;
00123 cb(*this, closecb_data);
00124 }
00125 }
00126
00127
00128 void WvStream::autoforward(WvStream &s)
00129 {
00130 setcallback(autoforward_callback, &s);
00131 read_requires_writable = &s;
00132 }
00133
00134
00135 void WvStream::noautoforward()
00136 {
00137 setcallback(0, NULL);
00138 read_requires_writable = NULL;
00139 }
00140
00141
00142 void WvStream::autoforward_callback(WvStream &s, void *userdata)
00143 {
00144 WvStream &s2 = *(WvStream *)userdata;
00145 char buf[1024];
00146 size_t len;
00147
00148 len = s.read(buf, sizeof(buf));
00149 s2.write(buf, len);
00150 }
00151
00152
00153
00154
00155
00156 void WvStream::_callback(void *stream)
00157 {
00158 WvStream *s = (WvStream *)stream;
00159
00160 s->running_callback = true;
00161
00162 s->wvstream_execute_called = false;
00163 s->execute();
00164 if (!! s->callfunc)
00165 s->callfunc(*s, s->userdata);
00166
00167
00168
00169
00170
00171
00172 assert(s->wvstream_execute_called);
00173
00174 s->running_callback = false;
00175 }
00176
00177
00178 void WvStream::callback()
00179 {
00180 TRACE("(?)");
00181
00182
00183
00184 if (running_callback)
00185 return;
00186
00187
00188 if (alarm_remaining() == 0)
00189 {
00190 alarm_time = wvtime_zero;
00191 alarm_was_ticking = true;
00192 }
00193 else
00194 alarm_was_ticking = false;
00195
00196 assert(!uses_continue_select || personal_stack_size >= 1024);
00197
00198
00199 if (uses_continue_select && personal_stack_size >= 1024)
00200 {
00201 if (!taskman)
00202 taskman = WvTaskMan::get();
00203
00204 if (!task)
00205 {
00206 TRACE("(!)");
00207 task = taskman->start("streamexec", _callback, this,
00208 personal_stack_size);
00209 }
00210 else if (!task->isrunning())
00211 {
00212 TRACE("(.)");
00213 task->start("streamexec2", _callback, this);
00214 }
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234 do
00235 {
00236 taskman->run(*task);
00237 } while (task && task->isrunning() && running_callback);
00238 }
00239 else
00240 _callback(this);
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256 }
00257
00258
00259 void WvStream::execute()
00260 {
00261
00262 wvstream_execute_called = true;
00263 }
00264
00265
00266 bool WvStream::isok() const
00267 {
00268 return WvError::isok();
00269 }
00270
00271
00272 void WvStream::seterr(int _errnum)
00273 {
00274 if (!errnum)
00275 {
00276 WvError::seterr(_errnum);
00277 close();
00278 }
00279 }
00280
00281
00282 size_t WvStream::read(WvBuf &outbuf, size_t count)
00283 {
00284
00285 size_t free = outbuf.free();
00286 if (count > free)
00287 count = free;
00288
00289 WvDynBuf tmp;
00290 unsigned char *buf = tmp.alloc(count);
00291 size_t len = read(buf, count);
00292 tmp.unalloc(count - len);
00293 outbuf.merge(tmp);
00294 return len;
00295 }
00296
00297
00298 size_t WvStream::continue_read(time_t wait_msec, WvBuf &outbuf, size_t count)
00299 {
00300
00301 size_t free = outbuf.free();
00302 if (count > free)
00303 count = free;
00304 unsigned char *buf = outbuf.alloc(count);
00305
00306
00307 size_t len = continue_read(wait_msec, buf, count);
00308
00309 outbuf.unalloc(count - len);
00310 return len;
00311 }
00312
00313
00314 size_t WvStream::write(WvBuf &inbuf, size_t count)
00315 {
00316
00317 size_t avail = inbuf.used();
00318 if (count > avail)
00319 count = avail;
00320 const unsigned char *buf = inbuf.get(count);
00321 size_t len = write(buf, count);
00322 inbuf.unget(count - len);
00323 return len;
00324 }
00325
00326
00327 size_t WvStream::read(void *buf, size_t count)
00328 {
00329 size_t bufu = inbuf.used(), i;
00330 unsigned char *newbuf;
00331
00332 bufu = inbuf.used();
00333 if (bufu < queue_min)
00334 {
00335 newbuf = inbuf.alloc(queue_min - bufu);
00336 i = uread(newbuf, queue_min - bufu);
00337 inbuf.unalloc(queue_min - bufu - i);
00338
00339 bufu = inbuf.used();
00340 }
00341
00342 if (bufu < queue_min)
00343 return 0;
00344
00345
00346 if (!bufu)
00347 bufu = uread(buf, count);
00348 else
00349 {
00350
00351 if (bufu > count)
00352 bufu = count;
00353
00354 memcpy(buf, inbuf.get(bufu), bufu);
00355 }
00356
00357 TRACE("read obj 0x%p, bytes %d/%d\n", this, bufu, count);
00358 return bufu;
00359 }
00360
00361
00362 size_t WvStream::continue_read(time_t wait_msec, void *buf, size_t count)
00363 {
00364 assert(uses_continue_select);
00365
00366 if (!count)
00367 return 0;
00368
00369
00370 if (wait_msec >= 0)
00371 alarm(wait_msec);
00372
00373 queuemin(count);
00374
00375 int got = 0;
00376
00377 while (isok())
00378 {
00379 if (continue_select(-1))
00380 {
00381 if ((got = read(buf, count)) != 0)
00382 break;
00383 if (alarm_was_ticking)
00384 break;
00385 }
00386 }
00387
00388 if (wait_msec >= 0)
00389 alarm(-1);
00390
00391 queuemin(0);
00392
00393 return got;
00394 }
00395
00396
00397 size_t WvStream::write(const void *buf, size_t count)
00398 {
00399 if (!isok() || !buf || !count) return 0;
00400
00401 size_t wrote = 0;
00402 if (!outbuf_delayed_flush && !outbuf.used())
00403 {
00404 wrote = uwrite(buf, count);
00405 count -= wrote;
00406 buf = (const unsigned char*)buf + wrote;
00407 }
00408 if (max_outbuf_size != 0)
00409 {
00410 size_t canbuffer = max_outbuf_size - outbuf.used();
00411 if (count > canbuffer)
00412 count = canbuffer;
00413 }
00414 if (count != 0)
00415 {
00416 outbuf.put(buf, count);
00417 wrote += count;
00418 }
00419
00420 if (should_flush())
00421 {
00422 if (is_auto_flush)
00423 flush(0);
00424 else
00425 flush_outbuf(0);
00426 }
00427
00428 return wrote;
00429 }
00430
00431
00432 void WvStream::noread()
00433 {
00434
00435
00436 }
00437
00438
00439 void WvStream::nowrite()
00440 {
00441 if (getwfd() < 0)
00442 return;
00443
00444 want_nowrite = true;
00445 }
00446
00447
00448 bool WvStream::isreadable()
00449 {
00450 return isok() && select(0, true, false, false);
00451 }
00452
00453
00454 bool WvStream::iswritable()
00455 {
00456 return isok() && select(0, false, true, false);
00457 }
00458
00459
00460 char *WvStream::getline(time_t wait_msec, char separator, int readahead)
00461 {
00462 struct timeval timeout_time;
00463 if (wait_msec > 0)
00464 timeout_time = msecadd(wvtime(), wait_msec);
00465
00466
00467
00468 while (isok())
00469 {
00470 queuemin(0);
00471
00472
00473 size_t i = inbuf.strchr(separator);
00474 if (i > 0)
00475 {
00476 char *eol = (char *)inbuf.mutablepeek(i - 1, 1);
00477 assert(eol);
00478 *eol = 0;
00479 return (char *)inbuf.get(i);
00480 }
00481 else if (!isok())
00482 {
00483 if (inbuf.used())
00484 {
00485
00486
00487
00488 inbuf.alloc(1)[0] = 0;
00489 return const_cast<char*>(
00490 (const char*)inbuf.get(inbuf.used()));
00491 }
00492 else
00493 break;
00494 }
00495
00496
00497 size_t needed = inbuf.used() + 1;
00498 queuemin(needed);
00499
00500
00501 if (wait_msec > 0)
00502 {
00503 wait_msec = msecdiff(timeout_time, wvtime());
00504 if (wait_msec < 0)
00505 wait_msec = 0;
00506 }
00507
00508 bool hasdata;
00509 if (uses_continue_select)
00510 hasdata = continue_select(wait_msec);
00511 else
00512 hasdata = select(wait_msec, true, false);
00513 if (!isok())
00514 break;
00515
00516 if (hasdata)
00517 {
00518
00519 unsigned char *buf = inbuf.alloc(readahead);
00520 size_t len = uread(buf, readahead);
00521 inbuf.unalloc(readahead - len);
00522 hasdata = inbuf.used() >= needed;
00523 }
00524
00525 if (!hasdata && wait_msec == 0)
00526 break;
00527 }
00528
00529
00530 if (!isok() && inbuf.used())
00531 {
00532
00533 inbuf.put("", 1);
00534 return (char *)inbuf.get(inbuf.used());
00535 }
00536 else
00537 return NULL;
00538 }
00539
00540
00541 void WvStream::drain()
00542 {
00543 char buf[1024];
00544 while (isreadable())
00545 read(buf, sizeof(buf));
00546 }
00547
00548
00549 bool WvStream::flush(time_t msec_timeout)
00550 {
00551 if (is_flushing) return false;
00552
00553 TRACE("%p flush starts\n", this);
00554
00555 is_flushing = true;
00556 want_to_flush = true;
00557 bool done = flush_internal(msec_timeout)
00558 && flush_outbuf(msec_timeout);
00559 is_flushing = false;
00560
00561 TRACE("flush stops (%d)\n", done);
00562 return done;
00563 }
00564
00565
00566 bool WvStream::should_flush()
00567 {
00568 return want_to_flush;
00569 }
00570
00571
00572 bool WvStream::flush_outbuf(time_t msec_timeout)
00573 {
00574 TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok());
00575
00576
00577 while (isok() && outbuf.used())
00578 {
00579
00580
00581
00582 size_t attempt = outbuf.used();
00583 size_t real = uwrite(outbuf.get(attempt), attempt);
00584
00585
00586
00587
00588 if (isok() && real < attempt)
00589 {
00590 TRACE("flush_outbuf: unget %d-%d\n", attempt, real);
00591 assert(outbuf.ungettable() >= attempt - real);
00592 outbuf.unget(attempt - real);
00593 }
00594
00595
00596
00597 if (!msec_timeout || !select(msec_timeout, false, true))
00598 {
00599 if (msec_timeout >= 0)
00600 break;
00601 }
00602 }
00603
00604
00605 if (isok() && autoclose_time)
00606 {
00607 time_t now = time(NULL);
00608 TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n",
00609 this, now - autoclose_time, outbuf.used());
00610 if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time)
00611 {
00612 autoclose_time = 0;
00613 close();
00614 }
00615 }
00616
00617 TRACE("flush_outbuf: after autoclose chunk\n");
00618
00619 if (!outbuf.used() && outbuf_delayed_flush)
00620 want_to_flush = false;
00621
00622 TRACE("flush_outbuf: now isok=%d\n", isok());
00623
00624
00625 if (!isok())
00626 outbuf.zap();
00627
00628 TRACE("flush_outbuf stops\n");
00629
00630 return !outbuf.used();
00631 }
00632
00633
00634 bool WvStream::flush_internal(time_t msec_timeout)
00635 {
00636
00637 return true;
00638 }
00639
00640
00641 int WvStream::getrfd() const
00642 {
00643 return -1;
00644 }
00645
00646
00647 int WvStream::getwfd() const
00648 {
00649 return -1;
00650 }
00651
00652
00653 void WvStream::flush_then_close(int msec_timeout)
00654 {
00655 time_t now = time(NULL);
00656 autoclose_time = now + (msec_timeout + 999) / 1000;
00657
00658 TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n",
00659 this, outbuf.used(), autoclose_time - now);
00660
00661
00662
00663
00664
00665
00666 flush(0);
00667 }
00668
00669
00670 bool WvStream::pre_select(SelectInfo &si)
00671 {
00672 time_t alarmleft = alarm_remaining();
00673
00674 if (alarmleft == 0)
00675 return true;
00676
00677 if (!si.inherit_request)
00678 si.wants |= force;
00679
00680
00681 if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
00682 return true;
00683 if (alarmleft >= 0
00684 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
00685 si.msec_timeout = alarmleft;
00686 return false;
00687 }
00688
00689
00690 bool WvStream::post_select(SelectInfo &si)
00691 {
00692
00693
00694
00695 return false;
00696 }
00697
00698
00699 bool WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
00700 bool readable, bool writable, bool isexcept, bool forceable)
00701 {
00702 FD_ZERO(&si.read);
00703 FD_ZERO(&si.write);
00704 FD_ZERO(&si.except);
00705
00706 if (forceable)
00707 si.wants = force;
00708 else
00709 {
00710 si.wants.readable = readable;
00711 si.wants.writable = writable;
00712 si.wants.isexception = isexcept;
00713 }
00714
00715 si.max_fd = -1;
00716 si.msec_timeout = msec_timeout;
00717 si.inherit_request = ! forceable;
00718 si.global_sure = false;
00719
00720 if (!isok()) return false;
00721
00722 bool sure = pre_select(si);
00723 if (globalstream && forceable && (globalstream != this))
00724 {
00725 WvStream *s = globalstream;
00726 globalstream = NULL;
00727 si.global_sure = s->pre_select(si);
00728 globalstream = s;
00729 }
00730 if (sure || si.global_sure)
00731 si.msec_timeout = 0;
00732 return sure;
00733 }
00734
00735
00736 int WvStream::_do_select(SelectInfo &si)
00737 {
00738
00739 timeval tv;
00740 tv.tv_sec = si.msec_timeout / 1000;
00741 tv.tv_usec = (si.msec_timeout % 1000) * 1000;
00742
00743
00744 int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
00745 si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
00746
00747
00748
00749
00750
00751
00752 if (sel < 0
00753 && errno != EAGAIN && errno != EINTR
00754 && errno != EBADF
00755 && errno != ENOBUFS
00756 #ifdef _WIN32
00757 && errno != WSAEINVAL
00758 #endif
00759 )
00760 {
00761 seterr(errno);
00762 }
00763 return sel;
00764 }
00765
00766
00767 bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable)
00768 {
00769 if (!isok()) return false;
00770
00771 bool sure = post_select(si);
00772 if (globalstream && forceable && (globalstream != this))
00773 {
00774 WvStream *s = globalstream;
00775 globalstream = NULL;
00776 si.global_sure = s->post_select(si) || si.global_sure;
00777 globalstream = s;
00778 }
00779 return sure;
00780 }
00781
00782
00783 bool WvStream::_select(time_t msec_timeout,
00784 bool readable, bool writable, bool isexcept, bool forceable)
00785 {
00786 SelectInfo si;
00787 bool sure = _build_selectinfo(si, msec_timeout,
00788 readable, writable, isexcept, forceable);
00789
00790 if (!isok())
00791 return false;
00792
00793
00794
00795
00796
00797
00798
00799
00800
00801 int sel = _do_select(si);
00802 if (sel >= 0)
00803 sure = _process_selectinfo(si, forceable) || sure;
00804 if (si.global_sure && globalstream && forceable && (globalstream != this))
00805 globalstream->callback();
00806 return sure;
00807 }
00808
00809
00810 void WvStream::force_select(bool readable, bool writable, bool isexception)
00811 {
00812 force.readable |= readable;
00813 force.writable |= writable;
00814 force.isexception |= isexception;
00815 }
00816
00817
00818 void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
00819 {
00820 force.readable &= !readable;
00821 force.writable &= !writable;
00822 force.isexception &= !isexception;
00823 }
00824
00825
00826 void WvStream::alarm(time_t msec_timeout)
00827 {
00828 if (msec_timeout >= 0)
00829 alarm_time = msecadd(wvtime(), msec_timeout);
00830 else
00831 alarm_time = wvtime_zero;
00832 }
00833
00834
00835 time_t WvStream::alarm_remaining()
00836 {
00837 if (alarm_time.tv_sec && !running_callback)
00838 {
00839 WvTime now = wvtime();
00840
00841
00842 if (now < last_alarm_check)
00843 alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
00844
00845 last_alarm_check = now;
00846
00847 time_t remaining = msecdiff(alarm_time, now);
00848 if (remaining < 0)
00849 remaining = 0;
00850 return remaining;
00851 }
00852 return -1;
00853 }
00854
00855
00856 bool WvStream::continue_select(time_t msec_timeout)
00857 {
00858 assert(uses_continue_select);
00859 assert(task);
00860 assert(taskman);
00861 assert(taskman->whoami() == task);
00862
00863 if (msec_timeout >= 0)
00864 alarm(msec_timeout);
00865
00866 running_callback = false;
00867 taskman->yield();
00868 running_callback = true;
00869 alarm(-1);
00870
00871
00872
00873
00874
00875
00876
00877
00878
00879
00880 TRACE("hello-%p\n", this);
00881 return !alarm_was_ticking || select(0);
00882 }
00883
00884
00885 void WvStream::terminate_continue_select()
00886 {
00887 close();
00888 if (task)
00889 {
00890 while (task->isrunning())
00891 taskman->run(*task);
00892 task->recycle();
00893 task = NULL;
00894 }
00895 }
00896
00897
00898 const WvAddr *WvStream::src() const
00899 {
00900 return NULL;
00901 }
00902
00903
00904 void WvStream::unread(WvBuf &unreadbuf, size_t count)
00905 {
00906 WvDynBuf tmp;
00907 tmp.merge(unreadbuf, count);
00908 tmp.merge(inbuf);
00909 inbuf.zap();
00910 inbuf.merge(tmp);
00911 }