diff --git a/scapy/arch/windows/__init__.py b/scapy/arch/windows/__init__.py index b433a8716251c484d14f364ee583988cf66023a7..12689a1a1e71cf37997ed14a1b6989aa07ebd904 100755 --- a/scapy/arch/windows/__init__.py +++ b/scapy/arch/windows/__init__.py @@ -45,6 +45,13 @@ def is_new_release(ignoreVBS=False): release = platform.release() if conf.prog.powershell is None and not ignoreVBS: return False + try: + if float(release) >= 8: + return True + except ValueError: + if (release=="post2008Server"): + return True + return False def _exec_query_ps(cmd, fields): @@ -258,6 +265,8 @@ class WinProgPath(ConfClass): ) self.cscript = win_find_exe("cscript", installsubdir="System32", env="SystemRoot") + self.cmd = win_find_exe("cmd", installsubdir="System32", + env="SystemRoot") if self.wireshark != "wireshark": manu_path = load_manuf(os.path.sep.join(self.wireshark.split(os.path.sep)[:-1])+os.path.sep+"manuf") scapy.data.MANUFDB = conf.manufdb = MANUFDB = manu_path @@ -359,13 +368,13 @@ class NetworkInterface(object): try: if not self.ip: self.ip=get_ip_from_name(data['name']) + if not self.ip and self.name == LOOPBACK_NAME: + self.ip = "127.0.0.1" if not self.ip: # No IP detected self.invalid = True except (KeyError, AttributeError, NameError) as e: print e - if not self.ip and self.name == LOOPBACK_NAME: - self.ip = "127.0.0.1" try: self.mac = data['mac'] except KeyError: @@ -504,7 +513,7 @@ pcapdnet.get_if_raw_hwaddr = lambda iface, *args, **kargs: ( ) get_if_raw_hwaddr = pcapdnet.get_if_raw_hwaddr -def read_routes_xp(): +def _read_routes_xp(): # The InterfaceIndex in Win32_IP4RouteTable does not match the # InterfaceIndex in Win32_NetworkAdapter under some platforms # (namely Windows XP): let's try an IP association @@ -530,9 +539,9 @@ def read_routes_xp(): routes.append((dst, mask, gw, iface, iface.ip)) return routes -def read_routes_7(): +def _read_routes_7(): routes=[] - for line in exec_query(['Get-WmiObject', 'win32_IP4RouteTable'], + for line in exec_query(['Get-WmiObject', 'Win32_IP4RouteTable'], ['Name', 'Mask', 'NextHop', 'InterfaceIndex']): try: iface = dev_from_index(line[3]) @@ -546,11 +555,11 @@ def read_routes(): release = platform.release() try: if is_new_release(): - routes = read_routes_post2008() + routes = _read_routes_post2008() elif release == "XP": - routes = read_routes_xp() + routes = _read_routes_xp() else: - routes = read_routes_7() + routes = _read_routes_7() except Exception as e: warning("Error building scapy routing table : %s" % str(e), True) else: @@ -558,7 +567,7 @@ def read_routes(): warning("No default IPv4 routes found. Your Windows release may no be supported and you have to enter your routes manually", True) return routes -def read_routes_post2008(): +def _read_routes_post2008(): routes = [] if_index = '(\d+)' dest = '(\d+\.\d+\.\d+\.\d+)/(\d+)' @@ -596,26 +605,59 @@ def in6_getifaddr(): """ Returns all IPv6 addresses found on the computer """ - ret = [] - ps = sp.Popen([conf.prog.powershell, 'Get-NetRoute', '-AddressFamily IPV6', '|', 'select ifIndex, DestinationPrefix'], stdout = sp.PIPE, universal_newlines = True) - stdout, stdin = ps.communicate() - netstat_line = '\s+'.join(['(\d+)', ''.join(['([A-z|0-9|:]+)', '(\/\d+)'])]) - pattern = re.compile(netstat_line) - for l in stdout.split('\n'): - match = re.search(pattern,l) - if match: + if is_new_release(): + ret = [] + ps = sp.Popen([conf.prog.powershell, 'Get-NetRoute', '-AddressFamily IPV6', '|', 'select ifIndex, DestinationPrefix'], stdout = sp.PIPE, universal_newlines = True) + stdout, stdin = ps.communicate() + netstat_line = '\s+'.join(['(\d+)', ''.join(['([A-z|0-9|:]+)', '(\/\d+)'])]) + pattern = re.compile(netstat_line) + for l in stdout.split('\n'): + match = re.search(pattern,l) + if match: + try: + if_index = match.group(1) + iface = dev_from_index(if_index) + except: + continue + scope = scapy.utils6.in6_getscope(match.group(2)) + ret.append((match.group(2), scope, iface)) # (addr,scope,iface) + continue + return ret + else: + ret = [] + # Get-WmiObject Win32_NetworkAdapterConfiguration | select InterfaceIndex, IpAddress + for line in exec_query(['Get-WmiObject', 'Win32_NetworkAdapterConfiguration'], ['InterfaceIndex', 'IPAddress']): try: - if_index = match.group(1) - iface = dev_from_index(if_index) + iface = dev_from_index(line[0]) except: continue - scope = scapy.utils6.in6_getscope(match.group(2)) - ret.append((match.group(2), scope, iface)) # (addr,scope,iface) - continue - return ret + _l_addresses = line[1] + _inline = [] + if _l_addresses: + _inline = _l_addresses[1:-1].split(",") + for _address in _inline: + _a = _address.strip() + if "." not in _a: + scope = scapy.utils6.in6_getscope(_a) + ret.append((_a, scope, iface)) # (addr,scope,iface) + return ret + +def _append_route6(routes, dpref, dp, nh, iface, lifaddr): + cset = [] # candidate set (possible source addresses) + if iface.name == LOOPBACK_NAME: + if dpref == '::': + return + cset = ['::1'] + else: + devaddrs = filter(lambda x: x[2] == iface, lifaddr) + cset = scapy.utils6.construct_source_candidate_set(dpref, dp, devaddrs, getLoopbackInterface()) + if len(cset) == 0: + return + # APPEND (DESTINATION, NETMASK, NEXT HOP, IFACE, CANDIDATS) + routes.append((dpref, dp, nh, iface, cset)) -def read_routes6(): - routes = [] +def _read_routes6_post2008(): + routes6 = [] ipv6_r = '([A-z|0-9|:]+)' #Hope it is a valid address... # The correct IPv6 regex would be: # ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)? @@ -639,22 +681,69 @@ def read_routes6(): except: continue - d = match.group(2) + dpref = match.group(2) dp = int(match.group(3)[1:]) nh = match.group(4) - cset = [] # candidate set (possible source addresses) - if iface.name == LOOPBACK_NAME: - if d == '::': + _append_route6(routes6, dpref, dp, nh, iface, lifaddr) + return routes6 + +def _read_routes6_7(): + # Not supported in powershell, we have to use netsh + routes = [] + ps = sp.Popen([conf.prog.cmd, '/c', 'netsh interface ipv6 show route level=verbose'], stdout = sp.PIPE, universal_newlines = True) + stdout, stdin = ps.communicate() + lifaddr = in6_getifaddr() + # Define regexes + r_int = [".*:\s+(\d+)"] + r_all = ["(.*)"] + r_ipv6 = [".*:\s+([A-z|0-9|:]+(\/\d+)?)"] + # Build regex list for each object + regex_list = r_ipv6*2 + r_int + r_all*3 + r_int + r_all*3 + current_object = [] + index = 0 + for l in stdout.split('\n'): + if not l.strip(): + if len(current_object) == 0: + continue + + if len(current_object) == len(regex_list): + try: + if_index = current_object[2] + iface = dev_from_index(if_index) + except: + current_object = [] + index = 0 continue - cset = ['::1'] - else: - devaddrs = filter(lambda x: x[2] == iface, lifaddr) - cset = scapy.utils6.construct_source_candidate_set(d, dp, devaddrs, getLoopbackInterface()) - # APPEND (DESTINATION, NETMASK, NEXT HOP, IFACE, CANDIDATS) - routes.append((d, dp, nh, iface, cset)) + _ip = current_object[0].split("/") + dpref = _ip[0] + dp = int(_ip[1]) + nh = current_object[1].split("/")[0] + # metric = current_object[6] + _append_route6(routes, dpref, dp, nh, iface, lifaddr) + + # Reset current object + current_object = [] + index = 0 + else: + pattern = re.compile(regex_list[index]) + match = re.search(pattern, l) + if match: + current_object.append(match.group(1)) + index = index + 1 return routes +def read_routes6(): + routes6 = [] + try: + if is_new_release(): + routes6 = _read_routes6_post2008() + else: + routes6 = _read_routes6_7() + except Exception as e: + warning("Error building scapy routing table : %s" % str(e), True) + return routes6 + if conf.interactive_shell != 'ipython' and conf.interactive: try: __IPYTHON__ diff --git a/scapy/tools/UTscapy.py b/scapy/tools/UTscapy.py index d19120c78e452e869b8266f2b51f857377c735ca..5cdf27b350064dd70d58750d8a48da49be463797 100755 --- a/scapy/tools/UTscapy.py +++ b/scapy/tools/UTscapy.py @@ -184,15 +184,10 @@ class UnitTest(TestClass): self.crc = None self.expand = 1 def decode(self): - self.test = self.test.decode("utf8") - self.output = self.output.decode("utf8") - self.comments = self.comments.decode("utf8") - self.result = self.result.decode("utf8") - def encode(self): - self.test = self.test.encode("utf8") - self.output = self.output.encode("utf8") - self.comments = self.comments.encode("utf8") - self.result = self.result.encode("utf8") + self.test = self.test.decode("utf8", "ignore") + self.output = self.output.decode("utf8", "ignore") + self.comments = self.comments.decode("utf8", "ignore") + self.result = self.result.decode("utf8", "ignore") def __nonzero__(self): return self.res diff --git a/test/mock_windows.uts b/test/mock_windows.uts index 2a2197a74df195f34a9901aa83429764a8558d4e..ce7391c5c1edf835417c4440a8bfeb68234d26bf 100644 --- a/test/mock_windows.uts +++ b/test/mock_windows.uts @@ -8,10 +8,15 @@ = Windows with fake IPv6 adapter +route_add_loopback() + import mock +from scapy.arch.windows import _read_routes6_post2008 -def check_mandatory_ipv6_routes_windows(routes6): - """Ensure that mandatory IPv6 routes are present. There is not always a loopback interface on windows !""" +def check_mandatory_ipv6_routes(routes6): + """Ensure that mandatory IPv6 routes are present.""" + if len(filter(lambda r: r[0] == "::" and r[-1] == ["::1"], routes6)) < 1: + return False if len(filter(lambda r: r[0] == "fe80::" and (r[1] == 64 or r[1] == 32), routes6)) < 1: return False if len(filter(lambda r: in6_islladdr(r[0]) and r[1] == 128, routes6)) < 1: @@ -19,23 +24,24 @@ def check_mandatory_ipv6_routes_windows(routes6): return True def dev_from_index_custom(if_index): - if_list = [{'mac': 'D0-50-99-56-DD-F9', 'win_index': '13', 'guid': '{C56DFFB3-992C-4964-B000-3E7C0F76E8BA}', 'name': 'Killer E2200 Gigabit Ethernet Controller', 'description': 'Ethernet'}, {'mac': '00-FF-0E-C7-25-37', 'win_index': '3', 'guid': '{0EC72537-B662-4F5D-B34E-48BFAE799BBE}', 'name': 'TAP-Windows Adapter V9', 'description': 'Ethernet 2'}] + if_list = [{'mac': 'D0:50:99:56:DD:F9', 'win_index': '13', 'guid': '{C56DFFB3-992C-4964-B000-3E7C0F76E8BA}', 'name': 'Killer E2200 Gigabit Ethernet Controller', 'description': 'Ethernet'}, {'mac': '00:FF:0E:C7:25:37', 'win_index': '3', 'guid': '{0EC72537-B662-4F5D-B34E-48BFAE799BBE}', 'name': 'TAP-Windows Adapter V9', 'description': 'Ethernet 2'}] values = {} for i in if_list: - try: - interface = NetworkInterface(i) - values[interface.guid] = interface - except (KeyError, PcapNameNotFoundError): - pass + try: + interface = NetworkInterface(i) + values[interface.guid] = interface + except (KeyError, PcapNameNotFoundError): + pass for devname, iface in values.items(): if iface.win_index == str(if_index): return iface raise ValueError("Unknown network interface index %r" % if_index) +@mock.patch("scapy.utils6.construct_source_candidate_set") @mock.patch("scapy.arch.windows.winpcapy_get_if_list") @mock.patch("scapy.arch.windows.dev_from_index") @mock.patch("scapy.arch.windows.sp.Popen") -def test_read_routes6_windows(mock_comm, mock_dev_from_index, mock_winpcapylist): +def test_read_routes6_windows(mock_comm, mock_dev_from_index, mock_winpcapylist, mock_utils6cset): """Test read_routes6() on Windows""" # 'Get-NetRoute -AddressFamily IPV6 | select ifIndex, DestinationPrefix, NextHop' get_net_route_output = """ @@ -60,12 +66,15 @@ ifIndex DestinationPrefix NextHop mock_winpcapylist.return_value = [u'\\Device\\NPF_{0EC72537-B662-4F5D-B34E-48BFAE799BBE}', u'\\Device\\NPF_{C56DFFB3-992C-4964-B000-3E7C0F76E8BA}'] # Mocked in6_getifaddr() output mock_dev_from_index.side_effect = dev_from_index_custom + # Random + mock_utils6cset.side_effect = lambda w,x,y,z: ["::1"] if w=="::" else ["fdbb:d995:ddd8:51fc::"] # Test the function - routes = read_routes6() + routes = _read_routes6_post2008() for r in routes: print r + print(len(routes)) assert(len(routes) == 9) - assert(check_mandatory_ipv6_routes_windows(routes)) + assert(check_mandatory_ipv6_routes(routes)) test_read_routes6_windows()