aboutsummaryrefslogtreecommitdiff
path: root/src/net.h
diff options
context:
space:
mode:
authorJeff Garzik <jeff@garzik.org>2011-05-09 14:00:14 -0400
committerJeff Garzik <jgarzik@pobox.com>2011-05-09 14:00:14 -0400
commitb17be7e14b4a62b7935c75fe7e7645e736fd68d2 (patch)
treebe44c7310683df72aeb5781ec74a85451206cfbc /src/net.h
parentc0430f42b868dbf996a416d3a9d81d1c8c07e47c (diff)
parent84c3fb07b0b8199c7f85c5de280e7100bad0786f (diff)
Manual merge of jaromil's source tree reorg commit.
Conflicts: src/sha256.cpp
Diffstat (limited to 'src/net.h')
-rw-r--r--src/net.h1055
1 files changed, 1055 insertions, 0 deletions
diff --git a/src/net.h b/src/net.h
new file mode 100644
index 0000000000..b3bd74da47
--- /dev/null
+++ b/src/net.h
@@ -0,0 +1,1055 @@
+// Copyright (c) 2009-2010 Satoshi Nakamoto
+// Distributed under the MIT/X11 software license, see the accompanying
+// file license.txt or http://www.opensource.org/licenses/mit-license.php.
+
+class CMessageHeader;
+class CAddress;
+class CInv;
+class CRequestTracker;
+class CNode;
+class CBlockIndex;
+extern int nBestHeight;
+
+
+
+inline unsigned short GetDefaultPort() { return fTestNet ? htons(18333) : htons(8333); }
+static const unsigned int PUBLISH_HOPS = 5;
+enum
+{
+ NODE_NETWORK = (1 << 0),
+};
+
+
+
+
+bool ConnectSocket(const CAddress& addrConnect, SOCKET& hSocketRet);
+bool GetMyExternalIP(unsigned int& ipRet);
+bool AddAddress(CAddress addr, int64 nTimePenalty=0);
+void AddressCurrentlyConnected(const CAddress& addr);
+CNode* FindNode(unsigned int ip);
+CNode* ConnectNode(CAddress addrConnect, int64 nTimeout=0);
+void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1);
+bool AnySubscribed(unsigned int nChannel);
+void MapPort(bool fMapPort);
+void DNSAddressSeed();
+bool BindListenPort(string& strError=REF(string()));
+void StartNode(void* parg);
+bool StopNode();
+
+
+
+
+
+
+
+
+//
+// Message header
+// (4) message start
+// (12) command
+// (4) size
+// (4) checksum
+
+extern char pchMessageStart[4];
+
+class CMessageHeader
+{
+public:
+ enum { COMMAND_SIZE=12 };
+ char pchMessageStart[sizeof(::pchMessageStart)];
+ char pchCommand[COMMAND_SIZE];
+ unsigned int nMessageSize;
+ unsigned int nChecksum;
+
+ CMessageHeader()
+ {
+ memcpy(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart));
+ memset(pchCommand, 0, sizeof(pchCommand));
+ pchCommand[1] = 1;
+ nMessageSize = -1;
+ nChecksum = 0;
+ }
+
+ CMessageHeader(const char* pszCommand, unsigned int nMessageSizeIn)
+ {
+ memcpy(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart));
+ strncpy(pchCommand, pszCommand, COMMAND_SIZE);
+ nMessageSize = nMessageSizeIn;
+ nChecksum = 0;
+ }
+
+ IMPLEMENT_SERIALIZE
+ (
+ READWRITE(FLATDATA(pchMessageStart));
+ READWRITE(FLATDATA(pchCommand));
+ READWRITE(nMessageSize);
+ if (nVersion >= 209)
+ READWRITE(nChecksum);
+ )
+
+ string GetCommand()
+ {
+ if (pchCommand[COMMAND_SIZE-1] == 0)
+ return string(pchCommand, pchCommand + strlen(pchCommand));
+ else
+ return string(pchCommand, pchCommand + COMMAND_SIZE);
+ }
+
+ bool IsValid()
+ {
+ // Check start string
+ if (memcmp(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart)) != 0)
+ return false;
+
+ // Check the command string for errors
+ for (char* p1 = pchCommand; p1 < pchCommand + COMMAND_SIZE; p1++)
+ {
+ if (*p1 == 0)
+ {
+ // Must be all zeros after the first zero
+ for (; p1 < pchCommand + COMMAND_SIZE; p1++)
+ if (*p1 != 0)
+ return false;
+ }
+ else if (*p1 < ' ' || *p1 > 0x7E)
+ return false;
+ }
+
+ // Message size
+ if (nMessageSize > MAX_SIZE)
+ {
+ printf("CMessageHeader::IsValid() : (%s, %u bytes) nMessageSize > MAX_SIZE\n", GetCommand().c_str(), nMessageSize);
+ return false;
+ }
+
+ return true;
+ }
+};
+
+
+
+
+
+
+static const unsigned char pchIPv4[12] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff };
+
+class CAddress
+{
+public:
+ uint64 nServices;
+ unsigned char pchReserved[12];
+ unsigned int ip;
+ unsigned short port;
+
+ // disk and network only
+ unsigned int nTime;
+
+ // memory only
+ unsigned int nLastTry;
+
+ CAddress()
+ {
+ Init();
+ }
+
+ CAddress(unsigned int ipIn, unsigned short portIn=0, uint64 nServicesIn=NODE_NETWORK)
+ {
+ Init();
+ ip = ipIn;
+ port = (portIn == 0 ? GetDefaultPort() : portIn);
+ nServices = nServicesIn;
+ }
+
+ explicit CAddress(const struct sockaddr_in& sockaddr, uint64 nServicesIn=NODE_NETWORK)
+ {
+ Init();
+ ip = sockaddr.sin_addr.s_addr;
+ port = sockaddr.sin_port;
+ nServices = nServicesIn;
+ }
+
+ explicit CAddress(const char* pszIn, uint64 nServicesIn=NODE_NETWORK)
+ {
+ Init();
+ SetAddress(pszIn);
+ nServices = nServicesIn;
+ }
+
+ explicit CAddress(string strIn, uint64 nServicesIn=NODE_NETWORK)
+ {
+ Init();
+ SetAddress(strIn.c_str());
+ nServices = nServicesIn;
+ }
+
+ void Init()
+ {
+ nServices = NODE_NETWORK;
+ memcpy(pchReserved, pchIPv4, sizeof(pchReserved));
+ ip = INADDR_NONE;
+ port = GetDefaultPort();
+ nTime = 100000000;
+ nLastTry = 0;
+ }
+
+ bool SetAddress(const char* pszIn)
+ {
+ ip = INADDR_NONE;
+ port = GetDefaultPort();
+ char psz[100];
+ strlcpy(psz, pszIn, sizeof(psz));
+ unsigned int a=0, b=0, c=0, d=0, e=0;
+ if (sscanf(psz, "%u.%u.%u.%u:%u", &a, &b, &c, &d, &e) < 4)
+ return false;
+ char* pszPort = strchr(psz, ':');
+ if (pszPort)
+ {
+ *pszPort++ = '\0';
+ port = htons(atoi(pszPort));
+ if (atoi(pszPort) < 0 || atoi(pszPort) > USHRT_MAX)
+ port = htons(USHRT_MAX);
+ }
+ ip = inet_addr(psz);
+ return IsValid();
+ }
+
+ bool SetAddress(string strIn)
+ {
+ return SetAddress(strIn.c_str());
+ }
+
+ IMPLEMENT_SERIALIZE
+ (
+ if (fRead)
+ const_cast<CAddress*>(this)->Init();
+ if (nType & SER_DISK)
+ READWRITE(nVersion);
+ if ((nType & SER_DISK) || (nVersion >= 31402 && !(nType & SER_GETHASH)))
+ READWRITE(nTime);
+ READWRITE(nServices);
+ READWRITE(FLATDATA(pchReserved)); // for IPv6
+ READWRITE(ip);
+ READWRITE(port);
+ )
+
+ friend inline bool operator==(const CAddress& a, const CAddress& b)
+ {
+ return (memcmp(a.pchReserved, b.pchReserved, sizeof(a.pchReserved)) == 0 &&
+ a.ip == b.ip &&
+ a.port == b.port);
+ }
+
+ friend inline bool operator!=(const CAddress& a, const CAddress& b)
+ {
+ return (!(a == b));
+ }
+
+ friend inline bool operator<(const CAddress& a, const CAddress& b)
+ {
+ int ret = memcmp(a.pchReserved, b.pchReserved, sizeof(a.pchReserved));
+ if (ret < 0)
+ return true;
+ else if (ret == 0)
+ {
+ if (ntohl(a.ip) < ntohl(b.ip))
+ return true;
+ else if (a.ip == b.ip)
+ return ntohs(a.port) < ntohs(b.port);
+ }
+ return false;
+ }
+
+ vector<unsigned char> GetKey() const
+ {
+ CDataStream ss;
+ ss.reserve(18);
+ ss << FLATDATA(pchReserved) << ip << port;
+
+ #if defined(_MSC_VER) && _MSC_VER < 1300
+ return vector<unsigned char>((unsigned char*)&ss.begin()[0], (unsigned char*)&ss.end()[0]);
+ #else
+ return vector<unsigned char>(ss.begin(), ss.end());
+ #endif
+ }
+
+ struct sockaddr_in GetSockAddr() const
+ {
+ struct sockaddr_in sockaddr;
+ memset(&sockaddr, 0, sizeof(sockaddr));
+ sockaddr.sin_family = AF_INET;
+ sockaddr.sin_addr.s_addr = ip;
+ sockaddr.sin_port = port;
+ return sockaddr;
+ }
+
+ bool IsIPv4() const
+ {
+ return (memcmp(pchReserved, pchIPv4, sizeof(pchIPv4)) == 0);
+ }
+
+ bool IsRoutable() const
+ {
+ return IsValid() &&
+ !(GetByte(3) == 10 ||
+ (GetByte(3) == 192 && GetByte(2) == 168) ||
+ GetByte(3) == 127 ||
+ GetByte(3) == 0);
+ }
+
+ bool IsValid() const
+ {
+ // Clean up 3-byte shifted addresses caused by garbage in size field
+ // of addr messages from versions before 0.2.9 checksum.
+ // Two consecutive addr messages look like this:
+ // header20 vectorlen3 addr26 addr26 addr26 header20 vectorlen3 addr26 addr26 addr26...
+ // so if the first length field is garbled, it reads the second batch
+ // of addr misaligned by 3 bytes.
+ if (memcmp(pchReserved, pchIPv4+3, sizeof(pchIPv4)-3) == 0)
+ return false;
+
+ return (ip != 0 && ip != INADDR_NONE && port != htons(USHRT_MAX));
+ }
+
+ unsigned char GetByte(int n) const
+ {
+ return ((unsigned char*)&ip)[3-n];
+ }
+
+ string ToStringIPPort() const
+ {
+ return strprintf("%u.%u.%u.%u:%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0), ntohs(port));
+ }
+
+ string ToStringIP() const
+ {
+ return strprintf("%u.%u.%u.%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0));
+ }
+
+ string ToStringPort() const
+ {
+ return strprintf("%u", ntohs(port));
+ }
+
+ string ToStringLog() const
+ {
+ return "";
+ }
+
+ string ToString() const
+ {
+ return strprintf("%u.%u.%u.%u:%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0), ntohs(port));
+ }
+
+ void print() const
+ {
+ printf("CAddress(%s)\n", ToString().c_str());
+ }
+};
+
+
+
+
+
+
+
+enum
+{
+ MSG_TX = 1,
+ MSG_BLOCK,
+};
+
+static const char* ppszTypeName[] =
+{
+ "ERROR",
+ "tx",
+ "block",
+};
+
+class CInv
+{
+public:
+ int type;
+ uint256 hash;
+
+ CInv()
+ {
+ type = 0;
+ hash = 0;
+ }
+
+ CInv(int typeIn, const uint256& hashIn)
+ {
+ type = typeIn;
+ hash = hashIn;
+ }
+
+ CInv(const string& strType, const uint256& hashIn)
+ {
+ int i;
+ for (i = 1; i < ARRAYLEN(ppszTypeName); i++)
+ {
+ if (strType == ppszTypeName[i])
+ {
+ type = i;
+ break;
+ }
+ }
+ if (i == ARRAYLEN(ppszTypeName))
+ throw std::out_of_range(strprintf("CInv::CInv(string, uint256) : unknown type '%s'", strType.c_str()));
+ hash = hashIn;
+ }
+
+ IMPLEMENT_SERIALIZE
+ (
+ READWRITE(type);
+ READWRITE(hash);
+ )
+
+ friend inline bool operator<(const CInv& a, const CInv& b)
+ {
+ return (a.type < b.type || (a.type == b.type && a.hash < b.hash));
+ }
+
+ bool IsKnownType() const
+ {
+ return (type >= 1 && type < ARRAYLEN(ppszTypeName));
+ }
+
+ const char* GetCommand() const
+ {
+ if (!IsKnownType())
+ throw std::out_of_range(strprintf("CInv::GetCommand() : type=%d unknown type", type));
+ return ppszTypeName[type];
+ }
+
+ string ToString() const
+ {
+ return strprintf("%s %s", GetCommand(), hash.ToString().substr(0,20).c_str());
+ }
+
+ void print() const
+ {
+ printf("CInv(%s)\n", ToString().c_str());
+ }
+};
+
+
+
+
+
+class CRequestTracker
+{
+public:
+ void (*fn)(void*, CDataStream&);
+ void* param1;
+
+ explicit CRequestTracker(void (*fnIn)(void*, CDataStream&)=NULL, void* param1In=NULL)
+ {
+ fn = fnIn;
+ param1 = param1In;
+ }
+
+ bool IsNull()
+ {
+ return fn == NULL;
+ }
+};
+
+
+
+
+
+extern bool fClient;
+extern uint64 nLocalServices;
+extern CAddress addrLocalHost;
+extern CNode* pnodeLocalHost;
+extern uint64 nLocalHostNonce;
+extern array<int, 10> vnThreadsRunning;
+extern SOCKET hListenSocket;
+
+extern vector<CNode*> vNodes;
+extern CCriticalSection cs_vNodes;
+extern map<vector<unsigned char>, CAddress> mapAddresses;
+extern CCriticalSection cs_mapAddresses;
+extern map<CInv, CDataStream> mapRelay;
+extern deque<pair<int64, CInv> > vRelayExpiration;
+extern CCriticalSection cs_mapRelay;
+extern map<CInv, int64> mapAlreadyAskedFor;
+
+// Settings
+extern int fUseProxy;
+extern CAddress addrProxy;
+
+
+
+
+
+
+class CNode
+{
+public:
+ // socket
+ uint64 nServices;
+ SOCKET hSocket;
+ CDataStream vSend;
+ CDataStream vRecv;
+ CCriticalSection cs_vSend;
+ CCriticalSection cs_vRecv;
+ int64 nLastSend;
+ int64 nLastRecv;
+ int64 nLastSendEmpty;
+ int64 nTimeConnected;
+ unsigned int nHeaderStart;
+ unsigned int nMessageStart;
+ CAddress addr;
+ int nVersion;
+ string strSubVer;
+ bool fClient;
+ bool fInbound;
+ bool fNetworkNode;
+ bool fSuccessfullyConnected;
+ bool fDisconnect;
+protected:
+ int nRefCount;
+public:
+ int64 nReleaseTime;
+ map<uint256, CRequestTracker> mapRequests;
+ CCriticalSection cs_mapRequests;
+ uint256 hashContinue;
+ CBlockIndex* pindexLastGetBlocksBegin;
+ uint256 hashLastGetBlocksEnd;
+ int nStartingHeight;
+
+ // flood relay
+ vector<CAddress> vAddrToSend;
+ set<CAddress> setAddrKnown;
+ bool fGetAddr;
+ set<uint256> setKnown;
+
+ // inventory based relay
+ set<CInv> setInventoryKnown;
+ vector<CInv> vInventoryToSend;
+ CCriticalSection cs_inventory;
+ multimap<int64, CInv> mapAskFor;
+
+ // publish and subscription
+ vector<char> vfSubscribe;
+
+
+ CNode(SOCKET hSocketIn, CAddress addrIn, bool fInboundIn=false)
+ {
+ nServices = 0;
+ hSocket = hSocketIn;
+ vSend.SetType(SER_NETWORK);
+ vSend.SetVersion(0);
+ vRecv.SetType(SER_NETWORK);
+ vRecv.SetVersion(0);
+ // Version 0.2 obsoletes 20 Feb 2012
+ if (GetTime() > 1329696000)
+ {
+ vSend.SetVersion(209);
+ vRecv.SetVersion(209);
+ }
+ nLastSend = 0;
+ nLastRecv = 0;
+ nLastSendEmpty = GetTime();
+ nTimeConnected = GetTime();
+ nHeaderStart = -1;
+ nMessageStart = -1;
+ addr = addrIn;
+ nVersion = 0;
+ strSubVer = "";
+ fClient = false; // set by version message
+ fInbound = fInboundIn;
+ fNetworkNode = false;
+ fSuccessfullyConnected = false;
+ fDisconnect = false;
+ nRefCount = 0;
+ nReleaseTime = 0;
+ hashContinue = 0;
+ pindexLastGetBlocksBegin = 0;
+ hashLastGetBlocksEnd = 0;
+ nStartingHeight = -1;
+ fGetAddr = false;
+ vfSubscribe.assign(256, false);
+
+ // Be shy and don't send version until we hear
+ if (!fInbound)
+ PushVersion();
+ }
+
+ ~CNode()
+ {
+ if (hSocket != INVALID_SOCKET)
+ {
+ closesocket(hSocket);
+ hSocket = INVALID_SOCKET;
+ }
+ }
+
+private:
+ CNode(const CNode&);
+ void operator=(const CNode&);
+public:
+
+
+ int GetRefCount()
+ {
+ return max(nRefCount, 0) + (GetTime() < nReleaseTime ? 1 : 0);
+ }
+
+ CNode* AddRef(int64 nTimeout=0)
+ {
+ if (nTimeout != 0)
+ nReleaseTime = max(nReleaseTime, GetTime() + nTimeout);
+ else
+ nRefCount++;
+ return this;
+ }
+
+ void Release()
+ {
+ nRefCount--;
+ }
+
+
+
+ void AddAddressKnown(const CAddress& addr)
+ {
+ setAddrKnown.insert(addr);
+ }
+
+ void PushAddress(const CAddress& addr)
+ {
+ // Known checking here is only to save space from duplicates.
+ // SendMessages will filter it again for knowns that were added
+ // after addresses were pushed.
+ if (addr.IsValid() && !setAddrKnown.count(addr))
+ vAddrToSend.push_back(addr);
+ }
+
+
+ void AddInventoryKnown(const CInv& inv)
+ {
+ CRITICAL_BLOCK(cs_inventory)
+ setInventoryKnown.insert(inv);
+ }
+
+ void PushInventory(const CInv& inv)
+ {
+ CRITICAL_BLOCK(cs_inventory)
+ if (!setInventoryKnown.count(inv))
+ vInventoryToSend.push_back(inv);
+ }
+
+ void AskFor(const CInv& inv)
+ {
+ // We're using mapAskFor as a priority queue,
+ // the key is the earliest time the request can be sent
+ int64& nRequestTime = mapAlreadyAskedFor[inv];
+ printf("askfor %s %"PRI64d"\n", inv.ToString().c_str(), nRequestTime);
+
+ // Make sure not to reuse time indexes to keep things in the same order
+ int64 nNow = (GetTime() - 1) * 1000000;
+ static int64 nLastTime;
+ nLastTime = nNow = max(nNow, ++nLastTime);
+
+ // Each retry is 2 minutes after the last
+ nRequestTime = max(nRequestTime + 2 * 60 * 1000000, nNow);
+ mapAskFor.insert(make_pair(nRequestTime, inv));
+ }
+
+
+
+ void BeginMessage(const char* pszCommand)
+ {
+ cs_vSend.Enter();
+ if (nHeaderStart != -1)
+ AbortMessage();
+ nHeaderStart = vSend.size();
+ vSend << CMessageHeader(pszCommand, 0);
+ nMessageStart = vSend.size();
+ if (fDebug)
+ printf("%s ", DateTimeStrFormat("%x %H:%M:%S", GetTime()).c_str());
+ printf("sending: %s ", pszCommand);
+ }
+
+ void AbortMessage()
+ {
+ if (nHeaderStart == -1)
+ return;
+ vSend.resize(nHeaderStart);
+ nHeaderStart = -1;
+ nMessageStart = -1;
+ cs_vSend.Leave();
+ printf("(aborted)\n");
+ }
+
+ void EndMessage()
+ {
+ if (mapArgs.count("-dropmessagestest") && GetRand(atoi(mapArgs["-dropmessagestest"])) == 0)
+ {
+ printf("dropmessages DROPPING SEND MESSAGE\n");
+ AbortMessage();
+ return;
+ }
+
+ if (nHeaderStart == -1)
+ return;
+
+ // Set the size
+ unsigned int nSize = vSend.size() - nMessageStart;
+ memcpy((char*)&vSend[nHeaderStart] + offsetof(CMessageHeader, nMessageSize), &nSize, sizeof(nSize));
+
+ // Set the checksum
+ if (vSend.GetVersion() >= 209)
+ {
+ uint256 hash = Hash(vSend.begin() + nMessageStart, vSend.end());
+ unsigned int nChecksum = 0;
+ memcpy(&nChecksum, &hash, sizeof(nChecksum));
+ assert(nMessageStart - nHeaderStart >= offsetof(CMessageHeader, nChecksum) + sizeof(nChecksum));
+ memcpy((char*)&vSend[nHeaderStart] + offsetof(CMessageHeader, nChecksum), &nChecksum, sizeof(nChecksum));
+ }
+
+ printf("(%d bytes) ", nSize);
+ printf("\n");
+
+ nHeaderStart = -1;
+ nMessageStart = -1;
+ cs_vSend.Leave();
+ }
+
+ void EndMessageAbortIfEmpty()
+ {
+ if (nHeaderStart == -1)
+ return;
+ int nSize = vSend.size() - nMessageStart;
+ if (nSize > 0)
+ EndMessage();
+ else
+ AbortMessage();
+ }
+
+
+
+ void PushVersion()
+ {
+ /// when NTP implemented, change to just nTime = GetAdjustedTime()
+ int64 nTime = (fInbound ? GetAdjustedTime() : GetTime());
+ CAddress addrYou = (fUseProxy ? CAddress("0.0.0.0") : addr);
+ CAddress addrMe = (fUseProxy ? CAddress("0.0.0.0") : addrLocalHost);
+ RAND_bytes((unsigned char*)&nLocalHostNonce, sizeof(nLocalHostNonce));
+ PushMessage("version", VERSION, nLocalServices, nTime, addrYou, addrMe,
+ nLocalHostNonce, string(pszSubVer), nBestHeight);
+ }
+
+
+
+
+ void PushMessage(const char* pszCommand)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1>
+ void PushMessage(const char* pszCommand, const T1& a1)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1, typename T2>
+ void PushMessage(const char* pszCommand, const T1& a1, const T2& a2)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1 << a2;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1, typename T2, typename T3>
+ void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1 << a2 << a3;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1, typename T2, typename T3, typename T4>
+ void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1 << a2 << a3 << a4;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1, typename T2, typename T3, typename T4, typename T5>
+ void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1 << a2 << a3 << a4 << a5;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6>
+ void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1 << a2 << a3 << a4 << a5 << a6;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7>
+ void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8>
+ void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7, const T8& a8)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+ template<typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8, typename T9>
+ void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4, const T5& a5, const T6& a6, const T7& a7, const T8& a8, const T9& a9)
+ {
+ try
+ {
+ BeginMessage(pszCommand);
+ vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9;
+ EndMessage();
+ }
+ catch (...)
+ {
+ AbortMessage();
+ throw;
+ }
+ }
+
+
+ void PushRequest(const char* pszCommand,
+ void (*fn)(void*, CDataStream&), void* param1)
+ {
+ uint256 hashReply;
+ RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));
+
+ CRITICAL_BLOCK(cs_mapRequests)
+ mapRequests[hashReply] = CRequestTracker(fn, param1);
+
+ PushMessage(pszCommand, hashReply);
+ }
+
+ template<typename T1>
+ void PushRequest(const char* pszCommand, const T1& a1,
+ void (*fn)(void*, CDataStream&), void* param1)
+ {
+ uint256 hashReply;
+ RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));
+
+ CRITICAL_BLOCK(cs_mapRequests)
+ mapRequests[hashReply] = CRequestTracker(fn, param1);
+
+ PushMessage(pszCommand, hashReply, a1);
+ }
+
+ template<typename T1, typename T2>
+ void PushRequest(const char* pszCommand, const T1& a1, const T2& a2,
+ void (*fn)(void*, CDataStream&), void* param1)
+ {
+ uint256 hashReply;
+ RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));
+
+ CRITICAL_BLOCK(cs_mapRequests)
+ mapRequests[hashReply] = CRequestTracker(fn, param1);
+
+ PushMessage(pszCommand, hashReply, a1, a2);
+ }
+
+
+
+ void PushGetBlocks(CBlockIndex* pindexBegin, uint256 hashEnd);
+ bool IsSubscribed(unsigned int nChannel);
+ void Subscribe(unsigned int nChannel, unsigned int nHops=0);
+ void CancelSubscribe(unsigned int nChannel);
+ void CloseSocketDisconnect();
+ void Cleanup();
+};
+
+
+
+
+
+
+
+
+
+
+inline void RelayInventory(const CInv& inv)
+{
+ // Put on lists to offer to the other nodes
+ CRITICAL_BLOCK(cs_vNodes)
+ foreach(CNode* pnode, vNodes)
+ pnode->PushInventory(inv);
+}
+
+template<typename T>
+void RelayMessage(const CInv& inv, const T& a)
+{
+ CDataStream ss(SER_NETWORK);
+ ss.reserve(10000);
+ ss << a;
+ RelayMessage(inv, ss);
+}
+
+template<>
+inline void RelayMessage<>(const CInv& inv, const CDataStream& ss)
+{
+ CRITICAL_BLOCK(cs_mapRelay)
+ {
+ // Expire old relay messages
+ while (!vRelayExpiration.empty() && vRelayExpiration.front().first < GetTime())
+ {
+ mapRelay.erase(vRelayExpiration.front().second);
+ vRelayExpiration.pop_front();
+ }
+
+ // Save original serialized message so newer versions are preserved
+ mapRelay[inv] = ss;
+ vRelayExpiration.push_back(make_pair(GetTime() + 15 * 60, inv));
+ }
+
+ RelayInventory(inv);
+}
+
+
+
+
+
+
+
+
+//
+// Templates for the publish and subscription system.
+// The object being published as T& obj needs to have:
+// a set<unsigned int> setSources member
+// specializations of AdvertInsert and AdvertErase
+// Currently implemented for CTable and CProduct.
+//
+
+template<typename T>
+void AdvertStartPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
+{
+ // Add to sources
+ obj.setSources.insert(pfrom->addr.ip);
+
+ if (!AdvertInsert(obj))
+ return;
+
+ // Relay
+ CRITICAL_BLOCK(cs_vNodes)
+ foreach(CNode* pnode, vNodes)
+ if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel)))
+ pnode->PushMessage("publish", nChannel, nHops, obj);
+}
+
+template<typename T>
+void AdvertStopPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
+{
+ uint256 hash = obj.GetHash();
+
+ CRITICAL_BLOCK(cs_vNodes)
+ foreach(CNode* pnode, vNodes)
+ if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel)))
+ pnode->PushMessage("pub-cancel", nChannel, nHops, hash);
+
+ AdvertErase(obj);
+}
+
+template<typename T>
+void AdvertRemoveSource(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
+{
+ // Remove a source
+ obj.setSources.erase(pfrom->addr.ip);
+
+ // If no longer supported by any sources, cancel it
+ if (obj.setSources.empty())
+ AdvertStopPublish(pfrom, nChannel, nHops, obj);
+}