/* */ #include "DHTSetup.h" #include "LogFactory.h" #include "Util.h" #include "DHTNode.h" #include "DHTConnectionImpl.h" #include "DHTRoutingTable.h" #include "DHTMessageFactoryImpl.h" #include "DHTMessageTracker.h" #include "DHTMessageDispatcher.h" #include "DHTMessageReceiver.h" #include "DHTTaskQueueImpl.h" #include "DHTTaskFactoryImpl.h" #include "DHTPeerAnnounceStorage.h" #include "DHTTokenTracker.h" #include "DHTInteractionCommand.h" #include "DHTTokenUpdateCommand.h" #include "DHTBucketRefreshCommand.h" #include "DHTPeerAnnounceCommand.h" #include "DHTEntryPointNameResolveCommand.h" #include "DHTAutoSaveCommand.h" #include "DHTTask.h" #include "DHTRoutingTableDeserializer.h" #include "DHTRegistry.h" #include "CUIDCounter.h" #include "prefs.h" #include "Option.h" #include "DlAbortEx.h" #include "RecoverableException.h" #include size_t DHTSetup::_initialized = 0; DHTSetup::DHTSetup():_logger(LogFactory::getInstance()) {} DHTSetup::~DHTSetup() {} Commands DHTSetup::setup(DownloadEngine* e, const Option* option) { if(_initialized) { return Commands(); } try { // load routing table and localnode id here DHTNodeHandle localNode = 0; DHTRoutingTableDeserializer deserializer; string dhtFile = option->get(PREF_DHT_FILE_PATH); if(File(dhtFile).isFile()) { try { ifstream in(dhtFile.c_str()); in.exceptions(ios::failbit); deserializer.deserialize(in); localNode = deserializer.getLocalNode(); } catch(RecoverableException* e) { _logger->error("Exception caught while loading DHT routing table from %s", e, dhtFile.c_str()); delete e; } } if(localNode.isNull()) { localNode = new DHTNode(); } SharedHandle connection = new DHTConnectionImpl(); { IntSequence seq = Util::parseIntRange(option->get(PREF_DHT_LISTEN_PORT)); uint16_t port = connection->bind(seq); if(port == 0) { throw new DlAbortEx("Error occurred while binding port for DHT"); } localNode->setPort(port); } _logger->debug("Initialized local node ID=%s", Util::toHex(localNode->getID(), DHT_ID_LENGTH).c_str()); DHTRoutingTableHandle routingTable = new DHTRoutingTable(localNode); SharedHandle factory = new DHTMessageFactoryImpl(); DHTMessageTrackerHandle tracker = new DHTMessageTracker(); DHTMessageDispatcherHandle dispatcher = new DHTMessageDispatcher(tracker); DHTMessageReceiverHandle receiver = new DHTMessageReceiver(tracker); DHTTaskQueueHandle taskQueue = new DHTTaskQueueImpl(); SharedHandle taskFactory = new DHTTaskFactoryImpl(); DHTPeerAnnounceStorageHandle peerAnnounceStorage = new DHTPeerAnnounceStorage(); DHTTokenTrackerHandle tokenTracker = new DHTTokenTracker(); // wiring up tracker->setRoutingTable(routingTable); tracker->setMessageFactory(factory); receiver->setConnection(connection); receiver->setMessageFactory(factory); receiver->setRoutingTable(routingTable); taskFactory->setLocalNode(localNode); taskFactory->setRoutingTable(routingTable); taskFactory->setMessageDispatcher(dispatcher); taskFactory->setMessageFactory(factory); taskFactory->setTaskQueue(taskQueue); routingTable->setTaskQueue(taskQueue); routingTable->setTaskFactory(taskFactory); peerAnnounceStorage->setTaskQueue(taskQueue); peerAnnounceStorage->setTaskFactory(taskFactory); factory->setRoutingTable(routingTable); factory->setConnection(connection); factory->setMessageDispatcher(dispatcher); factory->setPeerAnnounceStorage(peerAnnounceStorage); factory->setTokenTracker(tokenTracker); factory->setLocalNode(localNode); // assign them into DHTRegistry DHTRegistry::_localNode = localNode; DHTRegistry::_routingTable = routingTable; DHTRegistry::_taskQueue = taskQueue; DHTRegistry::_taskFactory = taskFactory; DHTRegistry::_peerAnnounceStorage = peerAnnounceStorage; DHTRegistry::_tokenTracker = tokenTracker; DHTRegistry::_messageDispatcher = dispatcher; DHTRegistry::_messageReceiver = receiver; DHTRegistry::_messageFactory = factory; // add deserialized nodes to routing table const DHTNodes& desnodes = deserializer.getNodes(); for(DHTNodes::const_iterator i = desnodes.begin(); i != desnodes.end(); ++i) { routingTable->addNode(*i); } if(!desnodes.empty() && deserializer.getSerializedTime().elapsed(DHT_BUCKET_REFRESH_INTERVAL)) { taskQueue->addPeriodicTask1(taskFactory->createBucketRefreshTask()); } Commands commands; if(!option->get(PREF_DHT_ENTRY_POINT_HOST).empty()) { { DHTEntryPointNameResolveCommand* command = new DHTEntryPointNameResolveCommand(CUIDCounterSingletonHolder::instance()->newID(), e); command->setTaskQueue(taskQueue); command->setTaskFactory(taskFactory); command->setLocalNode(localNode); commands.push_back(command); } } else { _logger->info("No DHT entry point specified."); } { DHTInteractionCommand* command = new DHTInteractionCommand(CUIDCounterSingletonHolder::instance()->newID(), e); command->setMessageDispatcher(dispatcher); command->setMessageReceiver(receiver); command->setTaskQueue(taskQueue); command->setReadCheckSocket(connection->getSocket()); commands.push_back(command); } { DHTTokenUpdateCommand* command = new DHTTokenUpdateCommand(CUIDCounterSingletonHolder::instance()->newID(), e, DHT_TOKEN_UPDATE_INTERVAL); command->setTokenTracker(tokenTracker); commands.push_back(command); } { DHTBucketRefreshCommand* command = new DHTBucketRefreshCommand(CUIDCounterSingletonHolder::instance()->newID(), e, DHT_BUCKET_REFRESH_CHECK_INTERVAL); command->setTaskQueue(taskQueue); command->setRoutingTable(routingTable); command->setTaskFactory(taskFactory); commands.push_back(command); } { DHTPeerAnnounceCommand* command = new DHTPeerAnnounceCommand(CUIDCounterSingletonHolder::instance()->newID(), e, DHT_PEER_ANNOUNCE_CHECK_INTERVAL); command->setPeerAnnounceStorage(peerAnnounceStorage); commands.push_back(command); } { DHTAutoSaveCommand* command = new DHTAutoSaveCommand(CUIDCounterSingletonHolder::instance()->newID(), e, 30*60); command->setLocalNode(localNode); command->setRoutingTable(routingTable); commands.push_back(command); } _initialized = true; return commands; } catch(RecoverableException* e) { _logger->error("Exception caught while initializing DHT functionality. DHT is disabled.", e); delete e; DHTRegistry::clear(); return Commands(); } } bool DHTSetup::initialized() { return _initialized; }