diff --git a/ChangeLog b/ChangeLog index f62ab110..73cc9988 100644 --- a/ChangeLog +++ b/ChangeLog @@ -44,6 +44,11 @@ (createNotInterestedMessage): Use PeerMessageUtil. (createBitfieldMessage): Use PeerMessageUtil. (createKeepAliveMessage): Use PeerMessageUtil. + + * src/SendMessageQueue.h: Renamed to PeerInteraction.h + * src/SendMessageQueue.cc: Renamed to PeerInteraction.cc + * src/PeerInteraction.h: New class. + * src/PeerInteraction.cc: New class. 2006-05-09 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com> diff --git a/src/BitfieldMessage.cc b/src/BitfieldMessage.cc index 51cd427f..84a96f33 100644 --- a/src/BitfieldMessage.cc +++ b/src/BitfieldMessage.cc @@ -20,7 +20,7 @@ */ /* copyright --> */ #include "BitfieldMessage.h" -#include "SendMessageQueue.h" +#include "PeerInteraction.h" #include "PeerMessageUtil.h" #include "Util.h" @@ -38,7 +38,7 @@ void BitfieldMessage::receivedAction() { } void BitfieldMessage::send() { - sendMessageQueue->getPeerConnection()->sendBitfield(); + peerInteraction->getPeerConnection()->sendBitfield(); } void BitfieldMessage::check() const { diff --git a/src/CancelMessage.cc b/src/CancelMessage.cc index ae209409..0a452cbf 100644 --- a/src/CancelMessage.cc +++ b/src/CancelMessage.cc @@ -20,16 +20,16 @@ */ /* copyright --> */ #include "CancelMessage.h" -#include "SendMessageQueue.h" +#include "PeerInteraction.h" #include "PeerMessageUtil.h" #include "Util.h" void CancelMessage::receivedAction() { - sendMessageQueue->deletePieceMessageInQueue(this); + peerInteraction->deletePieceMessageInQueue(this); } void CancelMessage::send() { - sendMessageQueue->getPeerConnection()->sendCancel(index, begin, length); + peerInteraction->getPeerConnection()->sendCancel(index, begin, length); } void CancelMessage::check() const { diff --git a/src/ChokeMessage.cc b/src/ChokeMessage.cc index 3cf080be..30be0245 100644 --- a/src/ChokeMessage.cc +++ b/src/ChokeMessage.cc @@ -20,7 +20,7 @@ */ /* copyright --> */ #include "ChokeMessage.h" -#include "SendMessageQueue.h" +#include "PeerInteraction.h" void ChokeMessage::receivedAction() { peer->peerChoking = true; @@ -28,7 +28,7 @@ void ChokeMessage::receivedAction() { void ChokeMessage::send() { if(!peer->amChoking) { - sendMessageQueue->getPeerConnection()->sendChoke(); + peerInteraction->getPeerConnection()->sendChoke(); peer->amChoking = true; } } diff --git a/src/HandshakeMessage.cc b/src/HandshakeMessage.cc index af41b280..744a60de 100644 --- a/src/HandshakeMessage.cc +++ b/src/HandshakeMessage.cc @@ -20,12 +20,12 @@ */ /* copyright --> */ #include "HandshakeMessage.h" -#include "SendMessageQueue.h" +#include "PeerInteraction.h" #include "PeerMessageUtil.h" #include "Util.h" -void HandshakeMessage::setSendMessageQueue(SendMessageQueue* sendMessageQueue) { - this->sendMessageQueue = sendMessageQueue; +void HandshakeMessage::setPeerInteraction(PeerInteraction* peerInteraction) { + this->peerInteraction = peerInteraction; } string HandshakeMessage::toString() const { @@ -35,5 +35,5 @@ string HandshakeMessage::toString() const { void HandshakeMessage::check() { PeerMessageUtil::checkHandshake(this, - sendMessageQueue->getTorrentMan()->getInfoHash()); + peerInteraction->getTorrentMan()->getInfoHash()); } diff --git a/src/HandshakeMessage.h b/src/HandshakeMessage.h index bc4796d0..4204870a 100644 --- a/src/HandshakeMessage.h +++ b/src/HandshakeMessage.h @@ -24,7 +24,7 @@ #include "common.h" -class SendMessageQueue; +class PeerInteraction; #define PSTR "BitTorrent protocol" #define HANDSHAKE_MESSAGE_LENGTH 68 @@ -35,13 +35,13 @@ public: string pstr; unsigned char infoHash[20]; char peerId[20]; - SendMessageQueue* sendMessageQueue; + PeerInteraction* peerInteraction; public: HandshakeMessage() {} ~HandshakeMessage() {} - SendMessageQueue* getSendMessageQueue() const { return sendMessageQueue; } - void setSendMessageQueue(SendMessageQueue* sendMessageQueue); + PeerInteraction* getPeerInteraction() const { return peerInteraction; } + void setPeerInteraction(PeerInteraction* peerInteraction); string toString() const; void check(); diff --git a/src/HaveMessage.cc b/src/HaveMessage.cc index 84c6eacd..98bcd582 100644 --- a/src/HaveMessage.cc +++ b/src/HaveMessage.cc @@ -20,7 +20,7 @@ */ /* copyright --> */ #include "HaveMessage.h" -#include "SendMessageQueue.h" +#include "PeerInteraction.h" #include "PeerMessageUtil.h" #include "Util.h" @@ -30,7 +30,7 @@ void HaveMessage::receivedAction() { void HaveMessage::send() { if(!peer->hasPiece(index)) { - sendMessageQueue->getPeerConnection()->sendHave(index); + peerInteraction->getPeerConnection()->sendHave(index); } } diff --git a/src/InterestedMessage.cc b/src/InterestedMessage.cc index 01f99db7..b964bf90 100644 --- a/src/InterestedMessage.cc +++ b/src/InterestedMessage.cc @@ -20,7 +20,7 @@ */ /* copyright --> */ #include "InterestedMessage.h" -#include "SendMessageQueue.h" +#include "PeerInteraction.h" void InterestedMessage::receivedAction() { peer->peerInterested = true; @@ -28,7 +28,7 @@ void InterestedMessage::receivedAction() { void InterestedMessage::send() { if(!peer->amInterested) { - sendMessageQueue->getPeerConnection()->sendInterested(); + peerInteraction->getPeerConnection()->sendInterested(); peer->amInterested = true; } } diff --git a/src/KeepAliveMessage.cc b/src/KeepAliveMessage.cc index cdb78a42..266af1b5 100644 --- a/src/KeepAliveMessage.cc +++ b/src/KeepAliveMessage.cc @@ -20,8 +20,8 @@ */ /* copyright --> */ #include "KeepAliveMessage.h" -#include "SendMessageQueue.h" +#include "PeerInteraction.h" void KeepAliveMessage::send() { - sendMessageQueue->getPeerConnection()->sendKeepAlive(); + peerInteraction->getPeerConnection()->sendKeepAlive(); } diff --git a/src/Makefile.am b/src/Makefile.am index bfb45d10..55fe0a59 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -73,7 +73,7 @@ SRCS = Socket.cc Socket.h\ Directory.cc Directory.h\ TrackerWatcherCommand.cc TrackerWatcherCommand.h\ messageDigest.h\ - SendMessageQueue.cc SendMessageQueue.h\ + PeerInteraction.cc PeerInteraction.h\ MultiDiskWriter.cc MultiDiskWriter.h\ DiskAdaptor.cc DiskAdaptor.h\ CopyDiskAdaptor.cc CopyDiskAdaptor.h\ diff --git a/src/Makefile.in b/src/Makefile.in index 6357426e..6cb60fc8 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -96,7 +96,7 @@ am__objects_1 = Socket.$(OBJEXT) SocketCore.$(OBJEXT) \ HandshakeMessage.$(OBJEXT) Piece.$(OBJEXT) \ RequestSlot.$(OBJEXT) TorrentAutoSaveCommand.$(OBJEXT) \ Directory.$(OBJEXT) TrackerWatcherCommand.$(OBJEXT) \ - SendMessageQueue.$(OBJEXT) MultiDiskWriter.$(OBJEXT) \ + PeerInteraction.$(OBJEXT) MultiDiskWriter.$(OBJEXT) \ DiskAdaptor.$(OBJEXT) CopyDiskAdaptor.$(OBJEXT) \ DirectDiskAdaptor.$(OBJEXT) MultiDiskAdaptor.$(OBJEXT) \ LogFactory.$(OBJEXT) TrackerUpdateCommand.$(OBJEXT) \ @@ -328,7 +328,7 @@ SRCS = Socket.cc Socket.h\ Directory.cc Directory.h\ TrackerWatcherCommand.cc TrackerWatcherCommand.h\ messageDigest.h\ - SendMessageQueue.cc SendMessageQueue.h\ + PeerInteraction.cc PeerInteraction.h\ MultiDiskWriter.cc MultiDiskWriter.h\ DiskAdaptor.cc DiskAdaptor.h\ CopyDiskAdaptor.cc CopyDiskAdaptor.h\ @@ -487,6 +487,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PeerChokeCommand.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PeerConnection.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PeerInitiateConnectionCommand.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PeerInteraction.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PeerInteractionCommand.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PeerListenCommand.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PeerMessage.Po@am__quote@ @@ -499,7 +500,6 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/RequestSlot.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SegmentMan.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SegmentSplitter.Po@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SendMessageQueue.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ShaVisitor.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SimpleLogger.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SleepCommand.Po@am__quote@ diff --git a/src/NotInterestedMessage.cc b/src/NotInterestedMessage.cc index 173ad703..dafcd095 100644 --- a/src/NotInterestedMessage.cc +++ b/src/NotInterestedMessage.cc @@ -20,7 +20,7 @@ */ /* copyright --> */ #include "NotInterestedMessage.h" -#include "SendMessageQueue.h" +#include "PeerInteraction.h" void NotInterestedMessage::receivedAction() { peer->peerInterested = false; @@ -28,7 +28,7 @@ void NotInterestedMessage::receivedAction() { void NotInterestedMessage::send() { if(peer->amInterested) { - sendMessageQueue->getPeerConnection()->sendNotInterested(); + peerInteraction->getPeerConnection()->sendNotInterested(); peer->amInterested = false; } } diff --git a/src/SendMessageQueue.cc b/src/PeerInteraction.cc similarity index 82% rename from src/SendMessageQueue.cc rename to src/PeerInteraction.cc index d3e66624..39307fe5 100644 --- a/src/SendMessageQueue.cc +++ b/src/PeerInteraction.cc @@ -19,18 +19,18 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /* copyright --> */ -#include "SendMessageQueue.h" +#include "PeerInteraction.h" #include "LogFactory.h" #include "DlAbortEx.h" #include "KeepAliveMessage.h" #include "PeerMessageUtil.h" #include <netinet/in.h> -SendMessageQueue::SendMessageQueue(int cuid, - const Socket* socket, - const Option* op, - TorrentMan* torrentMan, - Peer* peer) +PeerInteraction::PeerInteraction(int cuid, + const Socket* socket, + const Option* op, + TorrentMan* torrentMan, + Peer* peer) :cuid(cuid), uploadLimit(0), torrentMan(torrentMan), @@ -40,12 +40,12 @@ SendMessageQueue::SendMessageQueue(int cuid, logger = LogFactory::getInstance(); } -SendMessageQueue::~SendMessageQueue() { +PeerInteraction::~PeerInteraction() { delete peerConnection; for_each(messageQueue.begin(), messageQueue.end(), Deleter()); } -void SendMessageQueue::send(int uploadSpeed) { +void PeerInteraction::send(int uploadSpeed) { int size = messageQueue.size(); for(int i = 0; i < size; i++) { PeerMessage* msg = messageQueue.front(); @@ -70,7 +70,7 @@ void SendMessageQueue::send(int uploadSpeed) { } } -void SendMessageQueue::addMessage(PeerMessage* peerMessage) { +void PeerInteraction::addMessage(PeerMessage* peerMessage) { messageQueue.push_back(peerMessage); if(peerMessage->getId() == RequestMessage::ID) { RequestMessage* requestMessage = (RequestMessage*)peerMessage; @@ -82,7 +82,7 @@ void SendMessageQueue::addMessage(PeerMessage* peerMessage) { } } -void SendMessageQueue::deletePieceMessageInQueue(const CancelMessage* cancelMessage) { +void PeerInteraction::deletePieceMessageInQueue(const CancelMessage* cancelMessage) { for(MessageQueue::iterator itr = messageQueue.begin(); itr != messageQueue.end();) { if((*itr)->getId() == PieceMessage::ID) { @@ -107,7 +107,7 @@ void SendMessageQueue::deletePieceMessageInQueue(const CancelMessage* cancelMess } } -void SendMessageQueue::deleteRequestMessageInQueue() { +void PeerInteraction::deleteRequestMessageInQueue() { for(MessageQueue::iterator itr = messageQueue.begin(); itr != messageQueue.end();) { if((*itr)->getId() == RequestMessage::ID) { @@ -119,7 +119,7 @@ void SendMessageQueue::deleteRequestMessageInQueue() { } } -void SendMessageQueue::deleteRequestSlot(const RequestSlot& requestSlot) { +void PeerInteraction::deleteRequestSlot(const RequestSlot& requestSlot) { // TODO use STL algorithm for(RequestSlots::iterator itr = requestSlots.begin(); itr != requestSlots.end(); itr++) { @@ -130,7 +130,7 @@ void SendMessageQueue::deleteRequestSlot(const RequestSlot& requestSlot) { } } -void SendMessageQueue::deleteAllRequestSlot(Piece& piece) { +void PeerInteraction::deleteAllRequestSlot(Piece& piece) { if(!Piece::isNull(piece)) { for(RequestSlots::const_iterator itr = requestSlots.begin(); itr != requestSlots.end(); itr++) { @@ -143,7 +143,7 @@ void SendMessageQueue::deleteAllRequestSlot(Piece& piece) { requestSlots.clear(); } -void SendMessageQueue::deleteTimeoutRequestSlot() { +void PeerInteraction::deleteTimeoutRequestSlot() { for(RequestSlots::iterator itr = requestSlots.begin(); itr != requestSlots.end();) { if(itr->isTimeout(REQUEST_TIME_OUT)) { @@ -161,7 +161,7 @@ void SendMessageQueue::deleteTimeoutRequestSlot() { torrentMan->updatePiece(piece); } -void SendMessageQueue::deleteCompletedRequestSlot() { +void PeerInteraction::deleteCompletedRequestSlot() { for(RequestSlots::iterator itr = requestSlots.begin(); itr != requestSlots.end();) { if(Piece::isNull(piece) || piece.hasBlock(itr->getBlockIndex()) || @@ -176,7 +176,7 @@ void SendMessageQueue::deleteCompletedRequestSlot() { } } -RequestSlot SendMessageQueue::getCorrespondingRequestSlot(const PieceMessage* pieceMessage) const { +RequestSlot PeerInteraction::getCorrespondingRequestSlot(const PieceMessage* pieceMessage) const { for(RequestSlots::const_iterator itr = requestSlots.begin(); itr != requestSlots.end(); itr++) { const RequestSlot& slot = *itr; @@ -189,24 +189,24 @@ RequestSlot SendMessageQueue::getCorrespondingRequestSlot(const PieceMessage* pi return RequestSlot::nullSlot; } -void SendMessageQueue::cancelAllRequest() { +void PeerInteraction::cancelAllRequest() { cancelAllRequest(Piece::nullPiece); } -void SendMessageQueue::cancelAllRequest(Piece& piece) { +void PeerInteraction::cancelAllRequest(Piece& piece) { deleteRequestMessageInQueue(); deleteAllRequestSlot(piece); } -int SendMessageQueue::countMessageInQueue() const { +int PeerInteraction::countMessageInQueue() const { return messageQueue.size(); } -int SendMessageQueue::countRequestSlot() const { +int PeerInteraction::countRequestSlot() const { return requestSlots.size(); } -HandshakeMessage* SendMessageQueue::receiveHandshake() { +HandshakeMessage* PeerInteraction::receiveHandshake() { char msg[HANDSHAKE_MESSAGE_LENGTH]; int msgLength = 0; if(!peerConnection->receiveHandshake(msg, msgLength)) { @@ -222,13 +222,13 @@ HandshakeMessage* SendMessageQueue::receiveHandshake() { return handshakeMessage; } -HandshakeMessage* SendMessageQueue::createHandshakeMessage(const char* msg, int msgLength) { +HandshakeMessage* PeerInteraction::createHandshakeMessage(const char* msg, int msgLength) { HandshakeMessage* message = PeerMessageUtil::createHandshakeMessage(msg, msgLength); - message->setSendMessageQueue(this); + message->setPeerInteraction(this); return message; } -PeerMessage* SendMessageQueue::receiveMessage() { +PeerMessage* PeerInteraction::receiveMessage() { char msg[MAX_PAYLOAD_LEN]; int msgLength = 0; if(!peerConnection->receiveMessage(msg, msgLength)) { @@ -244,7 +244,7 @@ PeerMessage* SendMessageQueue::receiveMessage() { return peerMessage; } -PeerMessage* SendMessageQueue::createPeerMessage(const char* msg, int msgLength) { +PeerMessage* PeerInteraction::createPeerMessage(const char* msg, int msgLength) { PeerMessage* peerMessage; if(msgLength == 0) { // keep-alive @@ -299,14 +299,14 @@ PeerMessage* SendMessageQueue::createPeerMessage(const char* msg, int msgLength) } -void SendMessageQueue::syncPiece() { +void PeerInteraction::syncPiece() { if(Piece::isNull(piece)) { return; } torrentMan->syncPiece(piece); } -Piece SendMessageQueue::getNewPieceAndSendInterest() { +Piece PeerInteraction::getNewPieceAndSendInterest() { cancelAllRequest(); Piece piece = torrentMan->getMissingPiece(peer); if(Piece::isNull(piece)) { @@ -322,7 +322,7 @@ Piece SendMessageQueue::getNewPieceAndSendInterest() { return piece; } -void SendMessageQueue::sendMessages(int currentUploadSpeed) { +void PeerInteraction::sendMessages(int currentUploadSpeed) { if(Piece::isNull(piece)) { // retrive new piece from TorrentMan piece = getNewPieceAndSendInterest(); @@ -358,13 +358,13 @@ void SendMessageQueue::sendMessages(int currentUploadSpeed) { send(currentUploadSpeed); } -void SendMessageQueue::sendNow(PeerMessage* peerMessage) { +void PeerInteraction::sendNow(PeerMessage* peerMessage) { // ignore inProgress state peerMessage->send(); delete peerMessage; } -void SendMessageQueue::trySendNow(PeerMessage* peerMessage) { +void PeerInteraction::trySendNow(PeerMessage* peerMessage) { if(countMessageInQueue() == 0) { sendNow(peerMessage); } else { @@ -372,29 +372,29 @@ void SendMessageQueue::trySendNow(PeerMessage* peerMessage) { } } -void SendMessageQueue::sendHandshake() { +void PeerInteraction::sendHandshake() { peerConnection->sendHandshake(); } -void SendMessageQueue::abortPiece() { +void PeerInteraction::abortPiece() { cancelAllRequest(piece); torrentMan->cancelPiece(piece); } -Piece& SendMessageQueue::getDownloadPiece() { +Piece& PeerInteraction::getDownloadPiece() { if(Piece::isNull(piece)) { throw new DlAbortEx("current piece is null"); } return piece; } -void SendMessageQueue::setPeerMessageCommonProperty(PeerMessage* peerMessage) { +void PeerInteraction::setPeerMessageCommonProperty(PeerMessage* peerMessage) { peerMessage->setPeer(peer); peerMessage->setCuid(cuid); - peerMessage->setSendMessageQueue(this); + peerMessage->setPeerInteraction(this); } -RequestMessage* SendMessageQueue::createRequestMessage(int blockIndex) { +RequestMessage* PeerInteraction::createRequestMessage(int blockIndex) { RequestMessage* msg = PeerMessageUtil::createRequestMessage(piece.getIndex(), blockIndex*piece.getBlockLength(), @@ -404,57 +404,57 @@ RequestMessage* SendMessageQueue::createRequestMessage(int blockIndex) { return msg; } -CancelMessage* SendMessageQueue::createCancelMessage(int index, int begin, int length) { +CancelMessage* PeerInteraction::createCancelMessage(int index, int begin, int length) { CancelMessage* msg = PeerMessageUtil::createCancelMessage(index, begin, length); setPeerMessageCommonProperty(msg); return msg; } -PieceMessage* SendMessageQueue::createPieceMessage(int index, int begin, int length) { +PieceMessage* PeerInteraction::createPieceMessage(int index, int begin, int length) { PieceMessage* msg = PeerMessageUtil::createPieceMessage(index, begin, length); setPeerMessageCommonProperty(msg); return msg; } -HaveMessage* SendMessageQueue::createHaveMessage(int index) { +HaveMessage* PeerInteraction::createHaveMessage(int index) { HaveMessage* msg = PeerMessageUtil::createHaveMessage(index); setPeerMessageCommonProperty(msg); return msg; } -ChokeMessage* SendMessageQueue::createChokeMessage() { +ChokeMessage* PeerInteraction::createChokeMessage() { ChokeMessage* msg = PeerMessageUtil::createChokeMessage(); setPeerMessageCommonProperty(msg); return msg; } -UnchokeMessage* SendMessageQueue::createUnchokeMessage() { +UnchokeMessage* PeerInteraction::createUnchokeMessage() { UnchokeMessage* msg = PeerMessageUtil::createUnchokeMessage(); setPeerMessageCommonProperty(msg); return msg; } -InterestedMessage* SendMessageQueue::createInterestedMessage() { +InterestedMessage* PeerInteraction::createInterestedMessage() { InterestedMessage* msg = PeerMessageUtil::createInterestedMessage(); setPeerMessageCommonProperty(msg); return msg; } -NotInterestedMessage* SendMessageQueue::createNotInterestedMessage() { +NotInterestedMessage* PeerInteraction::createNotInterestedMessage() { NotInterestedMessage* msg = PeerMessageUtil::createNotInterestedMessage(); setPeerMessageCommonProperty(msg); return msg; } -BitfieldMessage* SendMessageQueue::createBitfieldMessage() { +BitfieldMessage* PeerInteraction::createBitfieldMessage() { BitfieldMessage* msg = PeerMessageUtil::createBitfieldMessage(); setPeerMessageCommonProperty(msg); return msg; } -KeepAliveMessage* SendMessageQueue::createKeepAliveMessage() { +KeepAliveMessage* PeerInteraction::createKeepAliveMessage() { KeepAliveMessage* msg = PeerMessageUtil::createKeepAliveMessage(); setPeerMessageCommonProperty(msg); return msg; diff --git a/src/SendMessageQueue.h b/src/PeerInteraction.h similarity index 93% rename from src/SendMessageQueue.h rename to src/PeerInteraction.h index 37ebc479..289de683 100644 --- a/src/SendMessageQueue.h +++ b/src/PeerInteraction.h @@ -19,8 +19,8 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /* copyright --> */ -#ifndef _D_SEND_MESSAGE_QUEUE_H_ -#define _D_SEND_MESSAGE_QUEUE_H_ +#ifndef _D_PEER_INTERACTION_H_ +#define _D_PEER_INTERACTION_H_ #include "common.h" #include "PeerConnection.h" @@ -43,7 +43,7 @@ typedef deque<RequestSlot> RequestSlots; typedef deque<PeerMessage*> MessageQueue; -class SendMessageQueue { +class PeerInteraction { private: int cuid; RequestSlots requestSlots; @@ -67,12 +67,12 @@ private: void cancelAllRequest(Piece& piece); int countRequestSlot() const; public: - SendMessageQueue(int cuid, - const Socket* socket, - const Option* op, - TorrentMan* torrentMan, - Peer* peer); - ~SendMessageQueue(); + PeerInteraction(int cuid, + const Socket* socket, + const Option* op, + TorrentMan* torrentMan, + Peer* peer); + ~PeerInteraction(); void addMessage(PeerMessage* peerMessage); void deletePieceMessageInQueue(const CancelMessage* cancelMessage); @@ -125,4 +125,4 @@ public: KeepAliveMessage* createKeepAliveMessage(); }; -#endif // _D_SEND_MESSAGE_QUEUE_H_ +#endif // _D_PEER_INTERACTION_H_ diff --git a/src/PeerInteractionCommand.cc b/src/PeerInteractionCommand.cc index bac480ab..08afbb1d 100644 --- a/src/PeerInteractionCommand.cc +++ b/src/PeerInteractionCommand.cc @@ -37,9 +37,9 @@ PeerInteractionCommand::PeerInteractionCommand(int cuid, Peer* peer, setWriteCheckSocket(socket); setTimeout(e->option->getAsInt(PREF_PEER_CONNECTION_TIMEOUT)); } - sendMessageQueue = new SendMessageQueue(cuid, socket, e->option, - e->torrentMan, this->peer); - sendMessageQueue->setUploadLimit(e->option->getAsInt(PREF_UPLOAD_LIMIT)); + peerInteraction = new PeerInteraction(cuid, socket, e->option, + e->torrentMan, this->peer); + peerInteraction->setUploadLimit(e->option->getAsInt(PREF_UPLOAD_LIMIT)); keepAliveCheckPoint.tv_sec = 0; keepAliveCheckPoint.tv_usec = 0; chokeCheckPoint.tv_sec = 0; @@ -53,7 +53,7 @@ PeerInteractionCommand::PeerInteractionCommand(int cuid, Peer* peer, } PeerInteractionCommand::~PeerInteractionCommand() { - delete sendMessageQueue; + delete peerInteraction; e->torrentMan->unadvertisePiece(cuid); e->torrentMan->deleteActivePeer(this->peer); } @@ -68,11 +68,11 @@ bool PeerInteractionCommand::executeInternal() { switch(sequence) { case INITIATOR_SEND_HANDSHAKE: - sendMessageQueue->sendHandshake(); + peerInteraction->sendHandshake(); sequence = INITIATOR_WAIT_HANDSHAKE; break; case INITIATOR_WAIT_HANDSHAKE: { - HandshakeMessage* handshakeMessage = sendMessageQueue->receiveHandshake(); + HandshakeMessage* handshakeMessage = peerInteraction->receiveHandshake(); if(handshakeMessage == NULL) { break; } @@ -82,13 +82,13 @@ bool PeerInteractionCommand::executeInternal() { handshakeMessage->toString().c_str()); delete handshakeMessage; if(e->torrentMan->getDownloadLength() > 0) { - sendMessageQueue->sendNow(sendMessageQueue->createBitfieldMessage()); + peerInteraction->sendNow(peerInteraction->createBitfieldMessage()); } sequence = WIRED; break; } case RECEIVER_WAIT_HANDSHAKE: { - HandshakeMessage* handshakeMessage = sendMessageQueue->receiveHandshake(); + HandshakeMessage* handshakeMessage = peerInteraction->receiveHandshake(); if(handshakeMessage == NULL) { break; } @@ -97,15 +97,15 @@ bool PeerInteractionCommand::executeInternal() { peer->ipaddr.c_str(), peer->port, handshakeMessage->toString().c_str()); delete handshakeMessage; - sendMessageQueue->sendHandshake(); + peerInteraction->sendHandshake(); if(e->torrentMan->getDownloadLength() > 0) { - sendMessageQueue->sendNow(sendMessageQueue->createBitfieldMessage()); + peerInteraction->sendNow(peerInteraction->createBitfieldMessage()); } sequence = WIRED; break; } case WIRED: - sendMessageQueue->syncPiece(); + peerInteraction->syncPiece(); decideChoking(); for(int i = 0; i < 10; i++) { if(!socket->isReadable(0)) { @@ -113,12 +113,12 @@ bool PeerInteractionCommand::executeInternal() { } receiveMessage(); } - sendMessageQueue->deleteTimeoutRequestSlot(); - sendMessageQueue->deleteCompletedRequestSlot(); - sendMessageQueue->sendMessages(e->getUploadSpeed()); + peerInteraction->deleteTimeoutRequestSlot(); + peerInteraction->deleteCompletedRequestSlot(); + peerInteraction->sendMessages(e->getUploadSpeed()); break; } - if(sendMessageQueue->countMessageInQueue() > 0) { + if(peerInteraction->countMessageInQueue() > 0) { setWriteCheckSocket(socket); } e->commands.push_back(this); @@ -171,17 +171,17 @@ void PeerInteractionCommand::checkLongTimePeerChoking() { void PeerInteractionCommand::decideChoking() { if(peer->shouldBeChoking()) { if(!peer->amChoking) { - sendMessageQueue->addMessage(sendMessageQueue->createChokeMessage()); + peerInteraction->addMessage(peerInteraction->createChokeMessage()); } } else { if(peer->amChoking) { - sendMessageQueue->addMessage(sendMessageQueue->createUnchokeMessage()); + peerInteraction->addMessage(peerInteraction->createUnchokeMessage()); } } } void PeerInteractionCommand::receiveMessage() { - PeerMessage* message = sendMessageQueue->receiveMessage(); + PeerMessage* message = peerInteraction->receiveMessage(); if(message == NULL) { return; } @@ -234,7 +234,7 @@ bool PeerInteractionCommand::prepareForRetry(int wait) { } void PeerInteractionCommand::onAbort(Exception* ex) { - sendMessageQueue->abortPiece(); + peerInteraction->abortPiece(); PeerAbstractCommand::onAbort(ex); } @@ -245,8 +245,8 @@ void PeerInteractionCommand::keepAlive() { struct timeval now; gettimeofday(&now, NULL); if(Util::difftv(now, keepAliveCheckPoint) >= (long long int)120*1000000) { - if(sendMessageQueue->countMessageInQueue() == 0) { - sendMessageQueue->sendNow(sendMessageQueue->createKeepAliveMessage()); + if(peerInteraction->countMessageInQueue() == 0) { + peerInteraction->sendNow(peerInteraction->createKeepAliveMessage()); } keepAliveCheckPoint = now; } @@ -261,10 +261,10 @@ void PeerInteractionCommand::beforeSocketCheck() { PieceIndexes indexes = e->torrentMan->getAdvertisedPieceIndexes(cuid); if(indexes.size() >= 20) { - sendMessageQueue->trySendNow(sendMessageQueue->createBitfieldMessage()); + peerInteraction->trySendNow(peerInteraction->createBitfieldMessage()); } else { for(PieceIndexes::iterator itr = indexes.begin(); itr != indexes.end(); itr++) { - sendMessageQueue->trySendNow(sendMessageQueue->createHaveMessage(*itr)); + peerInteraction->trySendNow(peerInteraction->createHaveMessage(*itr)); } } keepAlive(); diff --git a/src/PeerInteractionCommand.h b/src/PeerInteractionCommand.h index 35efeb1b..9cad831e 100644 --- a/src/PeerInteractionCommand.h +++ b/src/PeerInteractionCommand.h @@ -24,7 +24,7 @@ #include "PeerAbstractCommand.h" #include "PeerConnection.h" -#include "SendMessageQueue.h" +#include "PeerInteraction.h" using namespace std; @@ -33,7 +33,7 @@ using namespace std; class PeerInteractionCommand : public PeerAbstractCommand { private: int sequence; - SendMessageQueue* sendMessageQueue; + PeerInteraction* peerInteraction; struct timeval keepAliveCheckPoint; struct timeval chokeCheckPoint; diff --git a/src/PeerMessage.h b/src/PeerMessage.h index 78da44f1..4170aa8b 100644 --- a/src/PeerMessage.h +++ b/src/PeerMessage.h @@ -27,14 +27,14 @@ #include "Peer.h" #include <string> -class SendMessageQueue; +class PeerInteraction; class PeerMessage { protected: bool inProgress; int cuid; Peer* peer; - SendMessageQueue* sendMessageQueue; + PeerInteraction* peerInteraction; const Logger* logger; public: PeerMessage(); @@ -51,9 +51,9 @@ public: void setPeer(Peer* peer) { this->peer = peer; } - SendMessageQueue* getSendMessageQueue() const { return sendMessageQueue; } - void setSendMessageQueue(SendMessageQueue* sendMessageQueue) { - this->sendMessageQueue = sendMessageQueue; + PeerInteraction* getPeerInteraction() const { return peerInteraction; } + void setPeerInteraction(PeerInteraction* peerInteraction) { + this->peerInteraction = peerInteraction; } virtual int getId() const = 0; diff --git a/src/PieceMessage.cc b/src/PieceMessage.cc index 28d84691..2c8f7d97 100644 --- a/src/PieceMessage.cc +++ b/src/PieceMessage.cc @@ -21,7 +21,7 @@ /* copyright --> */ #include "PieceMessage.h" #include "PeerMessageUtil.h" -#include "SendMessageQueue.h" +#include "PeerInteraction.h" #include "Util.h" #include "message.h" @@ -35,12 +35,12 @@ void PieceMessage::setBlock(const char* block, int blockLength) { } void PieceMessage::receivedAction() { - TorrentMan* torrentMan = sendMessageQueue->getTorrentMan(); - RequestSlot slot = sendMessageQueue->getCorrespondingRequestSlot(this); + TorrentMan* torrentMan = peerInteraction->getTorrentMan(); + RequestSlot slot = peerInteraction->getCorrespondingRequestSlot(this); peer->addPeerUpload(blockLength); - if(sendMessageQueue->hasDownloadPiece() && + if(peerInteraction->hasDownloadPiece() && !RequestSlot::isNull(slot)) { - Piece& piece = sendMessageQueue->getDownloadPiece(); + Piece& piece = peerInteraction->getDownloadPiece(); long long int offset = ((long long int)index)*torrentMan->pieceLength+begin; logger->debug("CUID#%d - write block length = %d, offset=%lld", @@ -49,7 +49,7 @@ void PieceMessage::receivedAction() { blockLength, offset); piece.completeBlock(slot.getBlockIndex()); - sendMessageQueue->deleteRequestSlot(slot); + peerInteraction->deleteRequestSlot(slot); torrentMan->updatePiece(piece); logger->debug("CUID#%d - setting piece bit index=%d", cuid, slot.getBlockIndex()); @@ -66,14 +66,14 @@ void PieceMessage::receivedAction() { void PieceMessage::send() { if((!peer->amChoking && peer->peerInterested) || inProgress) { - PeerConnection* peerConnection = sendMessageQueue->getPeerConnection(); + PeerConnection* peerConnection = peerInteraction->getPeerConnection(); if(!inProgress) { peerConnection->sendPieceHeader(index, begin, blockLength); peer->addPeerDownload(blockLength); leftPieceDataLength = blockLength; } inProgress = false; - int pieceLength = sendMessageQueue->getTorrentMan()->pieceLength; + int pieceLength = peerInteraction->getTorrentMan()->pieceLength; long long int pieceDataOffset = ((long long int)index)*pieceLength+begin+blockLength-leftPieceDataLength; int writtenLength = @@ -96,7 +96,7 @@ string PieceMessage::toString() const { } bool PieceMessage::checkPieceHash(const Piece& piece) { - TorrentMan* torrentMan = sendMessageQueue->getTorrentMan(); + TorrentMan* torrentMan = peerInteraction->getTorrentMan(); long long int offset = ((long long int)piece.getIndex())*torrentMan->pieceLength; return torrentMan->diskAdaptor->sha1Sum(offset, piece.getLength()) == @@ -104,7 +104,7 @@ bool PieceMessage::checkPieceHash(const Piece& piece) { } void PieceMessage::onGotNewPiece(Piece& piece) { - TorrentMan* torrentMan = sendMessageQueue->getTorrentMan(); + TorrentMan* torrentMan = peerInteraction->getTorrentMan(); logger->info(MSG_GOT_NEW_PIECE, cuid, piece.getIndex()); torrentMan->completePiece(piece); torrentMan->advertisePiece(cuid, piece.getIndex()); @@ -112,7 +112,7 @@ void PieceMessage::onGotNewPiece(Piece& piece) { } void PieceMessage::onGotWrongPiece(Piece& piece) { - TorrentMan* torrentMan = sendMessageQueue->getTorrentMan(); + TorrentMan* torrentMan = peerInteraction->getTorrentMan(); logger->error(MSG_GOT_WRONG_PIECE, cuid, piece.getIndex()); erasePieceOnDisk(piece); piece.clearAllBlock(); @@ -120,7 +120,7 @@ void PieceMessage::onGotWrongPiece(Piece& piece) { } void PieceMessage::erasePieceOnDisk(const Piece& piece) { - TorrentMan* torrentMan = sendMessageQueue->getTorrentMan(); + TorrentMan* torrentMan = peerInteraction->getTorrentMan(); int BUFSIZE = 4096; char buf[BUFSIZE]; memset(buf, 0, BUFSIZE); diff --git a/src/PieceMessage.h b/src/PieceMessage.h index c9d8e56a..7ebcba6f 100644 --- a/src/PieceMessage.h +++ b/src/PieceMessage.h @@ -25,8 +25,6 @@ #include "PeerMessage.h" #include "TorrentMan.h" -class SendMessageQueue; - class PieceMessage : public PeerMessage { private: int index; diff --git a/src/RequestMessage.cc b/src/RequestMessage.cc index 3d7e737e..a3acd626 100644 --- a/src/RequestMessage.cc +++ b/src/RequestMessage.cc @@ -20,14 +20,14 @@ */ /* copyright --> */ #include "RequestMessage.h" -#include "SendMessageQueue.h" +#include "PeerInteraction.h" #include "PeerMessageUtil.h" #include "Util.h" void RequestMessage::receivedAction() { - TorrentMan* torrentMan = sendMessageQueue->getTorrentMan(); + TorrentMan* torrentMan = peerInteraction->getTorrentMan(); if(torrentMan->hasPiece(index)) { - sendMessageQueue->addMessage(sendMessageQueue->createPieceMessage(index, begin, length)); + peerInteraction->addMessage(peerInteraction->createPieceMessage(index, begin, length)); torrentMan->addUploadLength(length); torrentMan->addDeltaUploadLength(length); } @@ -35,7 +35,7 @@ void RequestMessage::receivedAction() { void RequestMessage::send() { if(!peer->peerChoking) { - sendMessageQueue->getPeerConnection()->sendRequest(index, begin, length); + peerInteraction->getPeerConnection()->sendRequest(index, begin, length); } } diff --git a/src/UnchokeMessage.cc b/src/UnchokeMessage.cc index 92645ab7..61894998 100644 --- a/src/UnchokeMessage.cc +++ b/src/UnchokeMessage.cc @@ -20,7 +20,7 @@ */ /* copyright --> */ #include "UnchokeMessage.h" -#include "SendMessageQueue.h" +#include "PeerInteraction.h" void UnchokeMessage::receivedAction() { peer->peerChoking = false; @@ -28,7 +28,7 @@ void UnchokeMessage::receivedAction() { void UnchokeMessage::send() { if(peer->amChoking) { - sendMessageQueue->getPeerConnection()->sendUnchoke(); + peerInteraction->getPeerConnection()->sendUnchoke(); peer->amChoking = false; } }