r19350 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r19349‎ | r19350 | r19351 >
Date:21:19, 16 January 2007
Author:river
Status:old
Tags:
Comment:
split parts of net not used on solaris into net_unix to remove duplication of code; build net + (net_unix | net_solaris) depending on platform
Modified paths:
  • /trunk/loreley/src/loreley/Makefile.in (modified) (history)
  • /trunk/loreley/src/loreley/net.cc (modified) (history)
  • /trunk/loreley/src/loreley/net_solaris.cc (modified) (history)
  • /trunk/loreley/src/loreley/net_unix.cc (added) (history)

Diff [purge]

Index: trunk/loreley/src/loreley/Makefile.in
@@ -33,6 +33,7 @@
3434 log.cc \
3535 net.cc \
3636 net_solaris.cc \
 37+ net_unix.cc \
3738 preprocessor.cc \
3839 radix.cc \
3940 loreley.cc \
Index: trunk/loreley/src/loreley/net_solaris.cc
@@ -33,12 +33,6 @@
3434 #include "log.h"
3535 #include "http.h"
3636
37 -#define RDBUF_INC 8192 /* buffer in 8 KiB incrs */
38 -
39 -/* see ifname_to_address.cc */
40 -int ifname_to_address(int, sockaddr_in *, char const *);
41 -unsigned int if_nametoindex_wrap(const char *);
42 -
4337 enum evtype_t {
4438 evtype_timer,
4539 evtype_event
@@ -63,8 +57,6 @@
6458 evtype_t ep_type;
6559 };
6660
67 -pthread_t io_loop_thread;
68 -
6961 struct event_queue {
7062 event_queue(int p) : portfd(p) {}
7163 int portfd;
@@ -72,40 +64,6 @@
7365
7466 tss<event_queue *> ev_queue;
7567
76 -ioloop_t *ioloop;
77 -
78 -char current_time_str[30];
79 -char current_time_short[30];
80 -time_t current_time;
81 -
82 -static void secondly_sched(void);
83 -
84 -bool wnet_exit;
85 -vector<wsocket *> awaks;
86 -size_t cawak;
87 -
88 -void
89 -wnet_add_accept_wakeup(wsocket *s)
90 -{
91 - awaks.push_back(s);
92 -}
93 -
94 -net::event secondly_ev;
95 -
96 -static void
97 -secondly_update(void)
98 -{
99 - WDEBUG("secondly_update");
100 - wnet_set_time();
101 - secondly_sched();
102 -}
103 -
104 -static void
105 -secondly_sched(void)
106 -{
107 - secondly_ev.schedule(secondly_update, 1000);
108 -}
109 -
11068 pthread_cond_t iot_ready;
11169 pthread_mutex_t iot_ready_m;
11270
@@ -118,7 +76,6 @@
11977 signal_set(&ev_sigusr2, SIGUSR2, usr2_handler, NULL);
12078 signal_add(&ev_sigusr2, NULL);
12179 #endif
122 - io_loop_thread = pthread_self();
12380 pthread_mutex_lock(&iot_ready_m);
12481 pthread_cond_signal(&iot_ready);
12582 pthread_mutex_unlock(&iot_ready_m);
@@ -127,11 +84,6 @@
12885 return NULL;
12986 }
13087
131 -ioloop_t::ioloop_t(void)
132 -{
133 - prepare();
134 -}
135 -
13688 void
13789 ioloop_t::prepare(void)
13890 {
@@ -166,63 +118,8 @@
167119 lns->sock->readback(bind(&ioloop_t::_accept, this, _1, _2), -1);
168120 }
169121 wlog.notice("net: initialised, using Solaris I/O");
170 - secondly_sched();
171122 }
172123
173 -void
174 -ioloop_t::_accept(wsocket *s, int)
175 -{
176 - wsocket *newe;
177 -static rate_limited_logger enfile_log(60, ll_warn, "accept error: %s");
178 -
179 - if ((newe = s->accept("HTTP client", prio_norm)) == NULL) {
180 - if (errno == ENFILE || errno == EMFILE)
181 - enfile_log.log(strerror(errno));
182 - else
183 - wlog.warn(format("accept error: %s") % strerror(errno));
184 - s->readback(bind(&ioloop_t::_accept, this, _1, _2), -1);
185 - return;
186 - }
187 -
188 - s->readback(bind(&ioloop_t::_accept, this, _1, _2), -1);
189 -
190 - newe->nonblocking(true);
191 -
192 - if (cawak == awaks.size())
193 - cawak = 0;
194 -char buf[sizeof(wsocket *) * 2];
195 - memcpy(buf, &newe, sizeof(newe));
196 - memcpy(buf + sizeof(newe), &s, sizeof(s));
197 - WDEBUG(format("_accept, lsnr=%d") % s);
198 -
199 - if (awaks[cawak]->write(buf, sizeof(wsocket *) * 2) < 0) {
200 - wlog.error(format("writing to thread wakeup socket: %s")
201 - % strerror(errno));
202 - exit(1);
203 - }
204 - cawak++;
205 - return;
206 -}
207 -
208 -void
209 -wnet_set_time(void)
210 -{
211 -struct tm *now;
212 - time_t old = current_time;
213 - size_t n;
214 -
215 - current_time = time(NULL);
216 - if (current_time == old)
217 - return;
218 -
219 - now = gmtime(&current_time);
220 -
221 - n = strftime(current_time_str, sizeof(current_time_str), "%a, %d %b %Y %H:%M:%S GMT", now);
222 - assert(n);
223 - n = strftime(current_time_short, sizeof(current_time_short), "%Y-%m-%d %H:%M:%S", now);
224 - assert(n);
225 -}
226 -
227124 namespace net {
228125
229126 event::event(void)
@@ -273,314 +170,6 @@
274171 (what & FDE_WRITE ? POLLWRNORM : 0), this);
275172 }
276173
277 -address::address(void)
278 -{
279 - memset(&_addr, 0, sizeof(_addr));
280 - _addrlen = 0;
281 - _fam = AF_UNSPEC;
282 - _stype = _prot = 0;
283 -}
284 -
285 -address::address(sockaddr *sa, socklen_t len)
286 -{
287 - memcpy(&_addr, sa, len);
288 - _addrlen = len;
289 - _stype = _prot = 0;
290 - _fam = ((sockaddr_storage *)sa)->ss_family;
291 -}
292 -
293 -address::address(addrinfo *ai)
294 -{
295 - memcpy(&_addr, ai->ai_addr, ai->ai_addrlen);
296 - _addrlen = ai->ai_addrlen;
297 - _fam = ai->ai_family;
298 - _stype = ai->ai_socktype;
299 - _prot = ai->ai_protocol;
300 -}
301 -
302 -socket *
303 -address::makesocket(char const *desc, sprio p) const
304 -{
305 - return new socket(*this, desc, p);
306 -}
307 -
308 -address::address(address const &o)
309 - : _addrlen(o._addrlen)
310 - , _fam(o._fam)
311 - , _stype(o._stype)
312 - , _prot(o._prot) {
313 - memcpy(&_addr, &o._addr, _addrlen);
314 -}
315 -
316 -address &
317 -address::operator= (address const &o)
318 -{
319 - _addrlen = o._addrlen;
320 - _fam = o._fam;
321 - _stype = o._stype;
322 - _prot = o._prot;
323 - memcpy(&_addr, &o._addr, _addrlen);
324 - return *this;
325 -}
326 -
327 -string const &
328 -address::straddr(bool lng) const
329 -{
330 -char res[NI_MAXHOST];
331 -int i;
332 - if (!lng) {
333 - if (_shortaddr.empty()) {
334 - if ((i = getnameinfo(sockaddr_cast<sockaddr const *>(&_addr),
335 - _addrlen, res, sizeof(res), NULL, 0, NI_NUMERICHOST)) != 0)
336 - throw resolution_error(i);
337 - _shortaddr = res;
338 - }
339 - return _shortaddr;
340 - }
341 -
342 - if (_straddr.empty()) {
343 - char port[NI_MAXSERV];
344 - if ((i = getnameinfo(sockaddr_cast<sockaddr const *>(&_addr),
345 - _addrlen, res, sizeof(res), port, sizeof(port),
346 - NI_NUMERICHOST | NI_NUMERICSERV)) != 0)
347 - throw resolution_error(i);
348 - _straddr = str(format("[%s]:%s") % res % port);
349 - }
350 - return _straddr;
351 -}
352 -
353 -addrlist *
354 -addrlist::resolve(string const &addr, string const &port,
355 - enum socktype socktype, int family)
356 -{
357 -addrinfo hints, *res, *ai;
358 -int r;
359 - memset(&hints, 0, sizeof(hints));
360 - hints.ai_socktype = (int) socktype;
361 - if (family != AF_UNSPEC)
362 - hints.ai_family = family;
363 -
364 - if ((r = getaddrinfo(addr.c_str(),
365 - port.c_str(), &hints, &res)) != 0)
366 - throw resolution_error(r);
367 -
368 -addrlist *al = new addrlist;
369 - for (ai = res; ai; ai = ai->ai_next)
370 - al->_addrs.push_back(address(ai));
371 -
372 - freeaddrinfo(res);
373 - return al;
374 -}
375 -
376 -address
377 -addrlist::first(string const &addr, int port,
378 - enum socktype socktype, int family)
379 -{
380 - return first(addr, lexical_cast<string>(port), socktype, family);
381 -}
382 -
383 -addrlist *
384 -addrlist::resolve(string const &addr, int port,
385 - enum socktype socktype, int family)
386 -{
387 - return resolve(addr, lexical_cast<string>(port), socktype, family);
388 -}
389 -
390 -address
391 -addrlist::first(string const &addr, string const &port,
392 - enum socktype socktype, int family)
393 -{
394 -addrlist *r = addrlist::resolve(addr, port, socktype, family);
395 -address res;
396 - res = *r->begin();
397 - delete r;
398 - return res;
399 -}
400 -
401 -addrlist::~addrlist(void)
402 -{
403 -}
404 -
405 -addrlist::iterator
406 -addrlist::begin(void) const
407 -{
408 - return _addrs.begin();
409 -}
410 -
411 -addrlist::iterator
412 -addrlist::end(void) const
413 -{
414 - return _addrs.end();
415 -}
416 -
417 -socket *
418 -addrlist::makesocket(char const *desc, sprio p) const
419 -{
420 -iterator it = _addrs.begin(), end = _addrs.end();
421 - for (; it != end; ++it) {
422 - socket *ns;
423 - if ((ns = it->makesocket(desc, p)) != NULL)
424 - return ns;
425 - }
426 - throw socket_error();
427 -}
428 -
429 -socket *
430 -socket::create(string const &addr, int port,
431 - enum socktype socktype, char const *desc, sprio p, int family)
432 -{
433 - return create(addr, lexical_cast<string>(port), socktype, desc, p, family);
434 -}
435 -
436 -socket *
437 -socket::create(string const &addr, string const &port,
438 - enum socktype socktype, char const *desc, sprio p, int family)
439 -{
440 -addrlist *al = addrlist::resolve(addr, port, socktype, family);
441 - return al->makesocket(desc, p);
442 -}
443 -
444 -pair<socket *, socket *>
445 -socket::socketpair(enum socktype st)
446 -{
447 -socket *s1 = NULL, *s2 = NULL;
448 -int sv[2];
449 - if (::socketpair(AF_UNIX, (int) st, 0, sv) == -1)
450 - throw socket_error();
451 - s1 = new socket(sv[0], net::address(), "socketpair", prio_norm);
452 - try {
453 - s2 = new socket(sv[1], net::address(), "socketpair", prio_norm);
454 - } catch (...) {
455 - delete s1;
456 - throw;
457 - }
458 - return make_pair(s1, s2);
459 -}
460 -
461 -connect_status
462 -socket::connect(void)
463 -{
464 - if (::connect(_s, _addr.addr(), _addr.length()) == -1)
465 - if (errno == EINPROGRESS)
466 - return connect_later;
467 - else
468 - throw socket_error();
469 - return connect_okay;
470 -}
471 -
472 -socket *
473 -socket::accept(char const *desc, sprio p)
474 -{
475 -int ns;
476 -sockaddr_storage addr;
477 -socklen_t addrlen = sizeof(addr);
478 - if ((ns = ::accept(_s, (sockaddr *)&addr, &addrlen)) == -1)
479 - return NULL;
480 -
481 - /*
482 - * If TCP_CORK is not supported, disable Nagle's algorithm on the
483 - * socket to prevent delays in HTTP keepalive (at the expense of
484 - * sending more packets).
485 - */
486 -#if !defined(TCP_CORK) && defined(TCP_NODELAY)
487 -int one = 1;
488 - setsockopt(ns, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
489 -#endif
490 -
491 - return new socket(ns, net::address((sockaddr *)&addr, addrlen), desc, p);
492 -}
493 -
494 -int
495 -socket::recvfrom(char *buf, size_t count, net::address &addr)
496 -{
497 -sockaddr_storage saddr;
498 -socklen_t addrlen = sizeof(addr);
499 -int i;
500 - if ((i = ::recvfrom(_s, buf, count, 0, (sockaddr *)&saddr, &addrlen)) < 0)
501 - return i;
502 - WDEBUG(format("recvfrom: fam=%d") % saddr.ss_family);
503 - addr = net::address((sockaddr *)&saddr, addrlen);
504 - return i;
505 -}
506 -
507 -int
508 -socket::sendto(char const *buf, size_t count, net::address const &addr)
509 -{
510 - return ::sendto(_s, buf, count, 0, addr.addr(), addr.length());
511 -}
512 -
513 -void
514 -socket::nonblocking(bool v)
515 -{
516 -int val;
517 - val = fcntl(_s, F_GETFL, 0);
518 - if (val == -1)
519 - throw socket_error();
520 - if (v)
521 - val |= O_NONBLOCK;
522 - else val &= ~O_NONBLOCK;
523 -
524 - if (fcntl(_s, F_SETFL, val) == -1)
525 - throw socket_error();
526 -}
527 -
528 -void
529 -socket::reuseaddr(bool v)
530 -{
531 -int i = v;
532 -int len = sizeof(i);
533 - setopt(SOL_SOCKET, SO_REUSEADDR, &i, len);
534 -}
535 -
536 -void
537 -socket::cork(void)
538 -{
539 -#ifdef TCP_CORK
540 -int one = 1;
541 - setopt(IPPROTO_TCP, TCP_CORK, &one, sizeof(one));
542 -#endif
543 -}
544 -
545 -void
546 -socket::uncork(void)
547 -{
548 -#ifdef TCP_CORK
549 -int zero = 0;
550 - setopt(IPPROTO_TCP, TCP_CORK, &zero, sizeof(zero));
551 -#endif
552 -}
553 -
554 -int
555 -socket::getopt(int level, int what, void *addr, socklen_t *len) const
556 -{
557 -int i;
558 - if ((i = getsockopt(_s, level, what, addr, len)) == -1)
559 - throw socket_error();
560 - return i;
561 -}
562 -
563 -int
564 -socket::setopt(int level, int what, void *addr, socklen_t len)
565 -{
566 -int i;
567 - if ((i = setsockopt(_s, level, what, addr, len)) == -1)
568 - throw socket_error();
569 - return i;
570 -}
571 -
572 -int
573 -socket::error(void) const
574 -{
575 -int error = 0;
576 -socklen_t len = sizeof(error);
577 - try {
578 - getopt(SOL_SOCKET, SO_ERROR, &error, &len);
579 - return error;
580 - } catch (socket_error &) {
581 - return 0;
582 - }
583 -}
584 -
585174 socket::socket(int s, net::address const &a, char const *desc, sprio p)
586175 : _addr(a)
587176 , _desc(desc)
@@ -601,22 +190,6 @@
602191 throw socket_error();
603192 }
604193
605 -void
606 -socket::bind(void)
607 -{
608 -int one = 1;
609 - setopt(SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
610 - if (sfun::bind(_s, _addr.addr(), _addr.length()) == -1)
611 - throw socket_error();
612 -}
613 -
614 -void
615 -socket::listen(int bl)
616 -{
617 - if (::listen(_s, bl) == -1)
618 - throw socket_error();
619 -}
620 -
621194 socket::~socket(void)
622195 {
623196 WDEBUG("closing socket");
@@ -631,64 +204,6 @@
632205 port_dissociate(_queue->portfd, PORT_SOURCE_FD, _s);
633206 }
634207
635 -void
636 -socket::mcast_join(string const &ifname)
637 -{
638 - switch (_addr.family()) {
639 - case AF_INET: {
640 - struct address ifaddr = address::from_ifname(_s, ifname);
641 - sockaddr_in *inbind = (sockaddr_in *)_addr.addr();
642 - sockaddr_in *inif = (sockaddr_in *)ifaddr.addr();
643 - ip_mreq mr;
644 - memset(&mr, 0, sizeof(mr));
645 - mr.imr_multiaddr.s_addr = inbind->sin_addr.s_addr;
646 - mr.imr_interface.s_addr = inif->sin_addr.s_addr;
647 - WDEBUG(format("NET: %s joins mcast on if %s")
648 - % straddr() % ifaddr.straddr());
649 - setopt(IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr, sizeof(mr));
650 - break;
651 - }
652 -
653 - case AF_INET6: {
654 -#ifdef IPV6_ADD_MEMBERSHIP
655 - u_int ifindex = address::ifname_to_index(ifname);
656 - sockaddr_in6 *inbind = (sockaddr_in6 *)_addr.addr();
657 - ipv6_mreq mr;
658 - memset(&mr, 0, sizeof(mr));
659 - memcpy(&mr.ipv6mr_multiaddr, &inbind->sin6_addr,
660 - sizeof(mr.ipv6mr_multiaddr));
661 - mr.ipv6mr_interface = ifindex;
662 - setopt(IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mr, sizeof(mr));
663 -#else
664 - wlog.warn("IPv6 multicast not supported on this platform");
665 -#endif
666 - break;
667 - }
668 -
669 - default:
670 - throw socket_error("multicast join not applicable for this socket type");
671 - }
672 -}
673 -
674 -u_int
675 -address::ifname_to_index(string const &ifname)
676 -{
677 -u_int ret = if_nametoindex_wrap(ifname.c_str());
678 - if (ret == 0)
679 - throw socket_error("named interface does not exist");
680 - return ret;
681 -}
682 -
683 -address
684 -address::from_ifname(int s, string const &ifname)
685 -{
686 -sockaddr_in addr;
687 - if (ifname_to_address(s, &addr, ifname.c_str()) < 0)
688 - throw socket_error();
689 -address ret((sockaddr *)&addr, sizeof(sockaddr_in));
690 - return ret;
691 -}
692 -
693208 } // namespace net
694209
695210 void
@@ -704,7 +219,6 @@
705220 // signal_set(&ev_sigterm, SIGTERM, sig_exit, NULL);
706221 // signal_add(&ev_sigterm, NULL);
707222 // }
708 - io_loop_thread = pthread_self();
709223 ev_queue = new event_queue * (new event_queue(port_create()));
710224 }
711225
@@ -776,7 +290,6 @@
777291 this->ev_queue = (event_queue *)::ev_queue;
778292 evtimer_set(&ev_event, timer_callback, this);
779293 ev_pending_list.insert(make_pair(0, ev_pending(&ev_event, ev_when, evtype_timer)));
780 - pthread_kill(io_loop_thread, SIGUSR2);
781294 #endif
782295 }
783296
Index: trunk/loreley/src/loreley/net.cc
@@ -1,5 +1,5 @@
22 /* Loreley: Lightweight HTTP reverse-proxy. */
3 -/* net: Networking. */
 3+/* net: OS agnostic networking. */
