00001
00002
00003
00004
00005
00006
00007
00008
00009 #include "wvhttppool.h"
00010 #include "wvtcp.h"
00011 #include "wvsslstream.h"
00012 #include "wvbuf.h"
00013 #include "wvbase64.h"
00014 #include "strutils.h"
00015
00016 #ifdef _WIN32
00017 #define ETIMEDOUT WSAETIMEDOUT
00018 #endif
00019
00020 WvHttpStream::WvHttpStream(const WvIPPortAddr &_remaddr, WvStringParm _username,
00021 bool _ssl, WvIPPortAddrTable &_pipeline_incompatible)
00022 : WvUrlStream(_remaddr, _username, WvString("HTTP %s", _remaddr)),
00023 pipeline_incompatible(_pipeline_incompatible)
00024 {
00025 log("Opening server connection.\n");
00026 http_response = "";
00027 encoding = Unknown;
00028 bytes_remaining = 0;
00029 in_chunk_trailer = false;
00030 pipeline_test_count = 0;
00031 last_was_pipeline_test = false;
00032
00033 enable_pipelining = global_enable_pipelining
00034 && !pipeline_incompatible[target.remaddr];
00035 ssl = _ssl;
00036
00037 if (ssl)
00038 cloned = new WvSSLStream(static_cast<WvFDStream*>(cloned));
00039
00040 sent_url_request = false;
00041
00042 alarm(60000);
00043 }
00044
00045
00046 WvHttpStream::~WvHttpStream()
00047 {
00048 log(WvLog::Debug2, "Deleting.\n");
00049 if (geterr())
00050 log("Error was: %s\n", errstr());
00051 close();
00052 }
00053
00054
00055 void WvHttpStream::close()
00056 {
00057
00058
00059 if (enable_pipelining && max_requests > 1
00060 && (pipeline_test_count < 1
00061 || (pipeline_test_count == 1 && last_was_pipeline_test)))
00062 pipelining_is_broken(2);
00063
00064 if (isok())
00065 log("Closing.\n");
00066 WvStreamClone::close();
00067
00068 if (geterr())
00069 {
00070
00071
00072 WvUrlRequest *msgurl = curl;
00073 if (!msgurl && !urls.isempty())
00074 msgurl = urls.first();
00075 if (!msgurl && !waiting_urls.isempty())
00076 msgurl = waiting_urls.first();
00077 if (msgurl)
00078 log("URL '%s' is FAILED\n", msgurl->url);
00079 }
00080 waiting_urls.zap();
00081 if (curl)
00082 doneurl();
00083 }
00084
00085
00086 void WvHttpStream::doneurl()
00087 {
00088 assert(curl != NULL);
00089 log("Done URL: %s\n", curl->url);
00090
00091 http_response = "";
00092 encoding = Unknown;
00093 in_chunk_trailer = false;
00094 bytes_remaining = 0;
00095
00096 last_was_pipeline_test = curl->pipeline_test;
00097 bool broken = false;
00098 if (last_was_pipeline_test)
00099 {
00100 pipeline_test_count++;
00101 if (pipeline_test_count == 1)
00102 start_pipeline_test(&curl->url);
00103 else if (pipeline_test_response != http_response)
00104 {
00105
00106
00107
00108 pipelining_is_broken(4);
00109 broken = true;
00110 }
00111 pipeline_test_response = http_response;
00112 }
00113
00114 assert(curl == urls.first());
00115 curl->done();
00116 curl = NULL;
00117 sent_url_request = false;
00118 urls.unlink_first();
00119
00120 if (broken)
00121 close();
00122
00123 request_next();
00124 }
00125
00126
00127 static WvString encode64(WvStringParm user, WvStringParm password)
00128 {
00129 WvBase64Encoder encoder;
00130 WvString ret;
00131 encoder.flushstrstr(WvString("%s:%s", user, password), ret);
00132 return ret;
00133 }
00134
00135
00136 static WvString fixnl(WvStringParm nonl)
00137 {
00138 WvDynBuf b;
00139 const char *cptr;
00140
00141 for (cptr = nonl; cptr && *cptr; cptr++)
00142 {
00143 if (*cptr == '\r')
00144 continue;
00145 else if (*cptr == '\n')
00146 b.put("\r", 1);
00147 b.put(cptr, 1);
00148 }
00149
00150 return b.getstr();
00151 }
00152
00153
00154 WvString WvHttpStream::request_str(WvUrlRequest *url, bool keepalive)
00155 {
00156 WvString request;
00157 WvString auth(""), content = putstream_data.getstr();
00158 if(!!url->url.getuser() && !!url->url.getpassword())
00159 auth = WvString("Authorization: Basic %s\n",
00160 encode64(url->url.getuser(), url->url.getpassword()));
00161
00162 request = fixnl(WvString("%s %s HTTP/1.1\n"
00163 "Host: %s:%s\n"
00164 "Connection: %s\n"
00165 "%s"
00166 "%s"
00167 "%s%s"
00168 "\n"
00169 "%s",
00170 url->method,
00171 url->url.getfile(),
00172 url->url.gethost(), url->url.getport(),
00173 keepalive ? "keep-alive" : "close",
00174 auth,
00175 (content.len() > 0 ? WvString("Content-Length: %s\n", content.len()).cstr() : ""),
00176 trim_string(url->headers.edit()),
00177 !!url->headers ? "\n" : "",
00178 (content.len() > 0 ? content.cstr() : ""))
00179 );
00180 return request;
00181 }
00182
00183
00184 void WvHttpStream::send_request(WvUrlRequest *url)
00185 {
00186 request_count++;
00187 log("Request #%s: %s\n", request_count, url->url);
00188 write(request_str(url, url->pipeline_test
00189 || request_count < max_requests));
00190 sent_url_request = true;
00191 alarm(60000);
00192 }
00193
00194
00195 void WvHttpStream::start_pipeline_test(WvUrl *url)
00196 {
00197 WvUrl location(WvString(
00198 "%s://%s:%s/wvhttp-pipeline-check-should-not-exist/",
00199 url->getproto(), url->gethost(), url->getport()));
00200 WvUrlRequest *testurl = new WvUrlRequest(location, "HEAD", "", NULL,
00201 false, true);
00202 testurl->instream = this;
00203 send_request(testurl);
00204 urls.append(testurl, true, "sent_running_url");
00205 }
00206
00207
00208 void WvHttpStream::request_next()
00209 {
00210
00211 putstream_data.zap();
00212
00213
00214
00215 if (request_count >= max_requests || waiting_urls.isempty())
00216 return;
00217
00218
00219 if (!enable_pipelining && !urls.isempty())
00220 return;
00221
00222
00223 WvUrlRequest *url = waiting_urls.first();
00224
00225 waiting_urls.unlink_first();
00226 if (!url->putstream)
00227 {
00228 if (enable_pipelining && !request_count && max_requests > 1)
00229 start_pipeline_test(&url->url);
00230 send_request(url);
00231 }
00232 urls.append(url, false, "sent_running_url");
00233 }
00234
00235
00236 void WvHttpStream::pipelining_is_broken(int why)
00237 {
00238 if (!pipeline_incompatible[target.remaddr])
00239 {
00240 pipeline_incompatible.add(new WvIPPortAddr(target.remaddr), true);
00241 log("Pipelining is broken on this server (%s)! Disabling.\n", why);
00242 }
00243 }
00244
00245
00246 bool WvHttpStream::pre_select(SelectInfo &si)
00247 {
00248 SelectRequest oldwant = si.wants;
00249 WvUrlRequest *url;
00250
00251 if (WvUrlStream::pre_select(si))
00252 return true;
00253
00254 if (!urls.isempty())
00255 {
00256 url = urls.first();
00257 if(url && url->putstream && url->putstream->pre_select(si))
00258 return true;
00259 }
00260
00261 si.wants = oldwant;
00262 return false;
00263 }
00264
00265
00266 bool WvHttpStream::post_select(SelectInfo &si)
00267 {
00268 SelectRequest oldwant = si.wants;
00269 WvUrlRequest *url;
00270
00271 if (WvUrlStream::post_select(si))
00272 return true;
00273
00274 if (!urls.isempty())
00275 {
00276 url = urls.first();
00277 if(url && url->putstream && url->putstream->post_select(si))
00278 return true;
00279 }
00280
00281 si.wants = oldwant;
00282 return false;
00283 }
00284
00285
00286 void WvHttpStream::execute()
00287 {
00288 char buf[1024], *line;
00289 size_t len;
00290
00291 WvStreamClone::execute();
00292
00293
00294 if (alarm_was_ticking)
00295 {
00296 log(WvLog::Debug4, "urls count: %s\n", urls.count());
00297 if (!urls.isempty())
00298 {
00299 seterr(ETIMEDOUT);
00300
00301
00302
00303
00304 if (!urls.isempty())
00305 {
00306 WvUrlRequest *url = urls.first();
00307 if (url->outstream)
00308 url->outstream->seterr(ETIMEDOUT);
00309 }
00310 }
00311 else
00312 close();
00313 return;
00314 }
00315
00316
00317
00318
00319 if (curl && !curl->outstream)
00320 {
00321
00322 pipeline_test_count++;
00323 last_was_pipeline_test = false;
00324
00325 close();
00326 if (curl)
00327 doneurl();
00328 return;
00329 }
00330 else if (curl)
00331 curl->inuse = true;
00332
00333 if(!sent_url_request && !urls.isempty())
00334 {
00335 WvUrlRequest *url = urls.first();
00336 if(url)
00337 {
00338 if(url->putstream)
00339 {
00340 int len = 0;
00341 if(url->putstream->isok())
00342 len = url->putstream->read(putstream_data, 1024);
00343
00344 if(!url->putstream->isok() || len == 0)
00345 {
00346 url->putstream = NULL;
00347 send_request(url);
00348 }
00349 }
00350 }
00351 }
00352
00353 if (!curl)
00354 {
00355
00356 line = getline(0);
00357 if (line)
00358 {
00359 line = trim_string(line);
00360 log(WvLog::Debug4, "Header: '%s'\n", line);
00361 if (!http_response)
00362 {
00363 http_response = line;
00364
00365
00366
00367
00368
00369 if (last_was_pipeline_test
00370 && http_response == pipeline_test_response)
00371 {
00372 pipelining_is_broken(1);
00373 close();
00374 return;
00375 }
00376
00377
00378
00379
00380
00381
00382 if (last_was_pipeline_test && !!http_response)
00383 {
00384 const char *cptr = strchr(http_response, ' ');
00385 if (cptr && atoi(cptr+1) == 400)
00386 {
00387 pipelining_is_broken(3);
00388 close();
00389 return;
00390 }
00391 }
00392 }
00393
00394 if (urls.isempty())
00395 {
00396 log("got unsolicited data.\n");
00397 seterr("unsolicited data from server!");
00398 return;
00399 }
00400
00401 if (!strncasecmp(line, "Content-length: ", 16))
00402 {
00403 bytes_remaining = atoi(line+16);
00404 encoding = ContentLength;
00405 }
00406 else if (!strncasecmp(line, "Transfer-Encoding: ", 19)
00407 && strstr(line+19, "chunked"))
00408 {
00409 encoding = Chunked;
00410 }
00411
00412 if (line[0])
00413 {
00414 char *p;
00415 WvBufUrlStream *outstream = urls.first()->outstream;
00416
00417 if ((p = strchr(line, ':')) != NULL)
00418 {
00419 *p = 0;
00420 p = trim_string(p+1);
00421 struct WvHTTPHeader *h = new struct WvHTTPHeader(line, p);
00422 if (outstream)
00423 outstream->headers.add(h, true);
00424 }
00425 else if (strncasecmp(line, "HTTP/", 5) == 0)
00426 {
00427 char *p = strchr(line, ' ');
00428 if (p)
00429 {
00430 *p = 0;
00431 if (outstream)
00432 {
00433 outstream->version = line+5;
00434 outstream->status = atoi(p+1);
00435 }
00436 }
00437 }
00438 }
00439 else
00440 {
00441
00442 curl = urls.first();
00443 in_chunk_trailer = false;
00444 log(WvLog::Debug4,
00445 "Starting data: %s (enc=%s)\n", bytes_remaining, encoding);
00446
00447 if (encoding == Unknown)
00448 encoding = Infinity;
00449
00450 if (curl->method == "HEAD")
00451 {
00452 log("Got all headers.\n");
00453
00454 doneurl();
00455 }
00456 }
00457 }
00458 }
00459 else if (encoding == Chunked && !bytes_remaining)
00460 {
00461 line = getline(0);
00462 if (line)
00463 {
00464 line = trim_string(line);
00465
00466 if (in_chunk_trailer)
00467 {
00468
00469 log(WvLog::Debug4, "Trailer: '%s'\n", line);
00470
00471
00472 if (!line[0])
00473 doneurl();
00474 }
00475 else
00476 {
00477
00478 if (line[0])
00479 {
00480 bytes_remaining = (size_t)strtoul(line, NULL, 16);
00481 if (!bytes_remaining)
00482 in_chunk_trailer = true;
00483 log(WvLog::Debug4, "Chunk length is %s ('%s').\n",
00484 bytes_remaining, line);
00485 }
00486 }
00487 }
00488 }
00489 else if (encoding == Infinity)
00490 {
00491
00492
00493
00494 len = read(buf, sizeof(buf));
00495 if (len)
00496 log(WvLog::Debug5, "Infinity: read %s bytes.\n", len);
00497 if (curl->outstream)
00498 curl->outstream->write(buf, len);
00499
00500 if (!isok())
00501 doneurl();
00502 }
00503 else
00504 {
00505
00506
00507
00508 if (bytes_remaining > sizeof(buf))
00509 len = read(buf, sizeof(buf));
00510 else
00511 len = read(buf, bytes_remaining);
00512 bytes_remaining -= len;
00513 if (len)
00514 log(WvLog::Debug5,
00515 "Read %s bytes (%s bytes left).\n", len, bytes_remaining);
00516 if (curl->outstream)
00517 curl->outstream->write(buf, len);
00518
00519 if (!bytes_remaining && encoding == ContentLength)
00520 doneurl();
00521
00522 if (bytes_remaining && !isok())
00523 seterr("connection interrupted");
00524 }
00525
00526 if (urls.isempty())
00527 alarm(5000);
00528 else
00529 alarm(60000);
00530 }