aboutsummaryrefslogtreecommitdiff
path: root/src/net.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.h')
-rw-r--r--src/net.h60
1 files changed, 37 insertions, 23 deletions
diff --git a/src/net.h b/src/net.h
index ca6899a83a..e34ea590cc 100644
--- a/src/net.h
+++ b/src/net.h
@@ -259,8 +259,7 @@ public:
virtual ~Transport() {}
// 1. Receiver side functions, for decoding bytes received on the wire into transport protocol
- // agnostic CNetMessage (message type & payload) objects. Callers must guarantee that none of
- // these functions are called concurrently w.r.t. one another.
+ // agnostic CNetMessage (message type & payload) objects.
// returns true if the current deserialization is complete
virtual bool Complete() const = 0;
@@ -282,20 +281,22 @@ class V1Transport final : public Transport
private:
const CChainParams& m_chain_params;
const NodeId m_node_id; // Only for logging
- mutable CHash256 hasher;
- mutable uint256 data_hash;
- bool in_data; // parsing header (false) or data (true)
- CDataStream hdrbuf; // partially received header
- CMessageHeader hdr; // complete header
- CDataStream vRecv; // received message data
- unsigned int nHdrPos;
- unsigned int nDataPos;
-
- const uint256& GetMessageHash() const;
- int readHeader(Span<const uint8_t> msg_bytes);
- int readData(Span<const uint8_t> msg_bytes);
-
- void Reset() {
+ mutable Mutex m_recv_mutex; //!< Lock for receive state
+ mutable CHash256 hasher GUARDED_BY(m_recv_mutex);
+ mutable uint256 data_hash GUARDED_BY(m_recv_mutex);
+ bool in_data GUARDED_BY(m_recv_mutex); // parsing header (false) or data (true)
+ CDataStream hdrbuf GUARDED_BY(m_recv_mutex); // partially received header
+ CMessageHeader hdr GUARDED_BY(m_recv_mutex); // complete header
+ CDataStream vRecv GUARDED_BY(m_recv_mutex); // received message data
+ unsigned int nHdrPos GUARDED_BY(m_recv_mutex);
+ unsigned int nDataPos GUARDED_BY(m_recv_mutex);
+
+ const uint256& GetMessageHash() const EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex);
+ int readHeader(Span<const uint8_t> msg_bytes) EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex);
+ int readData(Span<const uint8_t> msg_bytes) EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex);
+
+ void Reset() EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex) {
+ AssertLockHeld(m_recv_mutex);
vRecv.clear();
hdrbuf.clear();
hdrbuf.resize(24);
@@ -306,6 +307,13 @@ private:
hasher.Reset();
}
+ bool CompleteInternal() const noexcept EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex)
+ {
+ AssertLockHeld(m_recv_mutex);
+ if (!in_data) return false;
+ return hdr.nMessageSize == nDataPos;
+ }
+
public:
V1Transport(const CChainParams& chain_params, const NodeId node_id, int nTypeIn, int nVersionIn)
: m_chain_params(chain_params),
@@ -313,22 +321,28 @@ public:
hdrbuf(nTypeIn, nVersionIn),
vRecv(nTypeIn, nVersionIn)
{
+ LOCK(m_recv_mutex);
Reset();
}
- bool Complete() const override
+ bool Complete() const override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex)
{
- if (!in_data)
- return false;
- return (hdr.nMessageSize == nDataPos);
+ AssertLockNotHeld(m_recv_mutex);
+ return WITH_LOCK(m_recv_mutex, return CompleteInternal());
}
- void SetVersion(int nVersionIn) override
+
+ void SetVersion(int nVersionIn) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex)
{
+ AssertLockNotHeld(m_recv_mutex);
+ LOCK(m_recv_mutex);
hdrbuf.SetVersion(nVersionIn);
vRecv.SetVersion(nVersionIn);
}
- int Read(Span<const uint8_t>& msg_bytes) override
+
+ int Read(Span<const uint8_t>& msg_bytes) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex)
{
+ AssertLockNotHeld(m_recv_mutex);
+ LOCK(m_recv_mutex);
int ret = in_data ? readData(msg_bytes) : readHeader(msg_bytes);
if (ret < 0) {
Reset();
@@ -337,7 +351,7 @@ public:
}
return ret;
}
- CNetMessage GetMessage(std::chrono::microseconds time, bool& reject_message) override;
+ CNetMessage GetMessage(std::chrono::microseconds time, bool& reject_message) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex);
void prepareForTransport(CSerializedNetMsg& msg, std::vector<unsigned char>& header) const override;
};