44 /* Copyright (c) 2005, 2006 River Tarnell <river@attenuate.org>. */
55 /*
66 * Permission is granted to anyone to use this software for any purpose,
@@ -15,133 +15,32 @@
1616 # pragma hdrstop
1717 #endif
1818
19 -#ifndef SOLARIS_IO
20 -
2119 namespace sfun {
2220 using ::bind; /* because of conflict with boost::bind from util.h */
2321 };
2422
25 -/*
26 - * libevent needs these
27 - */
28 -#ifndef HAVE_U_INT8_T
29 -typedef uint8_t u_int8_t;
30 -#endif
31 -
32 -#ifndef HAVE_U_INT16_T
33 -typedef uint16_t u_int16_t;
34 -#endif
35 -
36 -#ifndef HAVE_U_INT32_T
37 -typedef uint32_t u_int32_t;
38 -#endif
39 -
40 -#ifndef HAVE_U_INT64_T
41 -typedef uint64_t u_int64_t;
42 -#endif
43 -
4423 using std::deque;
4524 using std::signal;
4625 using std::multimap;
4726
48 -#include <event.h>
49 -
5027 #include "loreley.h"
5128 #include "net.h"
5229 #include "config.h"
5330 #include "log.h"
5431 #include "http.h"
5532
56 -#define RDBUF_INC 8192 /* buffer in 8 KiB incrs */
57 -
5833 /* see ifname_to_address.cc */
5934 int ifname_to_address(int, sockaddr_in *, char const *);
6035 unsigned int if_nametoindex_wrap(const char *);
6136
62 -::event ev_sigint;
63 -::event ev_sigterm;
64 -//tss<event_base> evb;
65 -lockable ev_lock;
 37+static void secondly_sched(void);
