diff options
author | Ava Chow <github@achow101.com> | 2024-05-09 11:57:30 -0400 |
---|---|---|
committer | Ava Chow <github@achow101.com> | 2024-05-09 11:57:30 -0400 |
commit | ceb1e078f8c0ae58ff72748b039184a205efe337 (patch) | |
tree | 4d3a4750e63b3b04818ddf0f852967f389512a39 /contrib/seeds | |
parent | 921c61e9a5c8592cc9e07986612d3f4f31d663fc (diff) | |
parent | 6abe772a17e09fe96e68cd3311280d5a30f6378b (diff) | |
download | bitcoin-ceb1e078f8c0ae58ff72748b039184a205efe337.tar.xz |
Merge bitcoin/bitcoin#28793: contrib: Add asmap-tool
6abe772a17e09fe96e68cd3311280d5a30f6378b contrib: Add asmap-tool (Fabian Jahr)
Pull request description:
This adds `asmap.py` and `asmap-tool.py` from sipa's `nextgen` branch: https://github.com/sipa/asmap/tree/nextgen
The motivation is that we should maintain the tooling for de- and encoding asmap files within the bitcoin core repository because it is not possible to use an asmap file that is not encoded.
We already had an earlier version of `asmap.py` within the seeds contrib tools. The newer version only had a small amount of changes and is still compatible, so the old version is removed from contrib/seeds and the new version is made available to `makeseeds.py`.
ACKs for top commit:
virtu:
ACK [6abe772](https://github.com/bitcoin/bitcoin/commit/6abe772a17e09fe96e68cd3311280d5a30f6378b)
0xB10C:
ACK 6abe772a17e09fe96e68cd3311280d5a30f6378b
achow101:
ACK 6abe772a17e09fe96e68cd3311280d5a30f6378b
brunoerg:
ACK 6abe772a17e09fe96e68cd3311280d5a30f6378b
Tree-SHA512: cc2a82ffa4eb46fa0ce4ca769dd82f8d0d2f37fc3652aa748eeb060e1142f9da4035008fe89433e2fd524a4dc153b7b9c085748944b49137b37009b0c0be8afb
Diffstat (limited to 'contrib/seeds')
-rw-r--r-- | contrib/seeds/asmap.py | 816 | ||||
-rwxr-xr-x | contrib/seeds/makeseeds.py | 5 |
2 files changed, 4 insertions, 817 deletions
diff --git a/contrib/seeds/asmap.py b/contrib/seeds/asmap.py deleted file mode 100644 index 214805b5a5..0000000000 --- a/contrib/seeds/asmap.py +++ /dev/null @@ -1,816 +0,0 @@ -# Copyright (c) 2022 Pieter Wuille -# Distributed under the MIT software license, see the accompanying -# file LICENSE or http://www.opensource.org/licenses/mit-license.php. - -""" -This module provides the ASNEntry and ASMap classes. -""" - -import copy -import ipaddress -import random -import unittest -from collections.abc import Callable, Iterable -from enum import Enum -from functools import total_ordering -from typing import Optional, Union, overload - -def net_to_prefix(net: Union[ipaddress.IPv4Network,ipaddress.IPv6Network]) -> list[bool]: - """ - Convert an IPv4 or IPv6 network to a prefix represented as a list of bits. - - IPv4 ranges are remapped to their IPv4-mapped IPv6 range (::ffff:0:0/96). - """ - num_bits = net.prefixlen - netrange = int.from_bytes(net.network_address.packed, 'big') - - # Map an IPv4 prefix into IPv6 space. - if isinstance(net, ipaddress.IPv4Network): - num_bits += 96 - netrange += 0xffff00000000 - - # Strip unused bottom bits. - assert (netrange & ((1 << (128 - num_bits)) - 1)) == 0 - return [((netrange >> (127 - i)) & 1) != 0 for i in range(num_bits)] - -def prefix_to_net(prefix: list[bool]) -> Union[ipaddress.IPv4Network,ipaddress.IPv6Network]: - """The reverse operation of net_to_prefix.""" - # Convert to number - netrange = sum(b << (127 - i) for i, b in enumerate(prefix)) - num_bits = len(prefix) - assert num_bits <= 128 - - # Return IPv4 range if in ::ffff:0:0/96 - if num_bits >= 96 and (netrange >> 32) == 0xffff: - return ipaddress.IPv4Network((netrange & 0xffffffff, num_bits - 96), True) - - # Return IPv6 range otherwise. - return ipaddress.IPv6Network((netrange, num_bits), True) - -# Shortcut for (prefix, ASN) entries. -ASNEntry = tuple[list[bool], int] - -# Shortcut for (prefix, old ASN, new ASN) entries. -ASNDiff = tuple[list[bool], int, int] - -class _VarLenCoder: - """ - A class representing a custom variable-length binary encoder/decoder for - integers. Each object represents a different coder, with different parameters - minval and clsbits. - - The encoding is easiest to describe using an example. Let's say minval=100 and - clsbits=[4,2,2,3]. In that case: - - x in [100..115]: encoded as [0] + [4-bit BE encoding of (x-100)]. - - x in [116..119]: encoded as [1,0] + [2-bit BE encoding of (x-116)]. - - x in [120..123]: encoded as [1,1,0] + [2-bit BE encoding of (x-120)]. - - x in [124..131]: encoded as [1,1,1] + [3-bit BE encoding of (x-124)]. - - In general, every number is encoded as: - - First, k "1"-bits, where k is the class the number falls in (there is one class - per element of clsbits). - - Then, a "0"-bit, unless k is the highest class, in which case there is nothing. - - Lastly, clsbits[k] bits encoding in big endian the position in its class that - number falls into. - - Every class k consists of 2^clsbits[k] consecutive integers. k=0 starts at minval, - other classes start one past the last element of the class before it. - """ - - def __init__(self, minval: int, clsbits: list[int]): - """Construct a new _VarLenCoder.""" - self._minval = minval - self._clsbits = clsbits - self._maxval = minval + sum(1 << b for b in clsbits) - 1 - - def can_encode(self, val: int) -> bool: - """Check whether value val is in the range this coder supports.""" - return self._minval <= val <= self._maxval - - def encode(self, val: int, ret: list[int]) -> None: - """Append encoding of val onto integer list ret.""" - - assert self._minval <= val <= self._maxval - val -= self._minval - bits = 0 - for k, bits in enumerate(self._clsbits): - if val >> bits: - # If the value will not fit in class k, subtract its range from v, - # emit a "1" bit and continue with the next class. - val -= 1 << bits - ret.append(1) - else: - if k + 1 < len(self._clsbits): - # Unless we're in the last class, emit a "0" bit. - ret.append(0) - break - # And then encode v (now the position within the class) in big endian. - ret.extend((val >> (bits - 1 - b)) & 1 for b in range(bits)) - - def encode_size(self, val: int) -> int: - """Compute how many bits are needed to encode val.""" - assert self._minval <= val <= self._maxval - val -= self._minval - ret = 0 - bits = 0 - for k, bits in enumerate(self._clsbits): - if val >> bits: - val -= 1 << bits - ret += 1 - else: - ret += k + 1 < len(self._clsbits) - break - return ret + bits - - def decode(self, stream, bitpos) -> tuple[int,int]: - """Decode a number starting at bitpos in stream, returning value and new bitpos.""" - val = self._minval - bits = 0 - for k, bits in enumerate(self._clsbits): - bit = 0 - if k + 1 < len(self._clsbits): - bit = stream[bitpos] - bitpos += 1 - if not bit: - break - val += 1 << bits - for i in range(bits): - bit = stream[bitpos] - bitpos += 1 - val += bit << (bits - 1 - i) - return val, bitpos - -# Variable-length encoders used in the binary asmap format. -_CODER_INS = _VarLenCoder(0, [0, 0, 1]) -_CODER_ASN = _VarLenCoder(1, list(range(15, 25))) -_CODER_MATCH = _VarLenCoder(2, list(range(1, 9))) -_CODER_JUMP = _VarLenCoder(17, list(range(5, 31))) - -class _Instruction(Enum): - """One instruction in the binary asmap format.""" - # A return instruction, encoded as [0], returns a constant ASN. It is followed by - # an integer using the ASN encoding. - RETURN = 0 - # A jump instruction, encoded as [1,0] inspects the next unused bit in the input - # and either continues execution (if 0), or skips a specified number of bits (if 1). - # It is followed by an integer, and then two subprograms. The integer uses jump encoding - # and corresponds to the length of the first subprogram (so it can be skipped). - JUMP = 1 - # A match instruction, encoded as [1,1,0] inspects 1 or more of the next unused bits - # in the input with its argument. If they all match, execution continues. If they do - # not, failure is returned. If a default instruction has been executed before, instead - # of failure the default instruction's argument is returned. It is followed by an - # integer in match encoding, and a subprogram. That value is at least 2 bits and at - # most 9 bits. An n-bit value signifies matching (n-1) bits in the input with the lower - # (n-1) bits in the match value. - MATCH = 2 - # A default instruction, encoded as [1,1,1] sets the default variable to its argument, - # and continues execution. It is followed by an integer in ASN encoding, and a subprogram. - DEFAULT = 3 - # Not an actual instruction, but a way to encode the empty program that fails. In the - # encoder, it is used more generally to represent the failure case inside MATCH instructions, - # which may (if used inside the context of a DEFAULT instruction) actually correspond to - # a successful return. In this usage, they're always converted to an actual MATCH or RETURN - # before the top level is reached (see make_default below). - END = 4 - -class _BinNode: - """A class representing a (node of) the parsed binary asmap format.""" - - @overload - def __init__(self, ins: _Instruction): ... - @overload - def __init__(self, ins: _Instruction, arg1: int): ... - @overload - def __init__(self, ins: _Instruction, arg1: "_BinNode", arg2: "_BinNode"): ... - @overload - def __init__(self, ins: _Instruction, arg1: int, arg2: "_BinNode"): ... - - def __init__(self, ins: _Instruction, arg1=None, arg2=None): - """ - Construct a new asmap node. Possibilities are: - - _BinNode(_Instruction.RETURN, asn) - - _BinNode(_Instruction.JUMP, node_0, node_1) - - _BinNode(_Instruction.MATCH, val, node) - - _BinNode(_Instruction.DEFAULT, asn, node) - - _BinNode(_Instruction.END) - """ - self.ins = ins - self.arg1 = arg1 - self.arg2 = arg2 - if ins == _Instruction.RETURN: - assert isinstance(arg1, int) - assert arg2 is None - self.size = _CODER_INS.encode_size(ins.value) + _CODER_ASN.encode_size(arg1) - elif ins == _Instruction.JUMP: - assert isinstance(arg1, _BinNode) - assert isinstance(arg2, _BinNode) - self.size = (_CODER_INS.encode_size(ins.value) + _CODER_JUMP.encode_size(arg1.size) + - arg1.size + arg2.size) - elif ins == _Instruction.DEFAULT: - assert isinstance(arg1, int) - assert isinstance(arg2, _BinNode) - self.size = _CODER_INS.encode_size(ins.value) + _CODER_ASN.encode_size(arg1) + arg2.size - elif ins == _Instruction.MATCH: - assert isinstance(arg1, int) - assert isinstance(arg2, _BinNode) - self.size = (_CODER_INS.encode_size(ins.value) + _CODER_MATCH.encode_size(arg1) - + arg2.size) - elif ins == _Instruction.END: - assert arg1 is None - assert arg2 is None - self.size = 0 - else: - assert False - - @staticmethod - def make_end() -> "_BinNode": - """Constructor for a _BinNode with just an END instruction.""" - return _BinNode(_Instruction.END) - - @staticmethod - def make_leaf(val: int) -> "_BinNode": - """Constructor for a _BinNode of just a RETURN instruction.""" - assert val is not None and val > 0 - return _BinNode(_Instruction.RETURN, val) - - @staticmethod - def make_branch(node0: "_BinNode", node1: "_BinNode") -> "_BinNode": - """ - Construct a _BinNode corresponding to running either the node0 or node1 subprogram, - based on the next input bit. It exploits shortcuts that are possible in the encoding, - and uses either a JUMP, MATCH, or END instruction. - """ - if node0.ins == _Instruction.END and node1.ins == _Instruction.END: - return node0 - if node0.ins == _Instruction.END: - if node1.ins == _Instruction.MATCH and node1.arg1 <= 0xFF: - return _BinNode(node1.ins, node1.arg1 + (1 << node1.arg1.bit_length()), node1.arg2) - return _BinNode(_Instruction.MATCH, 3, node1) - if node1.ins == _Instruction.END: - if node0.ins == _Instruction.MATCH and node0.arg1 <= 0xFF: - return _BinNode(node0.ins, node0.arg1 + (1 << (node0.arg1.bit_length() - 1)), - node0.arg2) - return _BinNode(_Instruction.MATCH, 2, node0) - return _BinNode(_Instruction.JUMP, node0, node1) - - @staticmethod - def make_default(val: int, sub: "_BinNode") -> "_BinNode": - """ - Construct a _BinNode that corresponds to the specified subprogram, with the specified - default value. It exploits shortcuts that are possible in the encoding, and will use - either a DEFAULT or a RETURN instruction.""" - assert val is not None and val > 0 - if sub.ins == _Instruction.END: - return _BinNode(_Instruction.RETURN, val) - if sub.ins in (_Instruction.RETURN, _Instruction.DEFAULT): - return sub - return _BinNode(_Instruction.DEFAULT, val, sub) - -@total_ordering -class ASMap: - """ - A class whose objects represent a mapping from subnets to ASNs. - - Internally the mapping is stored as a binary trie, but can be converted - from/to a list of ASNEntry objects, and from/to the binary asmap file format. - - In the trie representation, nodes are represented as bare lists for efficiency - and ease of manipulation: - - [0] means an unassigned subnet (no ASN mapping for it is present) - - [int] means a subnet mapped entirely to the specified ASN. - - [node,node] means a subnet whose lower half and upper half have different - - mappings, represented by new trie nodes. - """ - - def update(self, prefix: list[bool], asn: int) -> None: - """Update this ASMap object to map prefix to the specified asn.""" - assert asn == 0 or _CODER_ASN.can_encode(asn) - - def recurse(node: list, offset: int) -> None: - if offset == len(prefix): - # Reached the end of prefix; overwrite this node. - node.clear() - node.append(asn) - return - if len(node) == 1: - # Need to descend into a leaf node; split it up. - oldasn = node[0] - node.clear() - node.append([oldasn]) - node.append([oldasn]) - # Descend into the node. - recurse(node[prefix[offset]], offset + 1) - # If the result is two identical leaf children, merge them. - if len(node[0]) == 1 and len(node[1]) == 1 and node[0] == node[1]: - oldasn = node[0][0] - node.clear() - node.append(oldasn) - recurse(self._trie, 0) - - def update_multi(self, entries: list[tuple[list[bool], int]]) -> None: - """Apply multiple update operations, where longer prefixes take precedence.""" - entries.sort(key=lambda entry: len(entry[0])) - for prefix, asn in entries: - self.update(prefix, asn) - - def _set_trie(self, trie) -> None: - """Set trie directly. Internal use only.""" - def recurse(node: list) -> None: - if len(node) < 2: - return - recurse(node[0]) - recurse(node[1]) - if len(node[0]) == 2: - return - if node[0] == node[1]: - if len(node[0]) == 0: - node.clear() - else: - asn = node[0][0] - node.clear() - node.append(asn) - recurse(trie) - self._trie = trie - - def __init__(self, entries: Optional[Iterable[ASNEntry]] = None) -> None: - """Construct an ASMap object from an optional list of entries.""" - self._trie = [0] - if entries is not None: - def entry_key(entry): - """Sort function that places shorter prefixes first.""" - prefix, asn = entry - return len(prefix), prefix, asn - for prefix, asn in sorted(entries, key=entry_key): - self.update(prefix, asn) - - def lookup(self, prefix: list[bool]) -> Optional[int]: - """Look up a prefix. Returns ASN, or 0 if unassigned, or None if indeterminate.""" - node = self._trie - for bit in prefix: - if len(node) == 1: - break - node = node[bit] - if len(node) == 1: - return node[0] - return None - - def _to_entries_flat(self, fill: bool = False) -> list[ASNEntry]: - """Convert an ASMap object to a list of non-overlapping (prefix, asn) objects.""" - prefix : list[bool] = [] - - def recurse(node: list) -> list[ASNEntry]: - ret = [] - if len(node) == 1: - if node[0] > 0: - ret = [(list(prefix), node[0])] - elif len(node) == 2: - prefix.append(False) - ret = recurse(node[0]) - prefix[-1] = True - ret += recurse(node[1]) - prefix.pop() - if fill and len(ret) > 1: - asns = set(x[1] for x in ret) - if len(asns) == 1: - ret = [(list(prefix), list(asns)[0])] - return ret - return recurse(self._trie) - - def _to_entries_minimal(self, fill: bool = False) -> list[ASNEntry]: - """Convert a trie to a minimal list of ASNEntry objects, exploiting overlap.""" - prefix : list[bool] = [] - - def recurse(node: list) -> (tuple[dict[Optional[int], list[ASNEntry]], bool]): - if len(node) == 1 and node[0] == 0: - return {None if fill else 0: []}, True - if len(node) == 1: - return {node[0]: [], None: [(list(prefix), node[0])]}, False - ret: dict[Optional[int], list[ASNEntry]] = {} - prefix.append(False) - left, lhole = recurse(node[0]) - prefix[-1] = True - right, rhole = recurse(node[1]) - prefix.pop() - hole = not fill and (lhole or rhole) - def candidate(ctx: Optional[int], res0: Optional[list[ASNEntry]], - res1: Optional[list[ASNEntry]]): - if res0 is not None and res1 is not None: - if ctx not in ret or len(res0) + len(res1) < len(ret[ctx]): - ret[ctx] = res0 + res1 - for ctx in set(left) | set(right): - candidate(ctx, left.get(ctx), right.get(ctx)) - candidate(ctx, left.get(None), right.get(ctx)) - candidate(ctx, left.get(ctx), right.get(None)) - if not hole: - for ctx in list(ret): - if ctx is not None: - candidate(None, [(list(prefix), ctx)], ret[ctx]) - if None in ret: - ret = {ctx:entries for ctx, entries in ret.items() - if ctx is None or len(entries) < len(ret[None])} - if hole: - ret = {ctx:entries for ctx, entries in ret.items() if ctx is None or ctx == 0} - return ret, hole - res, _ = recurse(self._trie) - return res[0] if 0 in res else res[None] - - def __str__(self) -> str: - """Convert this ASMap object to a string containing Python code constructing it.""" - return f"ASMap({self._trie})" - - def to_entries(self, overlapping: bool = True, fill: bool = False) -> list[ASNEntry]: - """ - Convert the mappings in this ASMap object to a list of ASNEntry objects. - - Arguments: - overlapping: Permit the subnets in the resulting ASNEntry to overlap. - Setting this can result in a shorter list. - fill: Permit the resulting ASNEntry objects to cover subnets that - are unassigned in this ASMap object. Setting this can - result in a shorter list. - """ - if overlapping: - return self._to_entries_minimal(fill) - return self._to_entries_flat(fill) - - @staticmethod - def from_random(num_leaves: int = 10, max_asn: int = 6, - unassigned_prob: float = 0.5) -> "ASMap": - """ - Construct a random ASMap object, with specified: - - Number of leaves in its trie (at least 1) - - Maximum ASN value (at least 1) - - Probability for leaf nodes to be unassigned - - The number of leaves in the resulting object may be less than what is - requested. This method is mostly intended for testing. - """ - assert num_leaves >= 1 - assert max_asn >= 1 or unassigned_prob == 1 - assert _CODER_ASN.can_encode(max_asn) - assert 0.0 <= unassigned_prob <= 1.0 - trie: list = [] - leaves = [trie] - ret = ASMap() - for i in range(1, num_leaves): - idx = random.randrange(i) - leaf = leaves[idx] - lastleaf = leaves.pop() - if idx + 1 < i: - leaves[idx] = lastleaf - leaf.append([]) - leaf.append([]) - leaves.append(leaf[0]) - leaves.append(leaf[1]) - for leaf in leaves: - if random.random() >= unassigned_prob: - leaf.append(random.randrange(1, max_asn + 1)) - else: - leaf.append(0) - #pylint: disable=protected-access - ret._set_trie(trie) - return ret - - def _to_binnode(self, fill: bool = False) -> _BinNode: - """Convert a trie to a _BinNode object.""" - def recurse(node: list) -> tuple[dict[Optional[int], _BinNode], bool]: - if len(node) == 1 and node[0] == 0: - return {(None if fill else 0): _BinNode.make_end()}, True - if len(node) == 1: - return {None: _BinNode.make_leaf(node[0]), node[0]: _BinNode.make_end()}, False - ret: dict[Optional[int], _BinNode] = {} - left, lhole = recurse(node[0]) - right, rhole = recurse(node[1]) - hole = (lhole or rhole) and not fill - - def candidate(ctx: Optional[int], arg1, arg2, func: Callable): - if arg1 is not None and arg2 is not None: - cand = func(arg1, arg2) - if ctx not in ret or cand.size < ret[ctx].size: - ret[ctx] = cand - - for ctx in set(left) | set(right): - candidate(ctx, left.get(ctx), right.get(ctx), _BinNode.make_branch) - candidate(ctx, left.get(None), right.get(ctx), _BinNode.make_branch) - candidate(ctx, left.get(ctx), right.get(None), _BinNode.make_branch) - if not hole: - for ctx in set(ret) - set([None]): - candidate(None, ctx, ret[ctx], _BinNode.make_default) - if None in ret: - ret = {ctx:enc for ctx, enc in ret.items() - if ctx is None or enc.size < ret[None].size} - if hole: - ret = {ctx:enc for ctx, enc in ret.items() if ctx is None or ctx == 0} - return ret, hole - res, _ = recurse(self._trie) - return res[0] if 0 in res else res[None] - - @staticmethod - def _from_binnode(binnode: _BinNode) -> "ASMap": - """Construct an ASMap object from a _BinNode. Internal use only.""" - def recurse(node: _BinNode, default: int) -> list: - if node.ins == _Instruction.RETURN: - return [node.arg1] - if node.ins == _Instruction.JUMP: - return [recurse(node.arg1, default), recurse(node.arg2, default)] - if node.ins == _Instruction.MATCH: - val = node.arg1 - sub = recurse(node.arg2, default) - while val >= 2: - bit = val & 1 - val >>= 1 - if bit: - sub = [[default], sub] - else: - sub = [sub, [default]] - return sub - assert node.ins == _Instruction.DEFAULT - return recurse(node.arg2, node.arg1) - ret = ASMap() - if binnode.ins != _Instruction.END: - #pylint: disable=protected-access - ret._set_trie(recurse(binnode, 0)) - return ret - - def to_binary(self, fill: bool = False) -> bytes: - """ - Convert this ASMap object to binary. - - Argument: - fill: permit the resulting binary encoder to contain mappers for - unassigned subnets in this ASMap object. Doing so may - reduce the size of the encoding. - Returns: - A bytes object with the encoding of this ASMap object. - """ - bits: list[int] = [] - - def recurse(node: _BinNode) -> None: - _CODER_INS.encode(node.ins.value, bits) - if node.ins == _Instruction.RETURN: - _CODER_ASN.encode(node.arg1, bits) - elif node.ins == _Instruction.JUMP: - _CODER_JUMP.encode(node.arg1.size, bits) - recurse(node.arg1) - recurse(node.arg2) - elif node.ins == _Instruction.DEFAULT: - _CODER_ASN.encode(node.arg1, bits) - recurse(node.arg2) - else: - assert node.ins == _Instruction.MATCH - _CODER_MATCH.encode(node.arg1, bits) - recurse(node.arg2) - - binnode = self._to_binnode(fill) - if binnode.ins != _Instruction.END: - recurse(binnode) - - val = 0 - nbits = 0 - ret = [] - for bit in bits: - val += (bit << nbits) - nbits += 1 - if nbits == 8: - ret.append(val) - val = 0 - nbits = 0 - if nbits: - ret.append(val) - return bytes(ret) - - @staticmethod - def from_binary(bindata: bytes) -> Optional["ASMap"]: - """Decode an ASMap object from the provided binary encoding.""" - - bits: list[int] = [] - for byte in bindata: - bits.extend((byte >> i) & 1 for i in range(8)) - - def recurse(bitpos: int) -> tuple[_BinNode, int]: - insval, bitpos = _CODER_INS.decode(bits, bitpos) - ins = _Instruction(insval) - if ins == _Instruction.RETURN: - asn, bitpos = _CODER_ASN.decode(bits, bitpos) - return _BinNode(ins, asn), bitpos - if ins == _Instruction.JUMP: - jump, bitpos = _CODER_JUMP.decode(bits, bitpos) - left, bitpos1 = recurse(bitpos) - if bitpos1 != bitpos + jump: - raise ValueError("Inconsistent jump") - right, bitpos = recurse(bitpos1) - return _BinNode(ins, left, right), bitpos - if ins == _Instruction.MATCH: - match, bitpos = _CODER_MATCH.decode(bits, bitpos) - sub, bitpos = recurse(bitpos) - return _BinNode(ins, match, sub), bitpos - assert ins == _Instruction.DEFAULT - asn, bitpos = _CODER_ASN.decode(bits, bitpos) - sub, bitpos = recurse(bitpos) - return _BinNode(ins, asn, sub), bitpos - - if len(bits) == 0: - binnode = _BinNode(_Instruction.END) - else: - try: - binnode, bitpos = recurse(0) - except (ValueError, IndexError): - return None - if bitpos < len(bits) - 7: - return None - if not all(bit == 0 for bit in bits[bitpos:]): - return None - - return ASMap._from_binnode(binnode) - - def __lt__(self, other: "ASMap") -> bool: - return self._trie < other._trie - - def __eq__(self, other: object) -> bool: - if isinstance(other, ASMap): - return self._trie == other._trie - return False - - def extends(self, req: "ASMap") -> bool: - """Determine whether this matches req for all subranges where req is assigned.""" - def recurse(actual: list, require: list) -> bool: - if len(require) == 1 and require[0] == 0: - return True - if len(require) == 1: - if len(actual) == 1: - return bool(require[0] == actual[0]) - return recurse(actual[0], require) and recurse(actual[1], require) - if len(actual) == 2: - return recurse(actual[0], require[0]) and recurse(actual[1], require[1]) - return recurse(actual, require[0]) and recurse(actual, require[1]) - assert isinstance(req, ASMap) - #pylint: disable=protected-access - return recurse(self._trie, req._trie) - - def diff(self, other: "ASMap") -> list[ASNDiff]: - """Compute the diff from self to other.""" - prefix: list[bool] = [] - ret: list[ASNDiff] = [] - - def recurse(old_node: list, new_node: list): - if len(old_node) == 1 and len(new_node) == 1: - if old_node[0] != new_node[0]: - ret.append((list(prefix), old_node[0], new_node[0])) - else: - old_left: list = old_node if len(old_node) == 1 else old_node[0] - old_right: list = old_node if len(old_node) == 1 else old_node[1] - new_left: list = new_node if len(new_node) == 1 else new_node[0] - new_right: list = new_node if len(new_node) == 1 else new_node[1] - prefix.append(False) - recurse(old_left, new_left) - prefix[-1] = True - recurse(old_right, new_right) - prefix.pop() - assert isinstance(other, ASMap) - #pylint: disable=protected-access - recurse(self._trie, other._trie) - return ret - - def __copy__(self) -> "ASMap": - """Construct a copy of this ASMap object. Its state will not be shared.""" - ret = ASMap() - #pylint: disable=protected-access - ret._set_trie(copy.deepcopy(self._trie)) - return ret - - def __deepcopy__(self, _) -> "ASMap": - # ASMap objects do not allow sharing of the _trie member, so we don't need the memoization. - return self.__copy__() - - -class TestASMap(unittest.TestCase): - """Unit tests for this module.""" - - def test_ipv6_prefix_roundtrips(self) -> None: - """Test that random IPv6 network ranges roundtrip through prefix encoding.""" - for _ in range(20): - net_bits = random.getrandbits(128) - for prefix_len in range(0, 129): - masked_bits = (net_bits >> (128 - prefix_len)) << (128 - prefix_len) - net = ipaddress.IPv6Network((masked_bits.to_bytes(16, 'big'), prefix_len)) - prefix = net_to_prefix(net) - self.assertTrue(len(prefix) <= 128) - net2 = prefix_to_net(prefix) - self.assertEqual(net, net2) - - def test_ipv4_prefix_roundtrips(self) -> None: - """Test that random IPv4 network ranges roundtrip through prefix encoding.""" - for _ in range(100): - net_bits = random.getrandbits(32) - for prefix_len in range(0, 33): - masked_bits = (net_bits >> (32 - prefix_len)) << (32 - prefix_len) - net = ipaddress.IPv4Network((masked_bits.to_bytes(4, 'big'), prefix_len)) - prefix = net_to_prefix(net) - self.assertTrue(32 <= len(prefix) <= 128) - net2 = prefix_to_net(prefix) - self.assertEqual(net, net2) - - def test_asmap_roundtrips(self) -> None: - """Test case that verifies random ASMap objects roundtrip to/from entries/binary.""" - # Iterate over the number of leaves the random test ASMap objects have. - for leaves in range(1, 20): - # Iterate over the number of bits in the AS numbers used. - for asnbits in range(0, 24): - # Iterate over the probability that leaves are unassigned. - for pct in range(101): - # Construct a random ASMap object according to the above parameters. - asmap = ASMap.from_random(num_leaves=leaves, max_asn=1 + (1 << asnbits), - unassigned_prob=0.01 * pct) - # Run tests for to_entries and construction from those entries, both - # for overlapping and non-overlapping ones. - for overlapping in [False, True]: - entries = asmap.to_entries(overlapping=overlapping, fill=False) - random.shuffle(entries) - asmap2 = ASMap(entries) - assert asmap2 is not None - self.assertEqual(asmap2, asmap) - entries = asmap.to_entries(overlapping=overlapping, fill=True) - random.shuffle(entries) - asmap2 = ASMap(entries) - assert asmap2 is not None - self.assertTrue(asmap2.extends(asmap)) - - # Run tests for to_binary and construction from binary. - enc = asmap.to_binary(fill=False) - asmap3 = ASMap.from_binary(enc) - assert asmap3 is not None - self.assertEqual(asmap3, asmap) - enc = asmap.to_binary(fill=True) - asmap3 = ASMap.from_binary(enc) - assert asmap3 is not None - self.assertTrue(asmap3.extends(asmap)) - - def test_patching(self) -> None: - """Test behavior of update, lookup, extends, and diff.""" - #pylint: disable=too-many-locals,too-many-nested-blocks - # Iterate over the number of leaves the random test ASMap objects have. - for leaves in range(1, 20): - # Iterate over the number of bits in the AS numbers used. - for asnbits in range(0, 10): - # Iterate over the probability that leaves are unassigned. - for pct in range(0, 101): - # Construct a random ASMap object according to the above parameters. - asmap = ASMap.from_random(num_leaves=leaves, max_asn=1 + (1 << asnbits), - unassigned_prob=0.01 * pct) - # Make a copy of that asmap object to which patches will be applied. - # It starts off being equal to asmap. - patched = copy.copy(asmap) - # Keep a list of patches performed. - patches: list[ASNEntry] = [] - # Initially there cannot be any difference. - self.assertEqual(asmap.diff(patched), []) - # Make 5 patches, each building on top of the previous ones. - for _ in range(0, 5): - # Construct a random path and new ASN to assign it to, apply it to patched, - # and remember it in patches. - pathlen = random.randrange(5) - path = [random.getrandbits(1) != 0 for _ in range(pathlen)] - newasn = random.randrange(1 + (1 << asnbits)) - patched.update(path, newasn) - patches = [(path, newasn)] + patches - - # Compute the diff, and whether asmap extends patched, and the other way - # around. - diff = asmap.diff(patched) - self.assertEqual(asmap == patched, len(diff) == 0) - extends = asmap.extends(patched) - back_extends = patched.extends(asmap) - # Determine whether those extends results are consistent with the diff - # result. - self.assertEqual(extends, all(d[2] == 0 for d in diff)) - self.assertEqual(back_extends, all(d[1] == 0 for d in diff)) - # For every diff found: - for path, old_asn, new_asn in diff: - # Verify asmap and patched actually differ there. - self.assertTrue(old_asn != new_asn) - self.assertEqual(asmap.lookup(path), old_asn) - self.assertEqual(patched.lookup(path), new_asn) - for _ in range(2): - # Extend the path far enough that it's smaller than any mapped - # range, and check the lookup holds there too. - spec_path = list(path) - while len(spec_path) < 32: - spec_path.append(random.getrandbits(1) != 0) - self.assertEqual(asmap.lookup(spec_path), old_asn) - self.assertEqual(patched.lookup(spec_path), new_asn) - # Search through the list of performed patches to find the last one - # applying to the extended path (note that patches is in reverse - # order, so the first match should work). - found = False - for patch_path, patch_asn in patches: - if spec_path[:len(patch_path)] == patch_path: - # When found, it must match whatever the result was patched - # to. - self.assertEqual(new_asn, patch_asn) - found = True - break - # And such a patch must exist. - self.assertTrue(found) - -if __name__ == '__main__': - unittest.main() diff --git a/contrib/seeds/makeseeds.py b/contrib/seeds/makeseeds.py index 79bb178b21..7fd0f7158e 100755 --- a/contrib/seeds/makeseeds.py +++ b/contrib/seeds/makeseeds.py @@ -9,11 +9,14 @@ import argparse import collections import ipaddress +from pathlib import Path import re import sys from typing import Union -from asmap import ASMap, net_to_prefix +asmap_dir = Path(__file__).parent.parent / "asmap" +sys.path.append(str(asmap_dir)) +from asmap import ASMap, net_to_prefix # noqa: E402 NSEEDS=512 |