00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef lint
00021 static const char rcsid[] = "$Id: sockbuf.c,v 1.23 2007-09-13 22:20:55 sven Exp $";
00022 #endif
00023
00024 #include <eggdrop/eggdrop.h>
00025 #include <unistd.h>
00026 #include <sys/socket.h>
00027
00028 #ifdef HAVE_POLL
00029 # ifdef HAVE_SYS_POLL_H
00030 # include <sys/poll.h>
00031 # else
00032 # include <poll.h>
00033 # endif
00034 #else
00035 # include "lib/compat/poll.h"
00036 #endif
00037
00038 #include <errno.h>
00039
00040 typedef struct {
00041 int sock;
00042 int flags;
00043
00044 char *peer_ip;
00045 int peer_port;
00046 char *my_ip;
00047 int my_port;
00048
00049
00050 sockbuf_stats_t *stats;
00051
00052 char *data;
00053 int len;
00054
00055 sockbuf_filter_t **filters;
00056 void **filter_client_data;
00057 int nfilters;
00058
00059 sockbuf_handler_t *handler;
00060 void *client_data;
00061 event_owner_t *owner;
00062 } sockbuf_t;
00063
00064 static sockbuf_t *sockbufs = NULL;
00065 static int nsockbufs = 0;
00066 static int ndeleted_sockbufs = 0;
00067
00068
00069 static int *idx_array = NULL;
00070 static struct pollfd *pollfds = NULL;
00071 static int npollfds = 0;
00072
00073
00074
00075 static int nlisteners = 0;
00076
00077
00078 static sockbuf_handler_t sockbuf_idler = {
00079 "idle",
00080 NULL, NULL, NULL,
00081 NULL, NULL
00082 };
00083
00084 static void sockbuf_got_eof(int idx, int err);
00085
00086
00087 static void stats_in(sockbuf_stats_t *stats, int len);
00088 static void stats_out(sockbuf_stats_t *stats, int len);
00089 static void skip_stats(sockbuf_stats_t *stats, int curtime);
00090 static void update_stats(sockbuf_stats_t *stats);
00091
00092 int sockbuf_init(void)
00093 {
00094 return (0);
00095 }
00096
00097 int sockbuf_shutdown(void)
00098 {
00099 int i;
00100
00101 for (i = npollfds - 1; i >= 0; i--) {
00102 sockbuf_t *sbuf = &sockbufs[idx_array[i]];
00103
00104 putlog(LOG_DEBUG, "*", "Socket %i %s:%i shouldn't be opened at this stage, closing.", idx_array[i],
00105 (sbuf->peer_ip) ? sbuf->peer_ip : sbuf->my_ip,
00106 (sbuf->peer_ip) ? sbuf->peer_port : sbuf->my_port);
00107
00108 sockbuf_delete(idx_array[i]);
00109 }
00110
00111 if (idx_array) {
00112 free(idx_array);
00113 idx_array = NULL;
00114 }
00115
00116 if (pollfds) {
00117 free(pollfds);
00118 pollfds = NULL;
00119 }
00120 npollfds = 0;
00121
00122 if (sockbufs) {
00123 free(sockbufs);
00124 sockbufs = NULL;
00125 }
00126 nsockbufs = 0;
00127
00128 return (0);
00129 }
00130
00131 int sockbuf_list(int **idx, int *len, int flags)
00132 {
00133 int i, j;
00134
00135 *idx = malloc(sizeof(int) * (nsockbufs+1));
00136 j = 0;
00137 for (i = 0; i < nsockbufs; i++) {
00138 if (sockbufs[i].flags & (SOCKBUF_DELETED | SOCKBUF_AVAIL)) continue;
00139 if (sockbufs[i].flags & flags) (*idx)[j++] = i;
00140 }
00141 *len = j;
00142 return(0);
00143 }
00144
00145
00146 static void sockbuf_block(int idx)
00147 {
00148 int i;
00149 sockbufs[idx].flags |= SOCKBUF_BLOCK;
00150 for (i = 0; i < npollfds; i++) {
00151 if (idx_array[i] == idx) {
00152 pollfds[i].events |= POLLOUT;
00153 break;
00154 }
00155 }
00156 }
00157
00158
00159 static void sockbuf_unblock(int idx)
00160 {
00161 int i;
00162 sockbufs[idx].flags &= (~SOCKBUF_BLOCK);
00163 for (i = 0; i < npollfds; i++) {
00164 if (idx_array[i] == idx) {
00165 pollfds[i].events &= (~POLLOUT);
00166 break;
00167 }
00168 }
00169 }
00170
00171
00172
00173 static int sockbuf_real_write(int idx, const char *data, int len)
00174 {
00175 int nbytes = 0;
00176 sockbuf_t *sbuf = &sockbufs[idx];
00177
00178 if (sbuf->sock < 0) return 0;
00179
00180 if (!(sbuf->flags & SOCKBUF_BLOCK)) {
00181 nbytes = write (sbuf->sock, data, len);
00182 if (nbytes < 0) {
00183 if (errno != EAGAIN) {
00184 sockbuf_got_eof(idx, errno);
00185 return(nbytes);
00186 }
00187 nbytes = 0;
00188 }
00189
00190 if (nbytes > 0) stats_out(sbuf->stats, nbytes);
00191 if (nbytes == len) return(nbytes);
00192 sockbuf_block(idx);
00193 data += nbytes;
00194 len -= nbytes;
00195 }
00196
00197
00198 sbuf->data = realloc(sbuf->data, sbuf->len + len);
00199 memcpy(sbuf->data + sbuf->len, data, len);
00200 sbuf->len += len;
00201 return(nbytes);
00202 }
00203
00204
00205 int sockbuf_on_eof(int idx, int level, int err, const char *errmsg)
00206 {
00207 int i;
00208 sockbuf_t *sbuf = &sockbufs[idx];
00209
00210 for (i = 0; i < sbuf->nfilters; i++) {
00211 if (sbuf->filters[i]->on_eof && sbuf->filters[i]->level > level) {
00212 return sbuf->filters[i]->on_eof(sbuf->filter_client_data[i], idx, err, errmsg);
00213 }
00214 }
00215
00216
00217 if (sbuf->handler->on_eof) {
00218 sbuf->handler->on_eof(sbuf->client_data, idx, err, errmsg);
00219 }
00220 return(0);
00221 }
00222
00223
00224 int sockbuf_on_connect(int idx, int level, const char *peer_ip, int peer_port)
00225 {
00226 int i;
00227 sockbuf_t *sbuf = &sockbufs[idx];
00228
00229 for (i = 0; i < sbuf->nfilters; i++) {
00230 if (sbuf->filters[i]->on_connect && sbuf->filters[i]->level > level) {
00231 return sbuf->filters[i]->on_connect(sbuf->filter_client_data[i], idx, peer_ip, peer_port);
00232 }
00233 }
00234
00235 timer_get_now(&sbuf->stats->connected_at);
00236 if (peer_ip) str_redup(&sbuf->peer_ip, peer_ip);
00237 sbuf->peer_port = peer_port;
00238 socket_get_name(sbuf->sock, &sbuf->my_ip, &sbuf->my_port);
00239
00240 if (sbuf->handler->on_connect) {
00241 sbuf->handler->on_connect(sbuf->client_data, idx, peer_ip, peer_port);
00242 }
00243 return(0);
00244 }
00245
00246
00247 int sockbuf_on_newclient(int idx, int level, int newidx, const char *peer_ip, int peer_port)
00248 {
00249 int i;
00250 sockbuf_t *sbuf = &sockbufs[idx];
00251 sockbuf_t *newsbuf = &sockbufs[newidx];
00252
00253 for (i = 0; i < sbuf->nfilters; i++) {
00254 if (sbuf->filters[i]->on_connect && sbuf->filters[i]->level > level) {
00255 return sbuf->filters[i]->on_newclient(sbuf->filter_client_data[i], idx, newidx, peer_ip, peer_port);
00256 }
00257 }
00258
00259 timer_get_now(&newsbuf->stats->connected_at);
00260 if (peer_ip) str_redup(&newsbuf->peer_ip, peer_ip);
00261 newsbuf->peer_port = peer_port;
00262 socket_get_name(newsbuf->sock, &newsbuf->my_ip, &newsbuf->my_port);
00263
00264 if (sbuf->handler->on_newclient) {
00265 sbuf->handler->on_newclient(sbuf->client_data, idx, newidx, peer_ip, peer_port);
00266 }
00267 return(0);
00268 }
00269
00270
00271 int sockbuf_on_read(int idx, int level, char *data, int len)
00272 {
00273 int i;
00274 sockbuf_t *sbuf = &sockbufs[idx];
00275
00276 for (i = 0; i < sbuf->nfilters; i++) {
00277 if (sbuf->filters[i]->on_read && sbuf->filters[i]->level > level) {
00278 return sbuf->filters[i]->on_read(sbuf->filter_client_data[i], idx, data, len);
00279 }
00280 }
00281
00282 sbuf->stats->bytes_in += len;
00283 if (sbuf->handler->on_read) {
00284 sbuf->handler->on_read(sbuf->client_data, idx, data, len);
00285 }
00286 return(0);
00287 }
00288
00289
00290 int sockbuf_on_write(int idx, int level, const char *data, int len)
00291 {
00292 int i;
00293 sockbuf_t *sbuf = &sockbufs[idx];
00294
00295 for (i = sbuf->nfilters-1; i >= 0; i--) {
00296 if (sbuf->filters[i]->on_write && sbuf->filters[i]->level < level) {
00297 return sbuf->filters[i]->on_write(sbuf->filter_client_data[i], idx, data, len);
00298 }
00299 }
00300
00301 return sockbuf_real_write(idx, data, len);
00302 }
00303
00304
00305 int sockbuf_on_written(int idx, int level, int len, int remaining)
00306 {
00307 int i;
00308 sockbuf_t *sbuf = &sockbufs[idx];
00309
00310 for (i = 0; i < sbuf->nfilters; i++) {
00311 if (sbuf->filters[i]->on_written && sbuf->filters[i]->level > level) {
00312 return sbuf->filters[i]->on_written(sbuf->filter_client_data[i], idx, len, remaining);
00313 }
00314 }
00315
00316 if (sbuf->handler->on_written) {
00317 sbuf->handler->on_written(sbuf->client_data, idx, len, remaining);
00318 }
00319 return(0);
00320 }
00321
00322
00323 static void sockbuf_got_eof(int idx, int err)
00324 {
00325 char *errmsg;
00326 sockbuf_t *sbuf = &sockbufs[idx];
00327
00328
00329 if (!err) err = socket_get_error(sbuf->sock);
00330
00331
00332 errmsg = strerror(err);
00333
00334 sockbuf_close(idx);
00335 sockbuf_on_eof(idx, SOCKBUF_LEVEL_INTERNAL, err, errmsg);
00336 }
00337
00338
00339
00340
00341 static void sockbuf_got_writable_client(int idx)
00342 {
00343 int err, peer_port;
00344 char *peer_ip;
00345 sockbuf_t *sbuf = &sockbufs[idx];
00346
00347 err = socket_get_error(sbuf->sock);
00348 if (err) {
00349 sockbuf_got_eof(idx, err);
00350 return;
00351 }
00352
00353 sbuf->flags &= ~SOCKBUF_CONNECTING;
00354 if (!sbuf->len) sockbuf_unblock(idx);
00355 socket_get_peer_name(sbuf->sock, &peer_ip, &peer_port);
00356
00357 sockbuf_on_connect(idx, SOCKBUF_LEVEL_INTERNAL, peer_ip, peer_port);
00358 if (peer_ip) free(peer_ip);
00359 }
00360
00361
00362
00363
00364 static void sockbuf_got_readable_server(int idx)
00365 {
00366 int newsock, newidx, peer_port;
00367 char *peer_ip = NULL;
00368 sockbuf_t *sbuf = &sockbufs[idx];
00369
00370 newsock = socket_accept(sbuf->sock, &peer_ip, &peer_port);
00371 if (newsock < 0) {
00372 if (peer_ip) free(peer_ip);
00373 return;
00374 }
00375 socket_set_nonblock(newsock, 1);
00376
00377 newidx = sockbuf_new();
00378 timer_get_now(&sockbufs[newidx].stats->connected_at);
00379 sockbuf_set_sock(newidx, newsock, SOCKBUF_INBOUND);
00380 sockbuf_on_newclient(idx, SOCKBUF_LEVEL_INTERNAL, newidx, peer_ip, peer_port);
00381 free(peer_ip);
00382 }
00383
00384
00385
00386
00387 static void sockbuf_got_writable(int idx)
00388 {
00389 int nbytes;
00390 sockbuf_t *sbuf = &sockbufs[idx];
00391
00392 if (sbuf->sock < 0) return;
00393
00394 errno = 0;
00395 nbytes = write(sbuf->sock, sbuf->data, sbuf->len);
00396 if (nbytes > 0) {
00397 stats_out(sbuf->stats, nbytes);
00398 sbuf->len -= nbytes;
00399 sbuf->stats->raw_bytes_left = sbuf->len;
00400 if (!sbuf->len) sockbuf_unblock(idx);
00401 else memmove(sbuf->data, sbuf->data+nbytes, sbuf->len);
00402 sockbuf_on_written(idx, SOCKBUF_LEVEL_INTERNAL, nbytes, sbuf->len);
00403 }
00404 else if (nbytes < 0) {
00405
00406
00407 sockbuf_got_eof(idx, errno);
00408 }
00409 }
00410
00411
00412
00413
00414 static void sockbuf_got_readable(int idx)
00415 {
00416 sockbuf_t *sbuf = &sockbufs[idx];
00417 char buf[4097];
00418 int nbytes;
00419
00420 if (sbuf->sock < 0) return;
00421 errno = 0;
00422 nbytes = read(sbuf->sock, buf, sizeof(buf)-1);
00423 if (nbytes > 0) {
00424 stats_in(sbuf->stats, nbytes);
00425 buf[nbytes] = 0;
00426 sockbuf_on_read(idx, SOCKBUF_LEVEL_INTERNAL, buf, nbytes);
00427 }
00428 else {
00429 sockbuf_got_eof(idx, errno);
00430 }
00431 }
00432
00433 int sockbuf_new()
00434 {
00435 sockbuf_t *sbuf;
00436 int idx;
00437
00438 for (idx = 0; idx < nsockbufs; idx++) {
00439 if (sockbufs[idx].flags & SOCKBUF_AVAIL) break;
00440 }
00441 if (idx == nsockbufs) {
00442 int i;
00443
00444 sockbufs = realloc(sockbufs, (nsockbufs+5) * sizeof(*sockbufs));
00445 memset(sockbufs+nsockbufs, 0, 5 * sizeof(*sockbufs));
00446 for (i = 0; i < 5; i++) {
00447 sockbufs[nsockbufs+i].sock = -1;
00448 sockbufs[nsockbufs+i].flags = SOCKBUF_AVAIL;
00449 }
00450 nsockbufs += 5;
00451 }
00452
00453 sbuf = &sockbufs[idx];
00454 memset(sbuf, 0, sizeof(*sbuf));
00455 sbuf->flags = SOCKBUF_BLOCK;
00456 sbuf->sock = -1;
00457 sbuf->handler = &sockbuf_idler;
00458 sbuf->stats = calloc(1, sizeof(*sbuf->stats));
00459
00460 return(idx);
00461 }
00462
00463 int sockbuf_get_sock(int idx)
00464 {
00465 if (!sockbuf_isvalid(idx)) return(-1);
00466 return(sockbufs[idx].sock);
00467 }
00468
00469 int sockbuf_set_sock(int idx, int sock, int flags)
00470 {
00471 int i;
00472
00473 if (!sockbuf_isvalid(idx)) return(-1);
00474
00475 sockbufs[idx].sock = sock;
00476 sockbufs[idx].flags &= ~(SOCKBUF_CONNECTING|SOCKBUF_CLIENT|SOCKBUF_SERVER|SOCKBUF_BLOCK|SOCKBUF_NOREAD);
00477 sockbufs[idx].flags |= flags;
00478 if (sockbufs[idx].flags & SOCKBUF_SERVER) {
00479 socket_get_name(sockbufs[idx].sock, &sockbufs[idx].my_ip, &sockbufs[idx].my_port);
00480 }
00481
00482
00483
00484
00485
00486
00487 for (i = 0; i < npollfds; i++) {
00488 if (idx_array[i] == idx) break;
00489 }
00490
00491 if (sock == -1) {
00492 if (i == npollfds) return(1);
00493
00494
00495 memmove(idx_array+i, idx_array+i+1, sizeof(int) * (npollfds-i-1));
00496 memmove(pollfds+i, pollfds+i+1, sizeof(*pollfds) * (nlisteners + npollfds-i-1));
00497 npollfds--;
00498 return(0);
00499 }
00500
00501
00502 if (i == npollfds) {
00503
00504 idx_array = realloc(idx_array, sizeof(int) * (i+1));
00505 idx_array[i] = idx;
00506
00507
00508 pollfds = realloc(pollfds, sizeof(*pollfds) * (i+nlisteners+1));
00509 memmove(pollfds+i+1, pollfds+i, sizeof(*pollfds) * nlisteners);
00510
00511 npollfds++;
00512 }
00513
00514 pollfds[i].fd = sock;
00515 pollfds[i].events = 0;
00516 if (flags & (SOCKBUF_BLOCK|SOCKBUF_CONNECTING)) pollfds[i].events |= POLLOUT;
00517 if (!(flags & SOCKBUF_NOREAD)) pollfds[i].events |= POLLIN;
00518
00519 return(idx);
00520 }
00521
00522 int sockbuf_get_peer(int idx, const char **peer_ip, int *peer_port)
00523 {
00524 if (!sockbuf_isvalid(idx)) return(-1);
00525 if (peer_ip) *peer_ip = sockbufs[idx].peer_ip;
00526 if (peer_port) *peer_port = sockbufs[idx].peer_port;
00527 return(0);
00528 }
00529
00530 int sockbuf_get_self(int idx, const char **my_ip, int *my_port)
00531 {
00532 if (!sockbuf_isvalid(idx)) return(-1);
00533 if (my_ip) *my_ip = sockbufs[idx].my_ip;
00534 if (my_port) *my_port = sockbufs[idx].my_port;
00535 return(0);
00536 }
00537
00538 int sockbuf_get_stats(int idx, sockbuf_stats_t **stats)
00539 {
00540 if (!sockbuf_isvalid(idx)) return(-1);
00541 if (stats) {
00542 *stats = sockbufs[idx].stats;
00543 update_stats(*stats);
00544 }
00545 return(0);
00546 }
00547
00548 int sockbuf_noread(int idx)
00549 {
00550 int i;
00551
00552 if (!sockbuf_isvalid(idx)) return(-1);
00553
00554
00555 for (i = 0; i < npollfds; i++) {
00556 if (idx_array[i] == idx) break;
00557 }
00558
00559 if (i == npollfds) return(-1);
00560
00561 pollfds[i].events &= (~POLLIN);
00562 return(0);
00563 }
00564
00565 int sockbuf_read(int idx)
00566 {
00567 int i;
00568
00569 if (!sockbuf_isvalid(idx)) return(-1);
00570
00571
00572 for (i = 0; i < npollfds; i++) {
00573 if (idx_array[i] == idx) break;
00574 }
00575
00576 if (i == npollfds) return(-1);
00577
00578 pollfds[i].events |= POLLIN;
00579 return(0);
00580 }
00581
00582 int sockbuf_isvalid(int idx)
00583 {
00584 if (idx >= 0 && idx < nsockbufs && !(sockbufs[idx].flags & (SOCKBUF_AVAIL | SOCKBUF_DELETED))) return(1);
00585 return(0);
00586 }
00587
00588 int sockbuf_close(int idx)
00589 {
00590 sockbuf_t *sbuf;
00591
00592 if (!sockbuf_isvalid(idx)) return(-1);
00593 sbuf = &sockbufs[idx];
00594 if (sbuf->sock >= 0) {
00595 socket_close(sbuf->sock);
00596 sockbuf_set_sock(idx, -1, 0);
00597 }
00598 return(0);
00599 }
00600
00601 int sockbuf_delete(int idx)
00602 {
00603 sockbuf_t *sbuf;
00604 int i;
00605
00606 if (!sockbuf_isvalid(idx)) return(-1);
00607 sbuf = &sockbufs[idx];
00608
00609 sbuf->flags |= SOCKBUF_DELETED;
00610
00611 for (i = 0; i < sbuf->nfilters; i++) {
00612 if (sbuf->filters[i]->on_delete) {
00613 sbuf->filters[i]->on_delete(sbuf->filter_client_data[i], idx);
00614 }
00615 }
00616
00617 if (sbuf->owner && sbuf->owner->on_delete) sbuf->owner->on_delete(sbuf->owner, sbuf->client_data);
00618
00619
00620 if (sbuf->sock >= 0) socket_close(sbuf->sock);
00621
00622
00623 if (sbuf->peer_ip) free(sbuf->peer_ip);
00624 if (sbuf->my_ip) free(sbuf->my_ip);
00625
00626
00627 if (sbuf->data) free(sbuf->data);
00628
00629
00630 if (sbuf->stats) free(sbuf->stats);
00631
00632
00633 if (sbuf->filters) free(sbuf->filters);
00634
00635
00636 if (sbuf->filter_client_data) free(sbuf->filter_client_data);
00637
00638
00639 memset(sbuf, 0, sizeof(*sbuf));
00640 sbuf->sock = -1;
00641 sbuf->flags = SOCKBUF_DELETED;
00642 sbuf->handler = &sockbuf_idler;
00643 ndeleted_sockbufs++;
00644
00645
00646 for (i = 0; i < npollfds; i++) if (idx_array[i] == idx) break;
00647 if (i == npollfds) return(0);
00648
00649 memmove(pollfds+i, pollfds+i+1, sizeof(*pollfds) * (npollfds+nlisteners-i-1));
00650 memmove(idx_array+i, idx_array+i+1, sizeof(int) * (npollfds-i-1));
00651 npollfds--;
00652
00653 return(0);
00654 }
00655
00656 int sockbuf_write(int idx, const char *data, int len)
00657 {
00658 if (!sockbuf_isvalid(idx)) return(-1);
00659 if (len < 0) len = strlen(data);
00660 sockbufs[idx].stats->bytes_out += len;
00661 return sockbuf_on_write(idx, SOCKBUF_LEVEL_WRITE_INTERNAL, data, len);
00662 }
00663
00664 int sockbuf_get_handler(int idx, sockbuf_handler_t **handler, void *client_data_ptr)
00665 {
00666 if (!sockbuf_isvalid(idx)) return(-1);
00667 if (handler) *handler = sockbufs[idx].handler;
00668 if (client_data_ptr) *(void **)client_data_ptr = sockbufs[idx].client_data;
00669
00670 return(0);
00671 }
00672
00673 int sockbuf_set_handler(int idx, sockbuf_handler_t *handler, void *client_data, event_owner_t *owner)
00674 {
00675 if (!sockbuf_isvalid(idx)) return(-1);
00676 sockbufs[idx].handler = handler;
00677 sockbufs[idx].client_data = client_data;
00678 sockbufs[idx].owner = owner;
00679
00680 return(0);
00681 }
00682
00683
00684
00685
00686
00687
00688
00689 int sockbuf_attach_listener(int fd)
00690 {
00691 pollfds = realloc(pollfds, sizeof(*pollfds) * (npollfds + nlisteners + 1));
00692 pollfds[npollfds+nlisteners].fd = fd;
00693 pollfds[npollfds+nlisteners].events = POLLIN;
00694 pollfds[npollfds+nlisteners].revents = 0;
00695 nlisteners++;
00696 return(0);
00697 }
00698
00699 int sockbuf_detach_listener(int fd)
00700 {
00701 int i;
00702
00703
00704 for (i = 0; i < nlisteners; i++) {
00705 if (pollfds[npollfds+i].fd == fd) break;
00706 }
00707 if (i < nlisteners) {
00708 memmove(pollfds+npollfds+i, pollfds+npollfds+i+1, sizeof(*pollfds) * (nlisteners-i-1));
00709 nlisteners--;
00710 }
00711 return(0);
00712 }
00713
00714
00715
00716
00717
00718
00719
00720
00721 int sockbuf_attach_filter(int idx, sockbuf_filter_t *filter, void *client_data)
00722 {
00723 sockbuf_t *sbuf;
00724 int i;
00725
00726 if (!sockbuf_isvalid(idx)) return(-1);
00727 sbuf = &sockbufs[idx];
00728
00729 sbuf->filters = realloc(sbuf->filters, sizeof(filter) * (sbuf->nfilters+1));
00730
00731 sbuf->filter_client_data = realloc(sbuf->filter_client_data, sizeof(void *) * (sbuf->nfilters+1));
00732
00733
00734
00735
00736
00737
00738 for (i = 0; i < sbuf->nfilters; i++) {
00739 if (filter->level < sbuf->filters[i]->level) break;
00740 }
00741
00742
00743 memmove(sbuf->filters+i+1, sbuf->filters+i, sizeof(filter) * (sbuf->nfilters-i));
00744 memmove(sbuf->filter_client_data+i+1, sbuf->filter_client_data+i, sizeof(void *) * (sbuf->nfilters-i));
00745
00746
00747 sbuf->filters[i] = filter;
00748 sbuf->filter_client_data[i] = client_data;
00749
00750 sbuf->nfilters++;
00751 return(0);
00752 }
00753
00754 int sockbuf_get_filter_data(int idx, sockbuf_filter_t *filter, void *client_data_ptr)
00755 {
00756 int i;
00757 sockbuf_t *sbuf;
00758
00759 if (!sockbuf_isvalid(idx)) return(-1);
00760 sbuf = &sockbufs[idx];
00761 for (i = 0; i < sbuf->nfilters; i++) {
00762 if (sbuf->filters[i] == filter) {
00763 if (client_data_ptr) *(void **)client_data_ptr = sbuf->filter_client_data[i];
00764 return(0);
00765 }
00766 }
00767 return(-1);
00768 }
00769
00770
00771
00772 int sockbuf_detach_filter(int idx, sockbuf_filter_t *filter, void *client_data_ptr)
00773 {
00774 int i;
00775 sockbuf_t *sbuf;
00776
00777 if (!sockbuf_isvalid(idx)) return(-1);
00778 sbuf = &sockbufs[idx];
00779
00780 for (i = 0; i < sbuf->nfilters; i++) if (sbuf->filters[i] == filter) break;
00781 if (i == sbuf->nfilters) {
00782 if (client_data_ptr) *(void **)client_data_ptr = NULL;
00783 return(-1);
00784 }
00785
00786 if (client_data_ptr) *(void **)client_data_ptr = sbuf->filter_client_data[i];
00787 memmove(sbuf->filter_client_data+i, sbuf->filter_client_data+i+1, sizeof(void *) * (sbuf->nfilters-i-1));
00788 memmove(sbuf->filters+i, sbuf->filters+i+1, sizeof(void *) * (sbuf->nfilters-i-1));
00789 sbuf->nfilters--;
00790 return(0);
00791 }
00792
00793
00794
00795
00796
00797
00798 int sockbuf_update_all(int timeout)
00799 {
00800 int i, n, flags, revents, idx;
00801 static int depth = 0;
00802
00803
00804 depth++;
00805
00806 n = poll(pollfds, npollfds, timeout);
00807 if (n < 0) n = npollfds;
00808
00809
00810
00811
00812
00813
00814 for (i = 0; n && i < npollfds; i++) {
00815
00816 revents = pollfds[i].revents;
00817 if (!revents) continue;
00818
00819 idx = idx_array[i];
00820 flags = sockbufs[idx].flags;
00821 if (revents & POLLOUT) {
00822 if (flags & SOCKBUF_CONNECTING) sockbuf_got_writable_client(idx);
00823 else sockbuf_got_writable(idx);
00824 }
00825 if (revents & POLLIN) {
00826 if (flags & SOCKBUF_SERVER) sockbuf_got_readable_server(idx);
00827 else sockbuf_got_readable(idx);
00828 }
00829 if (revents & (POLLHUP|POLLNVAL|POLLERR)) sockbuf_got_eof(idx, 0);
00830 n--;
00831 }
00832
00833
00834 depth--;
00835
00836
00837 if (ndeleted_sockbufs && !depth) {
00838 for (i = 0; ndeleted_sockbufs && i < nsockbufs; i++) {
00839 if (sockbufs[i].flags & SOCKBUF_DELETED) {
00840 sockbufs[i].flags = SOCKBUF_AVAIL;
00841 ndeleted_sockbufs--;
00842 }
00843 }
00844
00845
00846
00847 ndeleted_sockbufs = 0;
00848 }
00849
00850 return(0);
00851 }
00852
00853
00854 static void stats_out(sockbuf_stats_t *stats, int len)
00855 {
00856 timer_get_now(&stats->last_output_at);
00857 skip_stats(stats, stats->last_output_at.sec);
00858 stats->raw_bytes_out += len;
00859 stats->snapshot_out_bytes[stats->snapshot_counter] += len;
00860 }
00861
00862 static void stats_in(sockbuf_stats_t *stats, int len)
00863 {
00864 timer_get_now(&stats->last_input_at);
00865 skip_stats(stats, stats->last_input_at.sec);
00866 stats->raw_bytes_in += len;
00867 stats->snapshot_in_bytes[stats->snapshot_counter] += len;
00868 }
00869
00870 static void skip_stats(sockbuf_stats_t *stats, int curtime)
00871 {
00872 int diff, i;
00873
00874 diff = curtime - stats->last_snapshot;
00875 stats->last_snapshot = curtime;
00876 if (diff > 5) diff = 5;
00877
00878
00879 for (i = 0; i < diff; i++) {
00880 stats->snapshot_counter++;
00881 if (stats->snapshot_counter >= 5) stats->snapshot_counter = 0;
00882 stats->snapshot_out_bytes[stats->snapshot_counter] = 0;
00883 stats->snapshot_in_bytes[stats->snapshot_counter] = 0;
00884 }
00885 }
00886
00887
00888 static void update_stats(sockbuf_stats_t *stats)
00889 {
00890 int curtime = timer_get_now_sec(NULL);
00891 int nsecs;
00892 int snap_in = 0, snap_out = 0;
00893 int i;
00894
00895
00896 nsecs = curtime - stats->connected_at.sec + 1;
00897
00898 stats->total_in_cps = stats->raw_bytes_in / nsecs;
00899 stats->total_out_cps = stats->raw_bytes_out / nsecs;
00900
00901
00902
00903
00904 skip_stats(stats, curtime);
00905
00906
00907
00908
00909 for (i = 0; i < 5; i++) {
00910 if (i != stats->snapshot_counter) {
00911 snap_in += stats->snapshot_in_bytes[i];
00912 snap_out += stats->snapshot_out_bytes[i];
00913 }
00914 }
00915 if (nsecs > 4) nsecs = 4;
00916 else if (nsecs > 1) nsecs--;
00917 else nsecs = 1;
00918 stats->snapshot_in_cps = snap_in / nsecs;
00919 stats->snapshot_out_cps = snap_out / nsecs;
00920 }