diff options
Diffstat (limited to 'net.h')
-rw-r--r-- | net.h | 2110 |
1 files changed, 1055 insertions, 1055 deletions
@@ -1,1055 +1,1055 @@ -// Copyright (c) 2009-2010 Satoshi Nakamoto
-// Distributed under the MIT/X11 software license, see the accompanying
-// file license.txt or http://www.opensource.org/licenses/mit-license.php.
-
-class CMessageHeader;
-class CAddress;
-class CInv;
-class CRequestTracker;
-class CNode;
-class CBlockIndex;
-extern int nBestHeight;
-
-
-
-static const unsigned short DEFAULT_PORT = 0x8d20; // htons(8333)
-static const unsigned int PUBLISH_HOPS = 5;
-enum
-{
- NODE_NETWORK = (1 << 0),
-};
-
-
-
-
-bool ConnectSocket(const CAddress& addrConnect, SOCKET& hSocketRet);
-bool GetMyExternalIP(unsigned int& ipRet);
-bool AddAddress(CAddress addr);
-void AddressCurrentlyConnected(const CAddress& addr);
-CNode* FindNode(unsigned int ip);
-CNode* ConnectNode(CAddress addrConnect, int64 nTimeout=0);
-void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1);
-bool AnySubscribed(unsigned int nChannel);
-bool BindListenPort(string& strError=REF(string()));
-void StartNode(void* parg);
-bool StopNode();
-
-
-
-
-
-
-
-
-//
-// Message header
-// (4) message start
-// (12) command
-// (4) size
-// (4) checksum
-
-// The message start string is designed to be unlikely to occur in normal data.
-// The characters are rarely used upper ascii, not valid as UTF-8, and produce
-// a large 4-byte int at any alignment.
-static const char pchMessageStart[4] = { 0xf9, 0xbe, 0xb4, 0xd9 };
-
-class CMessageHeader
-{
-public:
- enum { COMMAND_SIZE=12 };
- char pchMessageStart[sizeof(::pchMessageStart)];
- char pchCommand[COMMAND_SIZE];
- unsigned int nMessageSize;
- unsigned int nChecksum;
-
- CMessageHeader()
- {
- memcpy(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart));
- memset(pchCommand, 0, sizeof(pchCommand));
- pchCommand[1] = 1;
- nMessageSize = -1;
- nChecksum = 0;
- }
-
- CMessageHeader(const char* pszCommand, unsigned int nMessageSizeIn)
- {
- memcpy(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart));
- strncpy(pchCommand, pszCommand, COMMAND_SIZE);
- nMessageSize = nMessageSizeIn;
- nChecksum = 0;
- }
-
- IMPLEMENT_SERIALIZE
- (
- READWRITE(FLATDATA(pchMessageStart));
- READWRITE(FLATDATA(pchCommand));
- READWRITE(nMessageSize);
- if (nVersion >= 209)
- READWRITE(nChecksum);
- )
-
- string GetCommand()
- {
- if (pchCommand[COMMAND_SIZE-1] == 0)
- return string(pchCommand, pchCommand + strlen(pchCommand));
- else
- return string(pchCommand, pchCommand + COMMAND_SIZE);
- }
-
- bool IsValid()
- {
- // Check start string
- if (memcmp(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart)) != 0)
- return false;
-
- // Check the command string for errors
- for (char* p1 = pchCommand; p1 < pchCommand + COMMAND_SIZE; p1++)
- {
- if (*p1 == 0)
- {
- // Must be all zeros after the first zero
- for (; p1 < pchCommand + COMMAND_SIZE; p1++)
- if (*p1 != 0)
- return false;
- }
- else if (*p1 < ' ' || *p1 > 0x7E)
- return false;
- }
-
- // Message size
- if (nMessageSize > 0x10000000)
- {
- printf("CMessageHeader::IsValid() : nMessageSize too large %u\n", nMessageSize);
- return false;
- }
-
- return true;
- }
-};
-
-
-
-
-
-
-static const unsigned char pchIPv4[12] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff };
-
-class CAddress
-{
-public:
- uint64 nServices;
- unsigned char pchReserved[12];
- unsigned int ip;
- unsigned short port;
-
- // disk only
- unsigned int nTime;
-
- // memory only
- unsigned int nLastTry;
-
- CAddress()
- {
- Init();
- }
-
- CAddress(unsigned int ipIn, unsigned short portIn=DEFAULT_PORT, uint64 nServicesIn=NODE_NETWORK)
- {
- Init();
- ip = ipIn;
- port = portIn;
- nServices = nServicesIn;
- }
-
- explicit CAddress(const struct sockaddr_in& sockaddr, uint64 nServicesIn=NODE_NETWORK)
- {
- Init();
- ip = sockaddr.sin_addr.s_addr;
- port = sockaddr.sin_port;
- nServices = nServicesIn;
- }
-
- explicit CAddress(const char* pszIn, uint64 nServicesIn=NODE_NETWORK)
- {
- Init();
- SetAddress(pszIn);
- nServices = nServicesIn;
- }
-
- explicit CAddress(string strIn, uint64 nServicesIn=NODE_NETWORK)
- {
- Init();
- SetAddress(strIn.c_str());
- nServices = nServicesIn;
- }
-
- void Init()
- {
- nServices = NODE_NETWORK;
- memcpy(pchReserved, pchIPv4, sizeof(pchReserved));
- ip = INADDR_NONE;
- port = DEFAULT_PORT;
- nTime = GetAdjustedTime();
- nLastTry = 0;
- }
-
- bool SetAddress(const char* pszIn)
- {
- ip = INADDR_NONE;
- port = DEFAULT_PORT;
- char psz[100];
- strlcpy(psz, pszIn, sizeof(psz));
- unsigned int a=0, b=0, c=0, d=0, e=0;
- if (sscanf(psz, "%u.%u.%u.%u:%u", &a, &b, &c, &d, &e) < 4)
- return false;
- char* pszPort = strchr(psz, ':');
- if (pszPort)
- {
- *pszPort++ = '\0';
- port = htons(atoi(pszPort));
- if (atoi(pszPort) < 0 || atoi(pszPort) > USHRT_MAX)
- port = htons(USHRT_MAX);
- }
- ip = inet_addr(psz);
- return IsValid();
- }
-
- bool SetAddress(string strIn)
- {
- return SetAddress(strIn.c_str());
- }
-
- IMPLEMENT_SERIALIZE
- (
- if (nType & SER_DISK)
- {
- READWRITE(nVersion);
- READWRITE(nTime);
- }
- READWRITE(nServices);
- READWRITE(FLATDATA(pchReserved)); // for IPv6
- READWRITE(ip);
- READWRITE(port);
- )
-
- friend inline bool operator==(const CAddress& a, const CAddress& b)
- {
- return (memcmp(a.pchReserved, b.pchReserved, sizeof(a.pchReserved)) == 0 &&
- a.ip == b.ip &&
- a.port == b.port);
- }
-
- friend inline bool operator!=(const CAddress& a, const CAddress& b)
- {
- return (!(a == b));
- }
-
- friend inline bool operator<(const CAddress& a, const CAddress& b)
- {
- int ret = memcmp(a.pchReserved, b.pchReserved, sizeof(a.pchReserved));
- if (ret < 0)
- return true;
- else if (ret == 0)
- {
- if (ntohl(a.ip) < ntohl(b.ip))
- return true;
- else if (a.ip == b.ip)
- return ntohs(a.port) < ntohs(b.port);
- }
- return false;
- }
-
- vector<unsigned char> GetKey() const
- {
- CDataStream ss;
- ss.reserve(18);
- ss << FLATDATA(pchReserved) << ip << port;
-
- #if defined(_MSC_VER) && _MSC_VER < 1300
- return vector<unsigned char>((unsigned char*)&ss.begin()[0], (unsigned char*)&ss.end()[0]);
- #else
- return vector<unsigned char>(ss.begin(), ss.end());
- #endif
- }
-
- struct sockaddr_in GetSockAddr() const
- {
- struct sockaddr_in sockaddr;
- memset(&sockaddr, 0, sizeof(sockaddr));
- sockaddr.sin_family = AF_INET;
- sockaddr.sin_addr.s_addr = ip;
- sockaddr.sin_port = port;
- return sockaddr;
- }
-
- bool IsIPv4() const
- {
- return (memcmp(pchReserved, pchIPv4, sizeof(pchIPv4)) == 0);
- }
-
- bool IsRoutable() const
- {
- return IsValid() &&
- !(GetByte(3) == 10 ||
- (GetByte(3) == 192 && GetByte(2) == 168) ||
- GetByte(3) == 127 ||
- GetByte(3) == 0);
- }
-
- bool IsValid() const
- {
- // Clean up 3-byte shifted addresses caused by garbage in size field
- // of addr messages from versions before 0.2.9 checksum.
- // Two consecutive addr messages look like this:
- // header20 vectorlen3 addr26 addr26 addr26 header20 vectorlen3 addr26 addr26 addr26...
- // so if the first length field is garbled, it reads the second batch
- // of addr misaligned by 3 bytes.
- if (memcmp(pchReserved, pchIPv4+3, sizeof(pchIPv4)-3) == 0)
- return false;
-
- return (ip != 0 && ip != INADDR_NONE && port != htons(USHRT_MAX));
- }
-
- unsigned char GetByte(int n) const
- {
- return ((unsigned char*)&ip)[3-n];
- }
-
- string ToStringIPPort() const
- {
- return strprintf("%u.%u.%u.%u:%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0), ntohs(port));
- }
-
- string ToStringIP() const
- {
- return strprintf("%u.%u.%u.%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0));
- }
-
- string ToStringPort() const
- {
- return strprintf("%u", ntohs(port));
- }
-
- string ToStringLog() const
- {
- return "";
- }
-
- string ToString() const
- {
- return strprintf("%u.%u.%u.%u:%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0), ntohs(port));
- }
-
- void print() const
- {
- printf("CAddress(%s)\n", ToString().c_str());
- }
-};
-
-
-
-
-
-
-
-enum
-{
- MSG_TX = 1,
- MSG_BLOCK,
-};
-
-static const char* ppszTypeName[] =
-{
- "ERROR",
- "tx",
- "block",
-};
-
-class CInv
-{
-public:
- int type;
- uint256 hash;
-
- CInv()
- {
- type = 0;
- hash = 0;
- }
-
- CInv(int typeIn, const uint256& hashIn)
- {
- type = typeIn;
- hash = hashIn;
- }
-
- CInv(const string& strType, const uint256& hashIn)
- {
- int i;
- for (i = 1; i < ARRAYLEN(ppszTypeName); i++)
- {
- if (strType == ppszTypeName[i])
- {
- type = i;
- break;
- }
- }
- if (i == ARRAYLEN(ppszTypeName))
- throw std::out_of_range(strprintf("CInv::CInv(string, uint256) : unknown type '%s'", strType.c_str()));
- hash = hashIn;
- }
-
- IMPLEMENT_SERIALIZE
- (
- READWRITE(type);
- READWRITE(hash);
- )
-
- friend inline bool operator<(const CInv& a, const CInv& b)
- {
- return (a.type < b.type || (a.type == b.type && a.hash < b.hash));
- }
-
- bool IsKnownType() const
- {
- return (type >= 1 && type < ARRAYLEN(ppszTypeName));
- }
-
- const char* GetCommand() const
- {
- if (!IsKnownType())
- throw std::out_of_range(strprintf("CInv::GetCommand() : type=% unknown type", type));
- return ppszTypeName[type];
- }
-
- string ToString() const
- {
- return strprintf("%s %s", GetCommand(), hash.ToString().substr(0,20).c_str());
- }
-
- void print() const
- {
- printf("CInv(%s)\n", ToString().c_str());
- }
-};
-
-
-
-
-
-class CRequestTracker
-{
-public:
- void (*fn)(void*, CDataStream&);
- void* param1;
-
- explicit CRequestTracker(void (*fnIn)(void*, CDataStream&)=NULL, void* param1In=NULL)
- {
- fn = fnIn;
- param1 = param1In;
- }
-
- bool IsNull()
- {
- return fn == NULL;
- }
-};
-
-
-
-
-
-extern bool fClient;
-extern uint64 nLocalServices;
-extern CAddress addrLocalHost;
-extern CNode* pnodeLocalHost;
-extern uint64 nLocalHostNonce;
-extern array<int, 10> vnThreadsRunning;
-extern SOCKET hListenSocket;
-extern int64 nThreadSocketHandlerHeartbeat;
-
-extern vector<CNode*> vNodes;
-extern CCriticalSection cs_vNodes;
-extern map<vector<unsigned char>, CAddress> mapAddresses;
-extern CCriticalSection cs_mapAddresses;
-extern map<CInv, CDataStream> mapRelay;
-extern deque<pair<int64, CInv> > vRelayExpiration;
-extern CCriticalSection cs_mapRelay;
-extern map<CInv, int64> mapAlreadyAskedFor;
-
-// Settings
-extern int fUseProxy;
-extern CAddress addrProxy;
-
-
-
-
-
-
-class CNode
-{
-public:
- // socket
- uint64 nServices;
- SOCKET hSocket;
- CDataStream vSend;
- CDataStream vRecv;
- CCriticalSection cs_vSend;
- CCriticalSection cs_vRecv;
- int64 nLastSend;
- int64 nLastRecv;
- int64 nLastSendEmpty;
- int64 nTimeConnected;
- unsigned int nHeaderStart;
- unsigned int nMessageStart;
- CAddress addr;
- int nVersion;
- string strSubVer;
- bool fClient;
- bool fInbound;
- bool fNetworkNode;
- bool fSuccessfullyConnected;
- bool fDisconnect;
-protected:
- int nRefCount;
-public:
- int64 nReleaseTime;
- map<uint256, CRequestTracker> mapRequests;
- CCriticalSection cs_mapRequests;
- uint256 hashContinue;
- CBlockIndex* pindexLastGetBlocksBegin;
- uint256 hashLastGetBlocksEnd;
- int nStartingHeight;
-
- // flood relay
- vector<CAddress> vAddrToSend;
- set<CAddress> setAddrKnown;
- bool fGetAddr;
- set<uint256> setKnown;
-
- // inventory based relay
- set<CInv> setInventoryKnown;
- vector<CInv> vInventoryToSend;
- CCriticalSection cs_inventory;
- multimap<int64, CInv> mapAskFor;
-
- // publish and subscription
- vector<char> vfSubscribe;
-
-
- CNode(SOCKET hSocketIn, CAddress addrIn, bool fInboundIn=false)
- {
- nServices = 0;
- hSocket = hSocketIn;
- vSend.SetType(SER_NETWORK);
- vSend.SetVersion(0);
- vRecv.SetType(SER_NETWORK);
- vRecv.SetVersion(0);
- // Version 0.2 obsoletes 20 Feb 2012
- if (GetTime() > 1329696000)
- {
- vSend.SetVersion(209);
- vRecv.SetVersion(209);
- }
- nLastSend = 0;
- nLastRecv = 0;
- nLastSendEmpty = GetTime();
- nTimeConnected = GetTime();
- nHeaderStart = -1;
- nMessageStart = -1;
- addr = addrIn;
- nVersion = 0;
- strSubVer = "";
- fClient = false; // set by version message
- fInbound = fInboundIn;
- fNetworkNode = false;
- fSuccessfullyConnected = false;
- fDisconnect = false;
- nRefCount = 0;
- nReleaseTime = 0;
- hashContinue = 0;
- pindexLastGetBlocksBegin = 0;
- hashLastGetBlocksEnd = 0;
- nStartingHeight = -1;
- fGetAddr = false;
- vfSubscribe.assign(256, false);
-
- // Push a version message
- /// when NTP implemented, change to just nTime = GetAdjustedTime()
- int64 nTime = (fInbound ? GetAdjustedTime() : GetTime());
- CAddress addrYou = (fUseProxy ? CAddress("0.0.0.0") : addr);
- CAddress addrMe = (fUseProxy ? CAddress("0.0.0.0") : addrLocalHost);
- RAND_bytes((unsigned char*)&nLocalHostNonce, sizeof(nLocalHostNonce));
- PushMessage("version", VERSION, nLocalServices, nTime, addrYou, addrMe,
- nLocalHostNonce, string(pszSubVer), nBestHeight);
- }
-
- ~CNode()
- {
- if (hSocket != INVALID_SOCKET)
- {
- closesocket(hSocket);
- hSocket = INVALID_SOCKET;
- }
- }
-
-private:
- CNode(const CNode&);
- void operator=(const CNode&);
-public:
-
-
- int GetRefCount()
- {
- return max(nRefCount, 0) + (GetTime() < nReleaseTime ? 1 : 0);
- }
-
- CNode* AddRef(int64 nTimeout=0)
- {
- if (nTimeout != 0)
- nReleaseTime = max(nReleaseTime, GetTime() + nTimeout);
- else
- nRefCount++;
- return this;
- }
-
- void Release()
- {
- nRefCount--;
- }
-
-
-
- void AddAddressKnown(const CAddress& addr)
- {
- setAddrKnown.insert(addr);
- }
-
- void PushAddress(const CAddress& addr)
- {
- // Known checking here is only to save space from duplicates.
- // SendMessages will filter it again for knowns that were added
- // after addresses were pushed.
- if (addr.IsValid() && !setAddrKnown.count(addr))
- vAddrToSend.push_back(addr);
- }
-
-
- void AddInventoryKnown(const CInv& inv)
- {
- CRITICAL_BLOCK(cs_inventory)
- setInventoryKnown.insert(inv);
- }
-
- void PushInventory(const CInv& inv)
- {
- CRITICAL_BLOCK(cs_inventory)
- if (!setInventoryKnown.count(inv))
- vInventoryToSend.push_back(inv);
- }
-
- void AskFor(const CInv& inv)
- {
- // We're using mapAskFor as a priority queue,
- // the key is the earliest time the request can be sent
- int64& nRequestTime = mapAlreadyAskedFor[inv];
- printf("askfor %s %"PRI64d"\n", inv.ToString().c_str(), nRequestTime);
-
- // Make sure not to reuse time indexes to keep things in the same order
- int64 nNow = (GetTime() - 1) * 1000000;
- static int64 nLastTime;
- nLastTime = nNow = max(nNow, ++nLastTime);
-
- // Each retry is 2 minutes after the last
- nRequestTime = max(nRequestTime + 2 * 60 * 1000000, nNow);
- mapAskFor.insert(make_pair(nRequestTime, inv));
- }
-
-
-
- void BeginMessage(const char* pszCommand)
- {
- cs_vSend.Enter();
- if (nHeaderStart != -1)
- AbortMessage();
- nHeaderStart = vSend.size();
- vSend << CMessageHeader(pszCommand, 0);
- nMessageStart = vSend.size();
- if (fDebug)
- printf("%s ", DateTimeStrFormat("%x %H:%M:%S", GetTime()).c_str());
- printf("sending: %s ", pszCommand);
- }
-
- void AbortMessage()
- {
- if (nHeaderStart == -1)
- return;
- vSend.resize(nHeaderStart);
- nHeaderStart = -1;
- nMessageStart = -1;
- cs_vSend.Leave();
- printf("(aborted)\n");
- }
-
- void EndMessage()
- {
- if (mapArgs.count("-dropmessagestest") && GetRand(atoi(mapArgs["-dropmessagestest"])) == 0)
- {
- printf("dropmessages DROPPING SEND MESSAGE\n");
- AbortMessage();
- return;
- }
-
- if (nHeaderStart == -1)
- return;
-
- // Set the size
- unsigned int nSize = vSend.size() - nMessageStart;
- memcpy((char*)&vSend[nHeaderStart] + offsetof(CMessageHeader, nMessageSize), &nSize, sizeof(nSize));
-
- // Set the checksum
- if (vSend.GetVersion() >= 209)
- {
- uint256 hash = Hash(vSend.begin() + nMessageStart, vSend.end());
- unsigned int nChecksum = 0;
- memcpy(&nChecksum, &hash, sizeof(nChecksum));
- assert(nMessageStart - nHeaderStart >= offsetof(CMessageHeader, nChecksum) + sizeof(nChecksum));
- memcpy((char*)&vSend[nHeaderStart] + offsetof(CMessageHeader, nChecksum), &nChecksum, sizeof(nChecksum));
- }
-
- printf("(%d bytes) ", nSize);
- printf("\n");
-
- nHeaderStart = -1;
- nMessageStart = -1;
- cs_vSend.Leave();
- }
-
- void EndMessageAbortIfEmpty()
- {
- if (nHeaderStart == -1)
- return;
- int nSize = vSend.size() - nMessageStart;
- if (nSize > 0)
- EndMessage();
- else
- AbortMessage();
- }
-
- const char* GetMessageCommand() const
- {
- if (nHeaderStart == -1)
- return "";
- return &vSend[nHeaderStart] + offsetof(CMessageHeader, pchCommand);
- }
-
-
-
-
- void PushMessage(const char* pszCommand)
- {
- try
- {
- BeginMessage(pszCommand);
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1>
- void PushMessage(const char* pszCommand, const T1& a1)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1, typename T2>
- void PushMessage(const char* pszCommand, const T1& a1, const T2& a2)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1 << a2;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1, typename T2, typename T3>
- void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1 << a2 << a3;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1, typename T2, typename T3, typename T4>
- void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1, typename T2, typename T3, typename T4, typename T5>
- void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6>
- void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5 << a6;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7>
- void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8>
- void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7, const T8& a8)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8, typename T9>
- void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7, const T8& a8, const T9& a9)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
-
- void PushRequest(const char* pszCommand,
- void (*fn)(void*, CDataStream&), void* param1)
- {
- uint256 hashReply;
- RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));
-
- CRITICAL_BLOCK(cs_mapRequests)
- mapRequests[hashReply] = CRequestTracker(fn, param1);
-
- PushMessage(pszCommand, hashReply);
- }
-
- template<typename T1>
- void PushRequest(const char* pszCommand, const T1& a1,
- void (*fn)(void*, CDataStream&), void* param1)
- {
- uint256 hashReply;
- RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));
-
- CRITICAL_BLOCK(cs_mapRequests)
- mapRequests[hashReply] = CRequestTracker(fn, param1);
-
- PushMessage(pszCommand, hashReply, a1);
- }
-
- template<typename T1, typename T2>
- void PushRequest(const char* pszCommand, const T1& a1, const T2& a2,
- void (*fn)(void*, CDataStream&), void* param1)
- {
- uint256 hashReply;
- RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));
-
- CRITICAL_BLOCK(cs_mapRequests)
- mapRequests[hashReply] = CRequestTracker(fn, param1);
-
- PushMessage(pszCommand, hashReply, a1, a2);
- }
-
-
-
- void PushGetBlocks(CBlockIndex* pindexBegin, uint256 hashEnd);
- bool IsSubscribed(unsigned int nChannel);
- void Subscribe(unsigned int nChannel, unsigned int nHops=0);
- void CancelSubscribe(unsigned int nChannel);
- void CloseSocketDisconnect();
- void Cleanup();
-};
-
-
-
-
-
-
-
-
-
-
-inline void RelayInventory(const CInv& inv)
-{
- // Put on lists to offer to the other nodes
- CRITICAL_BLOCK(cs_vNodes)
- foreach(CNode* pnode, vNodes)
- pnode->PushInventory(inv);
-}
-
-template<typename T>
-void RelayMessage(const CInv& inv, const T& a)
-{
- CDataStream ss(SER_NETWORK);
- ss.reserve(10000);
- ss << a;
- RelayMessage(inv, ss);
-}
-
-template<>
-inline void RelayMessage<>(const CInv& inv, const CDataStream& ss)
-{
- CRITICAL_BLOCK(cs_mapRelay)
- {
- // Expire old relay messages
- while (!vRelayExpiration.empty() && vRelayExpiration.front().first < GetTime())
- {
- mapRelay.erase(vRelayExpiration.front().second);
- vRelayExpiration.pop_front();
- }
-
- // Save original serialized message so newer versions are preserved
- mapRelay[inv] = ss;
- vRelayExpiration.push_back(make_pair(GetTime() + 15 * 60, inv));
- }
-
- RelayInventory(inv);
-}
-
-
-
-
-
-
-
-
-//
-// Templates for the publish and subscription system.
-// The object being published as T& obj needs to have:
-// a set<unsigned int> setSources member
-// specializations of AdvertInsert and AdvertErase
-// Currently implemented for CTable and CProduct.
-//
-
-template<typename T>
-void AdvertStartPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
-{
- // Add to sources
- obj.setSources.insert(pfrom->addr.ip);
-
- if (!AdvertInsert(obj))
- return;
-
- // Relay
- CRITICAL_BLOCK(cs_vNodes)
- foreach(CNode* pnode, vNodes)
- if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel)))
- pnode->PushMessage("publish", nChannel, nHops, obj);
-}
-
-template<typename T>
-void AdvertStopPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
-{
- uint256 hash = obj.GetHash();
-
- CRITICAL_BLOCK(cs_vNodes)
- foreach(CNode* pnode, vNodes)
- if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel)))
- pnode->PushMessage("pub-cancel", nChannel, nHops, hash);
-
- AdvertErase(obj);
-}
-
-template<typename T>
-void AdvertRemoveSource(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
-{
- // Remove a source
- obj.setSources.erase(pfrom->addr.ip);
-
- // If no longer supported by any sources, cancel it
- if (obj.setSources.empty())
- AdvertStopPublish(pfrom, nChannel, nHops, obj);
-}
+// Copyright (c) 2009-2010 Satoshi Nakamoto +// Distributed under the MIT/X11 software license, see the accompanying +// file license.txt or http://www.opensource.org/licenses/mit-license.php. + +class CMessageHeader; +class CAddress; +class CInv; +class CRequestTracker; +class CNode; +class CBlockIndex; +extern int nBestHeight; + + + +static const unsigned short DEFAULT_PORT = 0x8d20; // htons(8333) +static const unsigned int PUBLISH_HOPS = 5; +enum +{ + NODE_NETWORK = (1 << 0), +}; + + + + +bool ConnectSocket(const CAddress& addrConnect, SOCKET& hSocketRet); +bool GetMyExternalIP(unsigned int& ipRet); +bool AddAddress(CAddress addr); +void AddressCurrentlyConnected(const CAddress& addr); +CNode* FindNode(unsigned int ip); +CNode* ConnectNode(CAddress addrConnect, int64 nTimeout=0); +void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1); +bool AnySubscribed(unsigned int nChannel); +bool BindListenPort(string& strError=REF(string())); +void StartNode(void* parg); +bool StopNode(); + + + + + + + + +// +// Message header +// (4) message start +// (12) command +// (4) size +// (4) checksum + +// The message start string is designed to be unlikely to occur in normal data. +// The characters are rarely used upper ascii, not valid as UTF-8, and produce +// a large 4-byte int at any alignment. +static const char pchMessageStart[4] = { 0xf9, 0xbe, 0xb4, 0xd9 }; + +class CMessageHeader +{ +public: + enum { COMMAND_SIZE=12 }; + char pchMessageStart[sizeof(::pchMessageStart)]; + char pchCommand[COMMAND_SIZE]; + unsigned int nMessageSize; + unsigned int nChecksum; + + CMessageHeader() + { + memcpy(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart)); + memset(pchCommand, 0, sizeof(pchCommand)); + pchCommand[1] = 1; + nMessageSize = -1; + nChecksum = 0; + } + + CMessageHeader(const char* pszCommand, unsigned int nMessageSizeIn) + { + memcpy(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart)); + strncpy(pchCommand, pszCommand, COMMAND_SIZE); + nMessageSize = nMessageSizeIn; + nChecksum = 0; + } + + IMPLEMENT_SERIALIZE + ( + READWRITE(FLATDATA(pchMessageStart)); + READWRITE(FLATDATA(pchCommand)); + READWRITE(nMessageSize); + if (nVersion >= 209) + READWRITE(nChecksum); + ) + + string GetCommand() + { + if (pchCommand[COMMAND_SIZE-1] == 0) + return string(pchCommand, pchCommand + strlen(pchCommand)); + else + return string(pchCommand, pchCommand + COMMAND_SIZE); + } + + bool IsValid() + { + // Check start string + if (memcmp(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart)) != 0) + return false; + + // Check the command string for errors + for (char* p1 = pchCommand; p1 < pchCommand + COMMAND_SIZE; p1++) + { + if (*p1 == 0) + { + // Must be all zeros after the first zero + for (; p1 < pchCommand + COMMAND_SIZE; p1++) + if (*p1 != 0) + return false; + } + else if (*p1 < ' ' || *p1 > 0x7E) + return false; + } + + // Message size + if (nMessageSize > 0x10000000) + { + printf("CMessageHeader::IsValid() : nMessageSize too large %u\n", nMessageSize); + return false; + } + + return true; + } +}; + + + + + + +static const unsigned char pchIPv4[12] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff }; + +class CAddress +{ +public: + uint64 nServices; + unsigned char pchReserved[12]; + unsigned int ip; + unsigned short port; + + // disk only + unsigned int nTime; + + // memory only + unsigned int nLastTry; + + CAddress() + { + Init(); + } + + CAddress(unsigned int ipIn, unsigned short portIn=DEFAULT_PORT, uint64 nServicesIn=NODE_NETWORK) + { + Init(); + ip = ipIn; + port = portIn; + nServices = nServicesIn; + } + + explicit CAddress(const struct sockaddr_in& sockaddr, uint64 nServicesIn=NODE_NETWORK) + { + Init(); + ip = sockaddr.sin_addr.s_addr; + port = sockaddr.sin_port; + nServices = nServicesIn; + } + + explicit CAddress(const char* pszIn, uint64 nServicesIn=NODE_NETWORK) + { + Init(); + SetAddress(pszIn); + nServices = nServicesIn; + } + + explicit CAddress(string strIn, uint64 nServicesIn=NODE_NETWORK) + { + Init(); + SetAddress(strIn.c_str()); + nServices = nServicesIn; + } + + void Init() + { + nServices = NODE_NETWORK; + memcpy(pchReserved, pchIPv4, sizeof(pchReserved)); + ip = INADDR_NONE; + port = DEFAULT_PORT; + nTime = GetAdjustedTime(); + nLastTry = 0; + } + + bool SetAddress(const char* pszIn) + { + ip = INADDR_NONE; + port = DEFAULT_PORT; + char psz[100]; + strlcpy(psz, pszIn, sizeof(psz)); + unsigned int a=0, b=0, c=0, d=0, e=0; + if (sscanf(psz, "%u.%u.%u.%u:%u", &a, &b, &c, &d, &e) < 4) + return false; + char* pszPort = strchr(psz, ':'); + if (pszPort) + { + *pszPort++ = '\0'; + port = htons(atoi(pszPort)); + if (atoi(pszPort) < 0 || atoi(pszPort) > USHRT_MAX) + port = htons(USHRT_MAX); + } + ip = inet_addr(psz); + return IsValid(); + } + + bool SetAddress(string strIn) + { + return SetAddress(strIn.c_str()); + } + + IMPLEMENT_SERIALIZE + ( + if (nType & SER_DISK) + { + READWRITE(nVersion); + READWRITE(nTime); + } + READWRITE(nServices); + READWRITE(FLATDATA(pchReserved)); // for IPv6 + READWRITE(ip); + READWRITE(port); + ) + + friend inline bool operator==(const CAddress& a, const CAddress& b) + { + return (memcmp(a.pchReserved, b.pchReserved, sizeof(a.pchReserved)) == 0 && + a.ip == b.ip && + a.port == b.port); + } + + friend inline bool operator!=(const CAddress& a, const CAddress& b) + { + return (!(a == b)); + } + + friend inline bool operator<(const CAddress& a, const CAddress& b) + { + int ret = memcmp(a.pchReserved, b.pchReserved, sizeof(a.pchReserved)); + if (ret < 0) + return true; + else if (ret == 0) + { + if (ntohl(a.ip) < ntohl(b.ip)) + return true; + else if (a.ip == b.ip) + return ntohs(a.port) < ntohs(b.port); + } + return false; + } + + vector<unsigned char> GetKey() const + { + CDataStream ss; + ss.reserve(18); + ss << FLATDATA(pchReserved) << ip << port; + + #if defined(_MSC_VER) && _MSC_VER < 1300 + return vector<unsigned char>((unsigned char*)&ss.begin()[0], (unsigned char*)&ss.end()[0]); + #else + return vector<unsigned char>(ss.begin(), ss.end()); + #endif + } + + struct sockaddr_in GetSockAddr() const + { + struct sockaddr_in sockaddr; + memset(&sockaddr, 0, sizeof(sockaddr)); + sockaddr.sin_family = AF_INET; + sockaddr.sin_addr.s_addr = ip; + sockaddr.sin_port = port; + return sockaddr; + } + + bool IsIPv4() const + { + return (memcmp(pchReserved, pchIPv4, sizeof(pchIPv4)) == 0); + } + + bool IsRoutable() const + { + return IsValid() && + !(GetByte(3) == 10 || + (GetByte(3) == 192 && GetByte(2) == 168) || + GetByte(3) == 127 || + GetByte(3) == 0); + } + + bool IsValid() const + { + // Clean up 3-byte shifted addresses caused by garbage in size field + // of addr messages from versions before 0.2.9 checksum. + // Two consecutive addr messages look like this: + // header20 vectorlen3 addr26 addr26 addr26 header20 vectorlen3 addr26 addr26 addr26... + // so if the first length field is garbled, it reads the second batch + // of addr misaligned by 3 bytes. + if (memcmp(pchReserved, pchIPv4+3, sizeof(pchIPv4)-3) == 0) + return false; + + return (ip != 0 && ip != INADDR_NONE && port != htons(USHRT_MAX)); + } + + unsigned char GetByte(int n) const + { + return ((unsigned char*)&ip)[3-n]; + } + + string ToStringIPPort() const + { + return strprintf("%u.%u.%u.%u:%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0), ntohs(port)); + } + + string ToStringIP() const + { + return strprintf("%u.%u.%u.%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0)); + } + + string ToStringPort() const + { + return strprintf("%u", ntohs(port)); + } + + string ToStringLog() const + { + return ""; + } + + string ToString() const + { + return strprintf("%u.%u.%u.%u:%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0), ntohs(port)); + } + + void print() const + { + printf("CAddress(%s)\n", ToString().c_str()); + } +}; + + + + + + + +enum +{ + MSG_TX = 1, + MSG_BLOCK, +}; + +static const char* ppszTypeName[] = +{ + "ERROR", + "tx", + "block", +}; + +class CInv +{ +public: + int type; + uint256 hash; + + CInv() + { + type = 0; + hash = 0; + } + + CInv(int typeIn, const uint256& hashIn) + { + type = typeIn; + hash = hashIn; + } + + CInv(const string& strType, const uint256& hashIn) + { + int i; + for (i = 1; i < ARRAYLEN(ppszTypeName); i++) + { + if (strType == ppszTypeName[i]) + { + type = i; + break; + } + } + if (i == ARRAYLEN(ppszTypeName)) + throw std::out_of_range(strprintf("CInv::CInv(string, uint256) : unknown type '%s'", strType.c_str())); + hash = hashIn; + } + + IMPLEMENT_SERIALIZE + ( + READWRITE(type); + READWRITE(hash); + ) + + friend inline bool operator<(const CInv& a, const CInv& b) + { + return (a.type < b.type || (a.type == b.type && a.hash < b.hash)); + } + + bool IsKnownType() const + { + return (type >= 1 && type < ARRAYLEN(ppszTypeName)); + } + + const char* GetCommand() const + { + if (!IsKnownType()) + throw std::out_of_range(strprintf("CInv::GetCommand() : type=% unknown type", type)); + return ppszTypeName[type]; + } + + string ToString() const + { + return strprintf("%s %s", GetCommand(), hash.ToString().substr(0,20).c_str()); + } + + void print() const + { + printf("CInv(%s)\n", ToString().c_str()); + } +}; + + + + + +class CRequestTracker +{ +public: + void (*fn)(void*, CDataStream&); + void* param1; + + explicit CRequestTracker(void (*fnIn)(void*, CDataStream&)=NULL, void* param1In=NULL) + { + fn = fnIn; + param1 = param1In; + } + + bool IsNull() + { + return fn == NULL; + } +}; + + + + + +extern bool fClient; +extern uint64 nLocalServices; +extern CAddress addrLocalHost; +extern CNode* pnodeLocalHost; +extern uint64 nLocalHostNonce; +extern array<int, 10> vnThreadsRunning; +extern SOCKET hListenSocket; +extern int64 nThreadSocketHandlerHeartbeat; + +extern vector<CNode*> vNodes; +extern CCriticalSection cs_vNodes; +extern map<vector<unsigned char>, CAddress> mapAddresses; +extern CCriticalSection cs_mapAddresses; +extern map<CInv, CDataStream> mapRelay; +extern deque<pair<int64, CInv> > vRelayExpiration; +extern CCriticalSection cs_mapRelay; +extern map<CInv, int64> mapAlreadyAskedFor; + +// Settings +extern int fUseProxy; +extern CAddress addrProxy; + + + + + + +class CNode +{ +public: + // socket + uint64 nServices; + SOCKET hSocket; + CDataStream vSend; + CDataStream vRecv; + CCriticalSection cs_vSend; + CCriticalSection cs_vRecv; + int64 nLastSend; + int64 nLastRecv; + int64 nLastSendEmpty; + int64 nTimeConnected; + unsigned int nHeaderStart; + unsigned int nMessageStart; + CAddress addr; + int nVersion; + string strSubVer; + bool fClient; + bool fInbound; + bool fNetworkNode; + bool fSuccessfullyConnected; + bool fDisconnect; +protected: + int nRefCount; +public: + int64 nReleaseTime; + map<uint256, CRequestTracker> mapRequests; + CCriticalSection cs_mapRequests; + uint256 hashContinue; + CBlockIndex* pindexLastGetBlocksBegin; + uint256 hashLastGetBlocksEnd; + int nStartingHeight; + + // flood relay + vector<CAddress> vAddrToSend; + set<CAddress> setAddrKnown; + bool fGetAddr; + set<uint256> setKnown; + + // inventory based relay + set<CInv> setInventoryKnown; + vector<CInv> vInventoryToSend; + CCriticalSection cs_inventory; + multimap<int64, CInv> mapAskFor; + + // publish and subscription + vector<char> vfSubscribe; + + + CNode(SOCKET hSocketIn, CAddress addrIn, bool fInboundIn=false) + { + nServices = 0; + hSocket = hSocketIn; + vSend.SetType(SER_NETWORK); + vSend.SetVersion(0); + vRecv.SetType(SER_NETWORK); + vRecv.SetVersion(0); + // Version 0.2 obsoletes 20 Feb 2012 + if (GetTime() > 1329696000) + { + vSend.SetVersion(209); + vRecv.SetVersion(209); + } + nLastSend = 0; + nLastRecv = 0; + nLastSendEmpty = GetTime(); + nTimeConnected = GetTime(); + nHeaderStart = -1; + nMessageStart = -1; + addr = addrIn; + nVersion = 0; + strSubVer = ""; + fClient = false; // set by version message + fInbound = fInboundIn; + fNetworkNode = false; + fSuccessfullyConnected = false; + fDisconnect = false; + nRefCount = 0; + nReleaseTime = 0; + hashContinue = 0; + pindexLastGetBlocksBegin = 0; + hashLastGetBlocksEnd = 0; + nStartingHeight = -1; + fGetAddr = false; + vfSubscribe.assign(256, false); + + // Push a version message + /// when NTP implemented, change to just nTime = GetAdjustedTime() + int64 nTime = (fInbound ? GetAdjustedTime() : GetTime()); + CAddress addrYou = (fUseProxy ? CAddress("0.0.0.0") : addr); + CAddress addrMe = (fUseProxy ? CAddress("0.0.0.0") : addrLocalHost); + RAND_bytes((unsigned char*)&nLocalHostNonce, sizeof(nLocalHostNonce)); + PushMessage("version", VERSION, nLocalServices, nTime, addrYou, addrMe, + nLocalHostNonce, string(pszSubVer), nBestHeight); + } + + ~CNode() + { + if (hSocket != INVALID_SOCKET) + { + closesocket(hSocket); + hSocket = INVALID_SOCKET; + } + } + +private: + CNode(const CNode&); + void operator=(const CNode&); +public: + + + int GetRefCount() + { + return max(nRefCount, 0) + (GetTime() < nReleaseTime ? 1 : 0); + } + + CNode* AddRef(int64 nTimeout=0) + { + if (nTimeout != 0) + nReleaseTime = max(nReleaseTime, GetTime() + nTimeout); + else + nRefCount++; + return this; + } + + void Release() + { + nRefCount--; + } + + + + void AddAddressKnown(const CAddress& addr) + { + setAddrKnown.insert(addr); + } + + void PushAddress(const CAddress& addr) + { + // Known checking here is only to save space from duplicates. + // SendMessages will filter it again for knowns that were added + // after addresses were pushed. + if (addr.IsValid() && !setAddrKnown.count(addr)) + vAddrToSend.push_back(addr); + } + + + void AddInventoryKnown(const CInv& inv) + { + CRITICAL_BLOCK(cs_inventory) + setInventoryKnown.insert(inv); + } + + void PushInventory(const CInv& inv) + { + CRITICAL_BLOCK(cs_inventory) + if (!setInventoryKnown.count(inv)) + vInventoryToSend.push_back(inv); + } + + void AskFor(const CInv& inv) + { + // We're using mapAskFor as a priority queue, + // the key is the earliest time the request can be sent + int64& nRequestTime = mapAlreadyAskedFor[inv]; + printf("askfor %s %"PRI64d"\n", inv.ToString().c_str(), nRequestTime); + + // Make sure not to reuse time indexes to keep things in the same order + int64 nNow = (GetTime() - 1) * 1000000; + static int64 nLastTime; + nLastTime = nNow = max(nNow, ++nLastTime); + + // Each retry is 2 minutes after the last + nRequestTime = max(nRequestTime + 2 * 60 * 1000000, nNow); + mapAskFor.insert(make_pair(nRequestTime, inv)); + } + + + + void BeginMessage(const char* pszCommand) + { + cs_vSend.Enter(); + if (nHeaderStart != -1) + AbortMessage(); + nHeaderStart = vSend.size(); + vSend << CMessageHeader(pszCommand, 0); + nMessageStart = vSend.size(); + if (fDebug) + printf("%s ", DateTimeStrFormat("%x %H:%M:%S", GetTime()).c_str()); + printf("sending: %s ", pszCommand); + } + + void AbortMessage() + { + if (nHeaderStart == -1) + return; + vSend.resize(nHeaderStart); + nHeaderStart = -1; + nMessageStart = -1; + cs_vSend.Leave(); + printf("(aborted)\n"); + } + + void EndMessage() + { + if (mapArgs.count("-dropmessagestest") && GetRand(atoi(mapArgs["-dropmessagestest"])) == 0) + { + printf("dropmessages DROPPING SEND MESSAGE\n"); + AbortMessage(); + return; + } + + if (nHeaderStart == -1) + return; + + // Set the size + unsigned int nSize = vSend.size() - nMessageStart; + memcpy((char*)&vSend[nHeaderStart] + offsetof(CMessageHeader, nMessageSize), &nSize, sizeof(nSize)); + + // Set the checksum + if (vSend.GetVersion() >= 209) + { + uint256 hash = Hash(vSend.begin() + nMessageStart, vSend.end()); + unsigned int nChecksum = 0; + memcpy(&nChecksum, &hash, sizeof(nChecksum)); + assert(nMessageStart - nHeaderStart >= offsetof(CMessageHeader, nChecksum) + sizeof(nChecksum)); + memcpy((char*)&vSend[nHeaderStart] + offsetof(CMessageHeader, nChecksum), &nChecksum, sizeof(nChecksum)); + } + + printf("(%d bytes) ", nSize); + printf("\n"); + + nHeaderStart = -1; + nMessageStart = -1; + cs_vSend.Leave(); + } + + void EndMessageAbortIfEmpty() + { + if (nHeaderStart == -1) + return; + int nSize = vSend.size() - nMessageStart; + if (nSize > 0) + EndMessage(); + else + AbortMessage(); + } + + const char* GetMessageCommand() const + { + if (nHeaderStart == -1) + return ""; + return &vSend[nHeaderStart] + offsetof(CMessageHeader, pchCommand); + } + + + + + void PushMessage(const char* pszCommand) + { + try + { + BeginMessage(pszCommand); + EndMessage(); + } + catch (...) + { + AbortMessage(); + throw; + } + } + + template<typename T1> + void PushMessage(const char* pszCommand, const T1& a1) + { + try + { + BeginMessage(pszCommand); + vSend << a1; + EndMessage(); + } + catch (...) + { + AbortMessage(); + throw; + } + } + + template<typename T1, typename T2> + void PushMessage(const char* pszCommand, const T1& a1, const T2& a2) + { + try + { + BeginMessage(pszCommand); + vSend << a1 << a2; + EndMessage(); + } + catch (...) + { + AbortMessage(); + throw; + } + } + + template<typename T1, typename T2, typename T3> + void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3) + { + try + { + BeginMessage(pszCommand); + vSend << a1 << a2 << a3; + EndMessage(); + } + catch (...) + { + AbortMessage(); + throw; + } + } + + template<typename T1, typename T2, typename T3, typename T4> + void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4) + { + try + { + BeginMessage(pszCommand); + vSend << a1 << a2 << a3 << a4; + EndMessage(); + } + catch (...) + { + AbortMessage(); + throw; + } + } + + template<typename T1, typename T2, typename T3, typename T4, typename T5> + void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5) + { + try + { + BeginMessage(pszCommand); + vSend << a1 << a2 << a3 << a4 << a5; + EndMessage(); + } + catch (...) + { + AbortMessage(); + throw; + } + } + + template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6> + void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6) + { + try + { + BeginMessage(pszCommand); + vSend << a1 << a2 << a3 << a4 << a5 << a6; + EndMessage(); + } + catch (...) + { + AbortMessage(); + throw; + } + } + + template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7> + void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7) + { + try + { + BeginMessage(pszCommand); + vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7; + EndMessage(); + } + catch (...) + { + AbortMessage(); + throw; + } + } + + template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8> + void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7, const T8& a8) + { + try + { + BeginMessage(pszCommand); + vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8; + EndMessage(); + } + catch (...) + { + AbortMessage(); + throw; + } + } + + template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8, typename T9> + void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7, const T8& a8, const T9& a9) + { + try + { + BeginMessage(pszCommand); + vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9; + EndMessage(); + } + catch (...) + { + AbortMessage(); + throw; + } + } + + + void PushRequest(const char* pszCommand, + void (*fn)(void*, CDataStream&), void* param1) + { + uint256 hashReply; + RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply)); + + CRITICAL_BLOCK(cs_mapRequests) + mapRequests[hashReply] = CRequestTracker(fn, param1); + + PushMessage(pszCommand, hashReply); + } + + template<typename T1> + void PushRequest(const char* pszCommand, const T1& a1, + void (*fn)(void*, CDataStream&), void* param1) + { + uint256 hashReply; + RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply)); + + CRITICAL_BLOCK(cs_mapRequests) + mapRequests[hashReply] = CRequestTracker(fn, param1); + + PushMessage(pszCommand, hashReply, a1); + } + + template<typename T1, typename T2> + void PushRequest(const char* pszCommand, const T1& a1, const T2& a2, + void (*fn)(void*, CDataStream&), void* param1) + { + uint256 hashReply; + RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply)); + + CRITICAL_BLOCK(cs_mapRequests) + mapRequests[hashReply] = CRequestTracker(fn, param1); + + PushMessage(pszCommand, hashReply, a1, a2); + } + + + + void PushGetBlocks(CBlockIndex* pindexBegin, uint256 hashEnd); + bool IsSubscribed(unsigned int nChannel); + void Subscribe(unsigned int nChannel, unsigned int nHops=0); + void CancelSubscribe(unsigned int nChannel); + void CloseSocketDisconnect(); + void Cleanup(); +}; + + + + + + + + + + +inline void RelayInventory(const CInv& inv) +{ + // Put on lists to offer to the other nodes + CRITICAL_BLOCK(cs_vNodes) + foreach(CNode* pnode, vNodes) + pnode->PushInventory(inv); +} + +template<typename T> +void RelayMessage(const CInv& inv, const T& a) +{ + CDataStream ss(SER_NETWORK); + ss.reserve(10000); + ss << a; + RelayMessage(inv, ss); +} + +template<> +inline void RelayMessage<>(const CInv& inv, const CDataStream& ss) +{ + CRITICAL_BLOCK(cs_mapRelay) + { + // Expire old relay messages + while (!vRelayExpiration.empty() && vRelayExpiration.front().first < GetTime()) + { + mapRelay.erase(vRelayExpiration.front().second); + vRelayExpiration.pop_front(); + } + + // Save original serialized message so newer versions are preserved + mapRelay[inv] = ss; + vRelayExpiration.push_back(make_pair(GetTime() + 15 * 60, inv)); + } + + RelayInventory(inv); +} + + + + + + + + +// +// Templates for the publish and subscription system. +// The object being published as T& obj needs to have: +// a set<unsigned int> setSources member +// specializations of AdvertInsert and AdvertErase +// Currently implemented for CTable and CProduct. +// + +template<typename T> +void AdvertStartPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj) +{ + // Add to sources + obj.setSources.insert(pfrom->addr.ip); + + if (!AdvertInsert(obj)) + return; + + // Relay + CRITICAL_BLOCK(cs_vNodes) + foreach(CNode* pnode, vNodes) + if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel))) + pnode->PushMessage("publish", nChannel, nHops, obj); +} + +template<typename T> +void AdvertStopPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj) +{ + uint256 hash = obj.GetHash(); + + CRITICAL_BLOCK(cs_vNodes) + foreach(CNode* pnode, vNodes) + if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel))) + pnode->PushMessage("pub-cancel", nChannel, nHops, hash); + + AdvertErase(obj); +} + +template<typename T> +void AdvertRemoveSource(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj) +{ + // Remove a source + obj.setSources.erase(pfrom->addr.ip); + + // If no longer supported by any sources, cancel it + if (obj.setSources.empty()) + AdvertStopPublish(pfrom, nChannel, nHops, obj); +} |