checkpoint
[python-rwhoisd.git] / rwhoisd / Cidr.py
1 # This file is part of python-rwhoisd
2 #
3 # Copyright (C) 2003, 2008 David E. Blacka
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18 # USA
19
20 import socket, types, copy, bisect, struct
21
22 class Cidr:
23     """A class representing a generic CIDRized network value."""    
24
25     @staticmethod
26     def create(address, netlen = -1):
27         """Construct either a CidrV4 or CidrV6 object."""
28         if isinstance(address, int):
29             return CidrV4(address, netlen)
30         if isinstance(address, long):
31             if address <= pow(2, 32):
32                 return CidrV4(address, netlen)
33             return CidrV6(address, netlen)
34         if ":" in address:
35             return CidrV6(address, netlen)
36         return CidrV4(address, netlen)
37
38
39     def _initialize(self, address, netlen):
40         """This a common constructor that is used by the subclasses."""
41
42         if isinstance(address, int) or \
43                 isinstance(address, long) and netlen >= 0:
44             self.numaddr, self.netlen = address, netlen
45             self.addr = self._convert_ipaddr(address)
46             self.calc()
47             return
48
49         if not self.is_valid_cidr(address):
50             raise ValueError, \
51                 repr(address) + " is not a valid CIDR representation"
52
53         if netlen < 0:
54             if type(address) == types.StringType:
55                 if "/" in address:
56                     self.addr, self.netlen = address.split("/", 1)
57                 else:
58                     self.addr, self.netlen = address, self._max_netlen()
59             elif type(address) == types.TupleType:
60                 self.addr, self.netlen = address
61             else:
62                 raise TypeError, "address must be a string or a tuple"
63         else:
64             self.addr, self.netlen = address, netlen
65
66
67         # convert string network lengths to integer
68         if type(self.netlen) == types.StringType:
69             self.netlen = int(self.netlen)
70
71         self.calc()
72
73     def __str__(self):
74         return self.addr + "/" + str(self.netlen)
75
76     def __repr__(self):
77         return "<" + str(self) + ">"
78
79     def __cmp__(self, other):
80         """One CIDR network block is less than another if the start
81         address is numerically less or if the block is larger.  That
82         is, supernets will sort before subnets.  This ordering allows
83         for an efficient search for subnets of a given network."""
84
85         res = self._base_mask(self.numaddr) - other._base_mask(other.numaddr)
86         if res == 0: res = self.netlen - other.netlen
87         if res < 0: return -1
88         if res > 0: return 1
89         return 0
90
91     def calc(self):
92         """This method should be called after any change to the main
93         internal state: netlen or numaddr."""
94
95         # make sure the network length is valid
96         if not self.is_valid_netlen(self.netlen):
97             raise TypeError, "network length must be between 0 and %d" % \
98                 (self._max_netlen())
99
100         # convert the string ipv4 address to a 32bit number
101         self.numaddr = self._convert_ipstr(self.addr)
102         # calculate our netmask
103         self.mask = self._mask(self.netlen)
104         # force the cidr address into correct masked notation
105         self.numaddr &= self.mask
106
107         # convert the number back to a string to normalize the string
108         self.addr = self._convert_ipaddr(self.numaddr)
109
110     def is_supernet(self, other):
111         """returns True if the other Cidr object is a supernet (an
112         enclosing network block) of this one.  A Cidr object is a
113         supernet of itself."""
114         return other.numaddr & self.mask == self.numaddr
115
116     def is_subnet(self, other):
117         """returns True if the other Cidr object is a subnet (an
118         enclosednetwork block) of this one.  A Cidr object is a
119         subnet of itself."""
120         return self.numaddr & other.mask == other.numaddr
121
122     def netmask(self):
123         """return the netmask of this Cidr network"""
124         return self._convert_ipaddr(self.mask)
125     
126     def length(self):
127         """return the length (in number of addresses) of this network block"""
128         return 1 << (self._max_netlen() - self.netlen);
129
130     def end(self):
131         """return the last IP address in this network block"""
132         return self._convert_ipaddr(self.numaddr + self.length() - 1)
133
134     def to_netblock(self):
135         return (self.addr, self.end())
136
137     def clone(self):
138         # we can get away with a shallow copy (so far)
139         return copy.copy(self)
140     
141     def is_ipv6(self):
142         if isinstance(self, CidrV6): return True
143         return False
144
145     def is_valid_cidr(self, address):
146         if "/" in address:
147             addr, netlen = address.split("/", 1)
148             netlen = int(netlen)
149         else:
150             addr, netlen = address, 0
151         return self._is_valid_address(addr) and self.is_valid_netlen(netlen)
152
153     def is_valid_netlen(self, netlen):
154         if netlen < 0: return False
155         if netlen > self._max_netlen(): return False
156         return True
157
158
159 class CidrV4(Cidr):
160     """A class representing a CIDRized IPv4 network value.
161
162     Specifically, it is representing a contiguous IPv4 network block
163     that can be expressed as a ip-address/network-length pair."""
164
165     def __init__(self, address, netlen = -1):
166         """This takes either a formatted string in CIDR notation:
167         (e.g., "127.0.0.1/32"), A tuple consisting of an formatting
168         string IPv4 address and a numeric network length, or the same
169         as two arguments."""
170         
171         self._initialize(address, netlen)
172         
173     def _is_valid_address(self, address):
174         """Returns True if the address is a legal IPv4 address."""
175         try:
176             self._convert_ipstr(address)
177             return True
178         except socket.error:
179             return False
180
181     def _base_mask(self, numaddr):
182         return numaddr & 0xFFFFFFFFL
183
184     def _max_netlen(self):
185         return 32
186
187     def _convert_ipstr(self, addr):
188         packed_numaddr = socket.inet_aton(addr)
189         return struct.unpack("!I", packed_numaddr)[0]
190
191     def _convert_ipaddr(self, numaddr):
192         packed_numaddr = struct.pack("!I", numaddr)
193         return socket.inet_ntoa(packed_numaddr)
194
195     def _mask(self, len):
196         return self._base_mask(0xFFFFFFFF << (32 - len))
197         
198
199 class CidrV6(Cidr):
200     """A class representing a CIDRized IPv6 network value.
201
202     Specifically, it is representing a contiguous IPv6 network block
203     that can be expressed as a ipv6-address/network-length pair."""
204     
205     ip6_base_mask  = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFL # 128-bits of all ones.
206     ip6_lower_mask = 0x0000000000000000FFFFFFFFFFFFFFFFL
207     ip6_upper_mask = 0xFFFFFFFFFFFFFFFF0000000000000000L
208
209     def __init__(self, address, netlen = -1):
210         
211         self._initialize(address, netlen)
212
213     def _is_valid_address(self, address):
214         try:
215             self._convert_ipstr(address)
216             return True
217         except socket.error, e:
218             print "Failed to convert address string '%s': " + str(e) % (address)
219             return False
220
221     def _base_mask(self, numaddr):
222         return numaddr & CidrV6.ip6_base_mask
223
224     def _max_netlen(self):
225         return 128
226
227     def _convert_ipstr(self, addr):
228         packed_numaddr = socket.inet_pton(socket.AF_INET6, addr)
229         upper, lower = struct.unpack("!QQ", packed_numaddr);
230         return (upper << 64) | lower
231     
232     def _convert_ipaddr(self, numaddr):
233         upper = (numaddr & CidrV6.ip6_upper_mask) >> 64;
234         lower = numaddr & CidrV6.ip6_lower_mask;
235         packed_numaddr = struct.pack("!QQ", upper, lower)
236         return socket.inet_ntop(socket.AF_INET6, packed_numaddr)
237
238     def _mask(self, len):
239         return self._base_mask(CidrV6.ip6_base_mask << (128 - len))
240
241
242 def valid_cidr(address):
243     """Returns the converted Cidr object if 'address' is valid CIDR
244     notation, False if not.  For the purposes of this module, valid
245     CIDR notation consists of a IPv4 or IPv6 address with an optional
246     trailing "/netlen"."""
247
248     if isinstance(address, Cidr): return address
249     try:
250         c = Cidr.create(address)
251         return c
252     except (ValueError, socket.error):
253         return False
254
255 def netblock_to_cidr(start, end):
256     """Convert an arbitrary network block expressed as a start and end
257     address (inclusive) into a series of valid CIDR blocks."""
258
259     def largest_prefix_v4(length):
260         # calculates the largest network length (smallest mask length)
261         # that can fit within the block length.
262         i = 1; v = length
263         while i <= 32:
264             if v & 0x80000000: break
265             i += 1; v <<= 1
266         return i
267     def largest_prefix_v6(length):
268         i = 1; v = length
269         while i <= 128:
270             if v & 0x80000000000000000000000000000000L: break
271             i += 1; v <<= 1
272         return i
273     def netlen_to_mask_v4(n):
274         # convert the network length into its netmask
275         return ~((1 << (32 - n)) - 1)
276     def netlen_to_mask_v6(n):
277         return ~((1 << (128 -n)) - 1)
278
279     # convert the start and ending addresses of the netblock to Cidr
280     # object, mostly so we can get the numeric versions of their
281     # addresses.
282     cs = valid_cidr(start)
283     ce = valid_cidr(end)
284
285     # if either the start or ending addresses aren't valid addresses,
286     # quit now.
287     if not cs or not ce:
288         print "Invalid start or end address"
289         return None
290     # if the start and ending addresses aren't in the same family, quit now
291     if cs.is_ipv6() != ce.is_ipv6():
292         print "start and end address not same family"
293         return None
294     
295     if cs.is_ipv6():
296         largest_prefix = largest_prefix_v6
297         netlen_to_mask = netlen_to_mask_v6
298     else:
299         largest_prefix = largest_prefix_v4
300         netlen_to_mask = netlen_to_mask_v4
301
302     # calculate the number of IP address in the netblock
303     block_len = ce.numaddr - cs.numaddr
304     
305     # calcuate the largest CIDR block size that fits
306     netlen = largest_prefix(block_len + 1)
307     
308     res = []; s = cs.numaddr
309     while block_len > 0:
310         mask = netlen_to_mask(netlen)
311         # check to see if our current network length is valid
312         if (s & mask) != s:
313             # if not, shrink the network block size
314             netlen += 1
315             continue
316         # otherwise, we have a valid CIDR block, so add it to the list
317         cv = Cidr.create(s, netlen)
318         res.append(Cidr.create(s, netlen))
319         # and setup for the next round:
320         cur_len = netlen_to_length(netlen)
321         s         += cur_len
322         block_len -= cur_len
323         netlen = largest_prefix(block_len + 1)
324     return res
325
326 # test driver
327 if __name__ == "__main__":
328     import sys
329     a = Cidr.create("127.00.000.1/24")
330     b = Cidr.create("127.0.0.1", 32)
331     c = Cidr.create("24.232.119.192", 26)
332     d = Cidr.create("24.232.119.0", 24)
333     e = Cidr.create("24.224.0.0", 11)
334     f = Cidr.create("216.168.111.0/27");
335     g = Cidr.create("127.0.0.2/31");
336     h = Cidr.create("127.0.0.16/32")
337     i = Cidr.create("3ffe:4:201e:beef::0/64");
338     j = Cidr.create("2001:3c01::/32")
339
340     print f.addr
341     print j.addr
342     
343     try:
344         bad = Cidr.create("24.261.119.0", 32)
345     except ValueError, x:
346         print "error:", x
347     
348     print "cidr:", a, "num addresses:", a.length(), "ending address", \
349         a.end(), "netmask", a.netmask()
350
351     print "cidr:", j, "num addresses:", j.length(), "ending address", \
352         j.end(), "netmask", j.netmask()
353
354     clist = [a, b, c, d, e, f, g, h, i , j]
355     print "unsorted list of cidr objects:\n  ", clist
356
357
358     clist.sort()
359     print "sorted list of cidr object:\n  ", clist
360
361
362     netblocks = [ ("192.168.10.0", "192.168.10.255"),
363                   ("192.168.10.0", "192.168.10.63"),
364                   ("172.16.0.0", "172.16.127.255"),
365                   ("24.33.41.22", "24.33.41.37"),
366                   ("196.11.1.0", "196.11.30.255"),
367                   ("192.247.1.0", "192.247.10.255"),
368                   ("3ffe:4:5::", "3ffe:4:5::ffff") ]
369                   
370     for start, end in netblocks:
371         print "netblock %s - %s:" % (start, end)
372         blocks = netblock_to_cidr(start, end)
373         print blocks