From 2066af02d4144547e3531c16a5ede1340b97938b Mon Sep 17 00:00:00 2001
From: Guillaume Valadon <guillaume.valadon@ssi.gouv.fr>
Date: Fri, 8 Apr 2016 16:07:40 +0200
Subject: [PATCH] New read_routes6() function and unit tests

---
 scapy/arch/unix.py       | 152 +++++++++++++------
 scapy/utils6.py          |  10 ++
 test/mocked_functions.py | 315 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 428 insertions(+), 49 deletions(-)
 create mode 100644 test/mocked_functions.py

diff --git a/scapy/arch/unix.py b/scapy/arch/unix.py
index 949ff1ea..1449df37 100644
--- a/scapy/arch/unix.py
+++ b/scapy/arch/unix.py
@@ -12,10 +12,13 @@ from fcntl import ioctl
 from scapy.error import warning
 import scapy.config
 import scapy.utils
-import scapy.utils6
+from scapy.utils6 import in6_getscope, construct_source_candidate_set
+from scapy.utils6 import in6_isvalid, in6_ismlladdr, in6_ismnladdr
 import scapy.arch
 from scapy.config import conf
 
+import socket
+
 
 ##################
 ## Routes stuff ##
@@ -136,7 +139,7 @@ def _in6_getifaddr(ifname):
             continue
 
         # Get the scope and keep the address
-        scope = scapy.utils6.in6_getscope(addr)
+        scope = in6_getscope(addr)
         ret.append((addr, scope, ifname))
 
     return ret
@@ -181,65 +184,116 @@ def in6_getifaddr():
 	ret += _in6_getifaddr(i)
     return ret	    
 
+
 def read_routes6():
-    f = os.popen("netstat -rn -f inet6")
-    ok = False
+    """Return a list of IPv6 routes than can be used by Scapy."""
+
+    # Call netstat to retrieve IPv6 routes
+    fd_netstat = os.popen("netstat -rn -f inet6")
+
+    # List interfaces IPv6 addresses 
+    lifaddr = in6_getifaddr()
+    if not lifaddr:
+        return []
+
+    # Routes header information
+    got_header = False
     mtu_present = False
     prio_present = False
+
+    # Parse the routes
     routes = []
-    lifaddr = in6_getifaddr()
-    for l in f.readlines():
-        if not l:
-            break
-        l = l.strip()
-        if not ok:
-            if l.find("Destination") >= 0:
-                ok = 1
-                mtu_present = l.find("Mtu") >= 0
-                prio_present = l.find("Prio") >= 0
+    for line in fd_netstat.readlines():
+
+        # Parse the routes header and try to identify extra columns
+        if not got_header:
+            if "Destination" == line[:11]:
+                got_header = True
+                mtu_present = "Mtu" in line
+                prio_present = "Prio" in line
             continue
-        # gv 12/12/06: under debugging      
-        if scapy.arch.NETBSD or scapy.arch.OPENBSD:
-            lspl = l.split()
-            d,nh,fl = lspl[:3]
-            dev = lspl[5+mtu_present+prio_present]
-        else:       # FREEBSD or DARWIN 
-            d,nh,fl,dev = l.split()[:4]
-        if filter(lambda x: x[2] == dev, lifaddr) == []:
+
+        # Parse a route entry according to the operating system
+        splitted_line = line.split()
+        if scapy.arch.OPENBSD or scapy.arch.NETBSD:
+            index = 5 + mtu_present + prio_present
+            if len(splitted_line) < index:
+                warning("Not enough columns in route entry !")
+                continue
+            destination, next_hop, flags = splitted_line[:3]
+            dev = splitted_line[index]
+        else:
+            # FREEBSD or DARWIN 
+            if len(splitted_line) < 4:
+                warning("Not enough columns in route entry !")
+                continue
+            destination, next_hop, flags, dev = splitted_line[:4]
+
+        # Check flags
+        if not "U" in flags:  # usable route
+            continue
+        if "R" in flags:  # Host or net unrechable
             continue
-        if 'L' in fl: # drop MAC addresses
+        if "m" in flags:  # multicast address
+            # Note: multicast routing is handled in Route6.route()
             continue
 
-        if 'link' in nh:
-            nh = '::'
-
-        cset = [] # candidate set (possible source addresses)
-        dp = 128
-        if d == 'default':
-            d = '::'
-            dp = 0
-        if '/' in d:
-            d,dp = d.split("/")
-            dp = int(dp)
-        if '%' in d:
-            d,dev = d.split('%')
-        if '%' in nh:
-            nh,dev = nh.split('%')
-        if scapy.arch.LOOPBACK_NAME in dev:
-            cset = ['::1']
-            nh = '::'
-        else:
-            devaddrs = filter(lambda x: x[2] == dev, lifaddr)
-            cset = scapy.utils6.construct_source_candidate_set(d, dp, devaddrs, scapy.arch.LOOPBACK_NAME)
+        # Replace link with the default route in next_hop
+        if "link" in next_hop:
+            next_hop = "::"
 
