Skip to content
Snippets Groups Projects
Commit 9a4d5569 authored by gpotter2's avatar gpotter2
Browse files

Add ScapyPipes tests

parent f07cdad3
No related branches found
No related tags found
No related merge requests found
......@@ -28,6 +28,12 @@ except ImportError:
else:
THREAD_EXCEPTION = thread.error
if WINDOWS:
from scapy.arch.pcapdnet import PcapTimeoutElapsed
recv_error = PcapTimeoutElapsed
else:
recv_error = ()
""" In Windows, select.select is not available for custom objects. Here's the implementation of scapy to re-create this functionnality
# Passive way: using no-ressources locks
+---------+ +---------------+ +-------------------------+
......@@ -350,7 +356,11 @@ class _ATMT_supersocket(SuperSocket):
s = str(s)
return self.spa.send(s)
def recv(self, n=MTU):
r = self.spa.recv(n)
try:
r = self.spa.recv(n)
except recv_error:
if not WINDOWS:
raise
if self.proto is not None:
r = self.proto(r)
return r
......
......@@ -9,7 +9,7 @@ import Queue
from scapy.pipetool import Source,Drain,Sink
from scapy.config import conf
from scapy.utils import PcapReader, PcapWriter
from scapy.automaton import recv_error
class SniffSource(Source):
"""Read packets from an interface and send them to low exit.
......@@ -29,8 +29,14 @@ class SniffSource(Source):
self.s.close()
def fileno(self):
return self.s.fileno()
def check_recv(self):
return True
def deliver(self):
self._send(self.s.recv())
try:
self._send(self.s.recv())
except recv_error:
if not WINDOWS:
raise
class RdpcapSource(Source):
"""Read packets from a PCAP file send them to low exit.
......@@ -53,6 +59,8 @@ class RdpcapSource(Source):
self.f.close()
def fileno(self):
return self.f.fileno()
def check_recv(self):
return True
def deliver(self):
p = self.f.recv()
print("deliver %r" % p)
......@@ -100,6 +108,7 @@ class WrpcapSink(Sink):
self.f = PcapWriter(fname)
def stop(self):
self.f.flush()
self.f.close()
def push(self, msg):
self.f.write(msg)
......
......@@ -216,3 +216,80 @@ p.start()
p.wait_and_stop()
assert test_val == "hello"
+ Advanced ScapyPipes pipetools tests
= Test SniffSource
~ netaccess
p = PipeEngine()
s = SniffSource()
d1 = Drain(name="d1")
c = QueueSink(name="c")
s > d1 > c
p.add(s)
p.start()
sniff(count=3)
p.stop()
assert c.q.get()
= Test RdpcapSource and WrpcapSink
~ needs_root
req = Ether()/IP()/ICMP()
rpy = Ether()/IP('E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
wrpcap("t.pcap", [req, rpy])
p = PipeEngine()
s = RdpcapSource("t.pcap")
d1 = Drain(name="d1")
c = WrpcapSink("t2.pcap", name="c")
s > d1 > c
p.add(s)
p.start()
p.wait_and_stop()
results = rdpcap("t2.pcap")
assert str(results[0]) == str(req)
assert str(results[1]) == str(rpy)
os.unlink("t.pcap")
os.unlink("t2.pcap")
= Test InjectSink and Inject3Sink
~ needs_root
a = IP(dst="192.168.0.1")/ICMP()
msgs = []
class FakeSocket(object):
def __init__(self, *arg, **karg):
pass
def close(self):
pass
def send(self, msg):
global msgs
msgs.append(msg)
@mock.patch("scapy.scapypipes.conf.L2socket", FakeSocket)
@mock.patch("scapy.scapypipes.conf.L3socket", FakeSocket)
def _inject_sink(i3):
s = CLIFeeder()
s.send(a)
s.is_exhausted = True
d1 = Drain(name="d1")
c = Inject3Sink() if i3 else InjectSink()
s > d1 > c
p = PipeEngine(s)
p.start()
p.wait_and_stop()
_inject_sink(False) # InjectSink
_inject_sink(True) # Inject3Sink
assert msgs == [a,a]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment