diff --git a/scapy/arch/windows/__init__.py b/scapy/arch/windows/__init__.py index a3585e0e616d40d75d5cc1d2e20613bf43882695..25c7a0dce542e41901e49c5b34c94b0b79801354 100755 --- a/scapy/arch/windows/__init__.py +++ b/scapy/arch/windows/__init__.py @@ -16,14 +16,13 @@ from scapy.error import Scapy_Exception, log_loading, log_runtime, warning from scapy.utils import atol, itom, inet_aton, inet_ntoa, PcapReader from scapy.base_classes import Gen, Net, SetGen from scapy.data import MTU, ETHER_BROADCAST, ETH_P_ARP -from scapy.consts import LOOPBACK_NAME conf.use_pcap = False conf.use_dnet = False conf.use_winpcapy = True WINDOWS = (os.name == 'nt') -NEW_RELEASE = False +NEW_RELEASE = None #hot-patching socket for missing variables on Windows import socket @@ -39,17 +38,22 @@ if not hasattr(socket, 'IPPROTO_GRE'): from scapy.arch import pcapdnet from scapy.arch.pcapdnet import * -def is_new_release(): +from scapy.consts import LOOPBACK_NAME + +def is_new_release(ignoreVBS=False): + if NEW_RELEASE and conf.prog.powershell is not None: + return True release = platform.release() + if conf.prog.powershell is None and not ignoreVBS: + return False try: - if float(release) >= 8: - return True + if float(release) >= 8: + return True except ValueError: if (release=="post2008Server"): return True return False -NEW_RELEASE = is_new_release() def _exec_query_ps(cmd, fields): """Execute a PowerShell query""" @@ -236,18 +240,6 @@ def win_find_exe(filename, installsubdir=None, env="ProgramFiles"): return path -def is_new_release(ignoreVBS=False): - release = platform.release() - if conf.prog.powershell is None and not ignoreVBS: - return False - try: - if float(release) >= 8: - return True - except ValueError: - if (release=="post2008Server"): - return True - return False - class WinProgPath(ConfClass): # This is a dict containing the name of the .exe and a keyword # that must be in the path of the file @@ -274,6 +266,8 @@ class WinProgPath(ConfClass): ) self.cscript = win_find_exe("cscript", installsubdir="System32", env="SystemRoot") + self.cmd = win_find_exe("cmd", installsubdir="System32", + env="SystemRoot") if self.wireshark != "wireshark": manu_path = load_manuf(os.path.sep.join(self.wireshark.split(os.path.sep)[:-1])+os.path.sep+"manuf") scapy.data.MANUFDB = conf.manufdb = MANUFDB = manu_path @@ -298,6 +292,9 @@ if conf.prog.tcpdump != "windump" and conf.use_npcap: warning("The installed Windump version does not work with Npcap ! Refer to 'Winpcap/Npcap conflicts' in scapy's doc", True) del windump_ok +# Auto-detect release +NEW_RELEASE = is_new_release() + class PcapNameNotFoundError(Scapy_Exception): pass @@ -311,7 +308,7 @@ def is_interface_valid(iface): def get_windows_if_list(): """Returns windows interfaces""" - if NEW_RELEASE: + if is_new_release(): # This works only starting from Windows 8/2012 and up. For older Windows another solution is needed # Careful: this is weird, but Get-NetAdaptater works like: (Name isn't the interface name) # Name InterfaceDescription ifIndex Status MacAddress LinkSpeed @@ -337,6 +334,7 @@ def get_ip_from_name(ifname, v6=False): ['Description', 'IPAddress']): if descr == ifname.strip(): return ipaddr.split(",", 1)[v6].strip('{}').strip() + return None class NetworkInterface(object): """A network interface of your local host""" @@ -371,10 +369,13 @@ class NetworkInterface(object): try: if not self.ip: self.ip=get_ip_from_name(data['name']) + if not self.ip and self.name == LOOPBACK_NAME: + self.ip = "127.0.0.1" + if not self.ip: + # No IP detected + self.invalid = True except (KeyError, AttributeError, NameError) as e: print e - if not self.ip and self.name == LOOPBACK_NAME: - self.ip = "127.0.0.1" try: self.mac = data['mac'] except KeyError: @@ -413,6 +414,14 @@ class NetworkInterfaceDict(UserDict): "You probably won't be able to send packets. " "Deactivating unneeded interfaces and restarting Scapy might help." "Check your winpcap and powershell installation, and access rights.", True) + else: + # Loading state: remove invalid interfaces + self.remove_invalid_ifaces() + # Replace LOOPBACK_INTERFACE + try: + scapy.consts.LOOPBACK_INTERFACE = self.dev_from_name(LOOPBACK_NAME) + except: + pass def dev_from_name(self, name): """Return the first pcap device name for a given Windows @@ -435,8 +444,23 @@ class NetworkInterfaceDict(UserDict): for devname, iface in self.items(): if iface.win_index == str(if_index): return iface + if str(if_index) == "1": + # Test if the loopback interface is set up + if isinstance(scapy.consts.LOOPBACK_INTERFACE, NetworkInterface): + return scapy.consts.LOOPBACK_INTERFACE raise ValueError("Unknown network interface index %r" % if_index) + def remove_invalid_ifaces(self): + """Remove all invalid interfaces""" + for devname, iface in self.items(): + if iface.is_invalid(): + self.data.pop(devname) + + def reload(self): + """Reload interface list""" + self.data.clear() + self.load_from_powershell() + def show(self, resolve_mac=True): """Print list of available network interfaces in human readable form""" print "%s %s %s %s" % ("INDEX".ljust(5), "IFACE".ljust(35), "IP".ljust(15), "MAC") @@ -489,7 +513,7 @@ pcapdnet.get_if_raw_hwaddr = lambda iface, *args, **kargs: ( ) get_if_raw_hwaddr = pcapdnet.get_if_raw_hwaddr -def read_routes_xp(): +def _read_routes_xp(): # The InterfaceIndex in Win32_IP4RouteTable does not match the # InterfaceIndex in Win32_NetworkAdapter under some platforms # (namely Windows XP): let's try an IP association @@ -515,9 +539,9 @@ def read_routes_xp(): routes.append((dst, mask, gw, iface, iface.ip)) return routes -def read_routes_7(): +def _read_routes_7(): routes=[] - for line in exec_query(['Get-WmiObject', 'win32_IP4RouteTable'], + for line in exec_query(['Get-WmiObject', 'Win32_IP4RouteTable'], ['Name', 'Mask', 'NextHop', 'InterfaceIndex']): try: iface = dev_from_index(line[3]) @@ -530,12 +554,12 @@ def read_routes(): routes = [] release = platform.release() try: - if NEW_RELEASE: - routes = read_routes_post2008() + if is_new_release(): + routes = _read_routes_post2008() elif release == "XP": - routes = read_routes_xp() + routes = _read_routes_xp() else: - routes = read_routes_7() + routes = _read_routes_7() except Exception as e: warning("Error building scapy routing table : %s" % str(e), True) else: @@ -543,7 +567,7 @@ def read_routes(): warning("No default IPv4 routes found. Your Windows release may no be supported and you have to enter your routes manually", True) return routes -def read_routes_post2008(): +def _read_routes_post2008(): routes = [] if_index = '(\d+)' dest = '(\d+\.\d+\.\d+\.\d+)/(\d+)' @@ -581,26 +605,59 @@ def in6_getifaddr(): """ Returns all IPv6 addresses found on the computer """ - ret = [] - ps = sp.Popen([conf.prog.powershell, 'Get-NetRoute', '-AddressFamily IPV6', '|', 'select ifIndex, DestinationPrefix'], stdout = sp.PIPE, universal_newlines = True) - stdout, stdin = ps.communicate() - netstat_line = '\s+'.join(['(\d+)', ''.join(['([A-z|0-9|:]+)', '(\/\d+)'])]) - pattern = re.compile(netstat_line) - for l in stdout.split('\n'): - match = re.search(pattern,l) - if match: + if is_new_release(): + ret = [] + ps = sp.Popen([conf.prog.powershell, 'Get-NetRoute', '-AddressFamily IPV6', '|', 'select ifIndex, DestinationPrefix'], stdout = sp.PIPE, universal_newlines = True) + stdout, stdin = ps.communicate() + netstat_line = '\s+'.join(['(\d+)', ''.join(['([A-z|0-9|:]+)', '(\/\d+)'])]) + pattern = re.compile(netstat_line) + for l in stdout.split('\n'): + match = re.search(pattern,l) + if match: + try: + if_index = match.group(1) + iface = dev_from_index(if_index) + except: + continue + scope = scapy.utils6.in6_getscope(match.group(2)) + ret.append((match.group(2), scope, iface)) # (addr,scope,iface) + continue + return ret + else: + ret = [] + # Get-WmiObject Win32_NetworkAdapterConfiguration | select InterfaceIndex, IpAddress + for line in exec_query(['Get-WmiObject', 'Win32_NetworkAdapterConfiguration'], ['InterfaceIndex', 'IPAddress']): try: - if_index = match.group(1) - iface = dev_from_index(if_index) + iface = dev_from_index(line[0]) except: continue - scope = scapy.utils6.in6_getscope(match.group(2)) - ret.append((match.group(2), scope, iface)) # (addr,scope,iface) - continue - return ret + _l_addresses = line[1] + _inline = [] + if _l_addresses: + _inline = _l_addresses[1:-1].split(",") + for _address in _inline: + _a = _address.strip() + if "." not in _a: + scope = scapy.utils6.in6_getscope(_a) + ret.append((_a, scope, iface)) # (addr,scope,iface) + return ret + +def _append_route6(routes, dpref, dp, nh, iface, lifaddr): + cset = [] # candidate set (possible source addresses) + if iface.name == LOOPBACK_NAME: + if dpref == '::': + return + cset = ['::1'] + else: + devaddrs = filter(lambda x: x[2] == iface, lifaddr) + cset = scapy.utils6.construct_source_candidate_set(dpref, dp, devaddrs, scapy.consts.LOOPBACK_INTERFACE) + if len(cset) == 0: + return + # APPEND (DESTINATION, NETMASK, NEXT HOP, IFACE, CANDIDATS) + routes.append((dpref, dp, nh, iface, cset)) -def read_routes6(): - routes = [] +def _read_routes6_post2008(): + routes6 = [] ipv6_r = '([A-z|0-9|:]+)' #Hope it is a valid address... # The correct IPv6 regex would be: # ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)? @@ -624,24 +681,68 @@ def read_routes6(): except: continue - d = match.group(2) + dpref = match.group(2) dp = int(match.group(3)[1:]) nh = match.group(4) - cset = [] # candidate set (possible source addresses) - if iface.name == LOOPBACK_NAME: - if d == '::': + _append_route6(routes6, dpref, dp, nh, iface, lifaddr) + return routes6 + +def _read_routes6_7(): + # Not supported in powershell, we have to use netsh + routes = [] + ps = sp.Popen([conf.prog.cmd, '/c', 'netsh interface ipv6 show route level=verbose'], stdout = sp.PIPE, universal_newlines = True) + stdout, stdin = ps.communicate() + lifaddr = in6_getifaddr() + # Define regexes + r_int = [".*:\s+(\d+)"] + r_all = ["(.*)"] + r_ipv6 = [".*:\s+([A-z|0-9|:]+(\/\d+)?)"] + # Build regex list for each object + regex_list = r_ipv6*2 + r_int + r_all*3 + r_int + r_all*3 + current_object = [] + index = 0 + for l in stdout.split('\n'): + if not l.strip(): + if len(current_object) == 0: + continue + + if len(current_object) == len(regex_list): + try: + if_index = current_object[2] + iface = dev_from_index(if_index) + except: + current_object = [] + index = 0 continue - cset = ['::1'] - else: - devaddrs = filter(lambda x: x[2] == iface, lifaddr) - cset = scapy.utils6.construct_source_candidate_set(d, dp, devaddrs, LOOPBACK_NAME) - # APPEND (DESTINATION, NETMASK, NEXT HOP, IFACE, CANDIDATS) - routes.append((d, dp, nh, iface, cset)) + _ip = current_object[0].split("/") + dpref = _ip[0] + dp = int(_ip[1]) + nh = current_object[1].split("/")[0] + # metric = current_object[6] + _append_route6(routes, dpref, dp, nh, iface, lifaddr) + + # Reset current object + current_object = [] + index = 0 + else: + pattern = re.compile(regex_list[index]) + match = re.search(pattern, l) + if match: + current_object.append(match.group(1)) + index = index + 1 return routes - - +def read_routes6(): + routes6 = [] + try: + if is_new_release(): + routes6 = _read_routes6_post2008() + else: + routes6 = _read_routes6_7() + except Exception as e: + warning("Error building scapy routing table : %s" % str(e), True) + return routes6 if conf.interactive_shell != 'ipython' and conf.interactive: try: @@ -680,12 +781,16 @@ def get_working_if(): return min(read_routes(), key=lambda x: x[1])[3] except ValueError: # no route - return LOOPBACK_NAME + return scapy.consts.LOOPBACK_INTERFACE conf.iface = get_working_if() def route_add_loopback(routes=None, ipv6=False, iflist=None): """Add a route to 127.0.0.1 and ::1 to simplify unit tests on Windows""" + if not WINDOWS: + warning("Not available") + return + warning("This will completly mess up the routes. Testing purpose only !") # Add only if some adpaters already exist if ipv6: if len(conf.route6.routes) == 0: @@ -699,10 +804,22 @@ def route_add_loopback(routes=None, ipv6=False, iflist=None): data['win_index'] = -1 data['guid'] = "{0XX00000-X000-0X0X-X00X-00XXXX000XXX}" data['invalid'] = True + data['mac'] = '00:00:00:00:00:00' adapter = NetworkInterface(data) if iflist: iflist.append(unicode("\\Device\\NPF_" + adapter.guid)) return + # Remove all LOOPBACK_NAME routes + for route in list(conf.route.routes): + iface = route[3] + if iface.name == LOOPBACK_NAME: + conf.route.routes.remove(route) + # Remove LOOPBACK_NAME interface + for devname, iface in IFACES.items(): + if iface.name == LOOPBACK_NAME: + IFACES.pop(devname) + # Inject interface + IFACES[data['guid']] = adapter # Build the packed network addresses loop_net = struct.unpack("!I", socket.inet_aton("127.0.0.0"))[0] loop_mask = struct.unpack("!I", socket.inet_aton("255.0.0.0"))[0] diff --git a/scapy/consts.py b/scapy/consts.py index cb4983337d1f1c8f3bb25a4a4f01c340834e7db0..9b31a71e9f53b2fffd038d1996a2d6a32c4fd995 100644 --- a/scapy/consts.py +++ b/scapy/consts.py @@ -72,5 +72,8 @@ else: uname = os.uname() LOOPBACK_NAME = "lo" if LINUX else "lo0" +# Will be different on Windows +LOOPBACK_INTERFACE = LOOPBACK_NAME + def parent_function(): return inspect.getouterframes(inspect.currentframe()) diff --git a/scapy/layers/l2.py b/scapy/layers/l2.py index 9b3292d0e869fec8ce3957e6e584d969d4c8f526..188e72d99b21430769de9efbfd650931f1ecf896 100644 --- a/scapy/layers/l2.py +++ b/scapy/layers/l2.py @@ -18,7 +18,7 @@ from scapy.plist import SndRcvList from scapy.fields import * from scapy.sendrecv import srp, srp1, srpflood from scapy.arch import get_if_hwaddr -from scapy.consts import LOOPBACK_NAME +from scapy.consts import LOOPBACK_INTERFACE from scapy.utils import inet_ntoa, inet_aton from scapy.error import warning if conf.route is None: @@ -63,7 +63,7 @@ def getmacbyip(ip, chainCC=0): if (tmp[0] & 0xf0) == 0xe0: # mcast @ return "01:00:5e:%.2x:%.2x:%.2x" % (tmp[1]&0x7f,tmp[2],tmp[3]) iff,a,gw = conf.route.route(ip) - if ( (iff == LOOPBACK_NAME) or (ip == conf.route.get_if_bcast(iff)) ): + if ( (iff == LOOPBACK_INTERFACE) or (ip == conf.route.get_if_bcast(iff)) ): return "ff:ff:ff:ff:ff:ff" if gw != "0.0.0.0": ip = gw diff --git a/scapy/route.py b/scapy/route.py index 017ac7b7d5c84d8191af3dbe69560c82aa6fd74c..b2e01d405ffd44ec50168eb9a1319ad424c13d34 100644 --- a/scapy/route.py +++ b/scapy/route.py @@ -8,8 +8,8 @@ Routing and handling of network interfaces. """ import socket -from scapy.consts import LOOPBACK_NAME -from scapy.utils import atol, ltoa, itom +from scapy.consts import LOOPBACK_NAME, LOOPBACK_INTERFACE +from scapy.utils import atol, ltoa, itom, pretty_routes from scapy.config import conf from scapy.error import Scapy_Exception, warning from scapy.arch import WINDOWS @@ -32,7 +32,7 @@ class Route: self.routes = read_routes() def __repr__(self): - rtlst = [("Network", "Netmask", "Gateway", "Iface", "Output IP")] + rtlst = [] for net,msk,gw,iface,addr in self.routes: rtlst.append((ltoa(net), @@ -40,11 +40,9 @@ class Route: gw, (iface.name if not isinstance(iface, basestring) else iface), addr)) - - colwidth = map(lambda x: max(map(lambda y: len(y), x)), apply(zip, rtlst)) - fmt = " ".join(map(lambda x: "%%-%ds"%x, colwidth)) - rt = "\n".join(map(lambda x: fmt % x, rtlst)) - return rt + + return pretty_routes(rtlst, + [("Network", "Netmask", "Gateway", "Iface", "Output IP")]) def make_route(self, host=None, net=None, gw=None, dev=None): from scapy.arch import get_if_addr @@ -154,13 +152,13 @@ class Route: continue aa = atol(a) if aa == dst: - pathes.append((0xffffffffL,(LOOPBACK_NAME,a,"0.0.0.0"))) + pathes.append((0xffffffffL,(LOOPBACK_INTERFACE,a,"0.0.0.0"))) if (dst & m) == (d & m): pathes.append((m,(i,a,gw))) if not pathes: if verbose: warning("No route found (no default route?)") - return LOOPBACK_NAME,"0.0.0.0","0.0.0.0" #XXX linux specific! + return LOOPBACK_INTERFACE,"0.0.0.0","0.0.0.0" # Choose the more specific route (greatest netmask). # XXX: we don't care about metrics pathes.sort() @@ -185,6 +183,6 @@ conf.route=Route() #XXX use "with" _betteriface = conf.route.route("0.0.0.0", verbose=0)[0] -if _betteriface != LOOPBACK_NAME: +if ((_betteriface if (isinstance(_betteriface, basestring) or _betteriface is None) else _betteriface.name) != LOOPBACK_NAME): conf.iface = _betteriface del(_betteriface) diff --git a/scapy/route6.py b/scapy/route6.py index 8a7cb1ad429e428062f7bb47a6434da6e1f327e1..eef80393579e7d2b86d1ea5b85dfa3d8eabf97a3 100644 --- a/scapy/route6.py +++ b/scapy/route6.py @@ -22,6 +22,7 @@ from scapy.utils6 import * from scapy.arch import * from scapy.pton_ntop import * from scapy.error import warning, log_loading +from scapy.consts import LOOPBACK_INTERFACE class Route6: @@ -46,17 +47,14 @@ class Route6: log_loading.info("No IPv6 support in kernel") def __repr__(self): - rtlst = [('Destination', 'Next Hop', "iface", "src candidates")] + rtlst = [] for net,msk,gw,iface,cset in self.routes: - rtlst.append(('%s/%i'% (net,msk), gw, (iface if isinstance(iface, basestring) else iface.name), ", ".join(cset))) - - colwidth = map(lambda x: max(map(lambda y: len(y), x)), apply(zip, rtlst)) - fmt = " ".join(map(lambda x: "%%-%ds"%x, colwidth)) - rt = "\n".join(map(lambda x: fmt % x, rtlst)) - - return rt + rtlst.append(('%s/%i'% (net,msk), gw, (iface if isinstance(iface, basestring) else iface.name), ", ".join(cset) if len(cset) > 0 else "")) + return pretty_routes(rtlst, + [('Destination', 'Next Hop', "Iface", "Src candidates")], + sortBy = 1) # Unlike Scapy's Route.make_route() function, we do not have 'host' and 'net' # parameters. We only have a 'dst' parameter that accepts 'prefix' and @@ -77,7 +75,7 @@ class Route6: # replace that unique address by the list of all addresses lifaddr = in6_getifaddr() devaddrs = filter(lambda x: x[2] == dev, lifaddr) - ifaddr = construct_source_candidate_set(prefix, plen, devaddrs, LOOPBACK_NAME) + ifaddr = construct_source_candidate_set(prefix, plen, devaddrs, LOOPBACK_INTERFACE) return (prefix, plen, gw, dev, ifaddr) @@ -220,7 +218,7 @@ class Route6: if not pathes: warning("No route found for IPv6 destination %s (no default route?)" % dst) - return (LOOPBACK_NAME, "::", "::") # XXX Linux specific + return (LOOPBACK_INTERFACE, "::", "::") # Sort with longest prefix first pathes.sort(reverse=True) @@ -237,7 +235,7 @@ class Route6: if res == []: warning("Found a route for IPv6 destination '%s', but no possible source address." % dst) - return (LOOPBACK_NAME, "::", "::") # XXX Linux specific + return (LOOPBACK_INTERFACE, "::", "::") # Symptom : 2 routes with same weight (our weight is plen) # Solution : diff --git a/scapy/tools/UTscapy.py b/scapy/tools/UTscapy.py index d19120c78e452e869b8266f2b51f857377c735ca..5cdf27b350064dd70d58750d8a48da49be463797 100755 --- a/scapy/tools/UTscapy.py +++ b/scapy/tools/UTscapy.py @@ -184,15 +184,10 @@ class UnitTest(TestClass): self.crc = None self.expand = 1 def decode(self): - self.test = self.test.decode("utf8") - self.output = self.output.decode("utf8") - self.comments = self.comments.decode("utf8") - self.result = self.result.decode("utf8") - def encode(self): - self.test = self.test.encode("utf8") - self.output = self.output.encode("utf8") - self.comments = self.comments.encode("utf8") - self.result = self.result.encode("utf8") + self.test = self.test.decode("utf8", "ignore") + self.output = self.output.decode("utf8", "ignore") + self.comments = self.comments.decode("utf8", "ignore") + self.result = self.result.decode("utf8", "ignore") def __nonzero__(self): return self.res diff --git a/scapy/utils.py b/scapy/utils.py index 1a13d2caf2349571524598cd5f19274de7115218..5f05cacea80e259c545446257965d2344102f9d7 100644 --- a/scapy/utils.py +++ b/scapy/utils.py @@ -1237,6 +1237,78 @@ def hexedit(x): os.unlink(f) return x +def get_terminal_width(): + """Get terminal width if in a window""" + if WINDOWS: + from ctypes import windll, create_string_buffer + # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/ + h = windll.kernel32.GetStdHandle(-12) + csbi = create_string_buffer(22) + res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) + if res: + import struct + (bufx, bufy, curx, cury, wattr, + left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) + sizex = right - left + 1 + #sizey = bottom - top + 1 + return sizex + else: + return None + else: + sizex = 0 + try: + import struct, fcntl, termios + s = struct.pack('HHHH', 0, 0, 0, 0) + x = fcntl.ioctl(1, termios.TIOCGWINSZ, s) + sizex = struct.unpack('HHHH', x)[1] + except IOError: + pass + if not sizex: + try: + sizex = int(os.environ['COLUMNS']) + except: + pass + if sizex: + return sizex + else: + return None + +def pretty_routes(rtlst, header, sortBy=0): + """Pretty route list, and add header""" + _l_header = len(header[0]) + _space = " " + # Sort correctly + rtlst.sort(key=lambda x: x[sortBy]) + # Append tag + rtlst = header + rtlst + # Detect column's width + colwidth = map(lambda x: max(map(lambda y: len(y), x)), apply(zip, rtlst)) + # Make text fit in box (if exist) + # TODO: find a better and more precise way of doing this. That's currently working but very complicated + width = get_terminal_width() + if width: + if sum(colwidth) > width: + # Needs to be cropped + _med = (width // _l_header) - (1 if WINDOWS else 0) # Windows has a fat window border + # Crop biggest until size is correct + for i in range(1, len(colwidth)): # Should use while, but this is safer + if (sum(colwidth)+6) <= width: + break + _max = max(colwidth) + colwidth = [_med if x == _max else x for x in colwidth] + def _crop(x, width): + _r = x[:width] + if _r != x: + _r = x[:width-3] + return _r + "..." + return _r + rtlst = [tuple([_crop(rtlst[j][i], colwidth[i]) for i in range(0, len(rtlst[j]))]) for j in range(0, len(rtlst))] + # Recalculate column's width + colwidth = map(lambda x: max(map(lambda y: len(y), x)), apply(zip, rtlst)) + fmt = _space.join(map(lambda x: "%%-%ds"%x, colwidth)) + rt = "\n".join(map(lambda x: fmt % x, rtlst)) + return rt + def __make_table(yfmtfunc, fmtfunc, endline, list, fxyz, sortx=None, sorty=None, seplinefunc=None): vx = {} vy = {} diff --git a/scapy/utils6.py b/scapy/utils6.py index cb78d2cf2dc0a4f2ed287a1f4e11954b57c6a9bb..99292911dfdb6513e0387542848d92e8e370f323 100644 --- a/scapy/utils6.py +++ b/scapy/utils6.py @@ -21,7 +21,7 @@ from scapy.volatile import RandMAC from scapy.error import warning -def construct_source_candidate_set(addr, plen, laddr, loname): +def construct_source_candidate_set(addr, plen, laddr, loiface): """ Given all addresses assigned to a specific interface ('laddr' parameter), this function returns the "candidate set" associated with 'addr/plen'. @@ -57,7 +57,7 @@ def construct_source_candidate_set(addr, plen, laddr, loname): cset = filter(lambda x: x[1] == IPV6_ADDR_SITELOCAL, laddr) elif in6_ismaddr(addr): if in6_ismnladdr(addr): - cset = [('::1', 16, loname)] + cset = [('::1', 16, loiface)] elif in6_ismgladdr(addr): cset = filter(lambda x: x[1] == IPV6_ADDR_GLOBAL, laddr) elif in6_ismlladdr(addr): diff --git a/test/mock_windows.uts b/test/mock_windows.uts index 67b6e3ff291755f855f0985b82ae2773ca0b80d3..ce7391c5c1edf835417c4440a8bfeb68234d26bf 100644 --- a/test/mock_windows.uts +++ b/test/mock_windows.uts @@ -8,10 +8,15 @@ = Windows with fake IPv6 adapter +route_add_loopback() + import mock +from scapy.arch.windows import _read_routes6_post2008 -def check_mandatory_ipv6_routes_windows(routes6): - """Ensure that mandatory IPv6 routes are present. There is not always a loopback interface on windows !""" +def check_mandatory_ipv6_routes(routes6): + """Ensure that mandatory IPv6 routes are present.""" + if len(filter(lambda r: r[0] == "::" and r[-1] == ["::1"], routes6)) < 1: + return False if len(filter(lambda r: r[0] == "fe80::" and (r[1] == 64 or r[1] == 32), routes6)) < 1: return False if len(filter(lambda r: in6_islladdr(r[0]) and r[1] == 128, routes6)) < 1: @@ -19,23 +24,24 @@ def check_mandatory_ipv6_routes_windows(routes6): return True def dev_from_index_custom(if_index): - if_list = [{'mac': 'D0-50-99-56-DD-F9', 'win_index': '13', 'guid': '{C56DFFB3-992C-4964-B000-3E7C0F76E8BA}', 'name': 'Killer E2200 Gigabit Ethernet Controller', 'description': 'Ethernet'}, {'mac': '00-FF-0E-C7-25-37', 'win_index': '3', 'guid': '{0EC72537-B662-4F5D-B34E-48BFAE799BBE}', 'name': 'TAP-Windows Adapter V9', 'description': 'Ethernet 2'}] + if_list = [{'mac': 'D0:50:99:56:DD:F9', 'win_index': '13', 'guid': '{C56DFFB3-992C-4964-B000-3E7C0F76E8BA}', 'name': 'Killer E2200 Gigabit Ethernet Controller', 'description': 'Ethernet'}, {'mac': '00:FF:0E:C7:25:37', 'win_index': '3', 'guid': '{0EC72537-B662-4F5D-B34E-48BFAE799BBE}', 'name': 'TAP-Windows Adapter V9', 'description': 'Ethernet 2'}] values = {} for i in if_list: - try: - interface = NetworkInterface(i) - values[interface.guid] = interface - except (KeyError, PcapNameNotFoundError): - pass + try: + interface = NetworkInterface(i) + values[interface.guid] = interface + except (KeyError, PcapNameNotFoundError): + pass for devname, iface in values.items(): if iface.win_index == str(if_index): return iface raise ValueError("Unknown network interface index %r" % if_index) +@mock.patch("scapy.utils6.construct_source_candidate_set") @mock.patch("scapy.arch.windows.winpcapy_get_if_list") @mock.patch("scapy.arch.windows.dev_from_index") @mock.patch("scapy.arch.windows.sp.Popen") -def test_read_routes6_windows(mock_comm, mock_dev_from_index, mock_winpcapylist): +def test_read_routes6_windows(mock_comm, mock_dev_from_index, mock_winpcapylist, mock_utils6cset): """Test read_routes6() on Windows""" # 'Get-NetRoute -AddressFamily IPV6 | select ifIndex, DestinationPrefix, NextHop' get_net_route_output = """ @@ -60,12 +66,15 @@ ifIndex DestinationPrefix NextHop mock_winpcapylist.return_value = [u'\\Device\\NPF_{0EC72537-B662-4F5D-B34E-48BFAE799BBE}', u'\\Device\\NPF_{C56DFFB3-992C-4964-B000-3E7C0F76E8BA}'] # Mocked in6_getifaddr() output mock_dev_from_index.side_effect = dev_from_index_custom + # Random + mock_utils6cset.side_effect = lambda w,x,y,z: ["::1"] if w=="::" else ["fdbb:d995:ddd8:51fc::"] # Test the function - routes = read_routes6() + routes = _read_routes6_post2008() for r in routes: print r + print(len(routes)) assert(len(routes) == 9) - assert(check_mandatory_ipv6_routes_windows(routes)) + assert(check_mandatory_ipv6_routes(routes)) test_read_routes6_windows() @@ -201,7 +210,7 @@ def test_show_interfaces(mock_stdout): if not l.strip(): continue try: - int(l[0]) + int(l[:2]) except: sys.stderr.write(l) return False diff --git a/test/regression.uts b/test/regression.uts index 9a614020717956c1a34acf39842b7628bc2e0a7f..953f4dbb38bffc1506cf56dd2c6fb7a1bc81fdeb 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -11,6 +11,8 @@ * Dump the current configuration conf +IP().src + = List layers ~ conf command ls() @@ -110,6 +112,15 @@ else: conf.route6.ifchange(LOOPBACK_NAME, "::1/128") True += UTscapy route check +* Check that UTscapy has correctly replaced the routes. Many tests won't work otherwise + +if WINDOWS: + route_add_loopback() + +IP().src +assert _ == "127.0.0.1" + ############ ############ + Main.py tests