diff --git a/src/LibuvEventPoll.cc b/src/LibuvEventPoll.cc index c2e89709..2509357f 100644 --- a/src/LibuvEventPoll.cc +++ b/src/LibuvEventPoll.cc @@ -59,6 +59,7 @@ namespace { using namespace aria2; + template static void close_callback(uv_handle_t* handle) { @@ -77,12 +78,12 @@ LibuvEventPoll::KSocketEntry::KSocketEntry(sock_t s) : SocketEntry(s) {} -int accumulateEvent(int events, const LibuvEventPoll::KEvent& event) +inline int accumulateEvent(int events, const LibuvEventPoll::KEvent& event) { - return events|event.getEvents(); + return events | event.getEvents(); } -int LibuvEventPoll::KSocketEntry::getEvents() +int LibuvEventPoll::KSocketEntry::getEvents() const { int events = 0; #ifdef ENABLE_ASYNC_DNS @@ -95,6 +96,7 @@ int LibuvEventPoll::KSocketEntry::getEvents() events = std::accumulate(commandEvents_.begin(), commandEvents_.end(), 0, accumulateEvent); #endif // !ENABLE_ASYNC_DNS + return events; } @@ -106,23 +108,25 @@ LibuvEventPoll::LibuvEventPoll() LibuvEventPoll::~LibuvEventPoll() { for (KPolls::iterator i = polls_.begin(), e = polls_.end(); i != e; ++i) { - uv_poll_stop(&i->second->p); - uv_close((uv_handle_t*)&i->second->p, close_poll_callback); + i->second->close(); } - // Actually kill the polls, and timers, if any + // Actually kill the polls, and timers, if any. uv_run(loop_, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT)); if (loop_) { uv_loop_delete(loop_); loop_ = 0; } + + // Need this to free only after the loop is gone. polls_.clear(); } void LibuvEventPoll::poll(const struct timeval& tv) { - int timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000; + const int timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000; + // timeout == 0 will tick once if (timeout >= 0) { uv_timer_t* timer = new uv_timer_t; uv_timer_init(loop_, timer); @@ -173,8 +177,7 @@ int LibuvEventPoll::translateEvents(EventPoll::EventType events) return newEvents; } -void LibuvEventPoll::pollCallback(uv_poll_t* handle, poll_t* poll, int status, - int events) +void LibuvEventPoll::pollCallback(KPoll* poll, int status, int events) { if (status == -1) { uv_err_t err = uv_last_error(loop_); @@ -190,21 +193,21 @@ void LibuvEventPoll::pollCallback(uv_poll_t* handle, poll_t* poll, int status, case UV_EPIPE: case UV_ESHUTDOWN: events = IEV_HUP; - poll->entry->processEvents(events); - uv_poll_stop(handle); + poll->processEvents(events); + poll->stop(); uv_stop(loop_); return; default: events = IEV_ERROR; - poll->entry->processEvents(events); - uv_poll_stop(handle); + poll->processEvents(events); + poll->stop(); uv_stop(loop_); return; } } // Got something - poll->entry->processEvents(events); + poll->processEvents(events); uv_stop(loop_); } @@ -213,29 +216,22 @@ bool LibuvEventPoll::addEvents(sock_t socket, { SharedHandle socketEntry(new KSocketEntry(socket)); KSocketEntrySet::iterator i = socketEntries_.lower_bound(socketEntry); + if (i != socketEntries_.end() && **i == *socketEntry) { event.addSelf(*i); KPolls::iterator poll = polls_.find(socket); if (poll == polls_.end()) { throw std::logic_error("Invalid socket"); } - poll->second->events = (*i)->getEvents(); - uv_poll_start(&poll->second->p, - poll->second->events & (IEV_READ | IEV_WRITE), poll_callback); - } - else { - socketEntries_.insert(i, socketEntry); - event.addSelf(socketEntry); - poll_t *poll = new poll_t; - uv_poll_init_socket(loop_, &poll->p, socket); - poll->entry = socketEntry.get(); - poll->eventer = this; - poll->events = socketEntry->getEvents(); - poll->p.data = poll; - polls_[socket] = poll; - uv_poll_start(&poll->p, poll->events & (IEV_READ | IEV_WRITE), - poll_callback); + poll->second->start(); + return true; } + + socketEntries_.insert(i, socketEntry); + event.addSelf(socketEntry); + KPoll* poll = new KPoll(this, socketEntry.get(), socket); + polls_[socket] = poll; + poll->start(); return true; } @@ -258,7 +254,8 @@ bool LibuvEventPoll::deleteEvents(sock_t socket, { SharedHandle socketEntry(new KSocketEntry(socket)); KSocketEntrySet::iterator i = socketEntries_.find(socketEntry); - if(i == socketEntries_.end()) { + + if (i == socketEntries_.end()) { A2_LOG_DEBUG(fmt("Socket %d is not found in SocketEntries.", socket)); return false; } @@ -269,17 +266,15 @@ bool LibuvEventPoll::deleteEvents(sock_t socket, if (poll == polls_.end()) { return false; } + if ((*i)->eventEmpty()) { - uv_poll_stop(&poll->second->p); - uv_close((uv_handle_t*)&poll->second->p, close_poll_callback); + poll->second->close(); polls_.erase(poll); socketEntries_.erase(i); + return true; } - else { - poll->second->events = (*i)->getEvents(); - uv_poll_start(&poll->second->p, - poll->second->events & (IEV_READ | IEV_WRITE), poll_callback); - } + + poll->second->start(); return true; } diff --git a/src/LibuvEventPoll.h b/src/LibuvEventPoll.h index 330f1b60..5c058737 100644 --- a/src/LibuvEventPoll.h +++ b/src/LibuvEventPoll.h @@ -59,43 +59,74 @@ private: typedef CommandEvent KCommandEvent; typedef ADNSEvent KADNSEvent; typedef AsyncNameResolverEntry KAsyncNameResolverEntry; + friend class AsyncNameResolverEntry; - - class KSocketEntry: - public SocketEntry { - public: - KSocketEntry(sock_t socket); - int getEvents(); - }; - friend int accumulateEvent(int events, const KEvent& event); -private: - uv_loop_t* loop_; + class KSocketEntry: public SocketEntry { + public: + KSocketEntry(sock_t socket); + int getEvents() const; + }; + + class KPoll { + private: + LibuvEventPoll *eventer_; + KSocketEntry *entry_; + uv_poll_t handle_; + + static void poll_callback(uv_poll_t* handle, int status, int events) { + KPoll* poll = static_cast(handle->data); + poll->eventer_->pollCallback(poll, status, events); + } + static void close_callback(uv_handle_t* handle) { + delete static_cast(handle->data); + } + + public: + inline KPoll(LibuvEventPoll* eventer, KSocketEntry* entry, sock_t sock) + : eventer_(eventer), entry_(entry) + { + uv_poll_init_socket(eventer->loop_, &handle_, sock); + handle_.data = this; + } + inline void start() { + uv_poll_start(&handle_, entry_->getEvents() & IEV_RW, poll_callback); + } + inline void stop() { + uv_poll_stop(&handle_); + } + inline void processEvents(int events) { + entry_->processEvents(events); + } + inline void close() { + stop(); + uv_close((uv_handle_t*)&handle_, close_callback); + } + }; typedef std::set, DerefLess > > KSocketEntrySet; - KSocketEntrySet socketEntries_; - typedef struct { - uv_poll_t p; - KSocketEntry *entry; - LibuvEventPoll *eventer; - int events; - } poll_t; - - typedef std::map KPolls; - KPolls polls_; + typedef std::map KPolls; #ifdef ENABLE_ASYNC_DNS typedef std::set, DerefLess > > KAsyncNameResolverEntrySet; +#endif // ENABLE_ASYNC_DNS + + uv_loop_t* loop_; + KSocketEntrySet socketEntries_; + KPolls polls_; + +#ifdef ENABLE_ASYNC_DNS KAsyncNameResolverEntrySet nameResolverEntries_; #endif // ENABLE_ASYNC_DNS bool addEvents(sock_t socket, const KEvent& event); bool deleteEvents(sock_t socket, const KEvent& event); + void pollCallback(KPoll *poll, int status, int events); #ifdef ENABLE_ASYNC_DNS bool addEvents(sock_t socket, Command* command, int events, @@ -105,18 +136,9 @@ private: #endif static int translateEvents(EventPoll::EventType events); - static void close_poll_callback(uv_handle_t* handle) { - delete static_cast(handle->data); - } - static void poll_callback(uv_poll_t* handle, int status, int events) { - poll_t* poll = static_cast(handle->data); - poll->eventer->pollCallback(handle, poll, status, events); - } - void pollCallback(uv_poll_t* handle, poll_t *poll, int status, int events); public: LibuvEventPoll(); - virtual ~LibuvEventPoll(); bool good() const { return loop_; } @@ -125,19 +147,21 @@ public: virtual bool addEvents(sock_t socket, Command* command, EventPoll::EventType events); - virtual bool deleteEvents(sock_t socket, Command* command, EventPoll::EventType events); -#ifdef ENABLE_ASYNC_DNS +#ifdef ENABLE_ASYNC_DNS virtual bool addNameResolver(const SharedHandle& resolver, Command* command); - virtual bool deleteNameResolver - (const SharedHandle& resolver, Command* command); + virtual bool deleteNameResolver( + const SharedHandle& resolver, Command* command); #endif // ENABLE_ASYNC_DNS static const int IEV_READ = UV_READABLE; static const int IEV_WRITE = UV_WRITABLE; + static const int IEV_RW = UV_READABLE | UV_WRITABLE; + + // Make sure these do not interfere with the uv_poll API later. static const int IEV_ERROR = 128; static const int IEV_HUP = 255; };