diff --git a/scapy/automaton.py b/scapy/automaton.py index ca1dbac7071c990554fb2cacec46343448496fbf..7419aa9151287fc9dd0cb1529cef6917bf976aac 100644 --- a/scapy/automaton.py +++ b/scapy/automaton.py @@ -11,7 +11,7 @@ from __future__ import absolute_import import types,itertools,time,os,sys,socket,traceback from select import select from collections import deque -import threading +import threading, thread from scapy.config import conf from scapy.utils import do_graph from scapy.error import log_interactive @@ -21,17 +21,162 @@ from scapy.supersocket import SuperSocket from scapy.consts import WINDOWS import scapy.modules.six as six -class ObjectPipe: +""" In Windows, select.select is not available for custom objects. Here's the implementation of scapy to re-create this functionnality +# Passive way: using no-ressources locks + +---------+ +---------------+ +-------------------------+ + | Start +------------->Select_objects +----->+Linux: call select.select| + +---------+ |(select.select)| +-------------------------+ + +-------+-------+ + | + +----v----+ +--------+ + | Windows | |Time Out+----------------------------------+ + +----+----+ +----+---+ | + | ^ | + Event | | | + + | | | + | +-------v-------+ | | + | +------+Selectable Sel.+-----+-----------------+-----------+ | + | | +-------+-------+ | | | v +-----v-----+ ++-------v----------+ | | | | | Passive lock<-----+release_all<------+ +|Data added to list| +----v-----+ +-----v-----+ +----v-----+ v v + +-----------+ | ++--------+---------+ |Selectable| |Selectable | |Selectable| ............ | | + | +----+-----+ +-----------+ +----------+ | | + | v | | + v +----+------+ +------------------+ +-------------v-------------------+ | + +-----+------+ |wait_return+-->+ check_recv: | | | | + |call_release| +----+------+ |If data is in list| | END state: selectable returned | +---+--------+ + +-----+-------- v +-------+----------+ | | | exit door | + | else | +---------------------------------+ +---+--------+ + | + | | + | +----v-------+ | | + +--------->free -->Passive lock| | | + +----+-------+ | | + | | | + | v | + +------------------Selectable-Selector-is-advertised-that-the-selectable-is-readable---------+ +""" + +class SelectableObject: + """DEV: to implement one of those, you need to add 2 things to your object: + - add "check_recv" function + - call "self.call_release" once you are ready to be read""" + trigger = threading.Lock() + was_ended = False + def check_recv(self): + """DEV: will be called only once (at beginning) to check if the object is ready.""" + raise OSError("This method must be overwriten.") + + def _wait_non_ressources(self, callback): + """This get started as a thread, and waits for the data lock to be freed then advertise itself to the SelectableSelector using the callback""" + self.call_release() + self.trigger.acquire() + self.trigger.acquire() + if not self.was_ended: + callback(self) + + def wait_return(self, callback): + """Entry point of SelectableObject: register the callback""" + if self.check_recv(): + return callback(self) + threading.Thread(target=self._wait_non_ressources, args=(callback,)).start() + + def call_release(self, arborted=False): + """DEV: Must be call when the object becomes ready to read. + Relesases the lock of _wait_non_ressources""" + self.was_ended = arborted + try: + self.trigger.release() + except thread.error: + pass + +class SelectableSelector(object): + """ + Select SelectableObject objects. + + inputs: objects to process + remain: timeout. If 0, return []. + customTypes: types of the objects that have the checkRecv function. + """ + results = None + inputs = None + available_lock = None + _ended = False + def _release_all(self): + """Releases all locks to kill all threads""" + for i in self.inputs: + i.call_release(True) + self.available_lock.release() + + def _timeout_thread(self, remain): + """Timeout before releasing every thing, if nothing was returned""" + time.sleep(remain) + if not self._ended: + self._ended = True + self._release_all() + + def _exit_door(self,_input): + """This function is passed to each SelectableObject as a callback + The SelectableObjects have to call it once there are ready""" + self.results.append(_input) + if self._ended: + return + self._ended = True + self._release_all() + + def __init__(self, inputs, remain): + self.results = [] + self.inputs = list(inputs) + self.remain = remain + self.available_lock = threading.Lock() + self.available_lock.acquire() + self._ended = False + + def process(self): + """Entry point of SelectableSelector""" + if WINDOWS: + for i in self.inputs: + if not isinstance(i, SelectableObject): + warning("Unknown ignored object type: " + type(i)) + elif not self.remain and i.check_recv(): + self.results.append(i) + else: + i.wait_return(self._exit_door) + if not self.remain: + return self.results + + threading.Thread(target=self._timeout_thread, args=(self.remain,)).start() + if not self._ended: + self.available_lock.acquire() + return self.results + else: + r,_,_ = select(self.inputs,[],[],self.remain) + return r + +def select_objects(inputs, remain): + """ + Select SelectableObject objects. Same than: + select.select([inputs], [], [], remain) + But also works on Windows, only on SelectableObject. + + inputs: objects to process + remain: timeout. If 0, return []. + customTypes: types of the objects that have the checkRecv function. + """ + handler = SelectableSelector(inputs, remain) + return handler.process() + +class ObjectPipe(SelectableObject): def __init__(self): self.rd,self.wr = os.pipe() self.queue = deque() def fileno(self): return self.rd - def checkRecv(self): + def check_recv(self): return len(self.queue) > 0 def send(self, obj): self.queue.append(obj) os.write(self.wr,"X") + self.call_release() def write(self, obj): self.send(obj) def recv(self, n=0): @@ -325,37 +470,6 @@ class Automaton_metaclass(type): s += '\t"%s" -> "%s" [label="%s",color=blue];\n' % (k,n,l) s += "}\n" return do_graph(s, **kargs) - -def select_objects(inputs, remain, customTypes=()): - """ - Select object that have checkRecv function. - inputs: objects to process - remain: timeout. If 0, return []. - customTypes: types of the objects that have the checkRecv function. - """ - if WINDOWS: - r = [] - def look_for_select(): - for fd in list(inputs): - if isinstance(fd, (ObjectPipe, Automaton._IO_fdwrapper) + customTypes): - if fd.checkRecv(): - r.append(fd) - else: - raise OSError("Not supported type of socket:" + str(type(fd))) - break - def search_select(): - while len(r) == 0: - look_for_select() - if remain == 0: - look_for_select() - return r - t_select = threading.Thread(target=search_select) - t_select.start() - t_select.join(remain) - return r - else: - r,_,_ = select(inputs,[],[],remain) - return r class Automaton(six.with_metaclass(Automaton_metaclass)): def parse_args(self, debug=0, store=1, **kargs): @@ -371,7 +485,7 @@ class Automaton(six.with_metaclass(Automaton_metaclass)): ## Utility classes and exceptions - class _IO_fdwrapper: + class _IO_fdwrapper(SelectableObject): def __init__(self,rd,wr): if WINDOWS: # rd will be used for reading and sending @@ -388,7 +502,7 @@ class Automaton(six.with_metaclass(Automaton_metaclass)): self.wr = wr def fileno(self): return self.rd - def checkRecv(self): + def check_recv(self): return self.rd.checkRecv() def read(self, n=65535): if WINDOWS: @@ -396,7 +510,8 @@ class Automaton(six.with_metaclass(Automaton_metaclass)): return os.read(self.rd, n) def write(self, msg): if WINDOWS: - return self.rd.send(msg) + self.rd.send(msg) + return self.call_release() return os.write(self.wr,msg) def recv(self, n=65535): return self.read(n) diff --git a/scapy/pipetool.py b/scapy/pipetool.py index 3da17afde9dbfb4d3412edd248a3d849fd8db176..ba4a0f0329a1f53fb4248ad01f5bc8cff66ae96e 100644 --- a/scapy/pipetool.py +++ b/scapy/pipetool.py @@ -15,7 +15,7 @@ import scapy.modules.six as six from threading import Lock, Thread import scapy.utils -from scapy.automaton import Message, select_objects +from scapy.automaton import Message, select_objects, SelectableObject from scapy.consts import WINDOWS from scapy.error import log_interactive, warning from scapy.config import conf @@ -23,7 +23,7 @@ from scapy.utils import get_temp_file, do_graph import scapy.arch -class PipeEngine: +class PipeEngine(SelectableObject): pipes = {} @classmethod def list_pipes(cls): @@ -48,9 +48,9 @@ class PipeEngine: self._add_pipes(*pipes) self.thread_lock = Lock() self.command_lock = Lock() - self.__fd_queue = [] + self.__fd_queue = collections.deque() self.__fdr,self.__fdw = os.pipe() - self.threadid = None + self.thread = None def __getattr__(self, attr): if attr.startswith("spawn_"): dname = attr[6:] @@ -63,7 +63,7 @@ class PipeEngine: return f raise AttributeError(attr) - def checkRecv(self): + def check_recv(self): """As select.select is not available, we check if there is some data to read by using a list that stores pointers.""" return len(self.__fd_queue) > 0 @@ -72,12 +72,13 @@ class PipeEngine: return self.__fdr def _read_cmd(self): - self.__fd_queue.pop() - return os.read(self.__fdr,1) + os.read(self.__fdr,1) + return self.__fd_queue.popleft() def _write_cmd(self, _cmd): - os.write(self.__fdw, _cmd) - self.__fd_queue.append("X") + self.__fd_queue.append(_cmd) + os.write(self.__fdw, "X") + self.call_release() def add_one_pipe(self, pipe): self.active_pipes.add(pipe) @@ -119,7 +120,7 @@ class PipeEngine: RUN=True STOP_IF_EXHAUSTED = False while RUN and (not STOP_IF_EXHAUSTED or len(sources) > 1): - fds = select_objects(sources, 2, customTypes=(AutoSource, PipeEngine)) + fds = select_objects(sources, 2) for fd in fds: if fd is self: cmd = self._read_cmd() @@ -156,7 +157,7 @@ class PipeEngine: if self.thread_lock.acquire(0): _t = Thread(target=self.run) _t.start() - self.threadid = _t.ident + self.thread = _t else: warning("Pipe engine already running") def wait_and_stop(self): @@ -164,11 +165,13 @@ class PipeEngine: def stop(self, _cmd="X"): try: with self.command_lock: - if self.threadid is not None: + if self.thread is not None: self._write_cmd(_cmd) - while not self.thread_lock.acquire(0): - time.sleep(0.01) # interruptible wait for thread to terminate - self.thread_lock.release() # (not using .join() because it needs 'threading' module) + self.thread.join() + try: + self.thread_lock.release() + except: + pass else: warning("Pipe engine thread not running") except KeyboardInterrupt: @@ -177,7 +180,7 @@ class PipeEngine: def add(self, *pipes): pipes = self._add_pipes(*pipes) with self.command_lock: - if self.threadid is not None: + if self.thread is not None: for p in pipes: p.start() self._write_cmd("A") @@ -307,7 +310,7 @@ class Pipe(_ConnectorLogic): s += ct.punct(">") return s -class Source(Pipe): +class Source(Pipe, SelectableObject): def __init__(self, name=None): Pipe.__init__(self, name=name) self.is_exhausted = False @@ -318,7 +321,7 @@ class Source(Pipe): self._send(msg) def fileno(self): return None - def checkRecv(self): + def check_recv(self): return False def exhausted(self): return self.is_exhausted @@ -355,14 +358,14 @@ class Sink(Pipe): pass -class AutoSource(Source): +class AutoSource(Source, SelectableObject): def __init__(self, name=None): Source.__init__(self, name=name) self.__fdr,self.__fdw = os.pipe() self._queue = collections.deque() def fileno(self): return self.__fdr - def checkRecv(self): + def check_recv(self): return len(self._queue) > 0 def _gen_data(self, msg): self._queue.append((msg,False)) @@ -371,7 +374,7 @@ class AutoSource(Source): self._queue.append((msg,True)) self._wake_up() def _wake_up(self): - os.write(self.__fdw,"x") + os.write(self.__fdw,"X") def deliver(self): os.read(self.__fdr,1) try: @@ -379,6 +382,7 @@ class AutoSource(Source): except IndexError: #empty queue. Exhausted source pass else: + self.call_release() if high: self._high_send(msg) else: @@ -504,6 +508,7 @@ class TermSink(Sink): if not self.opened: self.opened = True self.__f = get_temp_file() + open(self.__f, "a").close() self.name = "Scapy" if self.name is None else self.name # Start a powershell in a new window and print the PID cmd = "$app = Start-Process PowerShell -ArgumentList '-command &{$host.ui.RawUI.WindowTitle=\\\"%s\\\";Get-Content \\\"%s\\\" -wait}' -passthru; echo $app.Id" % (self.name, self.__f.replace("\\", "\\\\")) diff --git a/test/pipetool.uts b/test/pipetool.uts index 9e0c67c53e9e5389883a5784f0fee089bb622efd..06a87ce33440946dc78e3479f9052deb9fa5c9ce 100644 --- a/test/pipetool.uts +++ b/test/pipetool.uts @@ -65,7 +65,6 @@ c = TestSink(name="c") s > d1 > c p.add(s) -time.sleep(1) p.wait_and_stop() assert test_val == "hello"