#!/usr/bin/env python3 # Copyright (c) 2010 ArtForz -- public domain half-a-node # Copyright (c) 2012 Jeff Garzik # Copyright (c) 2010-2020 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Bitcoin test framework primitive and message structures CBlock, CTransaction, CBlockHeader, CTxIn, CTxOut, etc....: data structures that should map to corresponding structures in bitcoin/primitives msg_block, msg_tx, msg_headers, etc.: data structures that represent network messages ser_*, deser_*: functions that handle serialization/deserialization. Classes use __slots__ to ensure extraneous attributes aren't accidentally added by tests, compromising their intended effect. """ from codecs import encode import copy import hashlib from io import BytesIO import random import socket import struct import time from test_framework.siphash import siphash256 from test_framework.util import hex_str_to_bytes, assert_equal MIN_VERSION_SUPPORTED = 60001 MY_VERSION = 70016 # past wtxid relay MY_SUBVERSION = b"/python-p2p-tester:0.0.3/" MY_RELAY = 1 # from version 70001 onwards, fRelay should be appended to version messages (BIP37) MAX_LOCATOR_SZ = 101 MAX_BLOCK_BASE_SIZE = 1000000 MAX_BLOOM_FILTER_SIZE = 36000 MAX_BLOOM_HASH_FUNCS = 50 COIN = 100000000 # 1 btc in satoshis MAX_MONEY = 21000000 * COIN BIP125_SEQUENCE_NUMBER = 0xfffffffd # Sequence number that is BIP 125 opt-in and BIP 68-opt-out MAX_PROTOCOL_MESSAGE_LENGTH = 4000000 # Maximum length of incoming protocol messages MAX_HEADERS_RESULTS = 2000 # Number of headers sent in one getheaders result MAX_INV_SIZE = 50000 # Maximum number of entries in an 'inv' protocol message NODE_NETWORK = (1 << 0) NODE_GETUTXO = (1 << 1) NODE_BLOOM = (1 << 2) NODE_WITNESS = (1 << 3) NODE_COMPACT_FILTERS = (1 << 6) NODE_NETWORK_LIMITED = (1 << 10) MSG_TX = 1 MSG_BLOCK = 2 MSG_FILTERED_BLOCK = 3 MSG_CMPCT_BLOCK = 4 MSG_WTX = 5 MSG_WITNESS_FLAG = 1 << 30 MSG_TYPE_MASK = 0xffffffff >> 2 FILTER_TYPE_BASIC = 0 # Serialization/deserialization tools def sha256(s): return hashlib.new('sha256', s).digest() def hash256(s): return sha256(sha256(s)) def ser_compact_size(l): r = b"" if l < 253: r = struct.pack("B", l) elif l < 0x10000: r = struct.pack(">= 32 return rs def uint256_from_str(s): r = 0 t = struct.unpack("> 24) & 0xFF v = (c & 0xFFFFFF) << (8 * (nbytes - 3)) return v def deser_vector(f, c): nit = deser_compact_size(f) r = [] for _ in range(nit): t = c() t.deserialize(f) r.append(t) return r # ser_function_name: Allow for an alternate serialization function on the # entries in the vector (we use this for serializing the vector of transactions # for a witness block). def ser_vector(l, ser_function_name=None): r = ser_compact_size(len(l)) for i in l: if ser_function_name: r += getattr(i, ser_function_name)() else: r += i.serialize() return r def deser_uint256_vector(f): nit = deser_compact_size(f) r = [] for _ in range(nit): t = deser_uint256(f) r.append(t) return r def ser_uint256_vector(l): r = ser_compact_size(len(l)) for i in l: r += ser_uint256(i) return r def deser_string_vector(f): nit = deser_compact_size(f) r = [] for _ in range(nit): t = deser_string(f) r.append(t) return r def ser_string_vector(l): r = ser_compact_size(len(l)) for sv in l: r += ser_string(sv) return r # Deserialize from a hex string representation (eg from RPC) def FromHex(obj, hex_string): obj.deserialize(BytesIO(hex_str_to_bytes(hex_string))) return obj # Convert a binary-serializable object to hex (eg for submission via RPC) def ToHex(obj): return obj.serialize().hex() # Objects that map to bitcoind objects, which can be serialized/deserialized class CAddress: __slots__ = ("ip", "nServices", "pchReserved", "port", "time") def __init__(self): self.time = 0 self.nServices = 1 self.pchReserved = b"\x00" * 10 + b"\xff" * 2 self.ip = "0.0.0.0" self.port = 0 def deserialize(self, f, *, with_time=True): if with_time: # VERSION messages serialize CAddress objects without time self.time = struct.unpack("H", f.read(2))[0] def serialize(self, *, with_time=True): r = b"" if with_time: # VERSION messages serialize CAddress objects without time r += struct.pack("H", self.port) return r def __repr__(self): return "CAddress(nServices=%i ip=%s port=%i)" % (self.nServices, self.ip, self.port) class CInv: __slots__ = ("hash", "type") typemap = { 0: "Error", MSG_TX: "TX", MSG_BLOCK: "Block", MSG_TX | MSG_WITNESS_FLAG: "WitnessTx", MSG_BLOCK | MSG_WITNESS_FLAG: "WitnessBlock", MSG_FILTERED_BLOCK: "filtered Block", 4: "CompactBlock", 5: "WTX", } def __init__(self, t=0, h=0): self.type = t self.hash = h def deserialize(self, f): self.type = struct.unpack(" 21000000 * COIN: return False return True def __repr__(self): return "CTransaction(nVersion=%i vin=%s vout=%s wit=%s nLockTime=%i)" \ % (self.nVersion, repr(self.vin), repr(self.vout), repr(self.wit), self.nLockTime) class CBlockHeader: __slots__ = ("hash", "hashMerkleRoot", "hashPrevBlock", "nBits", "nNonce", "nTime", "nVersion", "sha256") def __init__(self, header=None): if header is None: self.set_null() else: self.nVersion = header.nVersion self.hashPrevBlock = header.hashPrevBlock self.hashMerkleRoot = header.hashMerkleRoot self.nTime = header.nTime self.nBits = header.nBits self.nNonce = header.nNonce self.sha256 = header.sha256 self.hash = header.hash self.calc_sha256() def set_null(self): self.nVersion = 1 self.hashPrevBlock = 0 self.hashMerkleRoot = 0 self.nTime = 0 self.nBits = 0 self.nNonce = 0 self.sha256 = None self.hash = None def deserialize(self, f): self.nVersion = struct.unpack(" 1: newhashes = [] for i in range(0, len(hashes), 2): i2 = min(i+1, len(hashes)-1) newhashes.append(hash256(hashes[i] + hashes[i2])) hashes = newhashes return uint256_from_str(hashes[0]) def calc_merkle_root(self): hashes = [] for tx in self.vtx: tx.calc_sha256() hashes.append(ser_uint256(tx.sha256)) return self.get_merkle_root(hashes) def calc_witness_merkle_root(self): # For witness root purposes, the hash of the # coinbase, with witness, is defined to be 0...0 hashes = [ser_uint256(0)] for tx in self.vtx[1:]: # Calculate the hashes with witness data hashes.append(ser_uint256(tx.calc_sha256(True))) return self.get_merkle_root(hashes) def is_valid(self): self.calc_sha256() target = uint256_from_compact(self.nBits) if self.sha256 > target: return False for tx in self.vtx: if not tx.is_valid(): return False if self.calc_merkle_root() != self.hashMerkleRoot: return False return True def solve(self): self.rehash() target = uint256_from_compact(self.nBits) while self.sha256 > target: self.nNonce += 1 self.rehash() def __repr__(self): return "CBlock(nVersion=%i hashPrevBlock=%064x hashMerkleRoot=%064x nTime=%s nBits=%08x nNonce=%08x vtx=%s)" \ % (self.nVersion, self.hashPrevBlock, self.hashMerkleRoot, time.ctime(self.nTime), self.nBits, self.nNonce, repr(self.vtx)) class PrefilledTransaction: __slots__ = ("index", "tx") def __init__(self, index=0, tx = None): self.index = index self.tx = tx def deserialize(self, f): self.index = deser_compact_size(f) self.tx = CTransaction() self.tx.deserialize(f) def serialize(self, with_witness=True): r = b"" r += ser_compact_size(self.index) if with_witness: r += self.tx.serialize_with_witness() else: r += self.tx.serialize_without_witness() return r def serialize_without_witness(self): return self.serialize(with_witness=False) def serialize_with_witness(self): return self.serialize(with_witness=True) def __repr__(self): return "PrefilledTransaction(index=%d, tx=%s)" % (self.index, repr(self.tx)) # This is what we send on the wire, in a cmpctblock message. class P2PHeaderAndShortIDs: __slots__ = ("header", "nonce", "prefilled_txn", "prefilled_txn_length", "shortids", "shortids_length") def __init__(self): self.header = CBlockHeader() self.nonce = 0 self.shortids_length = 0 self.shortids = [] self.prefilled_txn_length = 0 self.prefilled_txn = [] def deserialize(self, f): self.header.deserialize(f) self.nonce = struct.unpack("= 70001: # Relay field is optional for version 70001 onwards try: self.nRelay = struct.unpack(" class msg_headers: __slots__ = ("headers",) msgtype = b"headers" def __init__(self, headers=None): self.headers = headers if headers is not None else [] def deserialize(self, f): # comment in bitcoind indicates these should be deserialized as blocks blocks = deser_vector(f, CBlock) for x in blocks: self.headers.append(CBlockHeader(x)) def serialize(self): blocks = [CBlock(x) for x in self.headers] return ser_vector(blocks) def __repr__(self): return "msg_headers(headers=%s)" % repr(self.headers) class msg_merkleblock: __slots__ = ("merkleblock",) msgtype = b"merkleblock" def __init__(self, merkleblock=None): if merkleblock is None: self.merkleblock = CMerkleBlock() else: self.merkleblock = merkleblock def deserialize(self, f): self.merkleblock.deserialize(f) def serialize(self): return self.merkleblock.serialize() def __repr__(self): return "msg_merkleblock(merkleblock=%s)" % (repr(self.merkleblock)) class msg_filterload: __slots__ = ("data", "nHashFuncs", "nTweak", "nFlags") msgtype = b"filterload" def __init__(self, data=b'00', nHashFuncs=0, nTweak=0, nFlags=0): self.data = data self.nHashFuncs = nHashFuncs self.nTweak = nTweak self.nFlags = nFlags def deserialize(self, f): self.data = deser_string(f) self.nHashFuncs = struct.unpack("