aboutsummaryrefslogtreecommitdiff
path: root/net.cpp
diff options
context:
space:
mode:
authors_nakamoto <s_nakamoto@1a98c847-1fd6-4fd8-948a-caf3550aa51b>2009-11-13 01:23:08 +0000
committers_nakamoto <s_nakamoto@1a98c847-1fd6-4fd8-948a-caf3550aa51b>2009-11-13 01:23:08 +0000
commit31e6ea7f5d5b3d73f4084261465142c9fe0fb894 (patch)
tree7427ce9b17468ed4a9dec82b380c2dd40a5332ca /net.cpp
parente9c2b5c84d7bbb7f173cbe6035f6299248e14769 (diff)
monitor ThreadSocketHandler and terminate and restart if hung, convert _beginthread to CreateThread wrapper, disconnect inactive connections, ping, break up long messages to speed up initial download, better priorities for initiating connections, track how many nodes have requested our blocks and transactions, status #/offline and warning message on unsent blocks, minimize on close as separate option -- linux-test5
Diffstat (limited to 'net.cpp')
-rw-r--r--net.cpp342
1 files changed, 220 insertions, 122 deletions
diff --git a/net.cpp b/net.cpp
index 0569604bab..71295d5dee 100644
--- a/net.cpp
+++ b/net.cpp
@@ -13,7 +13,6 @@ bool OpenNetworkConnection(const CAddress& addrConnect);
-
//
// Global state variables
//
@@ -25,6 +24,7 @@ uint64 nLocalHostNonce = 0;
bool fShutdown = false;
array<int, 10> vnThreadsRunning;
SOCKET hListenSocket = INVALID_SOCKET;
+int64 nThreadSocketHandlerHeartbeat = INT64_MAX;
vector<CNode*> vNodes;
CCriticalSection cs_vNodes;
@@ -65,7 +65,7 @@ bool ConnectSocket(const CAddress& addrConnect, SOCKET& hSocketRet)
if (fProxy)
{
- printf("Proxy connecting %s\n", addrConnect.ToStringLog().c_str());
+ printf("proxy connecting %s\n", addrConnect.ToStringLog().c_str());
char pszSocks4IP[] = "\4\1\0\0\0\0\0\0user";
memcpy(pszSocks4IP + 2, &addrConnect.port, 2);
memcpy(pszSocks4IP + 4, &addrConnect.ip, 4);
@@ -87,9 +87,11 @@ bool ConnectSocket(const CAddress& addrConnect, SOCKET& hSocketRet)
if (pchRet[1] != 0x5a)
{
closesocket(hSocket);
- return error("Proxy returned error %d", pchRet[1]);
+ if (pchRet[1] != 0x5b)
+ printf("ERROR: Proxy returned error %d\n", pchRet[1]);
+ return false;
}
- printf("Proxy connection established %s\n", addrConnect.ToStringLog().c_str());
+ printf("proxy connected %s\n", addrConnect.ToStringLog().c_str());
}
hSocketRet = hSocket;
@@ -219,6 +221,7 @@ bool AddAddress(CAddrDB& addrdb, CAddress addr, bool fCurrentlyOnline)
if (it == mapAddresses.end())
{
// New address
+ printf("AddAddress(%s)\n", addr.ToStringLog().c_str());
mapAddresses.insert(make_pair(addr.GetKey(), addr));
addrdb.WriteAddress(addr);
return true;
@@ -256,7 +259,7 @@ void AddressCurrentlyConnected(const CAddress& addr)
if (it != mapAddresses.end())
{
CAddress& addrFound = (*it).second;
- int64 nUpdateInterval = 60 * 60;
+ int64 nUpdateInterval = 20 * 60;
if (addrFound.nTime < GetAdjustedTime() - nUpdateInterval)
{
// Periodically update most recently seen time
@@ -417,7 +420,13 @@ CNode* ConnectNode(CAddress addrConnect, int64 nTimeout)
}
/// debug print
- printf("trying connection %s\n", addrConnect.ToStringLog().c_str());
+ printf("trying connection %s lastseen=%.1fhrs lasttry=%.1fhrs\n",
+ addrConnect.ToStringLog().c_str(),
+ (double)(addrConnect.nTime - GetAdjustedTime())/3600.0,
+ (double)(addrConnect.nLastTry - GetAdjustedTime())/3600.0);
+
+ CRITICAL_BLOCK(cs_mapAddresses)
+ mapAddresses[addrConnect.GetKey()].nLastTry = GetAdjustedTime();
// Connect
SOCKET hSocket;
@@ -428,7 +437,7 @@ CNode* ConnectNode(CAddress addrConnect, int64 nTimeout)
// Set to nonblocking
#ifdef __WXMSW__
- u_long nOne = 1;
+ u_long nOne = 1;
if (ioctlsocket(hSocket, FIONBIO, &nOne) == SOCKET_ERROR)
printf("ConnectSocket() : ioctlsocket nonblocking setting failed, error %d\n", WSAGetLastError());
#else
@@ -445,29 +454,23 @@ CNode* ConnectNode(CAddress addrConnect, int64 nTimeout)
CRITICAL_BLOCK(cs_vNodes)
vNodes.push_back(pnode);
- CRITICAL_BLOCK(cs_mapAddresses)
- mapAddresses[addrConnect.GetKey()].nLastFailed = 0;
+ pnode->nTimeConnected = GetTime();
return pnode;
}
else
{
- CRITICAL_BLOCK(cs_mapAddresses)
- mapAddresses[addrConnect.GetKey()].nLastFailed = GetAdjustedTime();
return NULL;
}
}
void CNode::DoDisconnect()
{
+ if (fDebug)
+ printf("%s ", DateTimeStrFormat("%x %H:%M:%S", GetTime()).c_str());
printf("disconnecting node %s\n", addr.ToStringLog().c_str());
closesocket(hSocket);
- // If outbound and never got version message, mark address as failed
- if (!fInbound && !fSuccessfullyConnected)
- CRITICAL_BLOCK(cs_mapAddresses)
- mapAddresses[addr.GetKey()].nLastFailed = GetAdjustedTime();
-
// All of a nodes broadcasts and subscriptions are automatically torn down
// when it goes down, so a node has to stay up to keep its broadcast going.
@@ -508,7 +511,7 @@ void ThreadSocketHandler(void* parg)
PrintException(&e, "ThreadSocketHandler()");
} catch (...) {
vnThreadsRunning[0]--;
- PrintException(NULL, "ThreadSocketHandler()");
+ throw; // support pthread_cancel()
}
printf("ThreadSocketHandler exiting\n");
@@ -531,15 +534,18 @@ void ThreadSocketHandler2(void* parg)
vector<CNode*> vNodesCopy = vNodes;
foreach(CNode* pnode, vNodesCopy)
{
- if (pnode->ReadyToDisconnect() && pnode->vRecv.empty() && pnode->vSend.empty())
+ if (pnode->fDisconnect ||
+ (pnode->GetRefCount() <= 0 && pnode->vRecv.empty() && pnode->vSend.empty()))
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
+
+ // close socket
pnode->DoDisconnect();
// hold in disconnected pool until all refs are released
pnode->nReleaseTime = max(pnode->nReleaseTime, GetTime() + 5 * 60);
- if (pnode->fNetworkNode)
+ if (pnode->fNetworkNode || pnode->fInbound)
pnode->Release();
vNodesDisconnected.push_back(pnode);
}
@@ -582,8 +588,10 @@ void ThreadSocketHandler2(void* parg)
fd_set fdsetRecv;
fd_set fdsetSend;
+ fd_set fdsetError;
FD_ZERO(&fdsetRecv);
FD_ZERO(&fdsetSend);
+ FD_ZERO(&fdsetError);
SOCKET hSocketMax = 0;
FD_SET(hListenSocket, &fdsetRecv);
hSocketMax = max(hSocketMax, hListenSocket);
@@ -592,6 +600,7 @@ void ThreadSocketHandler2(void* parg)
foreach(CNode* pnode, vNodes)
{
FD_SET(pnode->hSocket, &fdsetRecv);
+ FD_SET(pnode->hSocket, &fdsetError);
hSocketMax = max(hSocketMax, pnode->hSocket);
TRY_CRITICAL_BLOCK(pnode->cs_vSend)
if (!pnode->vSend.empty())
@@ -600,30 +609,21 @@ void ThreadSocketHandler2(void* parg)
}
vnThreadsRunning[0]--;
- int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, NULL, &timeout);
+ int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout);
vnThreadsRunning[0]++;
if (fShutdown)
return;
if (nSelect == SOCKET_ERROR)
{
int nErr = WSAGetLastError();
- printf("select failed: %d\n", nErr);
+ printf("socket select error %d\n", nErr);
for (int i = 0; i <= hSocketMax; i++)
- {
FD_SET(i, &fdsetRecv);
- FD_SET(i, &fdsetSend);
- }
+ FD_ZERO(&fdsetSend);
+ FD_ZERO(&fdsetError);
Sleep(timeout.tv_usec/1000);
}
- //// debug print
- //foreach(CNode* pnode, vNodes)
- //{
- // printf("vRecv = %-5d ", pnode->vRecv.size());
- // printf("vSend = %-5d ", pnode->vSend.size());
- //}
- //printf("\n");
-
//
// Accept new connections
@@ -641,7 +641,7 @@ void ThreadSocketHandler2(void* parg)
if (hSocket == INVALID_SOCKET)
{
if (WSAGetLastError() != WSAEWOULDBLOCK)
- printf("ERROR ThreadSocketHandler accept failed: %d\n", WSAGetLastError());
+ printf("socket error accept failed: %d\n", WSAGetLastError());
}
else
{
@@ -669,7 +669,7 @@ void ThreadSocketHandler2(void* parg)
//
// Receive
//
- if (FD_ISSET(hSocket, &fdsetRecv))
+ if (FD_ISSET(hSocket, &fdsetRecv) || FD_ISSET(hSocket, &fdsetError))
{
TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
{
@@ -677,25 +677,29 @@ void ThreadSocketHandler2(void* parg)
unsigned int nPos = vRecv.size();
// typical socket buffer is 8K-64K
- const unsigned int nBufSize = 0x10000;
- vRecv.resize(nPos + nBufSize);
- int nBytes = recv(hSocket, &vRecv[nPos], nBufSize, 0);
- vRecv.resize(nPos + max(nBytes, 0));
- if (nBytes == 0)
+ char pchBuf[0x10000];
+ int nBytes = recv(hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
+ if (nBytes > 0)
+ {
+ vRecv.resize(nPos + nBytes);
+ memcpy(&vRecv[nPos], pchBuf, nBytes);
+ pnode->nLastRecv = GetTime();
+ }
+ else if (nBytes == 0)
{
// socket closed gracefully
if (!pnode->fDisconnect)
- printf("recv: socket closed\n");
+ printf("socket closed\n");
pnode->fDisconnect = true;
}
else if (nBytes < 0)
{
- // socket error
+ // error
int nErr = WSAGetLastError();
if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
{
if (!pnode->fDisconnect)
- printf("recv failed: %d\n", nErr);
+ printf("socket recv error %d\n", nErr);
pnode->fDisconnect = true;
}
}
@@ -712,28 +716,63 @@ void ThreadSocketHandler2(void* parg)
CDataStream& vSend = pnode->vSend;
if (!vSend.empty())
{
- int nBytes = send(hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL);
+ int nBytes = send(hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT);
if (nBytes > 0)
{
vSend.erase(vSend.begin(), vSend.begin() + nBytes);
+ pnode->nLastSend = GetTime();
}
- else if (nBytes == 0)
- {
- if (pnode->ReadyToDisconnect())
- pnode->vSend.clear();
- }
- else
+ else if (nBytes < 0)
{
- printf("send error %d\n", nBytes);
- if (pnode->ReadyToDisconnect())
- pnode->vSend.clear();
+ // error
+ int nErr = WSAGetLastError();
+ if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
+ {
+ printf("socket send error %d\n", nErr);
+ pnode->fDisconnect = true;
+ }
}
}
}
}
+
+ //
+ // Inactivity checking
+ //
+ if (pnode->vSend.empty())
+ pnode->nLastSendEmpty = GetTime();
+ if (GetTime() - pnode->nTimeConnected > 60)
+ {
+ if (pnode->nLastRecv == 0 || pnode->nLastSend == 0)
+ {
+ printf("socket no message in first 60 seconds, %d %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0);
+ pnode->fDisconnect = true;
+ }
+ else if (GetTime() - pnode->nLastSend > 10 * 60 && GetTime() - pnode->nLastSendEmpty > 10 * 60)
+ {
+ printf("socket not sending\n");
+ pnode->fDisconnect = true;
+ }
+ else if (GetTime() - pnode->nLastRecv > (pnode->nVersion >= 107 ? 15*60 : 90*60))
+ {
+ printf("socket inactivity timeout\n");
+ pnode->fDisconnect = true;
+ }
+ }
+ }
+
+
+ //// debug heartbeat
+ static int64 nHeartbeat1;
+ if (GetTime() - nHeartbeat1 >= 5 * 60)
+ {
+ printf("%s sendrecv\n", DateTimeStrFormat("%x %H:%M:%S", GetTime()).c_str());
+ nHeartbeat1 = GetTime();
+ fDebug = true;
}
+ nThreadSocketHandlerHeartbeat = GetTime();
Sleep(10);
}
}
@@ -772,15 +811,20 @@ void ThreadOpenConnections2(void* parg)
{
printf("ThreadOpenConnections started\n");
- // Connect to one specified address
+ // Connect to specific addresses
while (mapArgs.count("-connect"))
{
- OpenNetworkConnection(CAddress(mapArgs["-connect"]));
- for (int i = 0; i < 10; i++)
+ foreach(string strAddr, mapMultiArgs["-connect"])
{
- Sleep(1000);
- if (fShutdown)
- return;
+ CAddress addr(strAddr, NODE_NETWORK);
+ if (addr.IsValid())
+ OpenNetworkConnection(addr);
+ for (int i = 0; i < 10; i++)
+ {
+ Sleep(1000);
+ if (fShutdown)
+ return;
+ }
}
}
@@ -821,12 +865,7 @@ void ThreadOpenConnections2(void* parg)
// Choose an address to connect to based on most recently seen
//
CAddress addrConnect;
- int64 nBestTime = 0;
- int64 nDelay = ((60 * 60) << vNodes.size());
- if (vNodes.size() >= 3)
- nDelay *= 4;
- if (nGotIRCAddresses > 0)
- nDelay *= 100;
+ int64 nBest = INT64_MIN;
// Do this here so we don't have to critsect vNodes inside mapAddresses critsect
set<unsigned int> setConnected;
@@ -841,24 +880,51 @@ void ThreadOpenConnections2(void* parg)
const CAddress& addr = item.second;
if (!addr.IsIPv4() || !addr.IsValid() || setConnected.count(addr.ip))
continue;
+ int64 nSinceLastSeen = GetAdjustedTime() - addr.nTime;
+ int64 nSinceLastTry = GetAdjustedTime() - addr.nLastTry;
// Randomize the order in a deterministic way, putting the standard port first
- int64 nRandomizer = (uint64)(addr.nLastFailed * 9567851 + addr.ip * 7789) % (1 * 60 * 60);
+ int64 nRandomizer = (uint64)(addr.nLastTry * 9567851 + addr.ip * 7789) % (30 * 60);
if (addr.port != DEFAULT_PORT)
- nRandomizer += 1 * 60 * 60;
+ nRandomizer += 30 * 60;
+
+ // Last seen Base retry frequency
+ // <1 hour 10 min
+ // 1 hour 1 hour
+ // 4 hours 2 hours
+ // 24 hours 5 hours
+ // 48 hours 7 hours
+ // 7 days 13 hours
+ // 30 days 27 hours
+ // 90 days 46 hours
+ // 365 days 93 hours
+ int64 nDelay = 3600.0 * sqrt(fabs(nSinceLastSeen) / 3600.0) + nRandomizer;
+
+ // Fast reconnect for one hour after last seen
+ if (nSinceLastSeen < 60 * 60)
+ nDelay = 10 * 60;
// Limit retry frequency
- if (GetAdjustedTime() < addr.nLastFailed + nDelay + nRandomizer)
+ if (nSinceLastTry < nDelay)
continue;
- // Try again only after all addresses had a first attempt
- int64 nTime = addr.nTime - nRandomizer;
- if (addr.nLastFailed > addr.nTime)
- nTime -= 365 * 24 * 60 * 60;
+ // If we have IRC, we'll be notified when they first come online,
+ // and again every 24 hours by the refresh broadcast.
+ if (nGotIRCAddresses > 0 && vNodes.size() >= 2 && nSinceLastSeen > 24 * 60 * 60)
+ continue;
- if (nTime > nBestTime)
+ // Only try the old stuff if we don't have enough connections
+ if (vNodes.size() >= 2 && nSinceLastSeen > 7 * 24 * 60 * 60)
+ continue;
+ if (vNodes.size() >= 4 && nSinceLastSeen > 24 * 60 * 60)
+ continue;
+
+ // If multiple addresses are ready, prioritize by time since
+ // last seen and time since last tried.
+ int64 nScore = min(nSinceLastTry, (int64)24 * 60 * 60) - nSinceLastSeen - nRandomizer;
+ if (nScore > nBest)
{
- nBestTime = nTime;
+ nBest = nScore;
addrConnect = addr;
}
}
@@ -941,7 +1007,7 @@ void ThreadMessageHandler(void* parg)
void ThreadMessageHandler2(void* parg)
{
printf("ThreadMessageHandler started\n");
- SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
+ SetThreadPriority(THREAD_PRIORITY_BELOW_NORMAL);
loop
{
// Poll the connected nodes for messages
@@ -1063,39 +1129,31 @@ bool BindListenPort(string& strError)
return true;
}
-bool StartNode(string& strError)
+void StartNode(void* parg)
{
- strError = "";
if (pnodeLocalHost == NULL)
pnodeLocalHost = new CNode(INVALID_SOCKET, CAddress("127.0.0.1", nLocalServices));
#ifdef __WXMSW__
// Get local host ip
- char pszHostName[255];
- if (gethostname(pszHostName, sizeof(pszHostName)) == SOCKET_ERROR)
+ char pszHostName[1000] = "";
+ if (gethostname(pszHostName, sizeof(pszHostName)) != SOCKET_ERROR)
{
- strError = strprintf("Error: Unable to get IP address of this computer (gethostname returned error %d)", WSAGetLastError());
- printf("%s\n", strError.c_str());
- return false;
- }
- struct hostent* phostent = gethostbyname(pszHostName);
- if (!phostent)
- {
- strError = strprintf("Error: Unable to get IP address of this computer (gethostbyname returned error %d)", WSAGetLastError());
- printf("%s\n", strError.c_str());
- return false;
- }
-
- // Take the first IP that isn't loopback 127.x.x.x
- for (int i = 0; phostent->h_addr_list[i] != NULL; i++)
- printf("host ip %d: %s\n", i, CAddress(*(unsigned int*)phostent->h_addr_list[i]).ToStringIP().c_str());
- for (int i = 0; phostent->h_addr_list[i] != NULL; i++)
- {
- CAddress addr(*(unsigned int*)phostent->h_addr_list[i], DEFAULT_PORT, nLocalServices);
- if (addr.IsValid() && addr.GetByte(3) != 127)
+ struct hostent* phostent = gethostbyname(pszHostName);
+ if (phostent)
{
- addrLocalHost = addr;
- break;
+ // Take the first IP that isn't loopback 127.x.x.x
+ for (int i = 0; phostent->h_addr_list[i] != NULL; i++)
+ printf("host ip %d: %s\n", i, CAddress(*(unsigned int*)phostent->h_addr_list[i]).ToStringIP().c_str());
+ for (int i = 0; phostent->h_addr_list[i] != NULL; i++)
+ {
+ CAddress addr(*(unsigned int*)phostent->h_addr_list[i], DEFAULT_PORT, nLocalServices);
+ if (addr.IsValid() && addr.GetByte(3) != 127)
+ {
+ addrLocalHost = addr;
+ break;
+ }
+ }
}
}
#else
@@ -1145,45 +1203,85 @@ bool StartNode(string& strError)
}
else
{
- if (addrIncoming.ip)
+ if (addrIncoming.IsValid())
addrLocalHost.ip = addrIncoming.ip;
if (GetMyExternalIP(addrLocalHost.ip))
{
addrIncoming = addrLocalHost;
CWalletDB().WriteSetting("addrIncoming", addrIncoming);
+ printf("addrLocalHost = %s\n", addrLocalHost.ToString().c_str());
}
}
- // Get addresses from IRC and advertise ours
- if (_beginthread(ThreadIRCSeed, 0, NULL) == -1)
- printf("Error: _beginthread(ThreadIRCSeed) failed\n");
-
//
// Start threads
//
- if (_beginthread(ThreadSocketHandler, 0, NULL) == -1)
- {
- strError = "Error: _beginthread(ThreadSocketHandler) failed";
- printf("%s\n", strError.c_str());
- return false;
- }
- if (_beginthread(ThreadOpenConnections, 0, NULL) == -1)
- {
- strError = "Error: _beginthread(ThreadOpenConnections) failed";
- printf("%s\n", strError.c_str());
- return false;
- }
+ // Get addresses from IRC and advertise ours
+ if (!CreateThread(ThreadIRCSeed, NULL))
+ printf("Error: CreateThread(ThreadIRCSeed) failed\n");
+
+ // Send and receive from sockets, accept connections
+ pthread_t hThreadSocketHandler = CreateThread(ThreadSocketHandler, NULL, true);
+
+ // Initiate outbound connections
+ if (!CreateThread(ThreadOpenConnections, NULL))
+ printf("Error: CreateThread(ThreadOpenConnections) failed\n");
+
+ // Process messages
+ if (!CreateThread(ThreadMessageHandler, NULL))
+ printf("Error: CreateThread(ThreadMessageHandler) failed\n");
- if (_beginthread(ThreadMessageHandler, 0, NULL) == -1)
+ // Generate coins in the background
+ GenerateBitcoins(fGenerateBitcoins);
+
+ //
+ // Thread monitoring
+ //
+ loop
{
- strError = "Error: _beginthread(ThreadMessageHandler) failed";
- printf("%s\n", strError.c_str());
- return false;
+ Sleep(15000);
+ if (GetTime() - nThreadSocketHandlerHeartbeat > 4 * 60)
+ {
+ // First see if closing sockets will free it
+ printf("*** ThreadSocketHandler is stopped ***\n");
+ CRITICAL_BLOCK(cs_vNodes)
+ {
+ foreach(CNode* pnode, vNodes)
+ {
+ bool fGot = false;
+ TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
+ TRY_CRITICAL_BLOCK(pnode->cs_vSend)
+ fGot = true;
+ if (!fGot)
+ {
+ printf("*** closing socket\n");
+ closesocket(pnode->hSocket);
+ pnode->fDisconnect = true;
+ }
+ }
+ }
+ Sleep(10000);
+ if (GetTime() - nThreadSocketHandlerHeartbeat < 60)
+ continue;
+
+ // Hopefully it never comes to this.
+ // We know it'll always be hung in the recv or send call.
+ // cs_vRecv or cs_vSend may be left permanently unreleased,
+ // but we always only use TRY_CRITICAL_SECTION on them.
+ printf("*** Restarting ThreadSocketHandler ***\n");
+ TerminateThread(hThreadSocketHandler, 0);
+ #ifdef __WXMSW__
+ CloseHandle(hThreadSocketHandler);
+ #endif
+ vnThreadsRunning[0] = 0;
+
+ // Restart
+ hThreadSocketHandler = CreateThread(ThreadSocketHandler, NULL, true);
+ nThreadSocketHandlerHeartbeat = GetTime();
+ }
}
-
- return true;
}
bool StopNode()