diff options
author | Gavin Andresen <gavinandresen@gmail.com> | 2013-03-06 22:31:26 -0500 |
---|---|---|
committer | Gavin Andresen <gavinandresen@gmail.com> | 2013-04-03 19:57:13 -0400 |
commit | 21eb5adadbe3110a8708f2570185566e1f137a49 (patch) | |
tree | 706908964d4a02c6d2fc2fe8968a65c449a3c38a /src/net.cpp | |
parent | 72f14d26ecc67a210a29d7914e580b8e67e45d8e (diff) |
Port Thread* methods to boost::thread_group
Diffstat (limited to 'src/net.cpp')
-rw-r--r-- | src/net.cpp | 354 |
1 files changed, 57 insertions, 297 deletions
diff --git a/src/net.cpp b/src/net.cpp index fe0dcd41b3..557bd6f878 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -26,14 +26,6 @@ using namespace boost; static const int MAX_OUTBOUND_CONNECTIONS = 8; -void ThreadMessageHandler2(void* parg); -void ThreadSocketHandler2(void* parg); -void ThreadOpenConnections2(void* parg); -void ThreadOpenAddedConnections2(void* parg); -#ifdef USE_UPNP -void ThreadMapPort2(void* parg); -#endif -void ThreadDNSAddressSeed2(void* parg); bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOutbound = NULL, const char *strDest = NULL, bool fOneShot = false); @@ -46,7 +38,6 @@ struct LocalServiceInfo { // Global state variables // bool fDiscover = true; -bool fUseUPnP = false; uint64 nLocalServices = NODE_NETWORK; static CCriticalSection cs_mapLocalHost; static map<CNetAddr, LocalServiceInfo> mapLocalHost; @@ -754,32 +745,10 @@ void SocketSendData(CNode *pnode) pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it); } -void ThreadSocketHandler(void* parg) -{ - // Make this thread recognisable as the networking thread - RenameThread("bitcoin-net"); - - try - { - vnThreadsRunning[THREAD_SOCKETHANDLER]++; - ThreadSocketHandler2(parg); - vnThreadsRunning[THREAD_SOCKETHANDLER]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_SOCKETHANDLER]--; - PrintException(&e, "ThreadSocketHandler()"); - } catch (...) { - vnThreadsRunning[THREAD_SOCKETHANDLER]--; - throw; // support pthread_cancel() - } - printf("ThreadSocketHandler exited\n"); -} - static list<CNode*> vNodesDisconnected; -void ThreadSocketHandler2(void* parg) +void ThreadSocketHandler() { - printf("ThreadSocketHandler started\n"); unsigned int nPrevNodeCount = 0; loop { @@ -892,12 +861,10 @@ void ThreadSocketHandler2(void* parg) } } - vnThreadsRunning[THREAD_SOCKETHANDLER]--; int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv, &fdsetSend, &fdsetError, &timeout); - vnThreadsRunning[THREAD_SOCKETHANDLER]++; - if (fShutdown) - return; + boost::this_thread::interruption_point(); + if (nSelect == SOCKET_ERROR) { if (have_fds) @@ -984,8 +951,7 @@ void ThreadSocketHandler2(void* parg) } BOOST_FOREACH(CNode* pnode, vNodesCopy) { - if (fShutdown) - return; + boost::this_thread::interruption_point(); // // Receive @@ -1089,31 +1055,8 @@ void ThreadSocketHandler2(void* parg) #ifdef USE_UPNP -void ThreadMapPort(void* parg) +void ThreadMapPort() { - // Make this thread recognisable as the UPnP thread - RenameThread("bitcoin-UPnP"); - - try - { - vnThreadsRunning[THREAD_UPNP]++; - ThreadMapPort2(parg); - vnThreadsRunning[THREAD_UPNP]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_UPNP]--; - PrintException(&e, "ThreadMapPort()"); - } catch (...) { - vnThreadsRunning[THREAD_UPNP]--; - PrintException(NULL, "ThreadMapPort()"); - } - printf("ThreadMapPort exited\n"); -} - -void ThreadMapPort2(void* parg) -{ - printf("ThreadMapPort started\n"); - std::string port = strprintf("%u", GetListenPort()); const char * multicastif = 0; const char * minissdpdpath = 0; @@ -1154,33 +1097,9 @@ void ThreadMapPort2(void* parg) } string strDesc = "Bitcoin " + FormatFullVersion(); -#ifndef UPNPDISCOVER_SUCCESS - /* miniupnpc 1.5 */ - r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype, - port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0); -#else - /* miniupnpc 1.6 */ - r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype, - port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0, "0"); -#endif - if(r!=UPNPCOMMAND_SUCCESS) - printf("AddPortMapping(%s, %s, %s) failed with code %d (%s)\n", - port.c_str(), port.c_str(), lanaddr, r, strupnperror(r)); - else - printf("UPnP Port Mapping successful.\n"); - int i = 1; - loop { - if (fShutdown || !fUseUPnP) - { - r = UPNP_DeletePortMapping(urls.controlURL, data.first.servicetype, port.c_str(), "TCP", 0); - printf("UPNP_DeletePortMapping() returned : %d\n", r); - freeUPNPDevlist(devlist); devlist = 0; - FreeUPNPUrls(&urls); - return; - } - if (i % 600 == 0) // Refresh every 20 minutes - { + try { + loop { #ifndef UPNPDISCOVER_SUCCESS /* miniupnpc 1.5 */ r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype, @@ -1196,33 +1115,49 @@ void ThreadMapPort2(void* parg) port.c_str(), port.c_str(), lanaddr, r, strupnperror(r)); else printf("UPnP Port Mapping successful.\n");; + + MilliSleep(20*60*1000); // Refresh every 20 minutes } - MilliSleep(2000); - i++; + } + catch (boost::thread_interrupted) + { + r = UPNP_DeletePortMapping(urls.controlURL, data.first.servicetype, port.c_str(), "TCP", 0); + printf("UPNP_DeletePortMapping() returned : %d\n", r); + freeUPNPDevlist(devlist); devlist = 0; + FreeUPNPUrls(&urls); + throw; } } else { printf("No valid UPnP IGDs found\n"); freeUPNPDevlist(devlist); devlist = 0; if (r != 0) FreeUPNPUrls(&urls); - loop { - if (fShutdown || !fUseUPnP) - return; - MilliSleep(2000); - } } } -void MapPort() +void MapPort(bool fUseUPnP) { - if (fUseUPnP && vnThreadsRunning[THREAD_UPNP] < 1) + static boost::thread* upnp_thread = NULL; + + if (fUseUPnP) { - if (!NewThread(ThreadMapPort, NULL)) - printf("Error: ThreadMapPort(ThreadMapPort) failed\n"); + if (upnp_thread) { + upnp_thread->interrupt(); + upnp_thread->join(); + delete upnp_thread; + } + upnp_thread = new boost::thread(boost::bind(&TraceThread<boost::function<void()> >, "upnp", &ThreadMapPort)); + } + else if (upnp_thread) { + upnp_thread->interrupt(); + upnp_thread->join(); + delete upnp_thread; + upnp_thread = NULL; } } + #else -void MapPort() +void MapPort(bool) { // Intentionally left blank. } @@ -1254,32 +1189,10 @@ static const char *strTestNetDNSSeed[][2] = { {NULL, NULL} }; -void ThreadDNSAddressSeed(void* parg) -{ - // Make this thread recognisable as the DNS seeding thread - RenameThread("bitcoin-dnsseed"); - - try - { - vnThreadsRunning[THREAD_DNSSEED]++; - ThreadDNSAddressSeed2(parg); - vnThreadsRunning[THREAD_DNSSEED]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_DNSSEED]--; - PrintException(&e, "ThreadDNSAddressSeed()"); - } catch (...) { - vnThreadsRunning[THREAD_DNSSEED]--; - throw; // support pthread_cancel() - } - printf("ThreadDNSAddressSeed exited\n"); -} - -void ThreadDNSAddressSeed2(void* parg) +void ThreadDNSAddressSeed() { static const char *(*strDNSSeed)[2] = fTestNet ? strTestNetDNSSeed : strMainNetDNSSeed; - printf("ThreadDNSAddressSeed started\n"); int found = 0; printf("Loading addresses from DNS seeds (could take a while)\n"); @@ -1409,57 +1322,6 @@ void DumpAddresses() addrman.size(), GetTimeMillis() - nStart); } -void ThreadDumpAddress2(void* parg) -{ - printf("ThreadDumpAddress started\n"); - - vnThreadsRunning[THREAD_DUMPADDRESS]++; - while (!fShutdown) - { - DumpAddresses(); - vnThreadsRunning[THREAD_DUMPADDRESS]--; - MilliSleep(100000); - vnThreadsRunning[THREAD_DUMPADDRESS]++; - } - vnThreadsRunning[THREAD_DUMPADDRESS]--; -} - -void ThreadDumpAddress(void* parg) -{ - // Make this thread recognisable as the address dumping thread - RenameThread("bitcoin-adrdump"); - - try - { - ThreadDumpAddress2(parg); - } - catch (std::exception& e) { - PrintException(&e, "ThreadDumpAddress()"); - } - printf("ThreadDumpAddress exited\n"); -} - -void ThreadOpenConnections(void* parg) -{ - // Make this thread recognisable as the connection opening thread - RenameThread("bitcoin-opencon"); - - try - { - vnThreadsRunning[THREAD_OPENCONNECTIONS]++; - ThreadOpenConnections2(parg); - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; - PrintException(&e, "ThreadOpenConnections()"); - } catch (...) { - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; - PrintException(NULL, "ThreadOpenConnections()"); - } - printf("ThreadOpenConnections exited\n"); -} - void static ProcessOneShot() { string strDest; @@ -1478,10 +1340,8 @@ void static ProcessOneShot() } } -void ThreadOpenConnections2(void* parg) +void ThreadOpenConnections() { - printf("ThreadOpenConnections started\n"); - // Connect to specific addresses if (mapArgs.count("-connect") && mapMultiArgs["-connect"].size() > 0) { @@ -1495,8 +1355,6 @@ void ThreadOpenConnections2(void* parg) for (int i = 0; i < 10 && i < nLoop; i++) { MilliSleep(500); - if (fShutdown) - return; } } MilliSleep(500); @@ -1509,18 +1367,10 @@ void ThreadOpenConnections2(void* parg) { ProcessOneShot(); - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; MilliSleep(500); - vnThreadsRunning[THREAD_OPENCONNECTIONS]++; - if (fShutdown) - return; - - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; CSemaphoreGrant grant(*semOutbound); - vnThreadsRunning[THREAD_OPENCONNECTIONS]++; - if (fShutdown) - return; + boost::this_thread::interruption_point(); // Add seed nodes if IRC isn't working if (addrman.size()==0 && (GetTime() - nStart > 60) && !fTestNet) @@ -1600,38 +1450,15 @@ void ThreadOpenConnections2(void* parg) } } -void ThreadOpenAddedConnections(void* parg) +void ThreadOpenAddedConnections() { - // Make this thread recognisable as the connection opening thread - RenameThread("bitcoin-opencon"); - - try - { - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++; - ThreadOpenAddedConnections2(parg); - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--; - PrintException(&e, "ThreadOpenAddedConnections()"); - } catch (...) { - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--; - PrintException(NULL, "ThreadOpenAddedConnections()"); - } - printf("ThreadOpenAddedConnections exited\n"); -} - -void ThreadOpenAddedConnections2(void* parg) -{ - printf("ThreadOpenAddedConnections started\n"); - { LOCK(cs_vAddedNodes); vAddedNodes = mapMultiArgs["-addnode"]; } if (HaveNameProxy()) { - while(!fShutdown) { + while(true) { list<string> lAddresses(0); { LOCK(cs_vAddedNodes); @@ -1643,14 +1470,9 @@ void ThreadOpenAddedConnections2(void* parg) CSemaphoreGrant grant(*semOutbound); OpenNetworkConnection(addr, &grant, strAddNode.c_str()); MilliSleep(500); - if (fShutdown) - return; } - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--; MilliSleep(120000); // Retry every 2 minutes - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++; } - return; } for (unsigned int i = 0; true; i++) @@ -1695,16 +1517,8 @@ void ThreadOpenAddedConnections2(void* parg) CSemaphoreGrant grant(*semOutbound); OpenNetworkConnection(CAddress(vserv[i % vserv.size()]), &grant); MilliSleep(500); - if (fShutdown) - return; } - if (fShutdown) - return; - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--; MilliSleep(120000); // Retry every 2 minutes - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++; - if (fShutdown) - return; } } @@ -1714,8 +1528,7 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu // // Initiate outbound network connection // - if (fShutdown) - return false; + boost::this_thread::interruption_point(); if (!strDest) if (IsLocal(addrConnect) || FindNode((CNetAddr)addrConnect) || CNode::IsBanned(addrConnect) || @@ -1724,11 +1537,9 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu if (strDest && FindNode(strDest)) return false; - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; CNode* pnode = ConnectNode(addrConnect, strDest); - vnThreadsRunning[THREAD_OPENCONNECTIONS]++; - if (fShutdown) - return false; + boost::this_thread::interruption_point(); + if (!pnode) return false; if (grantOutbound) @@ -1746,33 +1557,10 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu - -void ThreadMessageHandler(void* parg) -{ - // Make this thread recognisable as the message handling thread - RenameThread("bitcoin-msghand"); - - try - { - vnThreadsRunning[THREAD_MESSAGEHANDLER]++; - ThreadMessageHandler2(parg); - vnThreadsRunning[THREAD_MESSAGEHANDLER]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_MESSAGEHANDLER]--; - PrintException(&e, "ThreadMessageHandler()"); - } catch (...) { - vnThreadsRunning[THREAD_MESSAGEHANDLER]--; - PrintException(NULL, "ThreadMessageHandler()"); - } - printf("ThreadMessageHandler exited\n"); -} - -void ThreadMessageHandler2(void* parg) +void ThreadMessageHandler() { - printf("ThreadMessageHandler started\n"); SetThreadPriority(THREAD_PRIORITY_BELOW_NORMAL); - while (!fShutdown) + while (true) { vector<CNode*> vNodesCopy; { @@ -1798,8 +1586,7 @@ void ThreadMessageHandler2(void* parg) if (!ProcessMessages(pnode)) pnode->CloseSocketDisconnect(); } - if (fShutdown) - return; + boost::this_thread::interruption_point(); // Send messages { @@ -1807,8 +1594,7 @@ void ThreadMessageHandler2(void* parg) if (lockSend) SendMessages(pnode, pnode == pnodeTrickle); } - if (fShutdown) - return; + boost::this_thread::interruption_point(); } { @@ -1817,16 +1603,7 @@ void ThreadMessageHandler2(void* parg) pnode->Release(); } - // Wait and allow messages to bunch up. - // Reduce vnThreadsRunning so StopNode has permission to exit while - // we're sleeping, but we must always check fShutdown after doing this. - vnThreadsRunning[THREAD_MESSAGEHANDLER]--; MilliSleep(100); - if (fRequestShutdown) - StartShutdown(); - vnThreadsRunning[THREAD_MESSAGEHANDLER]++; - if (fShutdown) - return; } } @@ -2000,6 +1777,8 @@ void static Discover() void StartNode(void* parg) { + boost::thread_group* threadGroup = (boost::thread_group*)parg; + // Make this thread recognisable as the startup thread RenameThread("bitcoin-start"); @@ -2021,38 +1800,32 @@ void StartNode(void* parg) if (!GetBoolArg("-dnsseed", true)) printf("DNS seeding disabled\n"); else - if (!NewThread(ThreadDNSAddressSeed, NULL)) - printf("Error: NewThread(ThreadDNSAddressSeed) failed\n"); + threadGroup->create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", &ThreadDNSAddressSeed)); // Map ports with UPnP - if (fUseUPnP) - MapPort(); + MapPort(GetBoolArg("-upnp", USE_UPNP)); // Send and receive from sockets, accept connections - if (!NewThread(ThreadSocketHandler, NULL)) - printf("Error: NewThread(ThreadSocketHandler) failed\n"); + threadGroup->create_thread(boost::bind(&TraceThread<void (*)()>, "net", &ThreadSocketHandler)); // Initiate outbound connections from -addnode - if (!NewThread(ThreadOpenAddedConnections, NULL)) - printf("Error: NewThread(ThreadOpenAddedConnections) failed\n"); + threadGroup->create_thread(boost::bind(&TraceThread<void (*)()>, "addcon", &ThreadOpenAddedConnections)); // Initiate outbound connections - if (!NewThread(ThreadOpenConnections, NULL)) - printf("Error: NewThread(ThreadOpenConnections) failed\n"); + threadGroup->create_thread(boost::bind(&TraceThread<void (*)()>, "opencon", &ThreadOpenConnections)); // Process messages - if (!NewThread(ThreadMessageHandler, NULL)) - printf("Error: NewThread(ThreadMessageHandler) failed\n"); + threadGroup->create_thread(boost::bind(&TraceThread<void (*)()>, "msghand", &ThreadMessageHandler)); // Dump network addresses - if (!NewThread(ThreadDumpAddress, NULL)) - printf("Error; NewThread(ThreadDumpAddress) failed\n"); + threadGroup->create_thread(boost::bind(&LoopForever<void (*)()>, "dumpaddr", &DumpAddresses, 10000)); } bool StopNode() { printf("StopNode()\n"); GenerateBitcoins(false, NULL); + MapPort(false); fShutdown = true; nTransactionsUpdated++; int64 nStart = GetTime(); @@ -2070,19 +1843,6 @@ bool StopNode() break; MilliSleep(20); } while(true); - if (vnThreadsRunning[THREAD_SOCKETHANDLER] > 0) printf("ThreadSocketHandler still running\n"); - if (vnThreadsRunning[THREAD_OPENCONNECTIONS] > 0) printf("ThreadOpenConnections still running\n"); - if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n"); - if (vnThreadsRunning[THREAD_RPCLISTENER] > 0) printf("ThreadRPCListener still running\n"); - if (vnThreadsRunning[THREAD_RPCHANDLER] > 0) printf("ThreadsRPCServer still running\n"); -#ifdef USE_UPNP - if (vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n"); -#endif - if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n"); - if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n"); - if (vnThreadsRunning[THREAD_DUMPADDRESS] > 0) printf("ThreadDumpAddresses still running\n"); - while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCHANDLER] > 0) - MilliSleep(20); MilliSleep(50); DumpAddresses(); |