-        if len(cset) != 0:
-            routes.append((d, dp, nh, dev, cset))
+        # Default prefix length
+        destination_plen = 128
 
-    f.close()
-    return routes
+        # Extract network interface from the zone id
+        if '%' in destination:
+            destination, dev = destination.split('%')
+            if '/' in dev:
+                # Example: fe80::%lo0/64 ; dev = "lo0/64"
+                dev, destination_plen = dev.split('/')
+        if '%' in next_hop:
+            next_hop, dev = next_hop.split('%')
 
+        # Ensure that the next hop is a valid IPv6 address
+        if not in6_isvalid(next_hop):
+            # Note: the 'Gateway' column might contain a MAC address
+            next_hop = "::"
 
-            
+        # Modify parsed routing entries
+        # Note: these rules are OS specific and may evolve over time
+        if destination == "default":
+            destination, destination_plen = "::", 0
+        elif '/' in destination:
+            # Example: fe80::/10
+            destination, destination_plen = destination.split('/')
+        if '/' in dev:
+            # Example: ff02::%lo0/32 ; dev = "lo0/32"
+            dev, destination_plen = dev.split('/')
 
+        # Check route entries parameters consistency
+        if not in6_isvalid(destination):
+            warning("Invalid destination IPv6 address in route entry !")
+            continue
+        try:
+            destination_plen = int(destination_plen)
+        except:
+            warning("Invalid IPv6 prefix length in route entry !")
+            continue
+        if in6_ismlladdr(destination) or in6_ismnladdr(destination):
+            # Note: multicast routing is handled in Route6.route()
+            continue
 
+        if scapy.arch.LOOPBACK_NAME in dev:
+            # Handle ::1 separately
+            cset = ["::1"]
+            next_hop = "::"
+        else:
+            # Get possible IPv6 source addresses
+            devaddrs = filter(lambda x: x[2] == dev, lifaddr)
+            cset = construct_source_candidate_set(destination, destination_plen, devaddrs, scapy.arch.LOOPBACK_NAME)
 
+        if len(cset):
+            routes.append((destination, destination_plen, next_hop, dev, cset))
+
+    fd_netstat.close()
+    return routes
diff --git a/scapy/utils6.py b/scapy/utils6.py
index 8e08872a..6a2ae83d 100644
--- a/scapy/utils6.py
+++ b/scapy/utils6.py
@@ -800,3 +800,13 @@ def in6_get_common_plen(a, b):
         if mbits != 8:
             return 8*i + mbits
     return 128
