diff --git a/scapy/supersocket.py b/scapy/supersocket.py index 81feec212db0d9e752a6614270630dc5e579d974..a0cf3796081561219e2c3b69417d3a8fd78a1b66 100644 --- a/scapy/supersocket.py +++ b/scapy/supersocket.py @@ -8,14 +8,18 @@ SuperSocket. """ from __future__ import absolute_import -import socket,time +import socket +import subprocess +import struct +import time from scapy.config import conf +from scapy.consts import LINUX, OPENBSD, BSD from scapy.data import * from scapy.error import warning, log_runtime +import scapy.modules.six as six import scapy.packet from scapy.utils import PcapReader, tcpdump -import scapy.modules.six as six class _SuperSocket_metaclass(type): def __repr__(self): @@ -201,5 +205,123 @@ class L2ListenTcpdump(SuperSocket): return self.ins.recv(x) +class TunTapInterface(SuperSocket): + """A socket to act as the host's peer of a tun / tap interface. The +interface can also be created, using `create=True`. + + """ + desc = "Act as the host's peer of a tun / tap interface" + + def __init__(self, iface=None, create=False, mode_tun=None, *arg, **karg): + self.iface = conf.iface if iface is None else iface + self.mode_tun = ("tun" in iface) if mode_tun is None else mode_tun + self.created = False + if create: + self.create() + self.closed = True + self.open() + + def __enter__(self): + return self + + def __del__(self): + self.close() + self.delete() + + def __exit__(self, *_): + pass + + def create(self): + """Create the TUN or TAP interface, using system commands (ip on +Linux, ifconfig on *BSD & Darwin). + + """ + if self.created: + return + if LINUX: + for cmd in [ + ["ip", "tuntap", "add", "mode", + "tun" if self.mode_tun else "tap", "name", self.iface], + ["ip", "link", "set", self.iface, "up"], + ]: + subprocess.check_call(cmd) + elif OPENBSD and not self.mode_tun: + subprocess.check_call(["ifconfig", self.iface, "create", "link0"]) + elif BSD: + subprocess.check_call(["ifconfig", self.iface, "create"]) + else: + raise RuntimeError("Don't know how to create %s" % self.iface) + self.created = True + + def delete(self): + """Destroy the TUN or TAP interface, using system commands (ip on +Linux, ifconfig on *BSD & Darwin). + + """ + if not self.created: + return + if LINUX: + subprocess.check_call(["ip", "tuntap", "del", "mode", + "tun" if self.mode_tun else "tap", "name", + self.iface]) + elif BSD: + subprocess.check_call(["ifconfig", self.iface, "destroy"]) + self.created = False + + def open(self): + """Open the TUN or TAP device.""" + if not self.closed: + return + self.outs = self.ins = open( + "/dev/net/tun" if LINUX else ("/dev/%s" % self.iface), "r+b", + ) + if LINUX: + from fcntl import ioctl + # TUNSETIFF = 0x400454ca + # IFF_TUN = 0x0001 + # IFF_TAP = 0x0002 + # IFF_NO_PI = 0x1000 + ioctl(self.ins, 0x400454ca, struct.pack( + "16sH", self.iface, 0x0001 if self.mode_tun else 0x1002, + )) + self.closed = False + + def __call__(self, *arg, **karg): + """Needed when using an instantiated TunTapInterface object for +conf.L2listen, conf.L2socket or conf.L3socket. + + """ + return self + + def recv(self, x=MTU): + if self.mode_tun: + data = os.read(self.ins.fileno(), x + 4) + proto = struct.unpack('!H', data[2:4])[0] + return conf.l3types.get(proto, conf.raw_layer)(data[4:]) + return conf.l2types.get(1, conf.raw_layer)( + os.read(self.ins.fileno(), x) + ) + + def send(self, x): + sx = str(x) + if hasattr(x, "sent_time"): + x.sent_time = time.time() + if self.mode_tun: + try: + proto = conf.l3types[type(x)] + except KeyError: + log_runtime.warning( + "Cannot find layer 3 protocol value to send %s in " + "conf.l3types, using 0", + x.name if hasattr(x, "name") else type(x).__name__ + ) + proto = 0 + sx = struct.pack('!HH', 0, proto) + sx + try: + os.write(self.outs.fileno(), sx) + except socket.error: + log_runtime.error("%s send", self.__class__.__name__, exc_info=True) + + if conf.L3socket is None: conf.L3socket = L3RawSocket