From c7f8bbea77865d92b6e7924b7de436831c0cf14a Mon Sep 17 00:00:00 2001 From: David Blacka Date: Tue, 10 Jun 2008 23:04:36 -0400 Subject: [PATCH 1/1] checkpoint --- rwhoisd/Cidr.py | 285 +++++++++++++++++++++++++++--------------------- 1 file changed, 162 insertions(+), 123 deletions(-) diff --git a/rwhoisd/Cidr.py b/rwhoisd/Cidr.py index 68f0054..fcf4dcd 100644 --- a/rwhoisd/Cidr.py +++ b/rwhoisd/Cidr.py @@ -17,11 +17,59 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA -import socket, types, copy, bisect, re, struct +import socket, types, copy, bisect, struct class Cidr: """A class representing a generic CIDRized network value.""" + @staticmethod + def create(address, netlen = -1): + """Construct either a CidrV4 or CidrV6 object.""" + if isinstance(address, int): + return CidrV4(address, netlen) + if isinstance(address, long): + if address <= pow(2, 32): + return CidrV4(address, netlen) + return CidrV6(address, netlen) + if ":" in address: + return CidrV6(address, netlen) + return CidrV4(address, netlen) + + + def _initialize(self, address, netlen): + """This a common constructor that is used by the subclasses.""" + + if isinstance(address, int) or \ + isinstance(address, long) and netlen >= 0: + self.numaddr, self.netlen = address, netlen + self.addr = self._convert_ipaddr(address) + self.calc() + return + + if not self.is_valid_cidr(address): + raise ValueError, \ + repr(address) + " is not a valid CIDR representation" + + if netlen < 0: + if type(address) == types.StringType: + if "/" in address: + self.addr, self.netlen = address.split("/", 1) + else: + self.addr, self.netlen = address, self._max_netlen() + elif type(address) == types.TupleType: + self.addr, self.netlen = address + else: + raise TypeError, "address must be a string or a tuple" + else: + self.addr, self.netlen = address, netlen + + + # convert string network lengths to integer + if type(self.netlen) == types.StringType: + self.netlen = int(self.netlen) + + self.calc() + def __str__(self): return self.addr + "/" + str(self.netlen) @@ -34,7 +82,7 @@ class Cidr: is, supernets will sort before subnets. This ordering allows for an efficient search for subnets of a given network.""" - res = self._base_mask(self.numaddr) - self._base_mask(other.numaddr) + res = self._base_mask(self.numaddr) - other._base_mask(other.numaddr) if res == 0: res = self.netlen - other.netlen if res < 0: return -1 if res > 0: return 1 @@ -45,8 +93,9 @@ class Cidr: internal state: netlen or numaddr.""" # make sure the network length is valid - if not self._is_valid_netlen(netlen): - raise TypeError, "network length must be between 0 and %d" % (_max_netlen()) + if not self.is_valid_netlen(self.netlen): + raise TypeError, "network length must be between 0 and %d" % \ + (self._max_netlen()) # convert the string ipv4 address to a 32bit number self.numaddr = self._convert_ipstr(self.addr) @@ -76,7 +125,7 @@ class Cidr: def length(self): """return the length (in number of addresses) of this network block""" - return netlen_to_length(self.netlen) + return 1 << (self._max_netlen() - self.netlen); def end(self): """return the last IP address in this network block""" @@ -88,6 +137,24 @@ class Cidr: def clone(self): # we can get away with a shallow copy (so far) return copy.copy(self) + + def is_ipv6(self): + if isinstance(self, CidrV6): return True + return False + + def is_valid_cidr(self, address): + if "/" in address: + addr, netlen = address.split("/", 1) + netlen = int(netlen) + else: + addr, netlen = address, 0 + return self._is_valid_address(addr) and self.is_valid_netlen(netlen) + + def is_valid_netlen(self, netlen): + if netlen < 0: return False + if netlen > self._max_netlen(): return False + return True + class CidrV4(Cidr): """A class representing a CIDRized IPv4 network value. @@ -95,50 +162,21 @@ class CidrV4(Cidr): Specifically, it is representing a contiguous IPv4 network block that can be expressed as a ip-address/network-length pair.""" - # FIXME: we should probably actually make this class immutable and - # add methods that generate copies of this class with different - # netlens or whatever. - - ip4addr_re = re.compile("^\d{1,3}(\.\d{1,3}){0,3}(/\d{1,2})?$") - def __init__(self, address, netlen = -1): """This takes either a formatted string in CIDR notation: (e.g., "127.0.0.1/32"), A tuple consisting of an formatting string IPv4 address and a numeric network length, or the same as two arguments.""" - - # if we are handing a numerical address and netlen, convert - # them directly. - if isinstance(address, int) and netlen >= 0: - self.netlen = netlen - self.numaddr = address - self.addr = self._convert_ipaddr(self.numaddr); - self.calc() - return - if not CidrV4.ip4addr_re.search(address): - raise ValueError, repr(address) + \ - " is not a valid CIDR representation" + self._initialize(address, netlen) - if netlen < 0: - if type(address) == types.StringType: - if "/" in address: - self.addr, self.netlen = address.split("/", 1) - else: - self.addr, self.netlen = address, 32 - elif type(address) == types.TupleType: - self.addr, self.netlen = address - else: - raise TypeError, "address must be a string or a tuple" - else: - self.addr = address - self.netlen = netlen - - # convert string network lengths to integer - if type(self.netlen) == types.StringType: - self.netlen = int(self.netlen) - - self.calc() + def _is_valid_address(self, address): + """Returns True if the address is a legal IPv4 address.""" + try: + self._convert_ipstr(address) + return True + except socket.error: + return False def _base_mask(self, numaddr): return numaddr & 0xFFFFFFFFL @@ -146,23 +184,17 @@ class CidrV4(Cidr): def _max_netlen(self): return 32 - def _is_valid_netlen(self, netlen): - if self.netlen < 0: return False - if self.netlen > _max_netlen(): return False - return True - def _convert_ipstr(self, addr): - return socket.inet_aton(addr) + packed_numaddr = socket.inet_aton(addr) + return struct.unpack("!I", packed_numaddr)[0] def _convert_ipaddr(self, numaddr): - res = str((numaddr & 0xFF000000) >> 24 & 0xFF) + "." + \ - str((numaddr & 0x00FF0000) >> 16) + "." + \ - str((numaddr & 0x0000FF00) >> 8) + "." + \ - str(numaddr & 0x000000FF) - return res + packed_numaddr = struct.pack("!I", numaddr) + return socket.inet_ntoa(packed_numaddr) def _mask(self, len): - return 0xFFFFFFFF << (32 - len) + return self._base_mask(0xFFFFFFFF << (32 - len)) + class CidrV6(Cidr): """A class representing a CIDRized IPv6 network value. @@ -170,84 +202,61 @@ class CidrV6(Cidr): Specifically, it is representing a contiguous IPv6 network block that can be expressed as a ipv6-address/network-length pair.""" - ip6addr_re = re.compile("^[\da-f]{1,4}(:[\da-f]{1,4}){0,7}(::[\da-f])?(/\d{1,3})?$", re.I) - ip6_base_mask = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFL # 128-bits of all ones. + ip6_base_mask = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFL # 128-bits of all ones. + ip6_lower_mask = 0x0000000000000000FFFFFFFFFFFFFFFFL + ip6_upper_mask = 0xFFFFFFFFFFFFFFFF0000000000000000L def __init__(self, address, netlen = -1): - if isinstance(address, long) and netlen >= 0: - self.netlen = netlen - self.numaddr = address - self.addr = self._convert_ipaddr(address) - self.calc() - return - - if not CidrV6.ip6addr_re.search(address): - raise ValueError, repr(address) + \ - "is not a valid CIDR representation" - - if netlen < 0: - if type(address) == types.StringType: - if "/" in address: - self.addr, self.netlen = address.split("/", 1) - else: - self.addr, self.netlen = address, 128 - elif type(address) == types.TupleType: - self.addr, self.netlen = address - else: - raise TypeError, "address must be a string or a tuple" - else: - self.addr = address - self.netlen = netlen + self._initialize(address, netlen) - if type(self.netlen) == type.StringType: - self.netlen = int(self.netlen) - - self.calc() + def _is_valid_address(self, address): + try: + self._convert_ipstr(address) + return True + except socket.error, e: + print "Failed to convert address string '%s': " + str(e) % (address) + return False def _base_mask(self, numaddr): return numaddr & CidrV6.ip6_base_mask + def _max_netlen(self): + return 128 + def _convert_ipstr(self, addr): packed_numaddr = socket.inet_pton(socket.AF_INET6, addr) upper, lower = struct.unpack("!QQ", packed_numaddr); - numaddr = (upper << 64) | lower + return (upper << 64) | lower def _convert_ipaddr(self, numaddr): - upper = (numaddr & (ip6_base_mask << 64)) >> 64; - lower = numaddr & (ip6_base_mask >> 64) + upper = (numaddr & CidrV6.ip6_upper_mask) >> 64; + lower = numaddr & CidrV6.ip6_lower_mask; packed_numaddr = struct.pack("!QQ", upper, lower) return socket.inet_ntop(socket.AF_INET6, packed_numaddr) def _mask(self, len): - return ip6_base_mask << (128 - len) + return self._base_mask(CidrV6.ip6_base_mask << (128 - len)) def valid_cidr(address): - """Returns the converted Cidr object if 'address' is valid - CIDR notation, False if not. For the purposes of this module, - valid CIDR notation consists of 1 to 4 octets separated by '.' - characters, with an optional trailing "/netlen".""" + """Returns the converted Cidr object if 'address' is valid CIDR + notation, False if not. For the purposes of this module, valid + CIDR notation consists of a IPv4 or IPv6 address with an optional + trailing "/netlen".""" if isinstance(address, Cidr): return address try: - c = Cidr(address) + c = Cidr.create(address) return c - except: + except (ValueError, socket.error): return False - -def netlen_to_length(netlen): - """Convert a network-length to the length of the block in ip - addresses.""" - - return 1 << (32 - netlen); - def netblock_to_cidr(start, end): """Convert an arbitrary network block expressed as a start and end address (inclusive) into a series of valid CIDR blocks.""" - def largest_prefix(length): + def largest_prefix_v4(length): # calculates the largest network length (smallest mask length) # that can fit within the block length. i = 1; v = length @@ -255,10 +264,17 @@ def netblock_to_cidr(start, end): if v & 0x80000000: break i += 1; v <<= 1 return i - def netlen_to_mask(n): + def largest_prefix_v6(length): + i = 1; v = length + while i <= 128: + if v & 0x80000000000000000000000000000000L: break + i += 1; v <<= 1 + return i + def netlen_to_mask_v4(n): # convert the network length into its netmask return ~((1 << (32 - n)) - 1) - + def netlen_to_mask_v6(n): + return ~((1 << (128 -n)) - 1) # convert the start and ending addresses of the netblock to Cidr # object, mostly so we can get the numeric versions of their @@ -266,10 +282,22 @@ def netblock_to_cidr(start, end): cs = valid_cidr(start) ce = valid_cidr(end) - # if either the start or ending addresses aren't valid ipv4 - # address, quit now. + # if either the start or ending addresses aren't valid addresses, + # quit now. if not cs or not ce: + print "Invalid start or end address" return None + # if the start and ending addresses aren't in the same family, quit now + if cs.is_ipv6() != ce.is_ipv6(): + print "start and end address not same family" + return None + + if cs.is_ipv6(): + largest_prefix = largest_prefix_v6 + netlen_to_mask = netlen_to_mask_v6 + else: + largest_prefix = largest_prefix_v4 + netlen_to_mask = netlen_to_mask_v4 # calculate the number of IP address in the netblock block_len = ce.numaddr - cs.numaddr @@ -286,8 +314,8 @@ def netblock_to_cidr(start, end): netlen += 1 continue # otherwise, we have a valid CIDR block, so add it to the list - cv = Cidr(s, netlen) - res.append(Cidr(s, netlen)) + cv = Cidr.create(s, netlen) + res.append(Cidr.create(s, netlen)) # and setup for the next round: cur_len = netlen_to_length(netlen) s += cur_len @@ -297,36 +325,47 @@ def netblock_to_cidr(start, end): # test driver if __name__ == "__main__": - a = Cidr("127.00.000.1/24") - b = Cidr("127.0.0.1", 32) - c = Cidr("24.232.119.192", 26) - d = Cidr("24.232.119.0", 24) - e = Cidr("24.224.0.0", 11) - f = Cidr("216.168.111.0/27"); - g = Cidr("127.0.0.2/31"); - h = Cidr("127.0.0.16/32") + import sys + a = Cidr.create("127.00.000.1/24") + b = Cidr.create("127.0.0.1", 32) + c = Cidr.create("24.232.119.192", 26) + d = Cidr.create("24.232.119.0", 24) + e = Cidr.create("24.224.0.0", 11) + f = Cidr.create("216.168.111.0/27"); + g = Cidr.create("127.0.0.2/31"); + h = Cidr.create("127.0.0.16/32") + i = Cidr.create("3ffe:4:201e:beef::0/64"); + j = Cidr.create("2001:3c01::/32") + print f.addr + print j.addr try: - bad = Cidr("24.261.119.0", 32) - except Warning, x: - print "warning:", x + bad = Cidr.create("24.261.119.0", 32) + except ValueError, x: + print "error:", x print "cidr:", a, "num addresses:", a.length(), "ending address", \ - a.end(), "netmask", a.netmask() - - clist = [a, b, c, d, e, f, g, h] + a.end(), "netmask", a.netmask() + + print "cidr:", j, "num addresses:", j.length(), "ending address", \ + j.end(), "netmask", j.netmask() + + clist = [a, b, c, d, e, f, g, h, i , j] print "unsorted list of cidr objects:\n ", clist + clist.sort() print "sorted list of cidr object:\n ", clist + netblocks = [ ("192.168.10.0", "192.168.10.255"), ("192.168.10.0", "192.168.10.63"), ("172.16.0.0", "172.16.127.255"), ("24.33.41.22", "24.33.41.37"), ("196.11.1.0", "196.11.30.255"), - ("192.247.1.0", "192.247.10.255")] + ("192.247.1.0", "192.247.10.255"), + ("3ffe:4:5::", "3ffe:4:5::ffff") ] for start, end in netblocks: print "netblock %s - %s:" % (start, end) -- 2.36.6