+
+def in6_isvalid(address):
+    """Return True if 'address' is a valid IPv6 address string, False
+       otherwise."""
+
+    try:
+        socket.inet_pton(socket.AF_INET6, address)
+        return True
+    except:
+        return False
diff --git a/test/mocked_functions.py b/test/mocked_functions.py
new file mode 100644
index 00000000..25e137e5
--- /dev/null
+++ b/test/mocked_functions.py
@@ -0,0 +1,315 @@
+"""
+Test functions mocking outputs
+"""
+
+import mock
+import StringIO
+
+from scapy.arch.unix import read_routes6, in6_getifaddr
+from scapy.data import IPV6_ADDR_LOOPBACK, IPV6_ADDR_LINKLOCAL
+from scapy.utils6 import in6_isvalid, in6_islladdr
+
+
+def valid_output_read_routes6(routes):
+    """"Return True if 'routes' contains correctly formatted entries, False
+        otherwise"""
+
+    for destination, plen, next_hop, dev, cset  in routes:
+        if not in6_isvalid(destination) or not type(plen) == int:
+            return False
+        if not in6_isvalid(next_hop) or not type(dev) == str:
+            return False
+        for address in cset:
+            if not in6_isvalid(address):
+                return False
+
+    return True
+
+
+def check_mandatory_ipv6_routes(routes6):
+    """Ensure that mandatory IPv6 routes are present"""
+
+    if len(filter(lambda r: r[0] == "::1" and r[-1] == ["::1"], routes6)) < 1:
+        return False
+
+    if len(filter(lambda r: r[0] == "fe80::" and r[1] == 64, routes6)) < 1:
+        return False
+
+    if len(filter(lambda r: in6_islladdr(r[0]) and r[1] == 128 and \
+            r[-1] == ["::1"], routes6)) < 1:
+        return False
+
+    return True
+
+
+@mock.patch("scapy.arch.unix.in6_getifaddr")
+@mock.patch("scapy.arch.unix.os")
+def test_osx_10_10_4(mock_os, mock_in6_getifaddr):
+    """Test read_routes6() on OS X 10.10.4"""
+
+    # 'netstat -rn -f inet6' output
+    netstat_output = """
+Routing tables
+
+Internet6:
+Destination                             Gateway                         Flags         Netif Expire
+::1                                     ::1                             UHL             lo0
+fe80::%lo0/64                           fe80::1%lo0                     UcI             lo0
+fe80::1%lo0                             link#1                          UHLI            lo0
+fe80::%en0/64                           link#4                          UCI             en0
+fe80::a00:27ff:fe9b:c965%en0            8:0:27:9b:c9:65                 UHLI            lo0
+ff01::%lo0/32                           ::1                             UmCI            lo0
+ff01::%en0/32                           link#4                          UmCI            en0
+ff02::%lo0/32                           ::1                             UmCI            lo0
+ff02::%en0/32                           link#4                          UmCI            en0
+"""
+
+    # Mocked file descriptor
+    strio = StringIO.StringIO(netstat_output)
+    mock_os.popen = mock.MagicMock(return_value=strio)
+    
+    # Mocked in6_getifaddr() output
+    mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"),
+                                       ("fe80::a00:27ff:fe9b:c965", IPV6_ADDR_LINKLOCAL, "en0")]
+
+    # Test the function
+    routes = read_routes6()
+    print netstat_output
+    for r in routes:
+        print r
+    assert(len(routes) == 5)
+    assert(check_mandatory_ipv6_routes(routes))
+
+
+@mock.patch("scapy.arch.unix.in6_getifaddr")
+@mock.patch("scapy.arch.unix.os")
+def test_osx_10_9_5(mock_os, mock_in6_getifaddr):
+    """Test read_routes6() on OS X 10.9.5"""
+
+    # 'netstat -rn -f inet6' output
+    netstat_output = """
+Routing tables
+
+Internet6:
+Destination                             Gateway                         Flags         Netif Expire
+::1                                     ::1                             UHL             lo0
+fe80::%lo0/64                           fe80::1%lo0                     UcI             lo0
+fe80::1%lo0                             link#1                          UHLI            lo0
+fe80::%en0/64                           link#4                          UCI             en0
+fe80::ba26:6cff:fe5f:4eee%en0           b8:26:6c:5f:4e:ee               UHLWIi          en0
+fe80::bae8:56ff:fe45:8ce6%en0           b8:e8:56:45:8c:e6               UHLI            lo0
+ff01::%lo0/32                           ::1                             UmCI            lo0
+ff01::%en0/32                           link#4                          UmCI            en0
+ff02::%lo0/32                           ::1                             UmCI            lo0
+ff02::%en0/32                           link#4                          UmCI            en0
+"""
+
+    # Mocked file descriptor
+    strio = StringIO.StringIO(netstat_output)
+    mock_os.popen = mock.MagicMock(return_value=strio)
+    
+    # Mocked in6_getifaddr() output
+    mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"),
+                                       ("fe80::ba26:6cff:fe5f:4eee", IPV6_ADDR_LINKLOCAL, "en0")]
+
+    # Test the function
+    routes = read_routes6()
+    print netstat_output
+    for r in routes:
+        print r
+    assert(len(routes) == 6)
+    assert(check_mandatory_ipv6_routes(routes))
+
+
+@mock.patch("scapy.arch.unix.in6_getifaddr")
+@mock.patch("scapy.arch.unix.os")
+def test_osx_10_9_5_global(mock_os, mock_in6_getifaddr):
+    """Test read_routes6() on OS X 10.9.5 with an IPv6 connectivity"""
+
+    # 'netstat -rn -f inet6' output
+    netstat_output = """
+Routing tables
+
+Internet6:
+Destination                             Gateway                         Flags         Netif Expire
+default                                 fe80::ba26:8aff:fe5f:4eef%en0   UGc             en0
+::1                                     ::1                             UHL             lo0
+2a01:ab09:7d:1f01::/64                  link#4                          UC              en0
+2a01:ab09:7d:1f01:420:205c:9fab:5be7    b8:e9:55:44:7c:e5               UHL             lo0
+2a01:ab09:7d:1f01:ba26:8aff:fe5f:4eef   b8:26:8a:5f:4e:ef               UHLWI           en0
+2a01:ab09:7d:1f01:bae9:55ff:fe44:7ce5   b8:e9:55:44:7c:e5               UHL             lo0
+fe80::%lo0/64                           fe80::1%lo0                     UcI             lo0
+fe80::1%lo0                             link#1                          UHLI            lo0
+fe80::%en0/64                           link#4                          UCI             en0
+fe80::5664:d9ff:fe79:4e00%en0           54:64:d9:79:4e:0                UHLWI           en0
+fe80::6ead:f8ff:fe74:945a%en0           6c:ad:f8:74:94:5a               UHLWI           en0
+fe80::a2f3:c1ff:fec4:5b50%en0           a0:f3:c1:c4:5b:50               UHLWI           en0
+fe80::ba26:8aff:fe5f:4eef%en0           b8:26:8a:5f:4e:ef               UHLWIir         en0
+fe80::bae9:55ff:fe44:7ce5%en0           b8:e9:55:44:7c:e5               UHLI            lo0
+ff01::%lo0/32                           ::1                             UmCI            lo0
+ff01::%en0/32                           link#4                          UmCI            en0
+ff02::%lo0/32                           ::1                             UmCI            lo
+"""
+
+    # Mocked file descriptor
+    strio = StringIO.StringIO(netstat_output)
+    mock_os.popen = mock.MagicMock(return_value=strio)
+
+    # Mocked in6_getifaddr() output
+    mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"),
+                                       ("fe80::ba26:6cff:fe5f:4eee", IPV6_ADDR_LINKLOCAL, "en0")]
+
+    # Test the function
+    routes = read_routes6()
+    assert(valid_output_read_routes6(routes))
+
+    for r in routes:
+        print r
+    assert(len(routes) == 11)
+    assert(check_mandatory_ipv6_routes(routes))
+
+
+@mock.patch("scapy.arch.unix.in6_getifaddr")
+@mock.patch("scapy.arch.unix.os")
+def test_freebsd_10_2(mock_os, mock_in6_getifaddr):
+    """Test read_routes6() on FreeBSD 10.2"""
+
+    # 'netstat -rn -f inet6' output
+    netstat_output = """
+Routing tables
+
+Internet6:
+Destination                       Gateway                       Flags      Netif Expire
+::/96                             ::1                           UGRS        lo0
+::1                               link#2                        UH          lo0
+::ffff:0.0.0.0/96                 ::1                           UGRS        lo0
+fe80::/10                         ::1                           UGRS        lo0
+fe80::%lo0/64                     link#2                        U           lo0
+fe80::1%lo0                       link#2                        UHS         lo0
+ff01::%lo0/32                     ::1                           U           lo0
+ff02::/16                         ::1                           UGRS        lo0
+ff02::%lo0/32                     ::1                           U           lo0
+"""
+
+    # Mocked file descriptor
+    strio = StringIO.StringIO(netstat_output)
+    mock_os.popen = mock.MagicMock(return_value=strio)
+    
+    # Mocked in6_getifaddr() output
+    mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0")]
+
+    # Test the function
+    routes = read_routes6()
+    print netstat_output
+    for r in routes:
+        print r
+    assert(len(routes) == 3)
+    assert(check_mandatory_ipv6_routes(routes))
+
+
+@mock.patch("scapy.arch.OPENBSD")
+@mock.patch("scapy.arch.unix.in6_getifaddr")
+@mock.patch("scapy.arch.unix.os")
+def test_openbsd_5_5(mock_os, mock_in6_getifaddr, mock_openbsd):
+    """Test read_routes6() on OpenBSD 5.5"""
+
+    # 'netstat -rn -f inet6' output
+    netstat_output = """
+Routing tables
+
+Internet6:
+Destination                        Gateway                        Flags   Refs      Use   Mtu  Prio Iface
+::/104                             ::1                            UGRS       0        0     -     8 lo0  
+::/96                              ::1                            UGRS       0        0     -     8 lo0  
+::1                                ::1                            UH        14        0 33144     4 lo0  
+::127.0.0.0/104                    ::1                            UGRS       0        0     -     8 lo0  
+::224.0.0.0/100                    ::1                            UGRS       0        0     -     8 lo0  
+::255.0.0.0/104                    ::1                            UGRS       0        0     -     8 lo0  
+::ffff:0.0.0.0/96                  ::1                            UGRS       0        0     -     8 lo0  
+2002::/24                          ::1                            UGRS       0        0     -     8 lo0  
+2002:7f00::/24                     ::1                            UGRS       0        0     -     8 lo0  
+2002:e000::/20                     ::1                            UGRS       0        0     -     8 lo0  
+2002:ff00::/24                     ::1                            UGRS       0        0     -     8 lo0  
+fe80::/10                          ::1                            UGRS       0        0     -     8 lo0  
+fe80::%em0/64                      link#1                         UC         0        0     -     4 em0  
+fe80::a00:27ff:fe04:59bf%em0       08:00:27:04:59:bf              UHL        0        0     -     4 lo0  
+fe80::%lo0/64                      fe80::1%lo0                    U          0        0     -     4 lo0  
+fe80::1%lo0                        link#3                         UHL        0        0     -     4 lo0  
+fec0::/10                          ::1                            UGRS       0        0     -     8 lo0  
+ff01::/16                          ::1                            UGRS       0        0     -     8 lo0  
+ff01::%em0/32                      link#1                         UC         0        0     -     4 em0  
+ff01::%lo0/32                      fe80::1%lo0                    UC         0        0     -     4 lo0  
+ff02::/16                          ::1                            UGRS       0        0     -     8 lo0  
+ff02::%em0/32                      link#1                         UC         0        0     -     4 em0  
+ff02::%lo0/32                      fe80::1%lo0                    UC         0        0     -     4 lo0 
+"""
+
+    # Mocked file descriptor
+    strio = StringIO.StringIO(netstat_output)
+    mock_os.popen = mock.MagicMock(return_value=strio)
+    
+    # Mocked in6_getifaddr() output
+    mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"),
+                                       ("fe80::a00:27ff:fe04:59bf", IPV6_ADDR_LINKLOCAL, "em0")]
+
+    # Mocked OpenBSD parsing behavior
+    mock_openbsd = True
+
+    # Test the function
+    routes = read_routes6()
+    for r in routes:
+        print r
+    assert(len(routes) == 5)
+    assert(check_mandatory_ipv6_routes(routes))
+
+
+@mock.patch("scapy.arch.NETBSD")
+@mock.patch("scapy.arch.unix.in6_getifaddr")
+@mock.patch("scapy.arch.unix.os")
+def test_netbsd_7_0(mock_os, mock_in6_getifaddr, mock_netbsd):
+    """Test read_routes6() on NetBSD 7.0"""
+
+    # 'netstat -rn -f inet6' output
+    netstat_output = """
+Routing tables
+
+Internet6:
+Destination                        Gateway                        Flags    Refs      Use    Mtu Interface
+::/104                             ::1                            UGRS        -        -      -  lo0
+::/96                              ::1                            UGRS        -        -      -  lo0
+::1                                ::1                            UH          -        -  33648  lo0
+::127.0.0.0/104                    ::1                            UGRS        -        -      -  lo0
+::224.0.0.0/100                    ::1                            UGRS        -        -      -  lo0
+::255.0.0.0/104                    ::1                            UGRS        -        -      -  lo0
+::ffff:0.0.0.0/96                  ::1                            UGRS        -        -      -  lo0
+2001:db8::/32                      ::1                            UGRS        -        -      -  lo0
+2002::/24                          ::1                            UGRS        -        -      -  lo0
+2002:7f00::/24                     ::1                            UGRS        -        -      -  lo0
+2002:e000::/20                     ::1                            UGRS        -        -      -  lo0
+2002:ff00::/24                     ::1                            UGRS        -        -      -  lo0
+fe80::/10                          ::1                            UGRS        -        -      -  lo0
+fe80::%wm0/64                      link#1                         UC          -        -      -  wm0
+fe80::acd1:3989:180e:fde0          08:00:27:a1:64:d8              UHL         -        -      -  lo0
+fe80::%lo0/64                      fe80::1                        U           -        -      -  lo0
+fe80::1                            link#2                         UHL         -        -      -  lo0
+ff01:1::/32                        link#1                         UC          -        -      -  wm0
+ff01:2::/32                        ::1                            UC          -        -      -  lo0
+ff02::%wm0/32                      link#1                         UC          -        -      -  wm0
+ff02::%lo0/32                      ::1                            UC          -        -      -  lo0
+"""
+
+    # Mocked file descriptor
+    strio = StringIO.StringIO(netstat_output)
+    mock_os.popen = mock.MagicMock(return_value=strio)
+    
+    # Mocked in6_getifaddr() output
+    mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"),
+                                       ("fe80::acd1:3989:180e:fde0", IPV6_ADDR_LINKLOCAL, "wm0")]
+
+    # Test the function
+    routes = read_routes6()
+    print netstat_output
+    for r in routes:
+        print r
+    assert(len(routes) == 5)
+    assert(check_mandatory_ipv6_routes(routes))
-- 
GitLab