diff options
author | Gavin Andresen <gavinandresen@gmail.com> | 2012-04-04 15:56:00 -0400 |
---|---|---|
committer | Gavin Andresen <gavinandresen@gmail.com> | 2012-04-04 15:56:00 -0400 |
commit | f487746ded64dc472342ae876a2cded79ec1e338 (patch) | |
tree | 3e01f9df9febc91e5fa748fd690c4c75673e624e | |
parent | b0a7e05a45a925d78efd00ecca6dce9b7a9530f9 (diff) |
Remove half-implemented publish/subscribe system
-rw-r--r-- | src/net.cpp | 106 | ||||
-rw-r--r-- | src/net.h | 61 |
2 files changed, 0 insertions, 167 deletions
diff --git a/src/net.cpp b/src/net.cpp index 37e73c421a..7dc2d4c22a 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -289,105 +289,6 @@ void AddressCurrentlyConnected(const CService& addr) -void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1) -{ - // If the dialog might get closed before the reply comes back, - // call this in the destructor so it doesn't get called after it's deleted. - CRITICAL_BLOCK(cs_vNodes) - { - BOOST_FOREACH(CNode* pnode, vNodes) - { - CRITICAL_BLOCK(pnode->cs_mapRequests) - { - for (map<uint256, CRequestTracker>::iterator mi = pnode->mapRequests.begin(); mi != pnode->mapRequests.end();) - { - CRequestTracker& tracker = (*mi).second; - if (tracker.fn == fn && tracker.param1 == param1) - pnode->mapRequests.erase(mi++); - else - mi++; - } - } - } - } -} - - - - - - - -// -// Subscription methods for the broadcast and subscription system. -// Channel numbers are message numbers, i.e. MSG_TABLE and MSG_PRODUCT. -// -// The subscription system uses a meet-in-the-middle strategy. -// With 100,000 nodes, if senders broadcast to 1000 random nodes and receivers -// subscribe to 1000 random nodes, 99.995% (1 - 0.99^1000) of messages will get through. -// - -bool AnySubscribed(unsigned int nChannel) -{ - if (pnodeLocalHost->IsSubscribed(nChannel)) - return true; - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode->IsSubscribed(nChannel)) - return true; - return false; -} - -bool CNode::IsSubscribed(unsigned int nChannel) -{ - if (nChannel >= vfSubscribe.size()) - return false; - return vfSubscribe[nChannel]; -} - -void CNode::Subscribe(unsigned int nChannel, unsigned int nHops) -{ - if (nChannel >= vfSubscribe.size()) - return; - - if (!AnySubscribed(nChannel)) - { - // Relay subscribe - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode != this) - pnode->PushMessage("subscribe", nChannel, nHops); - } - - vfSubscribe[nChannel] = true; -} - -void CNode::CancelSubscribe(unsigned int nChannel) -{ - if (nChannel >= vfSubscribe.size()) - return; - - // Prevent from relaying cancel if wasn't subscribed - if (!vfSubscribe[nChannel]) - return; - vfSubscribe[nChannel] = false; - - if (!AnySubscribed(nChannel)) - { - // Relay subscription cancel - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode != this) - pnode->PushMessage("sub-cancel", nChannel); - } -} - - - - - - - CNode* FindNode(const CNetAddr& ip) @@ -486,13 +387,6 @@ void CNode::CloseSocketDisconnect() void CNode::Cleanup() { - // All of a nodes broadcasts and subscriptions are automatically torn down - // when it goes down, so a node has to stay up to keep its broadcast going. - - // Cancel subscriptions - for (unsigned int nChannel = 0; nChannel < vfSubscribe.size(); nChannel++) - if (vfSubscribe[nChannel]) - CancelSubscribe(nChannel); } @@ -29,7 +29,6 @@ extern int nBestHeight; inline unsigned int ReceiveBufferSize() { return 1000*GetArg("-maxreceivebuffer", 10*1000); } inline unsigned int SendBufferSize() { return 1000*GetArg("-maxsendbuffer", 10*1000); } -static const unsigned int PUBLISH_HOPS = 5; bool RecvLine(SOCKET hSocket, std::string& strLine); bool GetMyExternalIP(CNetAddr& ipRet); @@ -37,8 +36,6 @@ void AddressCurrentlyConnected(const CService& addr); CNode* FindNode(const CNetAddr& ip); CNode* FindNode(const CService& ip); CNode* ConnectNode(CAddress addrConnect, int64 nTimeout=0); -void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1); -bool AnySubscribed(unsigned int nChannel); void MapPort(bool fMapPort); bool BindListenPort(std::string& strError=REF(std::string())); void StartNode(void* parg); @@ -160,9 +157,6 @@ public: CCriticalSection cs_inventory; std::multimap<int64, CInv> mapAskFor; - // publish and subscription - std::vector<char> vfSubscribe; - CNode(SOCKET hSocketIn, CAddress addrIn, bool fInboundIn=false) { nServices = 0; @@ -192,7 +186,6 @@ public: hashLastGetBlocksEnd = 0; nStartingHeight = -1; fGetAddr = false; - vfSubscribe.assign(256, false); nMisbehavior = 0; setInventoryKnown.max_size(SendBufferSize() / 1000); @@ -634,58 +627,4 @@ inline void RelayMessage<>(const CInv& inv, const CDataStream& ss) } - - - - - - -// -// Templates for the publish and subscription system. -// The object being published as T& obj needs to have: -// a set<unsigned int> setSources member -// specializations of AdvertInsert and AdvertErase -// Currently implemented for CTable and CProduct. -// - -template<typename T> -void AdvertStartPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj) -{ - // Add to sources - obj.setSources.insert(pfrom->addr.ip); - - if (!AdvertInsert(obj)) - return; - - // Relay - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel))) - pnode->PushMessage("publish", nChannel, nHops, obj); -} - -template<typename T> -void AdvertStopPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj) -{ - uint256 hash = obj.GetHash(); - - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel))) - pnode->PushMessage("pub-cancel", nChannel, nHops, hash); - - AdvertErase(obj); -} - -template<typename T> -void AdvertRemoveSource(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj) -{ - // Remove a source - obj.setSources.erase(pfrom->addr.ip); - - // If no longer supported by any sources, cancel it - if (obj.setSources.empty()) - AdvertStopPublish(pfrom, nChannel, nHops, obj); -} - #endif |