Skip to content
Snippets Groups Projects
Commit be152458 authored by Pierre LALET's avatar Pierre LALET
Browse files

Adapt send_and_sniff() test function to work under Windows

Thanks @gpotter2
parent 4da93c9a
No related branches found
No related tags found
No related merge requests found
......@@ -27,7 +27,7 @@ test_script:
- set PATH="%APPVEYOR_BUILD_FOLDER%;C:\Program Files\Wireshark\;%PATH%"
# Main unit tests
- "%PYTHON%\\python -m coverage run -a bin\\UTscapy -f text -t test\\regression.uts -F -K automaton -K mock_read_routes6_bsd -K fork || exit /b 42"
- "%PYTHON%\\python -m coverage run -a bin\\UTscapy -f text -t test\\regression.uts -F -K automaton -K mock_read_routes6_bsd || exit /b 42"
- 'del test\regression.uts'
# Secondary unit tests
......
......@@ -689,26 +689,50 @@ conf.route.route("0.0.0.0")[2]
arping(_+"/24")
= send() and sniff()
~ netaccess fork
~ netaccess
import time
import os
def send_and_sniff(pkt, timeout=2, flt=None, iface=None):
"""Send a packet, sniff, and check the packet has been seen"""
pid = os.fork()
def _send_or_sniff(pkt, timeout, flt, pid, fork, t_other=None):
assert pid != -1
if pid == 0:
time.sleep(1)
(sendp if isinstance(pkt, (Ether, Dot3)) else send)(pkt, iface=iface)
os._exit(0)
(sendp if isinstance(pkt, (Ether, Dot3)) else send)(pkt)
if fork:
os._exit(0)
else:
return
else:
spkt = str(pkt)
pkts = sniff(
timeout=timeout, iface=iface, filter=flt,
timeout=timeout, filter=flt,
stop_filter=lambda p: pkt.__class__ in p and str(p[pkt.__class__]) == spkt
)
os.waitpid(pid, 0)
if fork:
os.waitpid(pid, 0)
else:
t_other.join()
assert str(pkt) in (str(p[pkt.__class__]) for p in pkts if pkt.__class__ in p)
def send_and_sniff(pkt, timeout=2, flt=None):
"""Send a packet, sniff, and check the packet has been seen"""
if hasattr(os, "fork"):
_send_or_sniff(pkt, timeout, flt, os.fork(), True)
else:
from threading import Thread
def run_function(pkt, timeout, flt, pid, thread, results):
_send_or_sniff(pkt, timeout, flt, pid, False, thread)
results.put(True)
results = Queue.Queue()
t_parent = Thread(target=run_function, args=(pkt, timeout, flt, 0, None, results))
t_child = Thread(target=run_function, args=(pkt, timeout, flt, 1, t_parent, results))
t_parent.start()
t_child.start()
t_parent.join()
t_child.join()
assert results.qsize() >= 2
while not results.empty():
assert results.get()
send_and_sniff(IP(dst="secdev.org")/ICMP())
send_and_sniff(IP(dst="secdev.org")/ICMP(), flt="icmp")
send_and_sniff(Ether()/IP(dst="secdev.org")/ICMP())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment