aboutsummaryrefslogtreecommitdiff
path: root/net.h
diff options
context:
space:
mode:
Diffstat (limited to 'net.h')
-rw-r--r--net.h2110
1 files changed, 1055 insertions, 1055 deletions
diff --git a/net.h b/net.h
index cf6bc94eb5..e12a8e1129 100644
--- a/net.h
+++ b/net.h
@@ -1,1055 +1,1055 @@
-// Copyright (c) 2009-2010 Satoshi Nakamoto
-// Distributed under the MIT/X11 software license, see the accompanying
-// file license.txt or http://www.opensource.org/licenses/mit-license.php.
-
-class CMessageHeader;
-class CAddress;
-class CInv;
-class CRequestTracker;
-class CNode;
-class CBlockIndex;
-extern int nBestHeight;
-
-
-
-static const unsigned short DEFAULT_PORT = 0x8d20; // htons(8333)
-static const unsigned int PUBLISH_HOPS = 5;
-enum
-{
- NODE_NETWORK = (1 << 0),
-};
-
-
-
-
-bool ConnectSocket(const CAddress& addrConnect, SOCKET& hSocketRet);
-bool GetMyExternalIP(unsigned int& ipRet);
-bool AddAddress(CAddress addr);
-void AddressCurrentlyConnected(const CAddress& addr);
-CNode* FindNode(unsigned int ip);
-CNode* ConnectNode(CAddress addrConnect, int64 nTimeout=0);
-void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1);
-bool AnySubscribed(unsigned int nChannel);
-bool BindListenPort(string& strError=REF(string()));
-void StartNode(void* parg);
-bool StopNode();
-
-
-
-
-
-
-
-
-//
-// Message header
-// (4) message start
-// (12) command
-// (4) size
-// (4) checksum
-
-// The message start string is designed to be unlikely to occur in normal data.
-// The characters are rarely used upper ascii, not valid as UTF-8, and produce
-// a large 4-byte int at any alignment.
-static const char pchMessageStart[4] = { 0xf9, 0xbe, 0xb4, 0xd9 };
-
-class CMessageHeader
-{
-public:
- enum { COMMAND_SIZE=12 };
- char pchMessageStart[sizeof(::pchMessageStart)];
- char pchCommand[COMMAND_SIZE];
- unsigned int nMessageSize;
- unsigned int nChecksum;
-
- CMessageHeader()
- {
- memcpy(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart));
- memset(pchCommand, 0, sizeof(pchCommand));
- pchCommand[1] = 1;
- nMessageSize = -1;
- nChecksum = 0;
- }
-
- CMessageHeader(const char* pszCommand, unsigned int nMessageSizeIn)
- {
- memcpy(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart));
- strncpy(pchCommand, pszCommand, COMMAND_SIZE);
- nMessageSize = nMessageSizeIn;
- nChecksum = 0;
- }
-
- IMPLEMENT_SERIALIZE
- (
- READWRITE(FLATDATA(pchMessageStart));
- READWRITE(FLATDATA(pchCommand));
- READWRITE(nMessageSize);
- if (nVersion >= 209)
- READWRITE(nChecksum);
- )
-
- string GetCommand()
- {
- if (pchCommand[COMMAND_SIZE-1] == 0)
- return string(pchCommand, pchCommand + strlen(pchCommand));
- else
- return string(pchCommand, pchCommand + COMMAND_SIZE);
- }
-
- bool IsValid()
- {
- // Check start string
- if (memcmp(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart)) != 0)
- return false;
-
- // Check the command string for errors
- for (char* p1 = pchCommand; p1 < pchCommand + COMMAND_SIZE; p1++)
- {
- if (*p1 == 0)
- {
- // Must be all zeros after the first zero
- for (; p1 < pchCommand + COMMAND_SIZE; p1++)
- if (*p1 != 0)
- return false;
- }
- else if (*p1 < ' ' || *p1 > 0x7E)
- return false;
- }
-
- // Message size
- if (nMessageSize > 0x10000000)
- {
- printf("CMessageHeader::IsValid() : nMessageSize too large %u\n", nMessageSize);
- return false;
- }
-
- return true;
- }
-};
-
-
-
-
-
-
-static const unsigned char pchIPv4[12] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff };
-
-class CAddress
-{
-public:
- uint64 nServices;
- unsigned char pchReserved[12];
- unsigned int ip;
- unsigned short port;
-
- // disk only
- unsigned int nTime;
-
- // memory only
- unsigned int nLastTry;
-
- CAddress()
- {
- Init();
- }
-
- CAddress(unsigned int ipIn, unsigned short portIn=DEFAULT_PORT, uint64 nServicesIn=NODE_NETWORK)
- {
- Init();
- ip = ipIn;
- port = portIn;
- nServices = nServicesIn;
- }
-
- explicit CAddress(const struct sockaddr_in& sockaddr, uint64 nServicesIn=NODE_NETWORK)
- {
- Init();
- ip = sockaddr.sin_addr.s_addr;
- port = sockaddr.sin_port;
- nServices = nServicesIn;
- }
-
- explicit CAddress(const char* pszIn, uint64 nServicesIn=NODE_NETWORK)
- {
- Init();
- SetAddress(pszIn);
- nServices = nServicesIn;
- }
-
- explicit CAddress(string strIn, uint64 nServicesIn=NODE_NETWORK)
- {
- Init();
- SetAddress(strIn.c_str());
- nServices = nServicesIn;
- }
-
- void Init()
- {
- nServices = NODE_NETWORK;
- memcpy(pchReserved, pchIPv4, sizeof(pchReserved));
- ip = INADDR_NONE;
- port = DEFAULT_PORT;
- nTime = GetAdjustedTime();
- nLastTry = 0;
- }
-
- bool SetAddress(const char* pszIn)
- {
- ip = INADDR_NONE;
- port = DEFAULT_PORT;
- char psz[100];
- strlcpy(psz, pszIn, sizeof(psz));
- unsigned int a=0, b=0, c=0, d=0, e=0;
- if (sscanf(psz, "%u.%u.%u.%u:%u", &a, &b, &c, &d, &e) < 4)
- return false;
- char* pszPort = strchr(psz, ':');
- if (pszPort)
- {
- *pszPort++ = '\0';
- port = htons(atoi(pszPort));
- if (atoi(pszPort) < 0 || atoi(pszPort) > USHRT_MAX)
- port = htons(USHRT_MAX);
- }
- ip = inet_addr(psz);
- return IsValid();
- }
-
- bool SetAddress(string strIn)
- {
- return SetAddress(strIn.c_str());
- }
-
- IMPLEMENT_SERIALIZE
- (
- if (nType & SER_DISK)
- {
- READWRITE(nVersion);
- READWRITE(nTime);
- }
- READWRITE(nServices);
- READWRITE(FLATDATA(pchReserved)); // for IPv6
- READWRITE(ip);
- READWRITE(port);
- )
-
- friend inline bool operator==(const CAddress& a, const CAddress& b)
- {
- return (memcmp(a.pchReserved, b.pchReserved, sizeof(a.pchReserved)) == 0 &&
- a.ip == b.ip &&
- a.port == b.port);
- }
-
- friend inline bool operator!=(const CAddress& a, const CAddress& b)
- {
- return (!(a == b));
- }
-
- friend inline bool operator<(const CAddress& a, const CAddress& b)
- {
- int ret = memcmp(a.pchReserved, b.pchReserved, sizeof(a.pchReserved));
- if (ret < 0)
- return true;
- else if (ret == 0)
- {
- if (ntohl(a.ip) < ntohl(b.ip))
- return true;
- else if (a.ip == b.ip)
- return ntohs(a.port) < ntohs(b.port);
- }
- return false;
- }
-
- vector<unsigned char> GetKey() const
- {
- CDataStream ss;
- ss.reserve(18);
- ss << FLATDATA(pchReserved) << ip << port;
-
- #if defined(_MSC_VER) && _MSC_VER < 1300
- return vector<unsigned char>((unsigned char*)&ss.begin()[0], (unsigned char*)&ss.end()[0]);
- #else
- return vector<unsigned char>(ss.begin(), ss.end());
- #endif
- }
-
- struct sockaddr_in GetSockAddr() const
- {
- struct sockaddr_in sockaddr;
- memset(&sockaddr, 0, sizeof(sockaddr));
- sockaddr.sin_family = AF_INET;
- sockaddr.sin_addr.s_addr = ip;
- sockaddr.sin_port = port;
- return sockaddr;
- }
-
- bool IsIPv4() const
- {
- return (memcmp(pchReserved, pchIPv4, sizeof(pchIPv4)) == 0);
- }
-
- bool IsRoutable() const
- {
- return IsValid() &&
- !(GetByte(3) == 10 ||
- (GetByte(3) == 192 && GetByte(2) == 168) ||
- GetByte(3) == 127 ||
- GetByte(3) == 0);
- }
-
- bool IsValid() const
- {
- // Clean up 3-byte shifted addresses caused by garbage in size field
- // of addr messages from versions before 0.2.9 checksum.
- // Two consecutive addr messages look like this:
- // header20 vectorlen3 addr26 addr26 addr26 header20 vectorlen3 addr26 addr26 addr26...
- // so if the first length field is garbled, it reads the second batch
- // of addr misaligned by 3 bytes.
- if (memcmp(pchReserved, pchIPv4+3, sizeof(pchIPv4)-3) == 0)
- return false;
-
- return (ip != 0 && ip != INADDR_NONE && port != htons(USHRT_MAX));
- }
-
- unsigned char GetByte(int n) const
- {
- return ((unsigned char*)&ip)[3-n];
- }
-
- string ToStringIPPort() const
- {
- return strprintf("%u.%u.%u.%u:%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0), ntohs(port));
- }
-
- string ToStringIP() const
- {
- return strprintf("%u.%u.%u.%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0));
- }
-
- string ToStringPort() const
- {
- return strprintf("%u", ntohs(port));
- }
-
- string ToStringLog() const
- {
- return "";
- }
-
- string ToString() const
- {
- return strprintf("%u.%u.%u.%u:%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0), ntohs(port));
- }
-
- void print() const
- {
- printf("CAddress(%s)\n", ToString().c_str());
- }
-};
-
-
-
-
-
-
-
-enum
-{
- MSG_TX = 1,
- MSG_BLOCK,
-};
-
-static const char* ppszTypeName[] =
-{
- "ERROR",
- "tx",
- "block",
-};
-
-class CInv
-{
-public:
- int type;
- uint256 hash;
-
- CInv()
- {
- type = 0;
- hash = 0;
- }
-
- CInv(int typeIn, const uint256& hashIn)
- {
- type = typeIn;
- hash = hashIn;
- }
-
- CInv(const string& strType, const uint256& hashIn)
- {
- int i;
- for (i = 1; i < ARRAYLEN(ppszTypeName); i++)
- {
- if (strType == ppszTypeName[i])
- {
- type = i;
- break;
- }
- }
- if (i == ARRAYLEN(ppszTypeName))
- throw std::out_of_range(strprintf("CInv::CInv(string, uint256) : unknown type '%s'", strType.c_str()));
- hash = hashIn;
- }
-
- IMPLEMENT_SERIALIZE
- (
- READWRITE(type);
- READWRITE(hash);
- )
-
- friend inline bool operator<(const CInv& a, const CInv& b)
- {
- return (a.type < b.type || (a.type == b.type && a.hash < b.hash));
- }
-
- bool IsKnownType() const
- {
- return (type >= 1 && type < ARRAYLEN(ppszTypeName));
- }
-
- const char* GetCommand() const
- {
- if (!IsKnownType())
- throw std::out_of_range(strprintf("CInv::GetCommand() : type=% unknown type", type));
- return ppszTypeName[type];
- }
-
- string ToString() const
- {
- return strprintf("%s %s", GetCommand(), hash.ToString().substr(0,20).c_str());
- }
-
- void print() const
- {
- printf("CInv(%s)\n", ToString().c_str());
- }
-};
-
-
-
-
-
-class CRequestTracker
-{
-public:
- void (*fn)(void*, CDataStream&);
- void* param1;
-
- explicit CRequestTracker(void (*fnIn)(void*, CDataStream&)=NULL, void* param1In=NULL)
- {
- fn = fnIn;
- param1 = param1In;
- }
-
- bool IsNull()
- {
- return fn == NULL;
- }
-};
-
-
-
-
-
-extern bool fClient;
-extern uint64 nLocalServices;
-extern CAddress addrLocalHost;
-extern CNode* pnodeLocalHost;
-extern uint64 nLocalHostNonce;
-extern array<int, 10> vnThreadsRunning;
-extern SOCKET hListenSocket;
-extern int64 nThreadSocketHandlerHeartbeat;
-
-extern vector<CNode*> vNodes;
-extern CCriticalSection cs_vNodes;
-extern map<vector<unsigned char>, CAddress> mapAddresses;
-extern CCriticalSection cs_mapAddresses;
-extern map<CInv, CDataStream> mapRelay;
-extern deque<pair<int64, CInv> > vRelayExpiration;
-extern CCriticalSection cs_mapRelay;
-extern map<CInv, int64> mapAlreadyAskedFor;
-
-// Settings
-extern int fUseProxy;
-extern CAddress addrProxy;
-
-
-
-
-
-
-class CNode
-{
-public:
- // socket
- uint64 nServices;
- SOCKET hSocket;
- CDataStream vSend;
- CDataStream vRecv;
- CCriticalSection cs_vSend;
- CCriticalSection cs_vRecv;
- int64 nLastSend;
- int64 nLastRecv;
- int64 nLastSendEmpty;
- int64 nTimeConnected;
- unsigned int nHeaderStart;
- unsigned int nMessageStart;
- CAddress addr;
- int nVersion;
- string strSubVer;
- bool fClient;
- bool fInbound;
- bool fNetworkNode;
- bool fSuccessfullyConnected;
- bool fDisconnect;
-protected:
- int nRefCount;
-public:
- int64 nReleaseTime;
- map<uint256, CRequestTracker> mapRequests;
- CCriticalSection cs_mapRequests;
- uint256 hashContinue;
- CBlockIndex* pindexLastGetBlocksBegin;
- uint256 hashLastGetBlocksEnd;
- int nStartingHeight;
-
- // flood relay
- vector<CAddress> vAddrToSend;
- set<CAddress> setAddrKnown;
- bool fGetAddr;
- set<uint256> setKnown;
-
- // inventory based relay
- set<CInv> setInventoryKnown;
- vector<CInv> vInventoryToSend;
- CCriticalSection cs_inventory;
- multimap<int64, CInv> mapAskFor;
-
- // publish and subscription
- vector<char> vfSubscribe;
-
-
- CNode(SOCKET hSocketIn, CAddress addrIn, bool fInboundIn=false)
- {
- nServices = 0;
- hSocket = hSocketIn;
- vSend.SetType(SER_NETWORK);
- vSend.SetVersion(0);
- vRecv.SetType(SER_NETWORK);
- vRecv.SetVersion(0);
- // Version 0.2 obsoletes 20 Feb 2012
- if (GetTime() > 1329696000)
- {
- vSend.SetVersion(209);
- vRecv.SetVersion(209);
- }
- nLastSend = 0;
- nLastRecv = 0;
- nLastSendEmpty = GetTime();
- nTimeConnected = GetTime();
- nHeaderStart = -1;
- nMessageStart = -1;
- addr = addrIn;
- nVersion = 0;
- strSubVer = "";
- fClient = false; // set by version message
- fInbound = fInboundIn;
- fNetworkNode = false;
- fSuccessfullyConnected = false;
- fDisconnect = false;
- nRefCount = 0;
- nReleaseTime = 0;
- hashContinue = 0;
- pindexLastGetBlocksBegin = 0;
- hashLastGetBlocksEnd = 0;
- nStartingHeight = -1;
- fGetAddr = false;
- vfSubscribe.assign(256, false);
-
- // Push a version message
- /// when NTP implemented, change to just nTime = GetAdjustedTime()
- int64 nTime = (fInbound ? GetAdjustedTime() : GetTime());
- CAddress addrYou = (fUseProxy ? CAddress("0.0.0.0") : addr);
- CAddress addrMe = (fUseProxy ? CAddress("0.0.0.0") : addrLocalHost);
- RAND_bytes((unsigned char*)&nLocalHostNonce, sizeof(nLocalHostNonce));
- PushMessage("version", VERSION, nLocalServices, nTime, addrYou, addrMe,
- nLocalHostNonce, string(pszSubVer), nBestHeight);
- }
-
- ~CNode()
- {
- if (hSocket != INVALID_SOCKET)
- {
- closesocket(hSocket);
- hSocket = INVALID_SOCKET;
- }
- }
-
-private:
- CNode(const CNode&);
- void operator=(const CNode&);
-public:
-
-
- int GetRefCount()
- {
- return max(nRefCount, 0) + (GetTime() < nReleaseTime ? 1 : 0);
- }
-
- CNode* AddRef(int64 nTimeout=0)
- {
- if (nTimeout != 0)
- nReleaseTime = max(nReleaseTime, GetTime() + nTimeout);
- else
- nRefCount++;
- return this;
- }
-
- void Release()
- {
- nRefCount--;
- }
-
-
-
- void AddAddressKnown(const CAddress& addr)
- {
- setAddrKnown.insert(addr);
- }
-
- void PushAddress(const CAddress& addr)
- {
- // Known checking here is only to save space from duplicates.
- // SendMessages will filter it again for knowns that were added
- // after addresses were pushed.
- if (addr.IsValid() && !setAddrKnown.count(addr))
- vAddrToSend.push_back(addr);
- }
-
-
- void AddInventoryKnown(const CInv& inv)
- {
- CRITICAL_BLOCK(cs_inventory)
- setInventoryKnown.insert(inv);
- }
-
- void PushInventory(const CInv& inv)
- {
- CRITICAL_BLOCK(cs_inventory)
- if (!setInventoryKnown.count(inv))
- vInventoryToSend.push_back(inv);
- }
-
- void AskFor(const CInv& inv)
- {
- // We're using mapAskFor as a priority queue,
- // the key is the earliest time the request can be sent
- int64& nRequestTime = mapAlreadyAskedFor[inv];
- printf("askfor %s %"PRI64d"\n", inv.ToString().c_str(), nRequestTime);
-
- // Make sure not to reuse time indexes to keep things in the same order
- int64 nNow = (GetTime() - 1) * 1000000;
- static int64 nLastTime;
- nLastTime = nNow = max(nNow, ++nLastTime);
-
- // Each retry is 2 minutes after the last
- nRequestTime = max(nRequestTime + 2 * 60 * 1000000, nNow);
- mapAskFor.insert(make_pair(nRequestTime, inv));
- }
-
-
-
- void BeginMessage(const char* pszCommand)
- {
- cs_vSend.Enter();
- if (nHeaderStart != -1)
- AbortMessage();
- nHeaderStart = vSend.size();
- vSend << CMessageHeader(pszCommand, 0);
- nMessageStart = vSend.size();
- if (fDebug)
- printf("%s ", DateTimeStrFormat("%x %H:%M:%S", GetTime()).c_str());
- printf("sending: %s ", pszCommand);
- }
-
- void AbortMessage()
- {
- if (nHeaderStart == -1)
- return;
- vSend.resize(nHeaderStart);
- nHeaderStart = -1;
- nMessageStart = -1;
- cs_vSend.Leave();
- printf("(aborted)\n");
- }
-
- void EndMessage()
- {
- if (mapArgs.count("-dropmessagestest") && GetRand(atoi(mapArgs["-dropmessagestest"])) == 0)
- {
- printf("dropmessages DROPPING SEND MESSAGE\n");
- AbortMessage();
- return;
- }
-
- if (nHeaderStart == -1)
- return;
-
- // Set the size
- unsigned int nSize = vSend.size() - nMessageStart;
- memcpy((char*)&vSend[nHeaderStart] + offsetof(CMessageHeader, nMessageSize), &nSize, sizeof(nSize));
-
- // Set the checksum
- if (vSend.GetVersion() >= 209)
- {
- uint256 hash = Hash(vSend.begin() + nMessageStart, vSend.end());
- unsigned int nChecksum = 0;
- memcpy(&nChecksum, &hash, sizeof(nChecksum));
- assert(nMessageStart - nHeaderStart >= offsetof(CMessageHeader, nChecksum) + sizeof(nChecksum));
- memcpy((char*)&vSend[nHeaderStart] + offsetof(CMessageHeader, nChecksum), &nChecksum, sizeof(nChecksum));
- }
-
- printf("(%d bytes) ", nSize);
- printf("\n");
-
- nHeaderStart = -1;
- nMessageStart = -1;
- cs_vSend.Leave();
- }
-
- void EndMessageAbortIfEmpty()
- {
- if (nHeaderStart == -1)
- return;
- int nSize = vSend.size() - nMessageStart;
- if (nSize > 0)
- EndMessage();
- else
- AbortMessage();
- }
-
- const char* GetMessageCommand() const
- {
- if (nHeaderStart == -1)
- return "";
- return &vSend[nHeaderStart] + offsetof(CMessageHeader, pchCommand);
- }
-
-
-
-
- void PushMessage(const char* pszCommand)
- {
- try
- {
- BeginMessage(pszCommand);
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1>
- void PushMessage(const char* pszCommand, const T1& a1)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1, typename T2>
- void PushMessage(const char* pszCommand, const T1& a1, const T2& a2)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1 << a2;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1, typename T2, typename T3>
- void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1 << a2 << a3;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1, typename T2, typename T3, typename T4>
- void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1, typename T2, typename T3, typename T4, typename T5>
- void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6>
- void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5 << a6;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7>
- void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8>
- void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7, const T8& a8)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
- template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8, typename T9>
- void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7, const T8& a8, const T9& a9)
- {
- try
- {
- BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9;
- EndMessage();
- }
- catch (...)
- {
- AbortMessage();
- throw;
- }
- }
-
-
- void PushRequest(const char* pszCommand,
- void (*fn)(void*, CDataStream&), void* param1)
- {
- uint256 hashReply;
- RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));
-
- CRITICAL_BLOCK(cs_mapRequests)
- mapRequests[hashReply] = CRequestTracker(fn, param1);
-
- PushMessage(pszCommand, hashReply);
- }
-
- template<typename T1>
- void PushRequest(const char* pszCommand, const T1& a1,
- void (*fn)(void*, CDataStream&), void* param1)
- {
- uint256 hashReply;
- RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));
-
- CRITICAL_BLOCK(cs_mapRequests)
- mapRequests[hashReply] = CRequestTracker(fn, param1);
-
- PushMessage(pszCommand, hashReply, a1);
- }
-
- template<typename T1, typename T2>
- void PushRequest(const char* pszCommand, const T1& a1, const T2& a2,
- void (*fn)(void*, CDataStream&), void* param1)
- {
- uint256 hashReply;
- RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));
-
- CRITICAL_BLOCK(cs_mapRequests)
- mapRequests[hashReply] = CRequestTracker(fn, param1);
-
- PushMessage(pszCommand, hashReply, a1, a2);
- }
-
-
-
- void PushGetBlocks(CBlockIndex* pindexBegin, uint256 hashEnd);
- bool IsSubscribed(unsigned int nChannel);
- void Subscribe(unsigned int nChannel, unsigned int nHops=0);
- void CancelSubscribe(unsigned int nChannel);
- void CloseSocketDisconnect();
- void Cleanup();
-};
-
-
-
-
-
-
-
-
-
-
-inline void RelayInventory(const CInv& inv)
-{
- // Put on lists to offer to the other nodes
- CRITICAL_BLOCK(cs_vNodes)
- foreach(CNode* pnode, vNodes)
- pnode->PushInventory(inv);
-}
-
-template<typename T>
-void RelayMessage(const CInv& inv, const T& a)
-{
- CDataStream ss(SER_NETWORK);
- ss.reserve(10000);
- ss << a;
- RelayMessage(inv, ss);
-}
-
-template<>
-inline void RelayMessage<>(const CInv& inv, const CDataStream& ss)
-{
- CRITICAL_BLOCK(cs_mapRelay)
- {
- // Expire old relay messages
- while (!vRelayExpiration.empty() && vRelayExpiration.front().first < GetTime())
- {
- mapRelay.erase(vRelayExpiration.front().second);
- vRelayExpiration.pop_front();
- }
-
- // Save original serialized message so newer versions are preserved
- mapRelay[inv] = ss;
- vRelayExpiration.push_back(make_pair(GetTime() + 15 * 60, inv));
- }
-
- RelayInventory(inv);
-}
-
-
-
-
-
-
-
-
-//
-// Templates for the publish and subscription system.
-// The object being published as T& obj needs to have:
-// a set<unsigned int> setSources member
-// specializations of AdvertInsert and AdvertErase
-// Currently implemented for CTable and CProduct.
-//
-
-template<typename T>
-void AdvertStartPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
-{
- // Add to sources
- obj.setSources.insert(pfrom->addr.ip);
-
- if (!AdvertInsert(obj))
- return;
-
- // Relay
- CRITICAL_BLOCK(cs_vNodes)
- foreach(CNode* pnode, vNodes)
- if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel)))
- pnode->PushMessage("publish", nChannel, nHops, obj);
-}
-
-template<typename T>
-void AdvertStopPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
-{
- uint256 hash = obj.GetHash();
-
- CRITICAL_BLOCK(cs_vNodes)
- foreach(CNode* pnode, vNodes)
- if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel)))
- pnode->PushMessage("pub-cancel", nChannel, nHops, hash);
-
- AdvertErase(obj);
-}
-
-template<typename T>
-void AdvertRemoveSource(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
-{
- // Remove a source
- obj.setSources.erase(pfrom->addr.ip);
-
- // If no longer supported by any sources, cancel it
- if (obj.setSources.empty())
- AdvertStopPublish(pfrom, nChannel, nHops, obj);
-}
+// Copyright (c) 2009-2010 Satoshi Nakamoto
+// Distributed under the MIT/X11 software license, see the accompanying
+// file license.txt or http://www.opensource.org/licenses/mit-license.php.
+
+class CMessageHeader;
+class CAddress;
+class CInv;
+class CRequestTracker;
+class CNode;
+class CBlockIndex;
+extern int nBestHeight;
+
+
+
+static const unsigned short DEFAULT_PORT = 0x8d20; // htons(8333)
+static const unsigned int PUBLISH_HOPS = 5;
+enum
+{
+ NODE_NETWORK = (1 << 0),
+};
+
+
+
+
+bool ConnectSocket(const CAddress& addrConnect, SOCKET& hSocketRet);
+bool GetMyExternalIP(unsigned int& ipRet);
+bool AddAddress(CAddress addr);
+void AddressCurrentlyConnected(const CAddress& addr);
+CNode* FindNode(unsigned int ip);
+CNode* ConnectNode(CAddress addrConnect, int64 nTimeout=0);
+void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1);
+bool AnySubscribed(unsigned int nChannel);
+bool BindListenPort(string& strError=REF(string()));
+void StartNode(void* parg);
+bool StopNode();
+
+
+
+
+
+
+
+
+//
+// Message header
+// (4) message start
+// (12) command
+// (4) size
+// (4) checksum
+
+// The message start string is designed to be unlikely to occur in normal data.
+// The characters are rarely used upper ascii, not valid as UTF-8, and produce
+// a large 4-byte int at any alignment.
+static const char pchMessageStart[4] = { 0xf9, 0xbe, 0xb4, 0xd9 };
+
+class CMessageHeader
+{
+public:
+ enum { COMMAND_SIZE=12 };
+ char pchMessageStart[sizeof(::pchMessageStart)];
+ char pchCommand[COMMAND_SIZE];
+ unsigned int nMessageSize;
+ unsigned int nChecksum;
+
+ CMessageHeader()
+ {
+ memcpy(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart));
+ memset(pchCommand, 0, sizeof(pchCommand));
+ pchCommand[1] = 1;
+ nMessageSize = -1;
+ nChecksum = 0;
+ }
+
+ CMessageHeader(const char* pszCommand, unsigned int nMessageSizeIn)
+ {
+ memcpy(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart));
+ strncpy(pchCommand, pszCommand, COMMAND_SIZE);
+ nMessageSize = nMessageSizeIn;
+ nChecksum = 0;
+ }
+
+ IMPLEMENT_SERIALIZE
+ (
+ READWRITE(FLATDATA(pchMessageStart));
+ READWRITE(FLATDATA(pchCommand));
+ READWRITE(nMessageSize);
+ if (nVersion >= 209)
+ READWRITE(nChecksum);
+ )
+
+ string GetCommand()
+ {
+ if (pchCommand[COMMAND_SIZE-1] == 0)
+ return string(pchCommand, pchCommand + strlen(pchCommand));
+ else
+ return string(pchCommand, pchCommand + COMMAND_SIZE);
+ }
+
+ bool IsValid()
+ {
+ // Check start string
+ if (memcmp(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart)) != 0)
+ return false;
+
+ // Check the command string for errors
+ for (char* p1 = pchCommand; p1 < pchCommand + COMMAND_SIZE; p1++)
+ {
+ if (*p1 == 0)
+ {
+ // Must be all zeros after the first zero
+ for (; p1 < pchCommand + COMMAND_SIZE; p1++)
+ if (*p1 != 0)
+ return false;
+ }
+ else if (*p1 < ' ' || *p1 > 0x7E)
+ return false;
+ }
+
+ // Message size
+ if (nMessageSize > 0x10000000)
+ {
+ printf("CMessageHeader::IsValid() : nMessageSize too large %u\n", nMessageSize);
+ return false;
+ }
+
+ return true;
+ }
+};
+
+
+
+
+
+
+static const unsigned char pchIPv4[12] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff };
+
+class CAddress
+{
+public:
+ uint64 nServices;
+ unsigned char pchReserved[12];
+ unsigned int ip;
+ unsigned short port;
+
+ // disk only
+ unsigned int nTime;
+
+ // memory only
+ unsigned int nLastTry;
+
+ CAddress()
+ {
+ Init();
+ }
+
+ CAddress(unsigned int ipIn, unsigned short portIn=DEFAULT_PORT, uint64 nServicesIn=NODE_NETWORK)
+ {
+ Init();
+ ip = ipIn;
+ port = portIn;
+ nServices = nServicesIn;
+ }
+
+ explicit CAddress(const struct sockaddr_in& sockaddr, uint64 nServicesIn=NODE_NETWORK)
+ {
+ Init();
+ ip = sockaddr.sin_addr.s_addr;
+ port = sockaddr.sin_port;
+ nServices = nServicesIn;
+ }
+
+ explicit CAddress(const char* pszIn, uint64 nServicesIn=NODE_NETWORK)
+ {
+ Init();
+ SetAddress(pszIn);
+ nServices = nServicesIn;
+ }
+
+ explicit CAddress(string strIn, uint64 nServicesIn=NODE_NETWORK)
+ {
+ Init();
+ SetAddress(strIn.c_str());
+ nServices = nServicesIn;
+ }
+
+ void Init()
+ {
+ nServices = NODE_NETWORK;
+ memcpy(pchReserved, pchIPv4, sizeof(pchReserved));
+ ip = INADDR_NONE;
+ port = DEFAULT_PORT;
+ nTime = GetAdjustedTime();
+ nLastTry = 0;
+ }
+
+ bool SetAddress(const char* pszIn)
+ {
+ ip = INADDR_NONE;
+ port = DEFAULT_PORT;
+ char psz[100];
+ strlcpy(psz, pszIn, sizeof(psz));
+ unsigned int a=0, b=0, c=0, d=0, e=0;
+ if (sscanf(psz, "%u.%u.%u.%u:%u", &a, &b, &c, &d, &e) < 4)
+ return false;
+ char* pszPort = strchr(psz, ':');
+ if (pszPort)
+ {
+ *pszPort++ = '\0';
+ port = htons(atoi(pszPort));
+ if (atoi(pszPort) < 0 || atoi(pszPort) > USHRT_MAX)
+ port = htons(USHRT_MAX);
+ }
+ ip = inet_addr(psz);
+ return IsValid();
+ }
+
+ bool SetAddress(string strIn)
+ {
+ return SetAddress(strIn.c_str());
+ }
+
+ IMPLEMENT_SERIALIZE
+ (
+ if (nType & SER_DISK)
+ {
+ READWRITE(nVersion);
+ READWRITE(nTime);
+ }
+ READWRITE(nServices);
+ READWRITE(FLATDATA(pchReserved)); // for IPv6
+ READWRITE(ip);
+ READWRITE(port);
+ )
+
+ friend inline bool operator==(const CAddress& a, const CAddress& b)
+ {
+ return (memcmp(a.pchReserved, b.pchReserved, sizeof(a.pchReserved)) == 0 &&
+ a.ip == b.ip &&
+ a.port == b.port);
+ }
+
+ friend inline bool operator!=(const CAddress& a, const CAddress& b)
+ {
+ return (!(a == b));
+ }
+
+ friend inline bool operator<(const CAddress& a, const CAddress& b)
+ {
+ int ret = memcmp(a.pchReserved, b.pchReserved, sizeof(a.pchReserved));
+ if (ret < 0)
+ return true;
+ else if (ret == 0)
+ {
+ if (ntohl(a.ip) < ntohl(b.ip))
+ return true;
+ else if (a.ip == b.ip)
+ return ntohs(a.port) < ntohs(b.port);
+ }
+ return false;
+ }
+
+ vector<unsigned char> GetKey() const
+ {
+ CDataStream ss;
+ ss.reserve(18);
+ ss << FLATDATA(pchReserved) << ip << port;
+
+ #if defined(_MSC_VER) && _MSC_VER < 1300
+ return vector<unsigned char>((unsigned char*)&ss.begin()[0], (unsigned char*)&ss.end()[0]);
+ #else
+ return vector<unsigned char>(ss.begin(), ss.end());
+ #endif
+ }
+
+ struct sockaddr_in GetSockAddr() const
+ {
+ struct sockaddr_in sockaddr;
+ memset(&sockaddr, 0, sizeof(sockaddr));
+ sockaddr.sin_family = AF_INET;
+ sockaddr.sin_addr.s_addr = ip;
+ sockaddr.sin_port = port;
+ return sockaddr;
+ }
+
+ bool IsIPv4() const
+ {
+ return (memcmp(pchReserved, pchIPv4, sizeof(pchIPv4)) == 0);
+ }
+
+ bool IsRoutable() const
+ {
+ return IsValid() &&
+ !(GetByte(3) == 10 ||
+ (GetByte(3) == 192 && GetByte(2) == 168) ||
+ GetByte(3) == 127 ||
+ GetByte(3) == 0);
+ }
+
+ bool IsValid() const
+ {
+ // Clean up 3-byte shifted addresses caused by garbage in size field
+ // of addr messages from versions before 0.2.9 checksum.
+ // Two consecutive addr messages look like this:
+ // header20 vectorlen3 addr26 addr26 addr26 header20 vectorlen3 addr26 addr26 addr26...
+ // so if the first length field is garbled, it reads the second batch
+ // of addr misaligned by 3 bytes.
+ if (memcmp(pchReserved, pchIPv4+3, sizeof(pchIPv4)-3) == 0)
+ return false;
+
+ return (ip != 0 && ip != INADDR_NONE && port != htons(USHRT_MAX));
+ }
+
+ unsigned char GetByte(int n) const
+ {
+ return ((unsigned char*)&ip)[3-n];
+ }
+
+ string ToStringIPPort() const
+ {
+ return strprintf("%u.%u.%u.%u:%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0), ntohs(port));
+ }
+
+ string ToStringIP() const
+ {
+ return strprintf("%u.%u.%u.%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0));
+ }
+
+ string ToStringPort() const
+ {
+ return strprintf("%u", ntohs(port));
+ }
+
+ string ToStringLog() const
+ {
+ return "";
+ }
+
+ string ToString() const
+ {
+ return strprintf("%u.%u.%u.%u:%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0), ntohs(port));
+ }
+
+ void print() const
+ {
+ printf("CAddress(%s)\n", ToString().c_str());
+ }
+};
+
+
+
+
+
+
+
+enum
+{
+ MSG_TX = 1,
+ MSG_BLOCK,
+};
+
+static const char* ppszTypeName[] =
+{
+ "ERROR",
+ "tx",
+ "block",
+};
+
+class CInv
+{
+public:
+ int type;
+ uint256 hash;
+
+ CInv()
+ {
+ type = 0;
+ hash = 0;
+ }
+
+ CInv(int typeIn, const uint256& hashIn)
+ {
+ type = typeIn;
+ hash = hashIn;
+ }
+
+ CInv(const string& strType, const uint256& hashIn)
+ {
+ int i;
+ for (i = 1; i < ARRAYLEN(ppszTypeName); i++)
+ {
+ if (strType == ppszTypeName[i])
+ {
+ type = i;
+ break;
+ }
+ }
+ if (i == ARRAYLEN(ppszTypeName))
+ throw std::out_of_range(strprintf("CInv::CInv(string, uint256) : unknown type '%s'", strType.c_str()));
+ hash = hashIn;
+ }
+
+ IMPLEMENT_SERIALIZE
+ (
+ READWRITE(type);
+ READWRITE(hash);
+ )
+
+ friend inline bool operator<(const CInv& a, const CInv& b)
+ {
+ return (a.type < b.type || (a.type == b.type && a.hash < b.hash));
+ }
+
+ bool IsKnownType() const
+ {
+ return (type >= 1 && type < ARRAYLEN(ppszTypeName));
+ }
+
+ const char* GetCommand() const
+ {
+ if (!IsKnownType())
+ throw std::out_of_range(strprintf("CInv::GetCommand() : type=% unknown type", type));
+ return ppszTypeName[type];
+ }
+
+ string ToString() const
+ {
+ return strprintf("%s %s", GetCommand(), hash.ToString().substr(0,20).c_str());
+ }
+
+ void print() const
+ {
+ printf("CInv(%s)\n", ToString().c_str());
+ }
+};
+
+
+
+
+
+class CRequestTracker
+{
+public:
+ void (*fn)(void*, CDataStream&);
+ void* param1;
+
+ explicit CRequestTracker(void (*fnIn)(void*, CDataStream&)=NULL, void* param1In=NULL)
+ {
+ fn = fnIn;
+ param1 = param1In;
+ }
+
+ bool IsNull()
+ {
+ return fn == NULL;
+ }
+};
+
+
+
+
+
+extern bool fClient;
+extern uint64 nLocalServices;
+extern CAddress addrLocalHost;
+extern CNode* pnodeLocalHost;
+extern uint64 nLocalHostNonce;
+extern array<int, 10> vnThreadsRunning;
+extern SOCKET hListenSocket;
+extern int64 nThreadSocketHandlerHeartbeat;
+
+extern vector<CNode*> vNodes;
+extern CCriticalSection cs_vNodes;
+extern map<vector<unsigned char>, CAddress> mapAddresses;
+extern CCriticalSection cs_mapAddresses;
+extern map<CInv, CDataStream> mapRelay;
+extern deque<pair<int64, CInv> > vRelayExpiration;
+extern CCriticalSection cs_mapRelay;
+extern map<CInv, int64> mapAlreadyAskedFor;
+
+// Settings
+extern int fUseProxy;
+extern CAddress addrProxy;
+
+
+
+
+
+
+class CNode
+{
+public:
+ // socket
+ uint64 nServices;
+ SOCKET hSocket;
+ CDataStream vSend;
+ CDataStream vRecv;
+ CCriticalSection cs_vSend;
+ CCriticalSection cs_vRecv;
+ int64 nLastSend;
+ int64 nLastRecv;
+ int64 nLastSendEmpty;
+ int64 nTimeConnected;
+ unsigned int nHeaderStart;
+ unsigned int nMessageStart;
+ CAddress addr;
+ int nVersion;
+ string strSubVer;
+ bool fClient;
+ bool fInbound;
+ bool fNetworkNode;
+ bool fSuccessfullyConnected;
+ bool fDisconnect;
+protected:
+ int nRefCount;
+public:
+ int64 nReleaseTime;
+ map<uint256, CRequestTracker> mapRequests;
+ CCriticalSection cs_mapRequests;
+ uint256 hashContinue;
+ CBlockIndex* pindexLastGetBlocksBegin;
+ uint256 hashLastGetBlocksEnd;
+ int nStartingHeight;
+
+ // flood relay
+ vector<CAddress> vAddrToSend;
+ set<CAddress> setAddrKnown;
+ bool fGetAddr;
+ set<uint256> setKnown;
+
+ // inventory based relay
+ set<CInv> setInventoryKnown;
+ vector<CInv> vInventoryToSend;
+ CCriticalSection cs_inventory;
+ multimap<int64, CInv> mapAskFor;
+
+ // publish and subscription
+ vector<char> vfSubscribe;
+
+
+ CNode(SOCKET hSocketIn, CAddress addrIn, bool fInboundIn=false)
+ {
+ nServices = 0;
+ hSocket = hSocketIn;
+ vSend.SetType(SER_NETWORK);
+ vSend.SetVersion(0);
+ vRecv.SetType(SER_NETWORK);
+ vRecv.SetVersion(0);
+ // Version 0.2 obsoletes 20 Feb 2012
+ if (GetTime() > 1329696000)
+ {
+ vSend.SetVersion(209);
+ vRecv.SetVersion(209);
+ }
+ nLastSend = 0;
+ nLastRecv = 0;
+ nLastSendEmpty = GetTime();
+ nTimeConnected = GetTime();
+ nHeaderStart = -1;
+ nMessageStart = -1;
+ addr = addrIn;
+ nVersion = 0;
+ strSubVer = "";
+ fClient = false; // set by version message
+ fInbound = fInboundIn;
+ fNetworkNode = false;
+ fSuccessfullyConnected = false;
+ fDisconnect = false;
+ nRefCount = 0;
+ nReleaseTime = 0;
+ hashContinue = 0;
+ pindexLastGetBlocksBegin = 0;
+ hashLastGetBlocksEnd = 0;
+ nStartingHeight = -1;
+ fGetAddr = false;
+ vfSubscribe.assign(256, false);
+
+ // Push a version message
+ /// when NTP implemented, change to just nTime = GetAdjustedTime()
+ int64 nTime = (fInbound ? GetAdjustedTime() : GetTime());
+ CAddress addrYou = (fUseProxy ? CAddress("0.0.0.0") : addr);
+ CAddress addrMe = (fUseProxy ? CAddress("0.0.0.0") : addrLocalHost);
+ RAND_bytes((unsigned char*)&nLocalHostNonce, sizeof(nLocalHostNonce));
+ PushMessage("version", VERSION, nLocalServices, nTime, addrYou, addrMe,
+ nLocalHostNonce, string(pszSubVer), nBestHeight);
+ }
+
+ ~CNode()
+ {
+ if (hSocket != INVALID_SOCKET)
+ {
+ closesocket(hSocket);
+ hSocket = INVALID_SOCKET;
+ }
+ }
+
+private:
+ CNode(const CNode&);
+ void operator=(const CNode&);
+public:
+
+
+ int GetRefCount()
+ {
+ return max(nRefCount, 0) + (GetTime() < nReleaseTime ? 1 : 0);
+ }
+
+ CNode* AddRef(int64 nTimeout=0)
+ {
+ if (nTimeout != 0)
+ nReleaseTime = max(nReleaseTime, GetTime() + nTimeout);
+ else
+ nRefCount++;
+ return this;
+ }
+
+ void Release()
+ {
+ nRefCount--;
+ }
+
+
+
+ void AddAddressKnown(const CAddress& addr)
+ {
+ setAddrKnown.insert(addr);
+ }
+
+ void PushAddress(const CAddress& addr)
+ {
+ // Known checking here is only to save space from duplicates.
+ // SendMessages will filter it again for knowns that were added
+ // after addresses were pushed.
+ if (addr.IsValid() && !setAddrKnown.count(addr))
+ vAddrToSend.push_back(addr);
+ }
+
+
+ void AddInventoryKnown(const CInv& inv)
+ {
+ CRITICAL_BLOCK(cs_inventory)
+ setInventoryKnown.insert(inv);
+ }
+
+ void PushInventory(const CInv& inv)
+ {
+ CRITICAL_BLOCK(cs_inventory)
+ if (!setInventoryKnown.count(inv))
+ vInventoryToSend.push_back(inv);
+ }
+
+ void AskFor(const CInv& inv)
+ {
+ // We're using mapAskFor as a priority queue,
+ // the key is the earliest time the request can be sent
+ int64& nRequestTime = mapAlreadyAskedFor[inv];
+ printf("askfor %s %"PRI64d"\n", inv.ToString().c_str(), nRequestTime);
+
+ // Make sure not to reuse time indexes to keep things in the same order
+ int64 nNow = (GetTime() - 1) * 1000000;
+ static int64 nLastTime;
+ nLastTime = nNow = max(nNow, ++nLastTime);
+
+ // Each retry is 2 minutes after the last
+ nRequestTime = max(nRequestTime + 2 * 60 * 1000000, nNow);
+ mapAskFor.insert(make_pair(nRequestTime, inv));
+ }
+
+
+
+ void BeginMessage(const char* pszCommand)
+ {
+ cs_vSend.Enter();
+ if (nHeaderStart != -1)
+ AbortMessage();
+ nHeaderStart = vSend.size();
+ vSend << CMessageHeader(pszCommand, 0);
+ nMessageStart = vSend.size();
+ if (fDebug)
+ printf("%s ", DateTimeStrFormat("%x %H:%M:%S", GetTime()).c_str());
+ printf("sending: %s ", pszCommand);
+ }
+
+ void AbortMessage()
+ {
+ if (nHeaderStart == -1)
+ return;
+ vSend.resize(nHeaderStart);
+ nHeaderStart = -1;
+ nMessageStart = -1;
+ cs_vSend.Leave();
+ printf("(aborted)\n");
+ }
+
+ void EndMessage()
+ {
+ if (mapArgs.count("-dropmessagestest") && GetRand(atoi(mapArgs["-dropmessagestest"])) == 0)
+ {
+ printf("dropmessages DROPPING SEND MESSAGE\n");
+ AbortMessage();
+ return;
+ }
+
+ if (nHeaderStart == -1)
+ return;
+
+ // Set the size
+ unsigned int nSize = vSend.size() - nMessageStart;
+ memcpy((char*)&vSend[nHeaderStart] + offsetof(CMessageHeader, nMessageSize), &nSize, sizeof(nSize));
+
+ // Set the checksum
+ if (vSend.GetVersion() >= 209)
+ {
+ uint256 hash = Hash(vSend.begin() + nMessageStart, vSend.end());
+ unsigned int nChecksum = 0;
+ memcpy(&nChecksum, &hash, sizeof(nChecksum));
+ assert(nMessageStart - nHeaderStart >= offsetof(CMessageHeader, nChecksum) + sizeof(nChecksum));
+ memcpy((char*)&vSend[nHeaderStart] + offsetof(CMessageHeader, nChecksum), &nChecksum, sizeof(nChecksum));
+ }
+
+ printf("(%d bytes) ", nSize);
+ printf("\n");
+
+ nHeaderStart = -1;
+ nMessageStart = -1;
+ cs_vSend.Leave();
+ }
+
+ void EndMessageAbortIfEmpty()
+ {
+ if (nHeaderStart == -1)
+ return;
+ int nSize = vSend.size() - nMessageStart;
+ if (nSize > 0)
+ EndMessage();
+ else
+ AbortMessage();
+ }
+
+ const char* GetMessageCommand() const
+ {
+ if (nHeaderStart == -1)
+ return "";
+ return &vSend[nHeaderStart] + offsetof(CMessageHeader, pchCommand);
+ }
+
+
+
+
+ void PushMessage(const char* pszCommand)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1>
+ void PushMessage(const char* pszCommand, const T1& a1)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1, typename T2>
+ void PushMessage(const char* pszCommand, const T1& a1, const T2& a2)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1 << a2;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1, typename T2, typename T3>
+ void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1 << a2 << a3;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1, typename T2, typename T3, typename T4>
+ void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1 << a2 << a3 << a4;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1, typename T2, typename T3, typename T4, typename T5>
+ void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1 << a2 << a3 << a4 << a5;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6>
+ void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1 << a2 << a3 << a4 << a5 << a6;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7>
+ void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8>
+ void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7, const T8& a8)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8, typename T9>
+ void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7, const T8& a8, const T9& a9)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+
+ void PushRequest(const char* pszCommand,
+ void (*fn)(void*, CDataStream&), void* param1)
+ {
+ uint256 hashReply;
+ RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));
+
+ CRITICAL_BLOCK(cs_mapRequests)
+ mapRequests[hashReply] = CRequestTracker(fn, param1);
+
+ PushMessage(pszCommand, hashReply);
+ }
+
+ template<typename T1>
+ void PushRequest(const char* pszCommand, const T1& a1,
+ void (*fn)(void*, CDataStream&), void* param1)
+ {
+ uint256 hashReply;
+ RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));
+
+ CRITICAL_BLOCK(cs_mapRequests)
+ mapRequests[hashReply] = CRequestTracker(fn, param1);
+
+ PushMessage(pszCommand, hashReply, a1);
+ }
+
+ template<typename T1, typename T2>
+ void PushRequest(const char* pszCommand, const T1& a1, const T2& a2,
+ void (*fn)(void*, CDataStream&), void* param1)
+ {
+ uint256 hashReply;
+ RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));
+
+ CRITICAL_BLOCK(cs_mapRequests)
+ mapRequests[hashReply] = CRequestTracker(fn, param1);
+
+ PushMessage(pszCommand, hashReply, a1, a2);
+ }
+
+
+
+ void PushGetBlocks(CBlockIndex* pindexBegin, uint256 hashEnd);
+ bool IsSubscribed(unsigned int nChannel);
+ void Subscribe(unsigned int nChannel, unsigned int nHops=0);
+ void CancelSubscribe(unsigned int nChannel);
+ void CloseSocketDisconnect();
+ void Cleanup();
+};
+
+
+
+
+
+
+
+
+
+
+inline void RelayInventory(const CInv& inv)
+{
+ // Put on lists to offer to the other nodes
+ CRITICAL_BLOCK(cs_vNodes)
+ foreach(CNode* pnode, vNodes)
+ pnode->PushInventory(inv);
+}
+
+template<typename T>
+void RelayMessage(const CInv& inv, const T& a)
+{
+ CDataStream ss(SER_NETWORK);
+ ss.reserve(10000);
+ ss << a;
+ RelayMessage(inv, ss);
+}
+
+template<>
+inline void RelayMessage<>(const CInv& inv, const CDataStream& ss)
+{
+ CRITICAL_BLOCK(cs_mapRelay)
+ {
+ // Expire old relay messages
+ while (!vRelayExpiration.empty() && vRelayExpiration.front().first < GetTime())
+ {
+ mapRelay.erase(vRelayExpiration.front().second);
+ vRelayExpiration.pop_front();
+ }
+
+ // Save original serialized message so newer versions are preserved
+ mapRelay[inv] = ss;
+ vRelayExpiration.push_back(make_pair(GetTime() + 15 * 60, inv));
+ }
+
+ RelayInventory(inv);
+}
+
+
+
+
+
+
+
+
+//
+// Templates for the publish and subscription system.
+// The object being published as T& obj needs to have:
+// a set<unsigned int> setSources member
+// specializations of AdvertInsert and AdvertErase
+// Currently implemented for CTable and CProduct.
+//
+
+template<typename T>
+void AdvertStartPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
+{
+ // Add to sources
+ obj.setSources.insert(pfrom->addr.ip);
+
+ if (!AdvertInsert(obj))
+ return;
+
+ // Relay
+ CRITICAL_BLOCK(cs_vNodes)
+ foreach(CNode* pnode, vNodes)
+ if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel)))
+ pnode->PushMessage("publish", nChannel, nHops, obj);
+}
+
+template<typename T>
+void AdvertStopPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
+{
+ uint256 hash = obj.GetHash();
+
+ CRITICAL_BLOCK(cs_vNodes)
+ foreach(CNode* pnode, vNodes)
+ if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel)))
+ pnode->PushMessage("pub-cancel", nChannel, nHops, hash);
+
+ AdvertErase(obj);
+}
+
+template<typename T>
+void AdvertRemoveSource(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
+{
+ // Remove a source
+ obj.setSources.erase(pfrom->addr.ip);
+
+ // If no longer supported by any sources, cancel it
+ if (obj.setSources.empty())
+ AdvertStopPublish(pfrom, nChannel, nHops, obj);
+}