X-Git-Url: https://blacka.com/cgi-bin/gitweb.cgi?p=python-rwhoisd.git;a=blobdiff_plain;f=rwhoisd%2FCidr.py;h=88d46d3692ffa33d477b50634716b5bdd65d1832;hp=fcf4dcd7bfe6a3beea19a39670976add9b8c3b7d;hb=HEAD;hpb=c7f8bbea77865d92b6e7924b7de436831c0cf14a diff --git a/rwhoisd/Cidr.py b/rwhoisd/Cidr.py index fcf4dcd..88d46d3 100644 --- a/rwhoisd/Cidr.py +++ b/rwhoisd/Cidr.py @@ -18,23 +18,27 @@ # USA import socket, types, copy, bisect, struct +import v6addr -class Cidr: - """A class representing a generic CIDRized network value.""" +def new(address, netlen = -1): + """Construct either a CidrV4 or CidrV6 object.""" - @staticmethod - def create(address, netlen = -1): - """Construct either a CidrV4 or CidrV6 object.""" - if isinstance(address, int): - return CidrV4(address, netlen) - if isinstance(address, long): - if address <= pow(2, 32): - return CidrV4(address, netlen) - return CidrV6(address, netlen) - if ":" in address: - return CidrV6(address, netlen) + # ints are probably v4 addresses. + if isinstance(address, int): return CidrV4(address, netlen) + # longs could be v4 addresses, but we will only assume so if the + # value is small. + if isinstance(address, long): + if address <= pow(2, 32): + return CidrV4(address, netlen) + return CidrV6(address, netlen) + # otherwise, a colon in the address is a dead giveaway. + if ":" in address: + return CidrV6(address, netlen) + return CidrV4(address, netlen) +class Cidr: + """A class representing a generic CIDRized network value.""" def _initialize(self, address, netlen): """This a common constructor that is used by the subclasses.""" @@ -122,7 +126,7 @@ class Cidr: def netmask(self): """return the netmask of this Cidr network""" return self._convert_ipaddr(self.mask) - + def length(self): """return the length (in number of addresses) of this network block""" return 1 << (self._max_netlen() - self.netlen); @@ -137,7 +141,7 @@ class Cidr: def clone(self): # we can get away with a shallow copy (so far) return copy.copy(self) - + def is_ipv6(self): if isinstance(self, CidrV6): return True return False @@ -162,14 +166,17 @@ class CidrV4(Cidr): Specifically, it is representing a contiguous IPv4 network block that can be expressed as a ip-address/network-length pair.""" + base_mask = 0xFFFFFFFF + msb_mask = 0x80000000 + def __init__(self, address, netlen = -1): """This takes either a formatted string in CIDR notation: (e.g., "127.0.0.1/32"), A tuple consisting of an formatting string IPv4 address and a numeric network length, or the same as two arguments.""" - + self._initialize(address, netlen) - + def _is_valid_address(self, address): """Returns True if the address is a legal IPv4 address.""" try: @@ -179,7 +186,7 @@ class CidrV4(Cidr): return False def _base_mask(self, numaddr): - return numaddr & 0xFFFFFFFFL + return numaddr & CidrV4.base_mask def _max_netlen(self): return 32 @@ -193,21 +200,21 @@ class CidrV4(Cidr): return socket.inet_ntoa(packed_numaddr) def _mask(self, len): - return self._base_mask(0xFFFFFFFF << (32 - len)) - + return self._base_mask(CidrV4.base_mask << (32 - len)) class CidrV6(Cidr): """A class representing a CIDRized IPv6 network value. Specifically, it is representing a contiguous IPv6 network block that can be expressed as a ipv6-address/network-length pair.""" - - ip6_base_mask = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFL # 128-bits of all ones. - ip6_lower_mask = 0x0000000000000000FFFFFFFFFFFFFFFFL - ip6_upper_mask = 0xFFFFFFFFFFFFFFFF0000000000000000L + + base_mask = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFL # 128-bits of all ones. + msb_mask = 0x80000000000000000000000000000000L + lower_mask = 0x0000000000000000FFFFFFFFFFFFFFFFL + upper_mask = 0xFFFFFFFFFFFFFFFF0000000000000000L def __init__(self, address, netlen = -1): - + self._initialize(address, netlen) def _is_valid_address(self, address): @@ -219,7 +226,7 @@ class CidrV6(Cidr): return False def _base_mask(self, numaddr): - return numaddr & CidrV6.ip6_base_mask + return numaddr & CidrV6.base_mask def _max_netlen(self): return 128 @@ -228,15 +235,15 @@ class CidrV6(Cidr): packed_numaddr = socket.inet_pton(socket.AF_INET6, addr) upper, lower = struct.unpack("!QQ", packed_numaddr); return (upper << 64) | lower - + def _convert_ipaddr(self, numaddr): - upper = (numaddr & CidrV6.ip6_upper_mask) >> 64; - lower = numaddr & CidrV6.ip6_lower_mask; + upper = (numaddr & CidrV6.upper_mask) >> 64; + lower = numaddr & CidrV6.lower_mask; packed_numaddr = struct.pack("!QQ", upper, lower) return socket.inet_ntop(socket.AF_INET6, packed_numaddr) def _mask(self, len): - return self._base_mask(CidrV6.ip6_base_mask << (128 - len)) + return self._base_mask(CidrV6.base_mask << (128 - len)) def valid_cidr(address): @@ -247,7 +254,7 @@ def valid_cidr(address): if isinstance(address, Cidr): return address try: - c = Cidr.create(address) + c = new(address) return c except (ValueError, socket.error): return False @@ -256,25 +263,19 @@ def netblock_to_cidr(start, end): """Convert an arbitrary network block expressed as a start and end address (inclusive) into a series of valid CIDR blocks.""" - def largest_prefix_v4(length): + def largest_prefix(length, max_netlen, msb_mask): # calculates the largest network length (smallest mask length) # that can fit within the block length. i = 1; v = length - while i <= 32: - if v & 0x80000000: break - i += 1; v <<= 1 - return i - def largest_prefix_v6(length): - i = 1; v = length - while i <= 128: - if v & 0x80000000000000000000000000000000L: break + while i <= max_netlen: + if v & msb_mask: break i += 1; v <<= 1 return i - def netlen_to_mask_v4(n): + def netlen_to_mask(n, max_netlen, base_mask): # convert the network length into its netmask - return ~((1 << (32 - n)) - 1) - def netlen_to_mask_v6(n): - return ~((1 << (128 -n)) - 1) + return ~((1 << (max_netlen - n)) - 1) & base_mask + def netlen_to_length(n, max_netlen, base_mask): + return 1 << (max_netlen - n) & base_mask # convert the start and ending addresses of the netblock to Cidr # object, mostly so we can get the numeric versions of their @@ -285,66 +286,59 @@ def netblock_to_cidr(start, end): # if either the start or ending addresses aren't valid addresses, # quit now. if not cs or not ce: - print "Invalid start or end address" return None # if the start and ending addresses aren't in the same family, quit now if cs.is_ipv6() != ce.is_ipv6(): - print "start and end address not same family" return None - - if cs.is_ipv6(): - largest_prefix = largest_prefix_v6 - netlen_to_mask = netlen_to_mask_v6 - else: - largest_prefix = largest_prefix_v4 - netlen_to_mask = netlen_to_mask_v4 + + max_netlen = cs._max_netlen() + msb_mask = cs.msb_mask + base_mask = cs.base_mask # calculate the number of IP address in the netblock block_len = ce.numaddr - cs.numaddr - # calcuate the largest CIDR block size that fits - netlen = largest_prefix(block_len + 1) - + netlen = largest_prefix(block_len + 1, max_netlen, msb_mask) + res = []; s = cs.numaddr while block_len > 0: - mask = netlen_to_mask(netlen) + mask = netlen_to_mask(netlen, max_netlen, base_mask) # check to see if our current network length is valid if (s & mask) != s: # if not, shrink the network block size netlen += 1 continue # otherwise, we have a valid CIDR block, so add it to the list - cv = Cidr.create(s, netlen) - res.append(Cidr.create(s, netlen)) + res.append(new(s, netlen)) # and setup for the next round: - cur_len = netlen_to_length(netlen) + cur_len = netlen_to_length(netlen, max_netlen, base_mask) s += cur_len block_len -= cur_len - netlen = largest_prefix(block_len + 1) + netlen = largest_prefix(block_len + 1, max_netlen, msb_mask) return res # test driver if __name__ == "__main__": import sys - a = Cidr.create("127.00.000.1/24") - b = Cidr.create("127.0.0.1", 32) - c = Cidr.create("24.232.119.192", 26) - d = Cidr.create("24.232.119.0", 24) - e = Cidr.create("24.224.0.0", 11) - f = Cidr.create("216.168.111.0/27"); - g = Cidr.create("127.0.0.2/31"); - h = Cidr.create("127.0.0.16/32") - i = Cidr.create("3ffe:4:201e:beef::0/64"); - j = Cidr.create("2001:3c01::/32") + a = new("127.00.000.1/24") + b = new("127.0.0.1", 32) + c = new("24.232.119.192", 26) + d = new("24.232.119.0", 24) + e = new("24.224.0.0", 11) + f = new("216.168.111.0/27"); + g = new("127.0.0.2/31"); + h = new("127.0.0.16/32") + i = new("3ffe:4:201e:beef::0/64"); + j = new("2001:3c01::/32") print f.addr print j.addr - + try: - bad = Cidr.create("24.261.119.0", 32) + bad = new("24.261.119.0", 32) except ValueError, x: print "error:", x - + print "cidr:", a, "num addresses:", a.length(), "ending address", \ a.end(), "netmask", a.netmask() @@ -358,6 +352,15 @@ if __name__ == "__main__": clist.sort() print "sorted list of cidr object:\n ", clist + k = new("2001:3c01::1:0", 120) + print "supernet: ", str(j), " supernet of ", str(k), "? ", \ + str(j.is_supernet(k)) + print "supernet: ", str(k), " supernet of ", str(j), "? ", \ + str(k.is_supernet(j)) + print "subnet: ", str(j), " subnet of ", str(k), "? ", \ + str(j.is_subnet(k)) + print "subnet: ", str(k), " subnet of ", str(j), "? ", \ + str(k.is_subnet(j)) netblocks = [ ("192.168.10.0", "192.168.10.255"), ("192.168.10.0", "192.168.10.63"), @@ -365,8 +368,10 @@ if __name__ == "__main__": ("24.33.41.22", "24.33.41.37"), ("196.11.1.0", "196.11.30.255"), ("192.247.1.0", "192.247.10.255"), - ("3ffe:4:5::", "3ffe:4:5::ffff") ] - + ("10.131.43.3", "10.131.44.7"), + ("3ffe:4:5::", "3ffe:4:5::ffff"), + ("3ffe:4:5::", "3ffe:4:6::1")] + for start, end in netblocks: print "netblock %s - %s:" % (start, end) blocks = netblock_to_cidr(start, end)