6638
67 -enum evtype_t {
68 - evtype_timer,
69 - evtype_event
70 -};
71 -
72 -struct event_impl {
73 - void schedule(int64_t);
74 -
75 - int64_t ev_when;
76 - event_queue *ev_queue;
77 - function<void (void)> ev_func;
78 - ::event ev_event;
79 -};
80 -
81 -struct ev_pending {
82 - ev_pending(::event *ev, int64_t to, evtype_t type)
83 - : ep_event(ev)
84 - , ep_timeout(to)
85 - , ep_type(type) {}
86 -
87 - ::event *ep_event;
88 - int64_t ep_timeout;
89 - evtype_t ep_type;
90 -};
91 -
92 -multimap<int, ev_pending> ev_pending_list;
93 -
94 -pthread_t io_loop_thread;
95 -
96 -struct eq_entry {
97 - eq_entry (net::socket *s, event_impl *ev, int flags)
98 - : ee_sock(s)
99 - , ee_event(ev)
100 - , ee_flags(flags) {}
101 -
102 - net::socket *ee_sock;
103 - event_impl *ee_event;
104 - int ee_flags;
105 -};
106 -
107 -struct event_queue {
108 - event_queue() {
109 - pthread_mutex_init(&eq_lock, NULL);
110 - pthread_cond_init(&eq_cond, NULL);
111 - }
112 -
113 - void add(net::socket *sock, event_impl *event, int flags) {
114 - struct lvars {
115 - lvars(pthread_mutex_t *l)
116 - : lock(l) {
117 - pthread_mutex_lock(lock);
118 - }
119 - ~lvars() {
120 - pthread_mutex_unlock(lock);
121 - }
122 - pthread_mutex_t *lock;
123 - } v(&eq_lock);
124 - WDEBUG("event_queue::add: adding event");
125 - eq_events.push_back(new eq_entry(sock, event, flags));
126 - pthread_cond_signal(&eq_cond);
127 - }
128 -
129 - pthread_mutex_t eq_lock;
130 - pthread_cond_t eq_cond;
131 - deque<eq_entry *> eq_events;
132 -};
133 -
134 -tss<event_queue> ev_queue;
135 -
136 -static void sig_exit(int, short, void *);
137 -
13839 ioloop_t *ioloop;
13940
14041 char current_time_str[30];
14142 char current_time_short[30];
14243 time_t current_time;
14344
144 -static void secondly_sched(void);
145 -
14645 bool wnet_exit;
14746 vector<wsocket *> awaks;
14847 size_t cawak;
@@ -168,78 +67,9 @@
16968 secondly_ev.schedule(secondly_update, 1000);
17069 }
17170
172 -pthread_cond_t iot_ready;
173 -pthread_mutex_t iot_ready_m;
174 -
175 -pthread_t io_thread;
176 -void
177 -usr2_handler(int, short, void *)
178 -{
179 - WDEBUG("got USR2");
180 - event_loopexit(NULL);
181 -}
182 -
183 -::event ev_sigusr2;
184 -
185 -void *
186 -io_start(void *)
187 -{
188 - signal_set(&ev_sigusr2, SIGUSR2, usr2_handler, NULL);
189 - signal_add(&ev_sigusr2, NULL);
190 -
191 - io_loop_thread = pthread_self();
192 - pthread_mutex_lock(&iot_ready_m);
193 - pthread_cond_signal(&iot_ready);
194 - pthread_mutex_unlock(&iot_ready_m);
195 -
196 - ioloop->run();
197 - return NULL;
198 -}
199 -
200 -
201 -
20271 ioloop_t::ioloop_t(void)
20372 {
20473 prepare();
205 -}
206 -
207 -void
208 -ioloop_t::prepare(void)
209 -{
210 -size_t i;
211 -
212 - event_init();
213 - signal(SIGUSR2, SIG_IGN);
214 - pthread_mutex_init(&iot_ready_m, NULL);
215 - pthread_cond_init(&iot_ready, NULL);
216 -
217 - pthread_mutex_lock(&iot_ready_m);
218 - pthread_create(&io_thread, NULL, io_start, NULL);
219 - pthread_cond_wait(&iot_ready, &iot_ready_m);
220 - pthread_mutex_unlock(&iot_ready_m);
221 -
222 - wlog.notice(format("maximum number of open files: %d")
223 - % getdtablesize());
224 -
225 - signal(SIGPIPE, SIG_IGN);
226 -
227 - for (i = 0; i < listeners.size(); ++i) {
228 - listener *lns = listeners[i];
229 -
230 - try {
231 - lns->sock->reuseaddr(true);
232 - lns->sock->bind();
233 - lns->sock->listen();
234 - } catch (socket_error &e) {
235 - wlog.error(format("creating listener %s: %s")
236 - % lns->sock->straddr() % e.what());
237 - exit(8);
238 - }
239 -
240 - lns->sock->readback(bind(&ioloop_t::_accept, this, _1, _2), -1);
241 - }
242 - wlog.notice(format("net: initialised, using libevent %s (%s)")
243 - % event_get_version() % event_get_method());
24474 secondly_sched();
24575 }
24676
@@ -299,78 +129,6 @@
300130
301131 namespace net {
302132
303 -void
304 -socket_callback(int fd, short ev, void *d)
305 -{
306 -wsocket *s = (wsocket *)d;
307 -
308 - HOLDING(ev_lock);
309 - WDEBUG(format("[%d] _ev_callback: %s%son %d (%s) queue %p")
310 - % pthread_self()
311 - % ((ev & EV_READ) ? "read " : "")
312 - % ((ev & EV_WRITE) ? "write " : "")
313 - % fd % s->_desc
314 - % s->_queue);
315 -
316 - s->_queue->add(s, NULL, ev);
317 -}
318 -
319 -event::event(void)
320 -{
321 - impl = new event_impl;
322 -}
323 -
324 -event::~event(void)
325 -{
326 - delete impl;
327 -};
328 -
329 -void
330 -event::schedule(function<void (void)> f, int64_t t)
331 -{
332 - impl->ev_func = f;
333 - impl->schedule(t);
334 -}
335 -
336 -void
337 -timer_callback(int, short, void *d)
338 -{
339 -event_impl *ev = (event_impl *)d;
340 - HOLDING(ev_lock);
341 - ev->ev_queue->add(NULL, ev, 0);
342 -}
343 -
344 -void
345 -socket::_register(int what, int64_t to, socket::call_type handler)
346 -{
347 - _ev_flags = 0;
348 - _queue = (event_queue *)ev_queue;
349 -
350 - WDEBUG(format("_register: %s%son %d (%s), queue %p")
351 - % ((what & FDE_READ) ? "read " : "")
352 - % ((what & FDE_WRITE) ? "write " : "")
353 - % _s % _desc % _queue);
354 -
355 - HOLDING(ev_lock);
356 -
357 - if (what & FDE_READ) {
358 - _read_handler = handler;
359 - _ev_flags |= EV_READ;
360 - }
361 - if (what & FDE_WRITE) {
362 - _write_handler = handler;
363 - _ev_flags |= EV_WRITE;
364 - }
365 -
366 - event_set(ev, _s, _ev_flags, socket_callback, this);
367 - event_priority_set(ev, (int) _prio);
368 -
369 - WDEBUG(format("timeout = %d") % to);
370 -
371 - ev_pending_list.insert(make_pair(_s, ev_pending(ev, to, evtype_event)));
372 - pthread_kill(io_loop_thread, SIGUSR2);
373 -}
374 -
375133 address::address(void)
376134 {
377135 memset(&_addr, 0, sizeof(_addr));
@@ -679,28 +437,6 @@
680438 }
681439 }
682440
683 -socket::socket(int s, net::address const &a, char const *desc, sprio p)
684 - : _addr(a)
685 - , _desc(desc)
686 - , _prio(p)
687 -{
688 - ev = new ::event;
689 - memset(ev, 0, sizeof(*ev));
690 - _s = s;
691 -}
692 -
693 -socket::socket(net::address const &a, char const *desc, sprio p)
694 - : _addr(a)
695 - , _desc(desc)
696 - , _prio(p)
697 -{
698 - ev = new ::event;
699 - memset(ev, 0, sizeof(*ev));
700 - _s = ::socket(_addr.family(), _addr.socktype(), _addr.protocol());
701 - if (_s == -1)
702 - throw socket_error();
703 -}
704 -
705441 void
706442 socket::bind(void)
707443 {
@@ -717,27 +453,7 @@
718454 throw socket_error();
719455 }
720456
721 -socket::~socket(void)
722 -{
723 - WDEBUG("closing socket");
724 - HOLDING(ev_lock);
725 - multimap<int, ev_pending>::iterator it;
726 - it = ev_pending_list.find(_s);
727 - if (it != ev_pending_list.end())
728 - ev_pending_list.erase(it);
729 -
730 - event_del(ev);
731 - delete ev;
732 - close(_s);
733 -}
734 -
735457 void
736 -socket::clearbacks(void)
737 -{
738 - event_del(ev);
739 -}
740 -
741 -void
742458 socket::mcast_join(string const &ifname)
743459 {
744460 switch (_addr.family()) {
@@ -796,134 +512,3 @@
797513 }
798514
799515 } // namespace net
800 -
801 -void
802 -make_event_base(void)
803 -{
804 -//static lockable meb_lock;
805 -// if (evb == NULL) {
806 -// HOLDING(meb_lock);
807 -// evb = (event_base *)event_init();
808 -// event_base_priority_init(evb, prio_max);
809 -// signal_set(&ev_sigint, SIGINT, sig_exit, NULL);
810 -// signal_add(&ev_sigint, NULL);
811 -// signal_set(&ev_sigterm, SIGTERM, sig_exit, NULL);
812 -// signal_add(&ev_sigterm, NULL);
813 -// }
814 - io_loop_thread = pthread_self();
815 - ev_queue = new event_queue;
816 -}
817 -
818 -void
819 -sig_exit(int sig, short what, void *d)
820 -{
821 - wnet_exit = true;
822 -}
823 -
824 -void
825 -ioloop_t::run(void)
826 -{
827 - WDEBUG(format("[%d] ioloop run: running") % pthread_self());
828 - while (!wnet_exit) {
829 - event_loop(EVLOOP_ONCE);
830 - WDEBUG("ioloop thread: got event");
831 -
832 - {
833 -
834 - HOLDING(ev_lock);
835 - multimap<int, ev_pending>::iterator it = ev_pending_list.begin(),
836 - end = ev_pending_list.end();
837 - for (; it != end; ++it) {
838 - WDEBUG("ioloop thread: processing new event");
839 - if (event_pending(it->second.ep_event, EV_READ | EV_WRITE, NULL))
840 - event_del(it->second.ep_event);
841 -
842 - if (it->second.ep_type == evtype_event) {
843 - if (it->second.ep_timeout == -1) {
844 - event_add(it->second.ep_event, NULL);
845 - } else {
846 - timeval tv;
847 - int64_t usec = it->second.ep_timeout * 1000;
848 - tv.tv_sec = usec / 1000000;
849 - tv.tv_usec = usec % 1000000;
850 - WDEBUG(format("timeout: %d %d") % tv.tv_sec % tv.tv_usec);
851 - event_add(it->second.ep_event, &tv);
852 - }
853 - } else if (it->second.ep_type == evtype_timer) {
854 - timeval tv;
855 - int64_t usec = it->second.ep_timeout * 1000;
856 - tv.tv_sec = usec / 1000000;
857 - tv.tv_usec = usec % 1000000;
858 - WDEBUG(format("timeout: %d %d") % tv.tv_sec % tv.tv_usec);
859 - evtimer_add(it->second.ep_event, &tv);
860 - } else
861 - abort();
862 - }
863 - ev_pending_list.clear();
864 - }
865 - }
866 -
867 -size_t i;
868 - for (i = 0; i < listeners.size(); ++i)
869 - delete listeners[i];
870 -}
871 -
872 -void
873 -ioloop_t::thread_run(void)
874 -{
875 - /*
876 - * ioloop for a single thread. uses a condition variable and a queue.
877 - */
878 -event_queue *eq = (event_queue *)ev_queue;
879 -
880 - for (;;) {
881 - deque<eq_entry *> evs;
882 - WDEBUG(format("[%d] thread_run: waiting for event, eq=%p")
883 - % pthread_self() % eq);
884 - pthread_mutex_lock(&eq->eq_lock);
885 - if (eq->eq_events.empty())
886 - pthread_cond_wait(&eq->eq_cond, &eq->eq_lock);
887 - WDEBUG(format("[%d] thread_run: got event") % pthread_self());
888 - evs = eq->eq_events;
889 - eq->eq_events.clear();
890 - pthread_mutex_unlock(&eq->eq_lock);
891 - for (deque<eq_entry *>::iterator
892 - it = evs.begin(),
893 - end = evs.end(); it != end; ++it) {
894 - if ((*it)->ee_sock) {
895 - WDEBUG(format("[%d] thread_run: got event on %s")
896 - % pthread_self() % (*it)->ee_sock->_desc);
897 - if ((*it)->ee_flags & EV_READ)
898 - (*it)->ee_sock->_read_handler((*it)->ee_sock, false);
899 - if ((*it)->ee_flags & EV_WRITE)
900 - (*it)->ee_sock->_write_handler((*it)->ee_sock, false);
901 - if ((*it)->ee_flags & EV_TIMEOUT) {
902 - if ((*it)->ee_sock->_ev_flags & EV_READ) {
903 - (*it)->ee_sock->_read_handler((*it)->ee_sock, true);
904 - } else if ((*it)->ee_sock->_ev_flags & EV_WRITE) {
905 - (*it)->ee_sock->_write_handler((*it)->ee_sock, true);
906 - }
907 - }
908 - } else {
909 - WDEBUG("event thread");
910 - (*it)->ee_event->ev_func();
911 - }
912 - delete *it;
913 - }
914 - }
915 -}
916 -
917 -
918 -void
919 -event_impl::schedule(int64_t when)
920 -{
921 - HOLDING(ev_lock);
922 - WDEBUG(format("schedule, when=%d") % when);
923 - this->ev_when = when;
924 - this->ev_queue = (event_queue *)::ev_queue;
925 - evtimer_set(&ev_event, timer_callback, this);
926 - ev_pending_list.insert(make_pair(0, ev_pending(&ev_event, ev_when, evtype_timer)));
927 - pthread_kill(io_loop_thread, SIGUSR2);
928 -}
929 -
930 -#endif /* !SOLARIS_IO */
Index: trunk/loreley/src/loreley/net_unix.cc
@@ -0,0 +1,547 @@
 2+/* Loreley: Lightweight HTTP reverse-proxy. */
 3+/* net_unix: Unix (other than Solaris)-specific networking. */
 4+/* Copyright (c) 2005, 2006 River Tarnell <river@attenuate.org>. */
 5+/*
 6+ * Permission is granted to anyone to use this software for any purpose,
 7+ * including commercial applications, and to alter it and redistribute it
 8+ * freely. This software is provided 'as-is', without any express or implied
 9+ * warranty.
 10+ */
 11+
 12+/* @(#) $Id$ */
 13+
 14+#include "stdinc.h"
 15+#ifdef __INTEL_COMPILER
 16+# pragma hdrstop
 17+#endif
 18+
 19+#ifndef SOLARIS_IO
 20+
 21+namespace sfun {
 22+ using ::bind; /* because of conflict with boost::bind from util.h */
 23+};
 24+
 25+/*
 26+ * libevent needs these
 27+ */
 28+#ifndef HAVE_U_INT8_T
 29+typedef uint8_t u_int8_t;
 30+#endif
 31+
 32+#ifndef HAVE_U_INT16_T
 33+typedef uint16_t u_int16_t;
 34+#endif
 35+
 36+#ifndef HAVE_U_INT32_T
 37+typedef uint32_t u_int32_t;
 38+#endif
 39+
 40+#ifndef HAVE_U_INT64_T
 41+typedef uint64_t u_int64_t;
 42+#endif
 43+
 44+using std::deque;
 45+using std::signal;
 46+using std::multimap;
 47+
 48+#include <event.h>
 49+
 50+#include "loreley.h"
 51+#include "net.h"
 52+#include "config.h"
 53+#include "log.h"
 54+#include "http.h"
 55+
 56+/* see ifname_to_address.cc */
 57+int ifname_to_address(int, sockaddr_in *, char const *);
 58+unsigned int if_nametoindex_wrap(const char *);
 59+
 60+::event ev_sigint;
 61+::event ev_sigterm;
 62+//tss<event_base> evb;
 63+lockable ev_lock;
 64+
 65+enum evtype_t {
 66+ evtype_timer,
 67+ evtype_event
 68+};
 69+
 70+struct event_impl {
 71+ void schedule(int64_t);
 72+
 73+ int64_t ev_when;
 74+ event_queue *ev_queue;
 75+ function<void (void)> ev_func;
 76+ ::event ev_event;
 77+};
 78+
 79+struct ev_pending {
 80+ ev_pending(::event *ev, int64_t to, evtype_t type)
 81+ : ep_event(ev)
 82+ , ep_timeout(to)
 83+ , ep_type(type) {}
 84+
 85+ ::event *ep_event;
 86+ int64_t ep_timeout;
 87+ evtype_t ep_type;
 88+};
 89+
 90+multimap<int, ev_pending> ev_pending_list;
 91+
 92+pthread_t io_loop_thread;
 93+
 94+struct eq_entry {
 95+ eq_entry (net::socket *s, event_impl *ev, int flags)
 96+ : ee_sock(s)
 97+ , ee_event(ev)
 98+ , ee_flags(flags) {}
 99+
 100+ net::socket *ee_sock;
 101+ event_impl *ee_event;
 102+ int ee_flags;
 103+};
 104+
 105+struct event_queue {
 106+ event_queue() {
 107+ pthread_mutex_init(&eq_lock, NULL);
 108+ pthread_cond_init(&eq_cond, NULL);
 109+ }
 110+
 111+ void add(net::socket *sock, event_impl *event, int flags) {
 112+ struct lvars {
 113+ lvars(pthread_mutex_t *l)
 114+ : lock(l) {
 115+ pthread_mutex_lock(lock);
 116+ }
 117+ ~lvars() {
 118+ pthread_mutex_unlock(lock);
 119+ }
 120+ pthread_mutex_t *lock;
 121+ } v(&eq_lock);
 122+ WDEBUG("event_queue::add: adding event");
 123+ eq_events.push_back(new eq_entry(sock, event, flags));
 124+ pthread_cond_signal(&eq_cond);
 125+ }
 126+
 127+ pthread_mutex_t eq_lock;
 128+ pthread_cond_t eq_cond;
 129+ deque<eq_entry *> eq_events;
 130+};
 131+
 132+tss<event_queue> ev_queue;
 133+
 134+static void sig_exit(int, short, void *);
 135+
 136+ioloop_t *ioloop;
 137+
 138+char current_time_str[30];
 139+char current_time_short[30];
 140+time_t current_time;
 141+
 142+static void secondly_sched(void);
 143+
 144+bool wnet_exit;
 145+vector<wsocket *> awaks;
 146+size_t cawak;
 147+
 148+void
 149+wnet_add_accept_wakeup(wsocket *s)
 150+{
 151+ awaks.push_back(s);
 152+}
 153+
 154+net::event secondly_ev;
 155+
 156+static void
 157+secondly_update(void)
 158+{
 159+ WDEBUG("secondly_update");
 160+ wnet_set_time();
 161+ secondly_sched();
 162+}
 163+
 164+static void
 165+secondly_sched(void)
 166+{
 167+ secondly_ev.schedule(secondly_update, 1000);
 168+}
 169+
 170+pthread_cond_t iot_ready;
 171+pthread_mutex_t iot_ready_m;
 172+
 173+pthread_t io_thread;
 174+void
 175+usr2_handler(int, short, void *)
 176+{
 177+ WDEBUG("got USR2");
 178+ event_loopexit(NULL);
 179+}
 180+
 181+::event ev_sigusr2;
 182+
 183+void *
 184+io_start(void *)
 185+{
 186+ signal_set(&ev_sigusr2, SIGUSR2, usr2_handler, NULL);
 187+ signal_add(&ev_sigusr2, NULL);
 188+
 189+ io_loop_thread = pthread_self();
 190+ pthread_mutex_lock(&iot_ready_m);
 191+ pthread_cond_signal(&iot_ready);
 192+ pthread_mutex_unlock(&iot_ready_m);
 193+
 194+ ioloop->run();
 195+ return NULL;
 196+}
 197+
 198+
 199+
 200+ioloop_t::ioloop_t(void)
 201+{
 202+ prepare();
 203+}
 204+
 205+void
 206+ioloop_t::prepare(void)
 207+{
 208+size_t i;
 209+
 210+ event_init();
 211+ signal(SIGUSR2, SIG_IGN);
 212+ pthread_mutex_init(&iot_ready_m, NULL);
 213+ pthread_cond_init(&iot_ready, NULL);
 214+
 215+ pthread_mutex_lock(&iot_ready_m);
 216+ pthread_create(&io_thread, NULL, io_start, NULL);
 217+ pthread_cond_wait(&iot_ready, &iot_ready_m);
 218+ pthread_mutex_unlock(&iot_ready_m);
 219+
 220+ wlog.notice(format("maximum number of open files: %d")
 221+ % getdtablesize());
 222+
 223+ signal(SIGPIPE, SIG_IGN);
 224+
 225+ for (i = 0; i < listeners.size(); ++i) {
 226+ listener *lns = listeners[i];
 227+
 228+ try {
 229+ lns->sock->reuseaddr(true);
 230+ lns->sock->bind();
 231+ lns->sock->listen();
 232+ } catch (socket_error &e) {
 233+ wlog.error(format("creating listener %s: %s")
 234+ % lns->sock->straddr() % e.what());
 235+ exit(8);
 236+ }
 237+
 238+ lns->sock->readback(bind(&ioloop_t::_accept, this, _1, _2), -1);
 239+ }
 240+ wlog.notice(format("net: initialised, using libevent %s (%s)")
 241+ % event_get_version() % event_get_method());
 242+ secondly_sched();
 243+}
 244+
 245+void
 246+ioloop_t::_accept(wsocket *s, int)
 247+{
 248+ wsocket *newe;
 249+static rate_limited_logger enfile_log(60, ll_warn, "accept error: %s");
 250+
 251+ if ((newe = s->accept("HTTP client", prio_norm)) == NULL) {
 252+ if (errno == ENFILE || errno == EMFILE)
 253+ enfile_log.log(strerror(errno));
 254+ else
 255+ wlog.warn(format("accept error: %s") % strerror(errno));
 256+ s->readback(bind(&ioloop_t::_accept, this, _1, _2), -1);
 257+ return;
 258+ }
 259+
 260+ s->readback(bind(&ioloop_t::_accept, this, _1, _2), -1);
 261+
 262+ newe->nonblocking(true);
 263+
 264+ if (cawak == awaks.size())
 265+ cawak = 0;
 266+char buf[sizeof(wsocket *) * 2];
 267+ memcpy(buf, &newe, sizeof(newe));
 268+ memcpy(buf + sizeof(newe), &s, sizeof(s));
 269+ WDEBUG(format("_accept, lsnr=%d") % s);
 270+
 271+ if (awaks[cawak]->write(buf, sizeof(wsocket *) * 2) < 0) {
 272+ wlog.error(format("writing to thread wakeup socket: %s")
 273+ % strerror(errno));
 274+ exit(1);
 275+ }
 276+ cawak++;
 277+ return;
 278+}
 279+
 280+void
 281+wnet_set_time(void)
 282+{
 283+struct tm *now;
 284+ time_t old = current_time;
 285+ size_t n;
 286+
 287+ current_time = time(NULL);
 288+ if (current_time == old)
 289+ return;
 290+
 291+ now = gmtime(&current_time);
 292+
 293+ n = strftime(current_time_str, sizeof(current_time_str), "%a, %d %b %Y %H:%M:%S GMT", now);
 294+ assert(n);
 295+ n = strftime(current_time_short, sizeof(current_time_short), "%Y-%m-%d %H:%M:%S", now);
 296+ assert(n);
 297+}
 298+
 299+namespace net {
 300+
 301+void
 302+socket_callback(int fd, short ev, void *d)
 303+{
 304+wsocket *s = (wsocket *)d;
 305+
 306+ HOLDING(ev_lock);
 307+ WDEBUG(format("[%d] _ev_callback: %s%son %d (%s) queue %p")
 308+ % pthread_self()
 309+ % ((ev & EV_READ) ? "read " : "")
 310+ % ((ev & EV_WRITE) ? "write " : "")
 311+ % fd % s->_desc
 312+ % s->_queue);
 313+
 314+ s->_queue->add(s, NULL, ev);
 315+}
 316+
 317+event::event(void)
 318+{
 319+ impl = new event_impl;
 320+}
 321+
 322+event::~event(void)
 323+{
 324+ delete impl;
 325+};
 326+
 327+void
 328+event::schedule(function<void (void)> f, int64_t t)
 329+{
 330+ impl->ev_func = f;
 331+ impl->schedule(t);
 332+}
 333+
 334+void
 335+timer_callback(int, short, void *d)
 336+{
 337+event_impl *ev = (event_impl *)d;
 338+ HOLDING(ev_lock);
 339+ ev->ev_queue->add(NULL, ev, 0);
 340+}
 341+
 342+void
 343+socket::_register(int what, int64_t to, socket::call_type handler)
 344+{
 345+ _ev_flags = 0;
 346+ _queue = (event_queue *)ev_queue;
 347+
 348+ WDEBUG(format("_register: %s%son %d (%s), queue %p")
 349+ % ((what & FDE_READ) ? "read " : "")
 350+ % ((what & FDE_WRITE) ? "write " : "")
 351+ % _s % _desc % _queue);
 352+
 353+ HOLDING(ev_lock);
 354+
 355+ if (what & FDE_READ) {
 356+ _read_handler = handler;
 357+ _ev_flags |= EV_READ;
 358+ }
 359+ if (what & FDE_WRITE) {
 360+ _write_handler = handler;
 361+ _ev_flags |= EV_WRITE;
 362+ }
 363+
 364+ event_set(ev, _s, _ev_flags, socket_callback, this);
 365+ event_priority_set(ev, (int) _prio);
 366+
 367+ WDEBUG(format("timeout = %d") % to);
 368+
 369+ ev_pending_list.insert(make_pair(_s, ev_pending(ev, to, evtype_event)));
 370+ pthread_kill(io_loop_thread, SIGUSR2);
 371+}
 372+
 373+socket::socket(int s, net::address const &a, char const *desc, sprio p)
 374+ : _addr(a)
 375+ , _desc(desc)
 376+ , _prio(p)
 377+ , _queue(0)
 378+{
 379+ ev = new ::event;
 380+ memset(ev, 0, sizeof(*ev));
 381+ _s = s;
 382+}
 383+
 384+socket::socket(net::address const &a, char const *desc, sprio p)
 385+ : _addr(a)
 386+ , _desc(desc)
 387+ , _prio(p)
 388+ , _queue(0)
 389+{
 390+ ev = new ::event;
 391+ memset(ev, 0, sizeof(*ev));
 392+ _s = ::socket(_addr.family(), _addr.socktype(), _addr.protocol());
 393+ if (_s == -1)
 394+ throw socket_error();
 395+}
 396+
 397+socket::~socket(void)
 398+{
 399+ WDEBUG("closing socket");
 400+ HOLDING(ev_lock);
 401+ multimap<int, ev_pending>::iterator it;
 402+ it = ev_pending_list.find(_s);
 403+ if (it != ev_pending_list.end())
 404+ ev_pending_list.erase(it);
 405+
 406+ event_del(ev);
 407+ delete ev;
 408+ close(_s);
 409+}
 410+
 411+void
 412+socket::clearbacks(void)
 413+{
 414+ event_del(ev);
 415+}
 416+
 417+} // namespace net
 418+
 419+void
 420+make_event_base(void)
 421+{
 422+//static lockable meb_lock;
 423+// if (evb == NULL) {
 424+// HOLDING(meb_lock);
 425+// evb = (event_base *)event_init();
 426+// event_base_priority_init(evb, prio_max);
 427+// signal_set(&ev_sigint, SIGINT, sig_exit, NULL);
 428+// signal_add(&ev_sigint, NULL);
 429+// signal_set(&ev_sigterm, SIGTERM, sig_exit, NULL);
 430+// signal_add(&ev_sigterm, NULL);
 431+// }
 432+ io_loop_thread = pthread_self();
 433+ ev_queue = new event_queue;
 434+}
 435+
 436+void
 437+sig_exit(int sig, short what, void *d)
 438+{
 439+ wnet_exit = true;
 440+}
 441+
 442+void
 443+ioloop_t::run(void)
 444+{
 445+ WDEBUG(format("[%d] ioloop run: running") % pthread_self());
 446+ while (!wnet_exit) {
 447+ event_loop(EVLOOP_ONCE);
 448+ WDEBUG("ioloop thread: got event");
 449+
 450+ {
 451+
 452+ HOLDING(ev_lock);
 453+ multimap<int, ev_pending>::iterator it = ev_pending_list.begin(),
 454+ end = ev_pending_list.end();
 455+ for (; it != end; ++it) {
 456+ WDEBUG("ioloop thread: processing new event");
 457+ if (event_pending(it->second.ep_event, EV_READ | EV_WRITE, NULL))
 458+ event_del(it->second.ep_event);
 459+
 460+ if (it->second.ep_type == evtype_event) {
 461+ if (it->second.ep_timeout == -1) {
 462+ event_add(it->second.ep_event, NULL);
 463+ } else {
 464+ timeval tv;
 465+ int64_t usec = it->second.ep_timeout * 1000;
 466+ tv.tv_sec = usec / 1000000;
 467+ tv.tv_usec = usec % 1000000;
 468+ WDEBUG(format("timeout: %d %d") % tv.tv_sec % tv.tv_usec);
 469+ event_add(it->second.ep_event, &tv);
 470+ }
 471+ } else if (it->second.ep_type == evtype_timer) {
 472+ timeval tv;
 473+ int64_t usec = it->second.ep_timeout * 1000;
 474+ tv.tv_sec = usec / 1000000;
 475+ tv.tv_usec = usec % 1000000;
 476+ WDEBUG(format("timeout: %d %d") % tv.tv_sec % tv.tv_usec);
 477+ evtimer_add(it->second.ep_event, &tv);
 478+ } else
 479+ abort();
 480+ }
 481+ ev_pending_list.clear();
 482+ }
 483+ }
 484+
 485+size_t i;
 486+ for (i = 0; i < listeners.size(); ++i)
 487+ delete listeners[i];
 488+}
 489+
 490+void
 491+ioloop_t::thread_run(void)
 492+{
 493+ /*
 494+ * ioloop for a single thread. uses a condition variable and a queue.
 495+ */
 496+event_queue *eq = (event_queue *)ev_queue;
 497+
 498+ for (;;) {
 499+ deque<eq_entry *> evs;
 500+ WDEBUG(format("[%d] thread_run: waiting for event, eq=%p")
 501+ % pthread_self() % eq);
 502+ pthread_mutex_lock(&eq->eq_lock);
 503+ if (eq->eq_events.empty())
 504+ pthread_cond_wait(&eq->eq_cond, &eq->eq_lock);
 505+ WDEBUG(format("[%d] thread_run: got event") % pthread_self());
 506+ evs = eq->eq_events;
 507+ eq->eq_events.clear();
 508+ pthread_mutex_unlock(&eq->eq_lock);
 509+ for (deque<eq_entry *>::iterator
 510+ it = evs.begin(),
 511+ end = evs.end(); it != end; ++it) {
 512+ if ((*it)->ee_sock) {
 513+ WDEBUG(format("[%d] thread_run: got event on %s")
 514+ % pthread_self() % (*it)->ee_sock->_desc);
 515+ if ((*it)->ee_flags & EV_READ)
 516+ (*it)->ee_sock->_read_handler((*it)->ee_sock, false);
 517+ if ((*it)->ee_flags & EV_WRITE)
 518+ (*it)->ee_sock->_write_handler((*it)->ee_sock, false);
 519+ if ((*it)->ee_flags & EV_TIMEOUT) {
 520+ if ((*it)->ee_sock->_ev_flags & EV_READ) {
 521+ (*it)->ee_sock->_read_handler((*it)->ee_sock, true);
 522+ } else if ((*it)->ee_sock->_ev_flags & EV_WRITE) {
 523+ (*it)->ee_sock->_write_handler((*it)->ee_sock, true);
 524+ }
 525+ }
 526+ } else {
 527+ WDEBUG("event thread");
 528+ (*it)->ee_event->ev_func();
 529+ }
 530+ delete *it;
 531+ }
 532+ }
 533+}
 534+
 535+
 536+void
 537+event_impl::schedule(int64_t when)
 538+{
 539+ HOLDING(ev_lock);
 540+ WDEBUG(format("schedule, when=%d") % when);
 541+ this->ev_when = when;
 542+ this->ev_queue = (event_queue *)::ev_queue;
 543+ evtimer_set(&ev_event, timer_callback, this);
 544+ ev_pending_list.insert(make_pair(0, ev_pending(&ev_event, ev_when, evtype_timer)));
 545+ pthread_kill(io_loop_thread, SIGUSR2);
 546+}
 547+
 548+#endif /* !SOLARIS_IO */