diff --git a/scapy/sendrecv.py b/scapy/sendrecv.py index 44232487f8b1bb32a65c6eda4b086e6e075fbc74..f8ae3135e6ca851fd25dbe2c247ab3a585081c5d 100644 --- a/scapy/sendrecv.py +++ b/scapy/sendrecv.py @@ -7,16 +7,19 @@ Functions to send and receive packets. """ -from __future__ import absolute_import -from __future__ import print_function +from __future__ import absolute_import, print_function import errno -import os, sys, time, subprocess import itertools +import multiprocessing +import os from select import select, error as select_error +import subprocess +import sys +import time from scapy.consts import DARWIN, FREEBSD, OPENBSD, WINDOWS -from scapy.data import * -from scapy.config import conf +from scapy.data import ETH_P_ALL, MTU +from scapy.config import conf, CacheInstance from scapy.packet import Gen from scapy.utils import get_temp_file, PcapReader, tcpdump, wrpcap from scapy import plist @@ -78,9 +81,10 @@ def _sndrcv_snd(pks, inter, verbose, tobesent, all_stimuli, rdpipe, wrpipe): if not WINDOWS: # Change process group to avoid ctrl-C os.setpgrp() - six.moves.cPickle.dump((conf.netcache, sent_times), - wrpipe) - wrpipe.close() + netcache = {cache.name: (cache.items(), cache._timetable.items()) + for cache in conf.netcache._caches_list} + wrpipe.send((netcache, sent_times)) + wrpipe.close() except: pass @@ -113,113 +117,108 @@ def sndrcv(pks, pkt, timeout=None, inter=0, verbose=None, chainCC=False, autostop = 0 while retry >= 0: - found = 0 - if timeout < 0: timeout = None - if not WINDOWS: - rdpipe, wrpipe = os.pipe() - rdpipe = os.fdopen(rdpipe) - wrpipe = os.fdopen(wrpipe,"w") - - pid = None + rdpipe, wrpipe = multiprocessing.Pipe(False) + + proc = multiprocessing.Process( + target=_sndrcv_snd, + args=(pks, inter, verbose, tobesent, all_stimuli, rdpipe, + wrpipe), + ) + proc.start() + wrpipe.close() + inmask = [rdpipe, pks] + stoptime = 0 + remaintime = None try: - if not WINDOWS: - pid = os.fork() - if not WINDOWS and pid < 0: - log_runtime.error("fork error") - elif WINDOWS or pid == 0: - _sndrcv_snd(pks, inter, verbose, tobesent, all_stimuli, - rdpipe, wrpipe) - elif WINDOWS or pid > 0: - if not WINDOWS: - wrpipe.close() - inmask = [rdpipe, pks] - stoptime = 0 - remaintime = None - try: - try: - while True: - if stoptime: - remaintime = stoptime-time.time() - if remaintime <= 0: - break - r = None - if WINDOWS: - r = pks.recv(MTU) - elif conf.use_bpf: - from scapy.arch.bpf.supersocket import bpf_select - inp = bpf_select(inmask) - if pks in inp: - r = pks.recv() - elif not isinstance(pks, StreamSocket) and (FREEBSD or DARWIN or OPENBSD): - inp, _, _ = select(inmask, [], [], 0.05) - if len(inp) == 0 or pks in inp: - r = pks.nonblock_recv() - else: - inp = [] - try: - inp, _, _ = select(inmask, [], [], remaintime) - except (IOError, select_error) as exc: - # select.error has no .errno attribute - if exc.args[0] != errno.EINTR: - raise - if len(inp) == 0: - break - if pks in inp: - r = pks.recv(MTU) - if WINDOWS and rdpipe in inp: - if timeout: - stoptime = time.time() + timeout - del(inmask[inmask.index(rdpipe)]) - if r is None: - continue - ok = 0 - h = r.hashret() - if h in hsent: - hlst = hsent[h] - for i, sentpkt in enumerate(hlst): - if r.answers(sentpkt): - ans.append((sentpkt, r)) - if verbose > 1: - os.write(1, b"*") - ok = 1 - if not multi: - del hlst[i] - notans -= 1 - else: - if not hasattr(sentpkt, '_answered'): - notans -= 1 - sentpkt._answered = 1 - break - if notans == 0 and not multi: - break - if not ok: - if verbose > 1: - os.write(1, b".") - nbrecv += 1 - if conf.debug_match: - debug.recv.append(r) - except KeyboardInterrupt: - if chainCC: - raise - finally: - if not WINDOWS: - try: - nc, sent_times = six.moves.cPickle.load(rdpipe) - except EOFError: - warning("Child died unexpectedly. " - "Packets may have not been sent.") + try: + while True: + if stoptime: + remaintime = stoptime-time.time() + if remaintime <= 0: + break + r = None + if WINDOWS: + r = pks.recv(MTU) + elif conf.use_bpf: + from scapy.arch.bpf.supersocket import bpf_select + inp = bpf_select(inmask) + if pks in inp: + r = pks.recv() + elif not isinstance(pks, StreamSocket) and ( + FREEBSD or DARWIN or OPENBSD + ): + inp, _, _ = select(inmask, [], [], 0.05) + if len(inp) == 0 or pks in inp: + r = pks.nonblock_recv() else: - if not WINDOWS: - conf.netcache.update(nc) - for p, t in zip(all_stimuli, sent_times): - p.sent_time = t - os.waitpid(pid,0) + inp = [] + try: + inp, _, _ = select(inmask, [], [], remaintime) + except (IOError, select_error) as exc: + # select.error has no .errno attribute + if exc.args[0] != errno.EINTR: + raise + if len(inp) == 0: + break + if pks in inp: + r = pks.recv(MTU) + if rdpipe in inp: + if timeout: + stoptime = time.time() + timeout + del(inmask[inmask.index(rdpipe)]) + if r is None: + continue + ok = 0 + h = r.hashret() + if h in hsent: + hlst = hsent[h] + for i, sentpkt in enumerate(hlst): + if r.answers(sentpkt): + ans.append((sentpkt, r)) + if verbose > 1: + os.write(1, "*") + ok = 1 + if not multi: + del hlst[i] + notans -= 1 + else: + if not hasattr(sentpkt, '_answered'): + notans -= 1 + sentpkt._answered = 1 + break + if notans == 0 and not multi: + break + if not ok: + if verbose > 1: + os.write(1, ".") + nbrecv += 1 + if conf.debug_match: + debug.recv.append(r) + except KeyboardInterrupt: + if chainCC: + raise finally: - if pid == 0: - os._exit(0) + try: + netcache, sent_times = rdpipe.recv() + except EOFError: + warning("Child died unexpectedly. " + "Packets may have not been sent.") + else: + for cache_name, (cache, cache_times) in netcache.iteritems(): + cache_obj = CacheInstance(name=cache_name) + for key, value in cache: + cache_obj[key] = value + cache_obj._timetable.update(cache_times) + try: + getattr(conf.netcache, cache_name).update(cache_obj) + except AttributeError: + conf.netcache.add_cache(cache_obj) + for pkt, tstamp in zip(all_stimuli, sent_times): + pkt.sent_time = tstamp + proc.join() remain = list(itertools.chain(*six.itervalues(hsent))) if multi: @@ -245,7 +244,7 @@ def sndrcv(pks, pkt, timeout=None, inter=0, verbose=None, chainCC=False, if verbose: print("\nReceived %i packets, got %i answers, remaining %i packets" % (nbrecv+len(ans), len(ans), notans)) - return plist.SndRcvList(ans),plist.PacketList(remain,"Unanswered") + return plist.SndRcvList(ans), plist.PacketList(remain, "Unanswered") def __gen_send(s, x, inter=0, loop=0, count=None, verbose=None, realtime=None, return_packets=False, *args, **kargs): @@ -371,9 +370,9 @@ iface: listen answers only on the given interface""" if "timeout" not in kargs: kargs["timeout"] = -1 s = conf.L3socket(promisc=promisc, filter=filter, iface=iface, nofilter=nofilter) - a,b=sndrcv(s,x,*args,**kargs) + result = sndrcv(s, x, *args, **kargs) s.close() - return a,b + return result @conf.commands.register def sr1(x, promisc=None, filter=None, iface=None, nofilter=0, *args,**kargs): @@ -389,10 +388,10 @@ iface: listen answers only on the given interface""" if "timeout" not in kargs: kargs["timeout"] = -1 s=conf.L3socket(promisc=promisc, filter=filter, nofilter=nofilter, iface=iface) - a,b=sndrcv(s,x,*args,**kargs) + ans, _ = sndrcv(s, x, *args, **kargs) s.close() - if len(a) > 0: - return a[0][1] + if len(ans) > 0: + return ans[0][1] else: return None @@ -412,9 +411,9 @@ iface: work only on the given interface""" if iface is None and iface_hint is not None: iface = conf.route.route(iface_hint)[0] s = conf.L2socket(promisc=promisc, iface=iface, filter=filter, nofilter=nofilter, type=type) - a,b=sndrcv(s ,x,*args,**kargs) + result = sndrcv(s, x, *args, **kargs) s.close() - return a,b + return result @conf.commands.register def srp1(*args,**kargs): @@ -429,9 +428,9 @@ filter: provide a BPF filter iface: work only on the given interface""" if "timeout" not in kargs: kargs["timeout"] = -1 - a,b=srp(*args,**kargs) - if len(a) > 0: - return a[0][1] + ans, _ = srp(*args, **kargs) + if len(ans) > 0: + return ans[0][1] else: return None