diff --git a/appveyor.yml b/appveyor.yml index 29131bf7154629e733130fb598104b35b55ede4b..d634de0a519f972df67cb014dc06197706456a3e 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -27,7 +27,7 @@ test_script: - set PATH="%APPVEYOR_BUILD_FOLDER%;C:\Program Files\Wireshark\;%PATH%" # Main unit tests - - "%PYTHON%\\python -m coverage run -a bin\\UTscapy -f text -t test\\regression.uts -F -K automaton -K mock_read_routes6_bsd -K fork || exit /b 42" + - "%PYTHON%\\python -m coverage run -a bin\\UTscapy -f text -t test\\regression.uts -F -K automaton -K mock_read_routes6_bsd || exit /b 42" - 'del test\regression.uts' # Secondary unit tests diff --git a/test/regression.uts b/test/regression.uts index cf3051ccb0995b92776e454d458d1ad6667bed48..10d09fcc58e666dc022a5a9678c299e4cd49100b 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -689,26 +689,50 @@ conf.route.route("0.0.0.0")[2] arping(_+"/24") = send() and sniff() -~ netaccess fork +~ netaccess import time import os -def send_and_sniff(pkt, timeout=2, flt=None, iface=None): - """Send a packet, sniff, and check the packet has been seen""" - pid = os.fork() +def _send_or_sniff(pkt, timeout, flt, pid, fork, t_other=None): assert pid != -1 if pid == 0: time.sleep(1) - (sendp if isinstance(pkt, (Ether, Dot3)) else send)(pkt, iface=iface) - os._exit(0) + (sendp if isinstance(pkt, (Ether, Dot3)) else send)(pkt) + if fork: + os._exit(0) + else: + return else: spkt = str(pkt) pkts = sniff( - timeout=timeout, iface=iface, filter=flt, + timeout=timeout, filter=flt, stop_filter=lambda p: pkt.__class__ in p and str(p[pkt.__class__]) == spkt ) - os.waitpid(pid, 0) + if fork: + os.waitpid(pid, 0) + else: + t_other.join() assert str(pkt) in (str(p[pkt.__class__]) for p in pkts if pkt.__class__ in p) +def send_and_sniff(pkt, timeout=2, flt=None): + """Send a packet, sniff, and check the packet has been seen""" + if hasattr(os, "fork"): + _send_or_sniff(pkt, timeout, flt, os.fork(), True) + else: + from threading import Thread + def run_function(pkt, timeout, flt, pid, thread, results): + _send_or_sniff(pkt, timeout, flt, pid, False, thread) + results.put(True) + results = Queue.Queue() + t_parent = Thread(target=run_function, args=(pkt, timeout, flt, 0, None, results)) + t_child = Thread(target=run_function, args=(pkt, timeout, flt, 1, t_parent, results)) + t_parent.start() + t_child.start() + t_parent.join() + t_child.join() + assert results.qsize() >= 2 + while not results.empty(): + assert results.get() + send_and_sniff(IP(dst="secdev.org")/ICMP()) send_and_sniff(IP(dst="secdev.org")/ICMP(), flt="icmp") send_and_sniff(Ether()/IP(dst="secdev.org")/ICMP())