aboutsummaryrefslogtreecommitdiff
path: root/test/functional
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional')
-rwxr-xr-xtest/functional/feature_block.py6
-rwxr-xr-xtest/functional/interface_http.py12
-rwxr-xr-xtest/functional/mempool_accept.py23
-rwxr-xr-xtest/functional/mempool_persist.py14
-rwxr-xr-xtest/functional/mining_prioritisetransaction.py2
-rwxr-xr-xtest/functional/p2p_invalid_messages.py2
-rwxr-xr-xtest/functional/test_framework/messages.py10
-rw-r--r--test/functional/test_framework/script.py4
-rwxr-xr-xtest/functional/test_framework/test_framework.py19
-rwxr-xr-xtest/functional/test_framework/test_node.py2
-rwxr-xr-xtest/functional/test_runner.py1
-rwxr-xr-xtest/functional/wallet_coinbase_category.py59
-rwxr-xr-xtest/functional/wallet_importmulti.py902
-rwxr-xr-xtest/functional/wallet_keypool_topup.py63
14 files changed, 612 insertions, 507 deletions
diff --git a/test/functional/feature_block.py b/test/functional/feature_block.py
index e50f67a345..b741339351 100755
--- a/test/functional/feature_block.py
+++ b/test/functional/feature_block.py
@@ -1181,7 +1181,7 @@ class FullBlockTest(BitcoinTestFramework):
self.save_spendable_output()
spend = self.get_spendable_output()
- self.sync_blocks(blocks, True, timeout=180)
+ self.sync_blocks(blocks, True, timeout=480)
chain1_tip = i
# now create alt chain of same length
@@ -1193,14 +1193,14 @@ class FullBlockTest(BitcoinTestFramework):
# extend alt chain to trigger re-org
block = self.next_block("alt" + str(chain1_tip + 1))
- self.sync_blocks([block], True, timeout=180)
+ self.sync_blocks([block], True, timeout=480)
# ... and re-org back to the first chain
self.move_tip(chain1_tip)
block = self.next_block(chain1_tip + 1)
self.sync_blocks([block], False, force_send=True)
block = self.next_block(chain1_tip + 2)
- self.sync_blocks([block], True, timeout=180)
+ self.sync_blocks([block], True, timeout=480)
# Helper methods
################
diff --git a/test/functional/interface_http.py b/test/functional/interface_http.py
index e4b86f9e1e..20889366e5 100755
--- a/test/functional/interface_http.py
+++ b/test/functional/interface_http.py
@@ -31,13 +31,13 @@ class HTTPBasicsTest (BitcoinTestFramework):
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
out1 = conn.getresponse().read()
assert(b'"error":null' in out1)
- assert(conn.sock!=None) #according to http/1.1 connection must still be open!
+ assert(conn.sock is not None) #according to http/1.1 connection must still be open!
#send 2nd request without closing connection
conn.request('POST', '/', '{"method": "getchaintips"}', headers)
out1 = conn.getresponse().read()
assert(b'"error":null' in out1) #must also response with a correct json-rpc message
- assert(conn.sock!=None) #according to http/1.1 connection must still be open!
+ assert(conn.sock is not None) #according to http/1.1 connection must still be open!
conn.close()
#same should be if we add keep-alive because this should be the std. behaviour
@@ -48,13 +48,13 @@ class HTTPBasicsTest (BitcoinTestFramework):
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
out1 = conn.getresponse().read()
assert(b'"error":null' in out1)
- assert(conn.sock!=None) #according to http/1.1 connection must still be open!
+ assert(conn.sock is not None) #according to http/1.1 connection must still be open!
#send 2nd request without closing connection
conn.request('POST', '/', '{"method": "getchaintips"}', headers)
out1 = conn.getresponse().read()
assert(b'"error":null' in out1) #must also response with a correct json-rpc message
- assert(conn.sock!=None) #according to http/1.1 connection must still be open!
+ assert(conn.sock is not None) #according to http/1.1 connection must still be open!
conn.close()
#now do the same with "Connection: close"
@@ -65,7 +65,7 @@ class HTTPBasicsTest (BitcoinTestFramework):
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
out1 = conn.getresponse().read()
assert(b'"error":null' in out1)
- assert(conn.sock==None) #now the connection must be closed after the response
+ assert(conn.sock is None) #now the connection must be closed after the response
#node1 (2nd node) is running with disabled keep-alive option
urlNode1 = urllib.parse.urlparse(self.nodes[1].url)
@@ -88,7 +88,7 @@ class HTTPBasicsTest (BitcoinTestFramework):
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
out1 = conn.getresponse().read()
assert(b'"error":null' in out1)
- assert(conn.sock!=None) #connection must be closed because bitcoind should use keep-alive by default
+ assert(conn.sock is not None) #connection must be closed because bitcoind should use keep-alive by default
# Check excessive request size
conn = http.client.HTTPConnection(urlNode2.hostname, urlNode2.port)
diff --git a/test/functional/mempool_accept.py b/test/functional/mempool_accept.py
index bec6a0050a..02f1cc4391 100755
--- a/test/functional/mempool_accept.py
+++ b/test/functional/mempool_accept.py
@@ -6,6 +6,7 @@
from io import BytesIO
import math
+
from test_framework.test_framework import BitcoinTestFramework
from test_framework.messages import (
BIP125_SEQUENCE_NUMBER,
@@ -57,6 +58,7 @@ class MempoolAcceptanceTest(BitcoinTestFramework):
self.mempool_size = 0
wait_until(lambda: node.getblockcount() == 200)
assert_equal(node.getmempoolinfo()['size'], self.mempool_size)
+ coins = node.listunspent()
self.log.info('Should not accept garbage to testmempoolaccept')
assert_raises_rpc_error(-3, 'Expected type array, got string', lambda: node.testmempoolaccept(rawtxs='ff00baar'))
@@ -64,13 +66,14 @@ class MempoolAcceptanceTest(BitcoinTestFramework):
assert_raises_rpc_error(-22, 'TX decode failed', lambda: node.testmempoolaccept(rawtxs=['ff00baar']))
self.log.info('A transaction already in the blockchain')
- coin = node.listunspent()[0] # Pick a random coin(base) to spend
+ coin = coins.pop() # Pick a random coin(base) to spend
raw_tx_in_block = node.signrawtransactionwithwallet(node.createrawtransaction(
inputs=[{'txid': coin['txid'], 'vout': coin['vout']}],
outputs=[{node.getnewaddress(): 0.3}, {node.getnewaddress(): 49}],
))['hex']
txid_in_block = node.sendrawtransaction(hexstring=raw_tx_in_block, allowhighfees=True)
node.generate(1)
+ self.mempool_size = 0
self.check_mempool_result(
result_expected=[{'txid': txid_in_block, 'allowed': False, 'reject-reason': '18: txn-already-known'}],
rawtxs=[raw_tx_in_block],
@@ -90,9 +93,25 @@ class MempoolAcceptanceTest(BitcoinTestFramework):
rawtxs=[raw_tx_0],
)
+ self.log.info('A final transaction not in the mempool')
+ coin = coins.pop() # Pick a random coin(base) to spend
+ raw_tx_final = node.signrawtransactionwithwallet(node.createrawtransaction(
+ inputs=[{'txid': coin['txid'], 'vout': coin['vout'], "sequence": 0xffffffff}], # SEQUENCE_FINAL
+ outputs=[{node.getnewaddress(): 0.025}],
+ locktime=node.getblockcount() + 2000, # Can be anything
+ ))['hex']
+ tx.deserialize(BytesIO(hex_str_to_bytes(raw_tx_final)))
+ self.check_mempool_result(
+ result_expected=[{'txid': tx.rehash(), 'allowed': True}],
+ rawtxs=[bytes_to_hex_str(tx.serialize())],
+ allowhighfees=True,
+ )
+ node.sendrawtransaction(hexstring=raw_tx_final, allowhighfees=True)
+ self.mempool_size += 1
+
self.log.info('A transaction in the mempool')
node.sendrawtransaction(hexstring=raw_tx_0)
- self.mempool_size = 1
+ self.mempool_size += 1
self.check_mempool_result(
result_expected=[{'txid': txid_0, 'allowed': False, 'reject-reason': '18: txn-already-in-mempool'}],
rawtxs=[raw_tx_0],
diff --git a/test/functional/mempool_persist.py b/test/functional/mempool_persist.py
index b4e9d967fd..d74d4eaaf1 100755
--- a/test/functional/mempool_persist.py
+++ b/test/functional/mempool_persist.py
@@ -42,6 +42,7 @@ import time
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, assert_raises_rpc_error, wait_until
+
class MempoolPersistTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 3
@@ -60,7 +61,7 @@ class MempoolPersistTest(BitcoinTestFramework):
self.log.debug("Send 5 transactions from node2 (to its own address)")
for i in range(5):
- self.nodes[2].sendtoaddress(self.nodes[2].getnewaddress(), Decimal("10"))
+ last_txid = self.nodes[2].sendtoaddress(self.nodes[2].getnewaddress(), Decimal("10"))
node2_balance = self.nodes[2].getbalance()
self.sync_all()
@@ -68,6 +69,13 @@ class MempoolPersistTest(BitcoinTestFramework):
assert_equal(len(self.nodes[0].getrawmempool()), 5)
assert_equal(len(self.nodes[1].getrawmempool()), 5)
+ self.log.debug("Prioritize a transaction on node0")
+ fees = self.nodes[0].getmempoolentry(txid=last_txid)['fees']
+ assert_equal(fees['base'], fees['modified'])
+ self.nodes[0].prioritisetransaction(txid=last_txid, fee_delta=1000)
+ fees = self.nodes[0].getmempoolentry(txid=last_txid)['fees']
+ assert_equal(fees['base'] + Decimal('0.00001000'), fees['modified'])
+
self.log.debug("Stop-start the nodes. Verify that node0 has the transactions in its mempool and node1 does not. Verify that node2 calculates its balance correctly after loading wallet transactions.")
self.stop_nodes()
# Give this node a head-start, so we can be "extra-sure" that it didn't load anything later
@@ -81,6 +89,10 @@ class MempoolPersistTest(BitcoinTestFramework):
# The others have loaded their mempool. If node_1 loaded anything, we'd probably notice by now:
assert_equal(len(self.nodes[1].getrawmempool()), 0)
+ self.log.debug('Verify prioritization is loaded correctly')
+ fees = self.nodes[0].getmempoolentry(txid=last_txid)['fees']
+ assert_equal(fees['base'] + Decimal('0.00001000'), fees['modified'])
+
# Verify accounting of mempool transactions after restart is correct
self.nodes[2].syncwithvalidationinterfacequeue() # Flush mempool to wallet
assert_equal(node2_balance, self.nodes[2].getbalance())
diff --git a/test/functional/mining_prioritisetransaction.py b/test/functional/mining_prioritisetransaction.py
index ceb5990f0e..ca4b621a78 100755
--- a/test/functional/mining_prioritisetransaction.py
+++ b/test/functional/mining_prioritisetransaction.py
@@ -84,7 +84,7 @@ class PrioritiseTransactionTest(BitcoinTestFramework):
high_fee_tx = x
# Something high-fee should have been mined!
- assert(high_fee_tx != None)
+ assert(high_fee_tx is not None)
# Add a prioritisation before a tx is in the mempool (de-prioritising a
# high-fee transaction so that it's now low fee).
diff --git a/test/functional/p2p_invalid_messages.py b/test/functional/p2p_invalid_messages.py
index 65997a5f9d..dbc5c5fff6 100755
--- a/test/functional/p2p_invalid_messages.py
+++ b/test/functional/p2p_invalid_messages.py
@@ -86,7 +86,7 @@ class InvalidMessagesTest(BitcoinTestFramework):
# Peer 1, despite serving up a bunch of nonsense, should still be connected.
self.log.info("Waiting for node to drop junk messages.")
- node.p2p.sync_with_ping(timeout=30)
+ node.p2p.sync_with_ping(timeout=120)
assert node.p2p.is_connected
#
diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py
index c72cb8835c..356a45d6d0 100755
--- a/test/functional/test_framework/messages.py
+++ b/test/functional/test_framework/messages.py
@@ -450,6 +450,8 @@ class CTransaction:
if flags != 0:
self.wit.vtxinwit = [CTxInWitness() for i in range(len(self.vin))]
self.wit.deserialize(f)
+ else:
+ self.wit = CTxWitness()
self.nLockTime = struct.unpack("<I", f.read(4))[0]
self.sha256 = None
self.hash = None
@@ -764,7 +766,7 @@ class HeaderAndShortIDs:
self.prefilled_txn = []
self.use_witness = False
- if p2pheaders_and_shortids != None:
+ if p2pheaders_and_shortids is not None:
self.header = p2pheaders_and_shortids.header
self.nonce = p2pheaders_and_shortids.nonce
self.shortids = p2pheaders_and_shortids.shortids
@@ -822,7 +824,7 @@ class BlockTransactionsRequest:
def __init__(self, blockhash=0, indexes = None):
self.blockhash = blockhash
- self.indexes = indexes if indexes != None else []
+ self.indexes = indexes if indexes is not None else []
def deserialize(self, f):
self.blockhash = deser_uint256(f)
@@ -863,7 +865,7 @@ class BlockTransactions:
def __init__(self, blockhash=0, transactions = None):
self.blockhash = blockhash
- self.transactions = transactions if transactions != None else []
+ self.transactions = transactions if transactions is not None else []
def deserialize(self, f):
self.blockhash = deser_uint256(f)
@@ -1052,7 +1054,7 @@ class msg_getdata:
command = b"getdata"
def __init__(self, inv=None):
- self.inv = inv if inv != None else []
+ self.inv = inv if inv is not None else []
def deserialize(self, f):
self.inv = deser_vector(f, CInv)
diff --git a/test/functional/test_framework/script.py b/test/functional/test_framework/script.py
index 2c5ba24a6a..012c80a1be 100644
--- a/test/functional/test_framework/script.py
+++ b/test/functional/test_framework/script.py
@@ -450,6 +450,10 @@ class CScript(bytes):
# join makes no sense for a CScript()
raise NotImplementedError
+ # Python 3.4 compatibility
+ def hex(self):
+ return hexlify(self).decode('ascii')
+
def __new__(cls, value=b''):
if isinstance(value, bytes) or isinstance(value, bytearray):
return super(CScript, cls).__new__(cls, value)
diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py
index 10d5c659ad..21bf35597e 100755
--- a/test/functional/test_framework/test_framework.py
+++ b/test/functional/test_framework/test_framework.py
@@ -281,7 +281,10 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
# Public helper methods. These can be accessed by the subclass test scripts.
def add_nodes(self, num_nodes, extra_args=None, *, rpchost=None, binary=None):
- """Instantiate TestNode objects"""
+ """Instantiate TestNode objects.
+
+ Should only be called once after the nodes have been specified in
+ set_test_params()."""
if self.bind_to_localhost_only:
extra_confs = [["bind=127.0.0.1"]] * num_nodes
else:
@@ -294,7 +297,19 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
assert_equal(len(extra_args), num_nodes)
assert_equal(len(binary), num_nodes)
for i in range(num_nodes):
- self.nodes.append(TestNode(i, get_datadir_path(self.options.tmpdir, i), rpchost=rpchost, timewait=self.rpc_timewait, bitcoind=binary[i], bitcoin_cli=self.options.bitcoincli, mocktime=self.mocktime, coverage_dir=self.options.coveragedir, extra_conf=extra_confs[i], extra_args=extra_args[i], use_cli=self.options.usecli))
+ self.nodes.append(TestNode(
+ i,
+ get_datadir_path(self.options.tmpdir, i),
+ rpchost=rpchost,
+ timewait=self.rpc_timewait,
+ bitcoind=binary[i],
+ bitcoin_cli=self.options.bitcoincli,
+ mocktime=self.mocktime,
+ coverage_dir=self.options.coveragedir,
+ extra_conf=extra_confs[i],
+ extra_args=extra_args[i],
+ use_cli=self.options.usecli,
+ ))
def start_node(self, i, *args, **kwargs):
"""Start a bitcoind"""
diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py
index ebae3faffc..031a8824b1 100755
--- a/test/functional/test_framework/test_node.py
+++ b/test/functional/test_framework/test_node.py
@@ -68,7 +68,7 @@ class TestNode():
self.rpc_timeout = timewait
self.binary = bitcoind
self.coverage_dir = coverage_dir
- if extra_conf != None:
+ if extra_conf is not None:
append_config(datadir, extra_conf)
# Most callers will just need to add extra args to the standard list below.
# For those callers that need more flexibility, they can just set the args property directly.
diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py
index a68b544738..a094433942 100755
--- a/test/functional/test_runner.py
+++ b/test/functional/test_runner.py
@@ -176,6 +176,7 @@ BASE_SCRIPTS = [
'rpc_getblockstats.py',
'p2p_fingerprint.py',
'feature_uacomment.py',
+ 'wallet_coinbase_category.py',
'feature_filelock.py',
'p2p_unrequested_blocks.py',
'feature_includeconf.py',
diff --git a/test/functional/wallet_coinbase_category.py b/test/functional/wallet_coinbase_category.py
new file mode 100755
index 0000000000..7aa8b44ebd
--- /dev/null
+++ b/test/functional/wallet_coinbase_category.py
@@ -0,0 +1,59 @@
+#!/usr/bin/env python3
+# Copyright (c) 2014-2018 The Bitcoin Core developers
+# Distributed under the MIT software license, see the accompanying
+# file COPYING or http://www.opensource.org/licenses/mit-license.php.
+"""Test coinbase transactions return the correct categories.
+
+Tests listtransactions, listsinceblock, and gettransaction.
+"""
+
+from test_framework.test_framework import BitcoinTestFramework
+from test_framework.util import (
+ assert_array_result
+)
+
+class CoinbaseCategoryTest(BitcoinTestFramework):
+ def set_test_params(self):
+ self.num_nodes = 1
+
+ def skip_test_if_missing_module(self):
+ self.skip_if_no_wallet()
+
+ def assert_category(self, category, address, txid, skip):
+ assert_array_result(self.nodes[0].listtransactions(skip=skip),
+ {"address": address},
+ {"category": category})
+ assert_array_result(self.nodes[0].listsinceblock()["transactions"],
+ {"address": address},
+ {"category": category})
+ assert_array_result(self.nodes[0].gettransaction(txid)["details"],
+ {"address": address},
+ {"category": category})
+
+ def run_test(self):
+ # Generate one block to an address
+ address = self.nodes[0].getnewaddress()
+ self.nodes[0].generatetoaddress(1, address)
+ hash = self.nodes[0].getbestblockhash()
+ txid = self.nodes[0].getblock(hash)["tx"][0]
+
+ # Coinbase transaction is immature after 1 confirmation
+ self.assert_category("immature", address, txid, 0)
+
+ # Mine another 99 blocks on top
+ self.nodes[0].generate(99)
+ # Coinbase transaction is still immature after 100 confirmations
+ self.assert_category("immature", address, txid, 99)
+
+ # Mine one more block
+ self.nodes[0].generate(1)
+ # Coinbase transaction is now matured, so category is "generate"
+ self.assert_category("generate", address, txid, 100)
+
+ # Orphan block that paid to address
+ self.nodes[0].invalidateblock(hash)
+ # Coinbase transaction is now orphaned
+ self.assert_category("orphan", address, txid, 100)
+
+if __name__ == '__main__':
+ CoinbaseCategoryTest().main()
diff --git a/test/functional/wallet_importmulti.py b/test/functional/wallet_importmulti.py
index 5c789b1c03..a163f7018c 100755
--- a/test/functional/wallet_importmulti.py
+++ b/test/functional/wallet_importmulti.py
@@ -2,9 +2,43 @@
# Copyright (c) 2014-2018 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
-"""Test the importmulti RPC."""
-
-from test_framework import script
+"""Test the importmulti RPC.
+
+Test importmulti by generating keys on node0, importing the scriptPubKeys and
+addresses on node1 and then testing the address info for the different address
+variants.
+
+- `get_key()` and `get_multisig()` are called to generate keys on node0 and
+ return the privkeys, pubkeys and all variants of scriptPubKey and address.
+- `test_importmulti()` is called to send an importmulti call to node1, test
+ success, and (if unsuccessful) test the error code and error message returned.
+- `test_address()` is called to call getaddressinfo for an address on node1
+ and test the values returned."""
+from collections import namedtuple
+
+from test_framework.address import (
+ key_to_p2pkh,
+ key_to_p2sh_p2wpkh,
+ key_to_p2wpkh,
+ script_to_p2sh,
+ script_to_p2sh_p2wsh,
+ script_to_p2wsh,
+)
+from test_framework.script import (
+ CScript,
+ OP_0,
+ OP_2,
+ OP_3,
+ OP_CHECKMULTISIG,
+ OP_CHECKSIG,
+ OP_DUP,
+ OP_EQUAL,
+ OP_EQUALVERIFY,
+ OP_HASH160,
+ OP_NOP,
+ hash160,
+ sha256,
+)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
@@ -13,12 +47,26 @@ from test_framework.util import (
bytes_to_hex_str,
hex_str_to_bytes
)
-from test_framework.script import (
- CScript,
- OP_0,
- hash160
-)
-from test_framework.messages import sha256
+
+Key = namedtuple('Key', ['privkey',
+ 'pubkey',
+ 'p2pkh_script',
+ 'p2pkh_addr',
+ 'p2wpkh_script',
+ 'p2wpkh_addr',
+ 'p2sh_p2wpkh_script',
+ 'p2sh_p2wpkh_redeem_script',
+ 'p2sh_p2wpkh_addr'])
+
+Multisig = namedtuple('Multisig', ['privkeys',
+ 'pubkeys',
+ 'p2sh_script',
+ 'p2sh_addr',
+ 'redeem_script',
+ 'p2wsh_script',
+ 'p2wsh_addr',
+ 'p2sh_p2wsh_script',
+ 'p2sh_p2wsh_addr'])
class ImportMultiTest(BitcoinTestFramework):
def set_test_params(self):
@@ -32,7 +80,64 @@ class ImportMultiTest(BitcoinTestFramework):
def setup_network(self):
self.setup_nodes()
- def run_test (self):
+ def get_key(self):
+ """Generate a fresh key on node0
+
+ Returns a named tuple of privkey, pubkey and all address and scripts."""
+ addr = self.nodes[0].getnewaddress()
+ pubkey = self.nodes[0].getaddressinfo(addr)['pubkey']
+ pkh = hash160(hex_str_to_bytes(pubkey))
+ return Key(self.nodes[0].dumpprivkey(addr),
+ pubkey,
+ CScript([OP_DUP, OP_HASH160, pkh, OP_EQUALVERIFY, OP_CHECKSIG]).hex(), # p2pkh
+ key_to_p2pkh(pubkey), # p2pkh addr
+ CScript([OP_0, pkh]).hex(), # p2wpkh
+ key_to_p2wpkh(pubkey), # p2wpkh addr
+ CScript([OP_HASH160, hash160(CScript([OP_0, pkh])), OP_EQUAL]).hex(), # p2sh-p2wpkh
+ CScript([OP_0, pkh]).hex(), # p2sh-p2wpkh redeem script
+ key_to_p2sh_p2wpkh(pubkey)) # p2sh-p2wpkh addr
+
+ def get_multisig(self):
+ """Generate a fresh multisig on node0
+
+ Returns a named tuple of privkeys, pubkeys and all address and scripts."""
+ addrs = []
+ pubkeys = []
+ for _ in range(3):
+ addr = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
+ addrs.append(addr['address'])
+ pubkeys.append(addr['pubkey'])
+ script_code = CScript([OP_2] + [hex_str_to_bytes(pubkey) for pubkey in pubkeys] + [OP_3, OP_CHECKMULTISIG])
+ witness_script = CScript([OP_0, sha256(script_code)])
+ return Multisig([self.nodes[0].dumpprivkey(addr) for addr in addrs],
+ pubkeys,
+ CScript([OP_HASH160, hash160(script_code), OP_EQUAL]).hex(), # p2sh
+ script_to_p2sh(script_code), # p2sh addr
+ script_code.hex(), # redeem script
+ witness_script.hex(), # p2wsh
+ script_to_p2wsh(script_code), # p2wsh addr
+ CScript([OP_HASH160, witness_script, OP_EQUAL]).hex(), # p2sh-p2wsh
+ script_to_p2sh_p2wsh(script_code)) # p2sh-p2wsh addr
+
+ def test_importmulti(self, req, success, error_code=None, error_message=None):
+ """Run importmulti and assert success"""
+ result = self.nodes[1].importmulti([req])
+ assert_equal(result[0]['success'], success)
+ if error_code is not None:
+ assert_equal(result[0]['error']['code'], error_code)
+ assert_equal(result[0]['error']['message'], error_message)
+
+ def test_address(self, address, **kwargs):
+ """Get address info for `address` and test whether the returned values are as expected."""
+ addr_info = self.nodes[1].getaddressinfo(address)
+ for key, value in kwargs.items():
+ if value is None:
+ if key in addr_info.keys():
+ raise AssertionError("key {} unexpectedly returned in getaddressinfo.".format(key))
+ elif addr_info[key] != value:
+ raise AssertionError("key {} value {} did not match expected value {}".format(key, addr_info[key], value))
+
+ def run_test(self):
self.log.info("Mining blocks...")
self.nodes[0].generate(1)
self.nodes[1].generate(1)
@@ -40,587 +145,460 @@ class ImportMultiTest(BitcoinTestFramework):
node0_address1 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- #Check only one address
+ # Check only one address
assert_equal(node0_address1['ismine'], True)
- #Node 1 sync test
- assert_equal(self.nodes[1].getblockcount(),1)
+ # Node 1 sync test
+ assert_equal(self.nodes[1].getblockcount(), 1)
- #Address Test - before import
+ # Address Test - before import
address_info = self.nodes[1].getaddressinfo(node0_address1['address'])
assert_equal(address_info['iswatchonly'], False)
assert_equal(address_info['ismine'], False)
-
# RPC importmulti -----------------------------------------------
# Bitcoin Address (implicit non-internal)
self.log.info("Should import an address")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": address['address']
- },
- "timestamp": "now",
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['iswatchonly'], True)
- assert_equal(address_assert['ismine'], False)
- assert_equal(address_assert['timestamp'], timestamp)
- assert_equal(address_assert['ischange'], False)
- watchonly_address = address['address']
+ key = self.get_key()
+ address = key.p2pkh_addr
+ self.test_importmulti({"scriptPubKey": {"address": address},
+ "timestamp": "now"},
+ True)
+ self.test_address(address,
+ iswatchonly=True,
+ ismine=False,
+ timestamp=timestamp,
+ ischange=False)
+ watchonly_address = address
watchonly_timestamp = timestamp
self.log.info("Should not import an invalid address")
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": "not valid address",
- },
- "timestamp": "now",
- }])
- assert_equal(result[0]['success'], False)
- assert_equal(result[0]['error']['code'], -5)
- assert_equal(result[0]['error']['message'], 'Invalid address')
+ self.test_importmulti({"scriptPubKey": {"address": "not valid address"},
+ "timestamp": "now"},
+ False,
+ error_code=-5,
+ error_message='Invalid address')
# ScriptPubKey + internal
self.log.info("Should import a scriptPubKey with internal flag")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- result = self.nodes[1].importmulti([{
- "scriptPubKey": address['scriptPubKey'],
- "timestamp": "now",
- "internal": True
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['iswatchonly'], True)
- assert_equal(address_assert['ismine'], False)
- assert_equal(address_assert['timestamp'], timestamp)
- assert_equal(address_assert['ischange'], True)
+ key = self.get_key()
+ self.test_importmulti({"scriptPubKey": key.p2pkh_script,
+ "timestamp": "now",
+ "internal": True},
+ True)
+ self.test_address(key.p2pkh_addr,
+ iswatchonly=True,
+ ismine=False,
+ timestamp=timestamp,
+ ischange=True)
# ScriptPubKey + internal + label
self.log.info("Should not allow a label to be specified when internal is true")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- result = self.nodes[1].importmulti([{
- "scriptPubKey": address['scriptPubKey'],
- "timestamp": "now",
- "internal": True,
- "label": "Example label"
- }])
- assert_equal(result[0]['success'], False)
- assert_equal(result[0]['error']['code'], -8)
- assert_equal(result[0]['error']['message'], 'Internal addresses should not have a label')
+ key = self.get_key()
+ self.test_importmulti({"scriptPubKey": key.p2pkh_script,
+ "timestamp": "now",
+ "internal": True,
+ "label": "Example label"},
+ False,
+ error_code=-8,
+ error_message='Internal addresses should not have a label')
# Nonstandard scriptPubKey + !internal
self.log.info("Should not import a nonstandard scriptPubKey without internal flag")
- nonstandardScriptPubKey = address['scriptPubKey'] + bytes_to_hex_str(script.CScript([script.OP_NOP]))
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- result = self.nodes[1].importmulti([{
- "scriptPubKey": nonstandardScriptPubKey,
- "timestamp": "now",
- }])
- assert_equal(result[0]['success'], False)
- assert_equal(result[0]['error']['code'], -8)
- assert_equal(result[0]['error']['message'], 'Internal must be set to true for nonstandard scriptPubKey imports.')
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['iswatchonly'], False)
- assert_equal(address_assert['ismine'], False)
- assert_equal('timestamp' in address_assert, False)
-
+ nonstandardScriptPubKey = key.p2pkh_script + bytes_to_hex_str(CScript([OP_NOP]))
+ key = self.get_key()
+ address = key.p2pkh_addr
+ self.test_importmulti({"scriptPubKey": nonstandardScriptPubKey,
+ "timestamp": "now"},
+ False,
+ error_code=-8,
+ error_message='Internal must be set to true for nonstandard scriptPubKey imports.')
+ self.test_address(address,
+ iswatchonly=False,
+ ismine=False,
+ timestamp=None)
# Address + Public key + !Internal(explicit)
self.log.info("Should import an address with public key")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": address['address']
- },
- "timestamp": "now",
- "pubkeys": [ address['pubkey'] ],
- "internal": False
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['iswatchonly'], True)
- assert_equal(address_assert['ismine'], False)
- assert_equal(address_assert['timestamp'], timestamp)
-
+ key = self.get_key()
+ address = key.p2pkh_addr
+ self.test_importmulti({"scriptPubKey": {"address": address},
+ "timestamp": "now",
+ "pubkeys": [key.pubkey],
+ "internal": False},
+ True)
+ self.test_address(address,
+ iswatchonly=True,
+ ismine=False,
+ timestamp=timestamp)
# ScriptPubKey + Public key + internal
self.log.info("Should import a scriptPubKey with internal and with public key")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- request = [{
- "scriptPubKey": address['scriptPubKey'],
- "timestamp": "now",
- "pubkeys": [ address['pubkey'] ],
- "internal": True
- }]
- result = self.nodes[1].importmulti(requests=request)
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['iswatchonly'], True)
- assert_equal(address_assert['ismine'], False)
- assert_equal(address_assert['timestamp'], timestamp)
+ key = self.get_key()
+ address = key.p2pkh_addr
+ self.test_importmulti({"scriptPubKey": key.p2pkh_script,
+ "timestamp": "now",
+ "pubkeys": [key.pubkey],
+ "internal": True},
+ True)
+ self.test_address(address,
+ iswatchonly=True,
+ ismine=False,
+ timestamp=timestamp)
# Nonstandard scriptPubKey + Public key + !internal
self.log.info("Should not import a nonstandard scriptPubKey without internal and with public key")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- request = [{
- "scriptPubKey": nonstandardScriptPubKey,
- "timestamp": "now",
- "pubkeys": [ address['pubkey'] ]
- }]
- result = self.nodes[1].importmulti(requests=request)
- assert_equal(result[0]['success'], False)
- assert_equal(result[0]['error']['code'], -8)
- assert_equal(result[0]['error']['message'], 'Internal must be set to true for nonstandard scriptPubKey imports.')
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['iswatchonly'], False)
- assert_equal(address_assert['ismine'], False)
- assert_equal('timestamp' in address_assert, False)
+ key = self.get_key()
+ address = key.p2pkh_addr
+ self.test_importmulti({"scriptPubKey": nonstandardScriptPubKey,
+ "timestamp": "now",
+ "pubkeys": [key.pubkey]},
+ False,
+ error_code=-8,
+ error_message='Internal must be set to true for nonstandard scriptPubKey imports.')
+ self.test_address(address,
+ iswatchonly=False,
+ ismine=False,
+ timestamp=None)
# Address + Private key + !watchonly
self.log.info("Should import an address with private key")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": address['address']
- },
- "timestamp": "now",
- "keys": [ self.nodes[0].dumpprivkey(address['address']) ]
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['iswatchonly'], False)
- assert_equal(address_assert['ismine'], True)
- assert_equal(address_assert['timestamp'], timestamp)
+ key = self.get_key()
+ address = key.p2pkh_addr
+ self.test_importmulti({"scriptPubKey": {"address": address},
+ "timestamp": "now",
+ "keys": [key.privkey]},
+ True)
+ self.test_address(address,
+ iswatchonly=False,
+ ismine=True,
+ timestamp=timestamp)
self.log.info("Should not import an address with private key if is already imported")
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": address['address']
- },
- "timestamp": "now",
- "keys": [ self.nodes[0].dumpprivkey(address['address']) ]
- }])
- assert_equal(result[0]['success'], False)
- assert_equal(result[0]['error']['code'], -4)
- assert_equal(result[0]['error']['message'], 'The wallet already contains the private key for this address or script')
+ self.test_importmulti({"scriptPubKey": {"address": address},
+ "timestamp": "now",
+ "keys": [key.privkey]},
+ False,
+ error_code=-4,
+ error_message='The wallet already contains the private key for this address or script')
# Address + Private key + watchonly
self.log.info("Should not import an address with private key and with watchonly")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": address['address']
- },
- "timestamp": "now",
- "keys": [ self.nodes[0].dumpprivkey(address['address']) ],
- "watchonly": True
- }])
- assert_equal(result[0]['success'], False)
- assert_equal(result[0]['error']['code'], -8)
- assert_equal(result[0]['error']['message'], 'Watch-only addresses should not include private keys')
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['iswatchonly'], False)
- assert_equal(address_assert['ismine'], False)
- assert_equal('timestamp' in address_assert, False)
+ key = self.get_key()
+ address = key.p2pkh_addr
+ self.test_importmulti({"scriptPubKey": {"address": address},
+ "timestamp": "now",
+ "keys": [key.privkey],
+ "watchonly": True},
+ False,
+ error_code=-8,
+ error_message='Watch-only addresses should not include private keys')
+ self.test_address(address,
+ iswatchonly=False,
+ ismine=False,
+ timestamp=None)
# ScriptPubKey + Private key + internal
self.log.info("Should import a scriptPubKey with internal and with private key")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- result = self.nodes[1].importmulti([{
- "scriptPubKey": address['scriptPubKey'],
- "timestamp": "now",
- "keys": [ self.nodes[0].dumpprivkey(address['address']) ],
- "internal": True
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['iswatchonly'], False)
- assert_equal(address_assert['ismine'], True)
- assert_equal(address_assert['timestamp'], timestamp)
+ key = self.get_key()
+ address = key.p2pkh_addr
+ self.test_importmulti({"scriptPubKey": key.p2pkh_script,
+ "timestamp": "now",
+ "keys": [key.privkey],
+ "internal": True},
+ True)
+ self.test_address(address,
+ iswatchonly=False,
+ ismine=True,
+ timestamp=timestamp)
# Nonstandard scriptPubKey + Private key + !internal
self.log.info("Should not import a nonstandard scriptPubKey without internal and with private key")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- result = self.nodes[1].importmulti([{
- "scriptPubKey": nonstandardScriptPubKey,
- "timestamp": "now",
- "keys": [ self.nodes[0].dumpprivkey(address['address']) ]
- }])
- assert_equal(result[0]['success'], False)
- assert_equal(result[0]['error']['code'], -8)
- assert_equal(result[0]['error']['message'], 'Internal must be set to true for nonstandard scriptPubKey imports.')
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['iswatchonly'], False)
- assert_equal(address_assert['ismine'], False)
- assert_equal('timestamp' in address_assert, False)
-
+ key = self.get_key()
+ address = key.p2pkh_addr
+ self.test_importmulti({"scriptPubKey": nonstandardScriptPubKey,
+ "timestamp": "now",
+ "keys": [key.privkey]},
+ False,
+ error_code=-8,
+ error_message='Internal must be set to true for nonstandard scriptPubKey imports.')
+ self.test_address(address,
+ iswatchonly=False,
+ ismine=False,
+ timestamp=None)
# P2SH address
- sig_address_1 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- sig_address_2 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- sig_address_3 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- multi_sig_script = self.nodes[0].createmultisig(2, [sig_address_1['pubkey'], sig_address_2['pubkey'], sig_address_3['pubkey']])
+ multisig = self.get_multisig()
self.nodes[1].generate(100)
- self.nodes[1].sendtoaddress(multi_sig_script['address'], 10.00)
+ self.nodes[1].sendtoaddress(multisig.p2sh_addr, 10.00)
self.nodes[1].generate(1)
timestamp = self.nodes[1].getblock(self.nodes[1].getbestblockhash())['mediantime']
self.log.info("Should import a p2sh")
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": multi_sig_script['address']
- },
- "timestamp": "now",
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(multi_sig_script['address'])
- assert_equal(address_assert['isscript'], True)
- assert_equal(address_assert['iswatchonly'], True)
- assert_equal(address_assert['timestamp'], timestamp)
- p2shunspent = self.nodes[1].listunspent(0,999999, [multi_sig_script['address']])[0]
+ self.test_importmulti({"scriptPubKey": {"address": multisig.p2sh_addr},
+ "timestamp": "now"},
+ True)
+ self.test_address(multisig.p2sh_addr,
+ isscript=True,
+ iswatchonly=True,
+ timestamp=timestamp)
+ p2shunspent = self.nodes[1].listunspent(0, 999999, [multisig.p2sh_addr])[0]
assert_equal(p2shunspent['spendable'], False)
assert_equal(p2shunspent['solvable'], False)
-
# P2SH + Redeem script
- sig_address_1 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- sig_address_2 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- sig_address_3 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- multi_sig_script = self.nodes[0].createmultisig(2, [sig_address_1['pubkey'], sig_address_2['pubkey'], sig_address_3['pubkey']])
+ multisig = self.get_multisig()
self.nodes[1].generate(100)
- self.nodes[1].sendtoaddress(multi_sig_script['address'], 10.00)
+ self.nodes[1].sendtoaddress(multisig.p2sh_addr, 10.00)
self.nodes[1].generate(1)
timestamp = self.nodes[1].getblock(self.nodes[1].getbestblockhash())['mediantime']
self.log.info("Should import a p2sh with respective redeem script")
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": multi_sig_script['address']
- },
- "timestamp": "now",
- "redeemscript": multi_sig_script['redeemScript']
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(multi_sig_script['address'])
- assert_equal(address_assert['timestamp'], timestamp)
-
- p2shunspent = self.nodes[1].listunspent(0,999999, [multi_sig_script['address']])[0]
+ self.test_importmulti({"scriptPubKey": {"address": multisig.p2sh_addr},
+ "timestamp": "now",
+ "redeemscript": multisig.redeem_script},
+ True)
+ self.test_address(multisig.p2sh_addr, timestamp=timestamp)
+
+ p2shunspent = self.nodes[1].listunspent(0, 999999, [multisig.p2sh_addr])[0]
assert_equal(p2shunspent['spendable'], False)
assert_equal(p2shunspent['solvable'], True)
-
# P2SH + Redeem script + Private Keys + !Watchonly
- sig_address_1 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- sig_address_2 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- sig_address_3 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- multi_sig_script = self.nodes[0].createmultisig(2, [sig_address_1['pubkey'], sig_address_2['pubkey'], sig_address_3['pubkey']])
+ multisig = self.get_multisig()
self.nodes[1].generate(100)
- self.nodes[1].sendtoaddress(multi_sig_script['address'], 10.00)
+ self.nodes[1].sendtoaddress(multisig.p2sh_addr, 10.00)
self.nodes[1].generate(1)
timestamp = self.nodes[1].getblock(self.nodes[1].getbestblockhash())['mediantime']
self.log.info("Should import a p2sh with respective redeem script and private keys")
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": multi_sig_script['address']
- },
- "timestamp": "now",
- "redeemscript": multi_sig_script['redeemScript'],
- "keys": [ self.nodes[0].dumpprivkey(sig_address_1['address']), self.nodes[0].dumpprivkey(sig_address_2['address'])]
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(multi_sig_script['address'])
- assert_equal(address_assert['timestamp'], timestamp)
-
- p2shunspent = self.nodes[1].listunspent(0,999999, [multi_sig_script['address']])[0]
+ self.test_importmulti({"scriptPubKey": {"address": multisig.p2sh_addr},
+ "timestamp": "now",
+ "redeemscript": multisig.redeem_script,
+ "keys": multisig.privkeys[0:2]},
+ True)
+ self.test_address(multisig.p2sh_addr,
+ timestamp=timestamp)
+
+ p2shunspent = self.nodes[1].listunspent(0, 999999, [multisig.p2sh_addr])[0]
assert_equal(p2shunspent['spendable'], False)
assert_equal(p2shunspent['solvable'], True)
# P2SH + Redeem script + Private Keys + Watchonly
- sig_address_1 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- sig_address_2 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- sig_address_3 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- multi_sig_script = self.nodes[0].createmultisig(2, [sig_address_1['pubkey'], sig_address_2['pubkey'], sig_address_3['pubkey']])
+ multisig = self.get_multisig()
self.nodes[1].generate(100)
- self.nodes[1].sendtoaddress(multi_sig_script['address'], 10.00)
+ self.nodes[1].sendtoaddress(multisig.p2sh_addr, 10.00)
self.nodes[1].generate(1)
timestamp = self.nodes[1].getblock(self.nodes[1].getbestblockhash())['mediantime']
self.log.info("Should import a p2sh with respective redeem script and private keys")
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": multi_sig_script['address']
- },
- "timestamp": "now",
- "redeemscript": multi_sig_script['redeemScript'],
- "keys": [ self.nodes[0].dumpprivkey(sig_address_1['address']), self.nodes[0].dumpprivkey(sig_address_2['address'])],
- "watchonly": True
- }])
- assert_equal(result[0]['success'], False)
- assert_equal(result[0]['error']['code'], -8)
- assert_equal(result[0]['error']['message'], 'Watch-only addresses should not include private keys')
-
+ self.test_importmulti({"scriptPubKey": {"address": multisig.p2sh_addr},
+ "timestamp": "now",
+ "redeemscript": multisig.redeem_script,
+ "keys": multisig.privkeys[0:2],
+ "watchonly": True},
+ False,
+ error_code=-8,
+ error_message='Watch-only addresses should not include private keys')
# Address + Public key + !Internal + Wrong pubkey
self.log.info("Should not import an address with a wrong public key")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- address2 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": address['address']
- },
- "timestamp": "now",
- "pubkeys": [ address2['pubkey'] ]
- }])
- assert_equal(result[0]['success'], False)
- assert_equal(result[0]['error']['code'], -5)
- assert_equal(result[0]['error']['message'], 'Key does not match address destination')
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['iswatchonly'], False)
- assert_equal(address_assert['ismine'], False)
- assert_equal('timestamp' in address_assert, False)
-
+ key = self.get_key()
+ address = key.p2pkh_addr
+ wrong_key = self.get_key().pubkey
+ self.test_importmulti({"scriptPubKey": {"address": address},
+ "timestamp": "now",
+ "pubkeys": [wrong_key]},
+ False,
+ error_code=-5,
+ error_message='Key does not match address destination')
+ self.test_address(address,
+ iswatchonly=False,
+ ismine=False,
+ timestamp=None)
# ScriptPubKey + Public key + internal + Wrong pubkey
self.log.info("Should not import a scriptPubKey with internal and with a wrong public key")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- address2 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- request = [{
- "scriptPubKey": address['scriptPubKey'],
- "timestamp": "now",
- "pubkeys": [ address2['pubkey'] ],
- "internal": True
- }]
- result = self.nodes[1].importmulti(request)
- assert_equal(result[0]['success'], False)
- assert_equal(result[0]['error']['code'], -5)
- assert_equal(result[0]['error']['message'], 'Key does not match address destination')
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['iswatchonly'], False)
- assert_equal(address_assert['ismine'], False)
- assert_equal('timestamp' in address_assert, False)
-
+ key = self.get_key()
+ address = key.p2pkh_addr
+ wrong_key = self.get_key().pubkey
+ self.test_importmulti({"scriptPubKey": key.p2pkh_script,
+ "timestamp": "now",
+ "pubkeys": [wrong_key],
+ "internal": True},
+ False,
+ error_code=-5,
+ error_message='Key does not match address destination')
+ self.test_address(address,
+ iswatchonly=False,
+ ismine=False,
+ timestamp=None)
# Address + Private key + !watchonly + Wrong private key
self.log.info("Should not import an address with a wrong private key")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- address2 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": address['address']
- },
- "timestamp": "now",
- "keys": [ self.nodes[0].dumpprivkey(address2['address']) ]
- }])
- assert_equal(result[0]['success'], False)
- assert_equal(result[0]['error']['code'], -5)
- assert_equal(result[0]['error']['message'], 'Key does not match address destination')
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['iswatchonly'], False)
- assert_equal(address_assert['ismine'], False)
- assert_equal('timestamp' in address_assert, False)
-
+ key = self.get_key()
+ address = key.p2pkh_addr
+ wrong_privkey = self.get_key().privkey
+ self.test_importmulti({"scriptPubKey": {"address": address},
+ "timestamp": "now",
+ "keys": [wrong_privkey]},
+ False,
+ error_code=-5,
+ error_message='Key does not match address destination')
+ self.test_address(address,
+ iswatchonly=False,
+ ismine=False,
+ timestamp=None)
# ScriptPubKey + Private key + internal + Wrong private key
self.log.info("Should not import a scriptPubKey with internal and with a wrong private key")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- address2 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- result = self.nodes[1].importmulti([{
- "scriptPubKey": address['scriptPubKey'],
- "timestamp": "now",
- "keys": [ self.nodes[0].dumpprivkey(address2['address']) ],
- "internal": True
- }])
- assert_equal(result[0]['success'], False)
- assert_equal(result[0]['error']['code'], -5)
- assert_equal(result[0]['error']['message'], 'Key does not match address destination')
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['iswatchonly'], False)
- assert_equal(address_assert['ismine'], False)
- assert_equal('timestamp' in address_assert, False)
-
+ key = self.get_key()
+ address = key.p2pkh_addr
+ wrong_privkey = self.get_key().privkey
+ self.test_importmulti({"scriptPubKey": key.p2pkh_script,
+ "timestamp": "now",
+ "keys": [wrong_privkey],
+ "internal": True},
+ False,
+ error_code=-5,
+ error_message='Key does not match address destination')
+ self.test_address(address,
+ iswatchonly=False,
+ ismine=False,
+ timestamp=None)
# Importing existing watch only address with new timestamp should replace saved timestamp.
assert_greater_than(timestamp, watchonly_timestamp)
self.log.info("Should replace previously saved watch only timestamp.")
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": watchonly_address,
- },
- "timestamp": "now",
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(watchonly_address)
- assert_equal(address_assert['iswatchonly'], True)
- assert_equal(address_assert['ismine'], False)
- assert_equal(address_assert['timestamp'], timestamp)
+ self.test_importmulti({"scriptPubKey": {"address": watchonly_address},
+ "timestamp": "now"},
+ True)
+ self.test_address(watchonly_address,
+ iswatchonly=True,
+ ismine=False,
+ timestamp=timestamp)
watchonly_timestamp = timestamp
-
# restart nodes to check for proper serialization/deserialization of watch only address
self.stop_nodes()
self.start_nodes()
- address_assert = self.nodes[1].getaddressinfo(watchonly_address)
- assert_equal(address_assert['iswatchonly'], True)
- assert_equal(address_assert['ismine'], False)
- assert_equal(address_assert['timestamp'], watchonly_timestamp)
+ self.test_address(watchonly_address,
+ iswatchonly=True,
+ ismine=False,
+ timestamp=watchonly_timestamp)
# Bad or missing timestamps
self.log.info("Should throw on invalid or missing timestamp values")
assert_raises_rpc_error(-3, 'Missing required timestamp field for key',
- self.nodes[1].importmulti, [{
- "scriptPubKey": address['scriptPubKey'],
- }])
+ self.nodes[1].importmulti, [{"scriptPubKey": key.p2pkh_script}])
assert_raises_rpc_error(-3, 'Expected number or "now" timestamp value for key. got type string',
- self.nodes[1].importmulti, [{
- "scriptPubKey": address['scriptPubKey'],
- "timestamp": "",
- }])
+ self.nodes[1].importmulti, [{
+ "scriptPubKey": key.p2pkh_script,
+ "timestamp": ""
+ }])
# Import P2WPKH address as watch only
self.log.info("Should import a P2WPKH address as watch only")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress(address_type="bech32"))
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": address['address']
- },
- "timestamp": "now",
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['iswatchonly'], True)
- assert_equal(address_assert['solvable'], False)
+ key = self.get_key()
+ address = key.p2wpkh_addr
+ self.test_importmulti({"scriptPubKey": {"address": address},
+ "timestamp": "now"},
+ True)
+ self.test_address(address,
+ iswatchonly=True,
+ solvable=False)
# Import P2WPKH address with public key but no private key
self.log.info("Should import a P2WPKH address and public key as solvable but not spendable")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress(address_type="bech32"))
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": address['address']
- },
- "timestamp": "now",
- "pubkeys": [ address['pubkey'] ]
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['ismine'], False)
- assert_equal(address_assert['solvable'], True)
+ key = self.get_key()
+ address = key.p2wpkh_addr
+ self.test_importmulti({"scriptPubKey": {"address": address},
+ "timestamp": "now",
+ "pubkeys": [key.pubkey]},
+ True)
+ self.test_address(address,
+ ismine=False,
+ solvable=True)
# Import P2WPKH address with key and check it is spendable
self.log.info("Should import a P2WPKH address with key")
- address = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress(address_type="bech32"))
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": address['address']
- },
- "timestamp": "now",
- "keys": [self.nodes[0].dumpprivkey(address['address'])]
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(address['address'])
- assert_equal(address_assert['iswatchonly'], False)
- assert_equal(address_assert['ismine'], True)
+ key = self.get_key()
+ address = key.p2wpkh_addr
+ self.test_importmulti({"scriptPubKey": {"address": address},
+ "timestamp": "now",
+ "keys": [key.privkey]},
+ True)
+ self.test_address(address,
+ iswatchonly=False,
+ ismine=True)
# P2WSH multisig address without scripts or keys
- sig_address_1 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- sig_address_2 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- multi_sig_script = self.nodes[0].addmultisigaddress(2, [sig_address_1['pubkey'], sig_address_2['pubkey']], "", "bech32")
+ multisig = self.get_multisig()
self.log.info("Should import a p2wsh multisig as watch only without respective redeem script and private keys")
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": multi_sig_script['address']
- },
- "timestamp": "now"
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(multi_sig_script['address'])
- assert_equal(address_assert['solvable'], False)
+ self.test_importmulti({"scriptPubKey": {"address": multisig.p2wsh_addr},
+ "timestamp": "now"},
+ True)
+ self.test_address(multisig.p2sh_addr,
+ solvable=False)
# Same P2WSH multisig address as above, but now with witnessscript + private keys
- self.log.info("Should import a p2wsh with respective redeem script and private keys")
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": multi_sig_script['address']
- },
- "timestamp": "now",
- "witnessscript": multi_sig_script['redeemScript'],
- "keys": [ self.nodes[0].dumpprivkey(sig_address_1['address']), self.nodes[0].dumpprivkey(sig_address_2['address']) ]
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(multi_sig_script['address'])
- assert_equal(address_assert['solvable'], True)
- assert_equal(address_assert['ismine'], True)
- assert_equal(address_assert['sigsrequired'], 2)
+ self.log.info("Should import a p2wsh with respective witness script and private keys")
+ self.test_importmulti({"scriptPubKey": {"address": multisig.p2wsh_addr},
+ "timestamp": "now",
+ "witnessscript": multisig.redeem_script,
+ "keys": multisig.privkeys},
+ True)
+ self.test_address(multisig.p2sh_addr,
+ solvable=True,
+ ismine=True,
+ sigsrequired=2)
# P2SH-P2WPKH address with no redeemscript or public or private key
- sig_address_1 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress(address_type="p2sh-segwit"))
- pubkeyhash = hash160(hex_str_to_bytes(sig_address_1['pubkey']))
- pkscript = CScript([OP_0, pubkeyhash])
+ key = self.get_key()
+ address = key.p2sh_p2wpkh_addr
self.log.info("Should import a p2sh-p2wpkh without redeem script or keys")
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": sig_address_1['address']
- },
- "timestamp": "now"
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(sig_address_1['address'])
- assert_equal(address_assert['solvable'], False)
- assert_equal(address_assert['ismine'], False)
+ self.test_importmulti({"scriptPubKey": {"address": address},
+ "timestamp": "now"},
+ True)
+ self.test_address(address,
+ solvable=False,
+ ismine=False)
# P2SH-P2WPKH address + redeemscript + public key with no private key
self.log.info("Should import a p2sh-p2wpkh with respective redeem script and pubkey as solvable")
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": sig_address_1['address']
- },
- "timestamp": "now",
- "redeemscript": bytes_to_hex_str(pkscript),
- "pubkeys": [ sig_address_1['pubkey'] ]
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(sig_address_1['address'])
- assert_equal(address_assert['solvable'], True)
- assert_equal(address_assert['ismine'], False)
+ self.test_importmulti({"scriptPubKey": {"address": address},
+ "timestamp": "now",
+ "redeemscript": key.p2sh_p2wpkh_redeem_script,
+ "pubkeys": [key.pubkey]},
+ True)
+ self.test_address(address,
+ solvable=True,
+ ismine=False)
# P2SH-P2WPKH address + redeemscript + private key
- sig_address_1 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress(address_type="p2sh-segwit"))
- pubkeyhash = hash160(hex_str_to_bytes(sig_address_1['pubkey']))
- pkscript = CScript([OP_0, pubkeyhash])
+ key = self.get_key()
+ address = key.p2sh_p2wpkh_addr
self.log.info("Should import a p2sh-p2wpkh with respective redeem script and private keys")
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": sig_address_1['address']
- },
- "timestamp": "now",
- "redeemscript": bytes_to_hex_str(pkscript),
- "keys": [ self.nodes[0].dumpprivkey(sig_address_1['address'])]
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(sig_address_1['address'])
- assert_equal(address_assert['solvable'], True)
- assert_equal(address_assert['ismine'], True)
-
- # P2SH-P2WSH 1-of-1 multisig + redeemscript with no private key
- sig_address_1 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())
- multi_sig_script = self.nodes[0].addmultisigaddress(1, [sig_address_1['pubkey']], "", "p2sh-segwit")
- scripthash = sha256(hex_str_to_bytes(multi_sig_script['redeemScript']))
- redeem_script = CScript([OP_0, scripthash])
+ self.test_importmulti({"scriptPubKey": {"address": address},
+ "timestamp": "now",
+ "redeemscript": key.p2sh_p2wpkh_redeem_script,
+ "keys": [key.privkey]},
+ True)
+ self.test_address(address,
+ solvable=True,
+ ismine=True)
+
+ # P2SH-P2WSH multisig + redeemscript with no private key
+ multisig = self.get_multisig()
self.log.info("Should import a p2sh-p2wsh with respective redeem script but no private key")
- result = self.nodes[1].importmulti([{
- "scriptPubKey": {
- "address": multi_sig_script['address']
- },
- "timestamp": "now",
- "redeemscript": bytes_to_hex_str(redeem_script),
- "witnessscript": multi_sig_script['redeemScript']
- }])
- assert_equal(result[0]['success'], True)
- address_assert = self.nodes[1].getaddressinfo(multi_sig_script['address'])
- assert_equal(address_assert['solvable'], True)
+ self.test_importmulti({"scriptPubKey": {"address": multisig.p2sh_p2wsh_addr},
+ "timestamp": "now",
+ "redeemscript": multisig.p2wsh_script,
+ "witnessscript": multisig.redeem_script},
+ True)
+ self.test_address(address,
+ solvable=True)
if __name__ == '__main__':
- ImportMultiTest ().main ()
+ ImportMultiTest().main()
diff --git a/test/functional/wallet_keypool_topup.py b/test/functional/wallet_keypool_topup.py
index f1a441c399..b7c8d3098d 100755
--- a/test/functional/wallet_keypool_topup.py
+++ b/test/functional/wallet_keypool_topup.py
@@ -24,8 +24,8 @@ from test_framework.util import (
class KeypoolRestoreTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
- self.num_nodes = 2
- self.extra_args = [[], ['-keypool=100']]
+ self.num_nodes = 4
+ self.extra_args = [[], ['-keypool=100'], ['-keypool=100'], ['-keypool=100']]
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
@@ -40,32 +40,47 @@ class KeypoolRestoreTest(BitcoinTestFramework):
shutil.copyfile(wallet_path, wallet_backup_path)
self.start_node(1, self.extra_args[1])
connect_nodes_bi(self.nodes, 0, 1)
+ connect_nodes_bi(self.nodes, 0, 2)
+ connect_nodes_bi(self.nodes, 0, 3)
- self.log.info("Generate keys for wallet")
- for _ in range(90):
- addr_oldpool = self.nodes[1].getnewaddress()
- for _ in range(20):
- addr_extpool = self.nodes[1].getnewaddress()
+ for i, output_type in enumerate(["legacy", "p2sh-segwit", "bech32"]):
- self.log.info("Send funds to wallet")
- self.nodes[0].sendtoaddress(addr_oldpool, 10)
- self.nodes[0].generate(1)
- self.nodes[0].sendtoaddress(addr_extpool, 5)
- self.nodes[0].generate(1)
- sync_blocks(self.nodes)
+ self.log.info("Generate keys for wallet with address type: {}".format(output_type))
+ idx = i+1
+ for _ in range(90):
+ addr_oldpool = self.nodes[idx].getnewaddress(address_type=output_type)
+ for _ in range(20):
+ addr_extpool = self.nodes[idx].getnewaddress(address_type=output_type)
- self.log.info("Restart node with wallet backup")
- self.stop_node(1)
- shutil.copyfile(wallet_backup_path, wallet_path)
- self.start_node(1, self.extra_args[1])
- connect_nodes_bi(self.nodes, 0, 1)
- self.sync_all()
+ # Make sure we're creating the outputs we expect
+ address_details = self.nodes[idx].validateaddress(addr_extpool)
+ if i == 0:
+ assert(not address_details["isscript"] and not address_details["iswitness"])
+ elif i == 1:
+ assert(address_details["isscript"] and not address_details["iswitness"])
+ else:
+ assert(not address_details["isscript"] and address_details["iswitness"])
+
+
+ self.log.info("Send funds to wallet")
+ self.nodes[0].sendtoaddress(addr_oldpool, 10)
+ self.nodes[0].generate(1)
+ self.nodes[0].sendtoaddress(addr_extpool, 5)
+ self.nodes[0].generate(1)
+ sync_blocks(self.nodes)
+
+ self.log.info("Restart node with wallet backup")
+ self.stop_node(idx)
+ shutil.copyfile(wallet_backup_path, wallet_path)
+ self.start_node(idx, self.extra_args[idx])
+ connect_nodes_bi(self.nodes, 0, idx)
+ self.sync_all()
- self.log.info("Verify keypool is restored and balance is correct")
- assert_equal(self.nodes[1].getbalance(), 15)
- assert_equal(self.nodes[1].listtransactions()[0]['category'], "receive")
- # Check that we have marked all keys up to the used keypool key as used
- assert_equal(self.nodes[1].getaddressinfo(self.nodes[1].getnewaddress())['hdkeypath'], "m/0'/0'/110'")
+ self.log.info("Verify keypool is restored and balance is correct")
+ assert_equal(self.nodes[idx].getbalance(), 15)
+ assert_equal(self.nodes[idx].listtransactions()[0]['category'], "receive")
+ # Check that we have marked all keys up to the used keypool key as used
+ assert_equal(self.nodes[idx].getaddressinfo(self.nodes[idx].getnewaddress())['hdkeypath'], "m/0'/0'/110'")
if __name__ == '__main__':