diff --git a/scapy/automaton.py b/scapy/automaton.py index a10f02292f7b7dbd8a083764be06a961d8fe27d7..5cf02f56f0396cdbecebad1ccae7b3269dc1f6b1 100644 --- a/scapy/automaton.py +++ b/scapy/automaton.py @@ -324,12 +324,18 @@ class Automaton_metaclass(type): s += "}\n" return do_graph(s, **kargs) -def select_objects(inputs, remain): +def select_objects(inputs, remain, customTypes=()): + """ + Select object that have checkRecv function. + inputs: objects to process + remain: timeout. If 0, return []. + customTypes: types of the objects that have the checkRecv function. + """ if WINDOWS: r = [] def look_for_select(): - for fd in inputs: - if isinstance(fd, ObjectPipe) or isinstance(fd, Automaton._IO_fdwrapper): + for fd in list(inputs): + if isinstance(fd, (ObjectPipe, Automaton._IO_fdwrapper) + customTypes): if fd.checkRecv(): r.append(fd) else: diff --git a/scapy/pipetool.py b/scapy/pipetool.py index 03d452e987a5c84c1e03523f12cd0b1b89656a00..5f86d73d60f7777ca4e8e782a4ebd3408e2443ef 100644 --- a/scapy/pipetool.py +++ b/scapy/pipetool.py @@ -5,16 +5,21 @@ ## Copyright (C) Philippe Biondi <phil@secdev.org> ## This program is published under a GPLv2 license -import os, thread, select +import os import subprocess import itertools import collections import time import Queue -import scapy.utils +from threading import Lock, Thread +from scapy.automaton import Message, select_objects +from scapy.consts import WINDOWS from scapy.error import log_interactive, warning from scapy.config import conf +from scapy.utils import get_temp_file, do_graph + +import scapy.arch class PipeEngine: pipes = {} @@ -39,8 +44,9 @@ class PipeEngine: self.active_drains = set() self.active_sinks = set() self._add_pipes(*pipes) - self.thread_lock = thread.allocate_lock() - self.command_lock = thread.allocate_lock() + self.thread_lock = Lock() + self.command_lock = Lock() + self.__fd_queue = [] self.__fdr,self.__fdw = os.pipe() self.threadid = None def __getattr__(self, attr): @@ -55,6 +61,22 @@ class PipeEngine: return f raise AttributeError(attr) + def checkRecv(self): + """As select.select is not available, we check if there + is some data to read by using a list that stores pointers.""" + return len(self.__fd_queue) > 0 + + def fileno(self): + return self.__fdr + + def _read_cmd(self): + self.__fd_queue.pop() + return os.read(self.__fdr,1) + + def _write_cmd(self, _cmd): + os.write(self.__fdw, _cmd) + self.__fd_queue.append("X") + def add_one_pipe(self, pipe): self.active_pipes.add(pipe) if isinstance(pipe, Source): @@ -90,15 +112,15 @@ class PipeEngine: for p in self.active_pipes: p.start() sources = self.active_sources - sources.add(self.__fdr) + sources.add(self) exhausted = set([]) RUN=True STOP_IF_EXHAUSTED = False while RUN and (not STOP_IF_EXHAUSTED or len(sources) > 1): - fds,fdo,fde=select.select(sources,[],[]) + fds = select_objects(sources, 2, customTypes=(AutoSource, PipeEngine)) for fd in fds: - if fd is self.__fdr: - cmd = os.read(self.__fdr,1) + if fd is self: + cmd = self._read_cmd() if cmd == "X": RUN=False break @@ -106,7 +128,7 @@ class PipeEngine: STOP_IF_EXHAUSTED = True elif cmd == "A": sources = self.active_sources-exhausted - sources.add(self.__fdr) + sources.add(self) else: warning("Unknown internal pipe engine command: %r. Ignoring." % cmd) elif fd in sources: @@ -130,7 +152,9 @@ class PipeEngine: def start(self): if self.thread_lock.acquire(0): - self.threadid = thread.start_new_thread(self.run,()) + _t = Thread(target=self.run) + _t.start() + self.threadid = _t.ident else: warning("Pipe engine already running") def wait_and_stop(self): @@ -139,7 +163,7 @@ class PipeEngine: try: with self.command_lock: if self.threadid is not None: - os.write(self.__fdw, _cmd) + self._write_cmd(_cmd) while not self.thread_lock.acquire(0): time.sleep(0.01) # interruptible wait for thread to terminate self.thread_lock.release() # (not using .join() because it needs 'threading' module) @@ -154,7 +178,7 @@ class PipeEngine: if self.threadid is not None: for p in pipes: p.start() - os.write(self.__fdw, "A") + self._write_cmd("A") def graph(self,**kargs): g=['digraph "pipe" {',"\tnode [shape=rectangle];",] @@ -177,7 +201,7 @@ class PipeEngine: g.append('\t"%i" -> "%i";' % (id(p), id(q))) g.append('}') graph = "\n".join(g) - scapy.utils.do_graph(graph, **kargs) + do_graph(graph, **kargs) class _ConnectorLogic(object): @@ -286,13 +310,14 @@ class Source(Pipe): Pipe.__init__(self, name=name) self.is_exhausted = False def _read_message(self): - from scapy.automaton import Message return Message() def deliver(self): msg = self._read_message self._send(msg) def fileno(self): return None + def checkRecv(self): + return False def exhausted(self): return self.is_exhausted def start(self): @@ -335,6 +360,8 @@ class AutoSource(Source): self._queue = collections.deque() def fileno(self): return self.__fdr + def checkRecv(self): + return len(self._queue) > 0 def _gen_data(self, msg): self._queue.append((msg,False)) self._wake_up() @@ -363,7 +390,7 @@ class ThreadGenSource(AutoSource): pass def start(self): self.RUN = True - thread.start_new_thread(self.generate,()) + Thread(target=self.generate).start() def stop(self): self.RUN = False @@ -393,14 +420,15 @@ class RawConsoleSink(Sink): def __init__(self, name=None, newlines=True): Sink.__init__(self, name=name) self.newlines = newlines + self._write_pipe = 1 def push(self, msg): if self.newlines: msg += "\n" - os.write(1, str(msg)) + os.write(self._write_pipe, str(msg)) def high_push(self, msg): if self.newlines: msg += "\n" - os.write(1, str(msg)) + os.write(self._write_pipe, str(msg)) class CLIFeeder(AutoSource): """Send messages from python command line @@ -470,8 +498,19 @@ class TermSink(Sink): self.opened = False if self.openearly: self.start() - - def start(self): + def _start_windows(self): + if not self.opened: + self.opened = True + self.__f = get_temp_file() + self.name = "Scapy" if self.name is None else self.name + # Start a powershell in a new window and print the PID + cmd = "$app = Start-Process PowerShell -ArgumentList '-command &{$host.ui.RawUI.WindowTitle=\\\"%s\\\";Get-Content \\\"%s\\\" -wait}' -passthru; echo $app.Id" % (self.name, self.__f.replace("\\", "\\\\")) + _p = subprocess.Popen([conf.prog.powershell, cmd], stdout=subprocess.PIPE) + _output, _stderr = _p.communicate() + # This is the process PID + self.__p = int(_output) + print("PID:" + str(self.__p)) + def _start_unix(self): if not self.opened: self.opened = True self.__r,self.__w = os.pipe() @@ -481,19 +520,43 @@ class TermSink(Sink): if self.keepterm: cmd.append("-hold") cmd.extend(["-e", "cat 0<&%i" % self.__r]) - self.__p = subprocess.Popen(cmd) + self.__p = subprocess.Popen(cmd, shell=True, executable="/bin/bash") os.close(self.__r) - def stop(self): + def start(self): + if WINDOWS: + return self._start_windows() + else: + return self._start_unix() + def _stop_windows(self): + if not self.keepterm: + self.opened = False + # Recipe to kill process with PID + # http://code.activestate.com/recipes/347462-terminating-a-subprocess-on-windows/ + import ctypes + PROCESS_TERMINATE = 1 + handle = ctypes.windll.kernel32.OpenProcess(PROCESS_TERMINATE, False, self.__p) + ctypes.windll.kernel32.TerminateProcess(handle, -1) + ctypes.windll.kernel32.CloseHandle(handle) + def _stop_unix(self): if not self.keepterm: self.opened = False os.close(self.__w) self.__p.kill() self.__p.wait() + def stop(self): + if WINDOWS: + return self._stop_windows() + else: + return self._stop_unix() def _print(self, s): if self.newlines: s+="\n" - os.write(self.__w, s) - + if WINDOWS: + self.__w = open(self.__f, "a") + self.__w.write(s) + self.__w.close() + else: + os.write(self.__w, s) def push(self, msg): self._print(str(msg)) def high_push(self, msg): @@ -564,27 +627,3 @@ class DownDrain(Drain): pass def high_push(self, msg): self._send(msg) - - -def _testmain(): - s = PeriodicSource("hello", 1, name="src") - d1 = Drain(name="d1") - c = ConsoleSink(name="c") - tf = TransformDrain(lambda x:"Got %r" % x) - t = TermSink(name="t", keepterm=False) - - s > d1 > c - d1 > tf > t - - p = PipeEngine(s) - - p.graph(type="png",target="> /tmp/pipe.png") - - p.start() - print p.threadid - time.sleep(5) - p.stop() - - -if __name__ == "__main__": - _testmain() diff --git a/scapy/utils.py b/scapy/utils.py index f20888642a90388d1070d826b58e47e3aa9a5eb5..d24a2523ad7b2040a135960eef173005e4eb641c 100644 --- a/scapy/utils.py +++ b/scapy/utils.py @@ -436,7 +436,10 @@ def do_graph(graph,prog=None,format=None,target=None,type=None,string=None,optio format = "-T %s" % format w,r = os.popen2("%s %s %s %s" % (prog,options or "", format or "", target)) w.write(graph) - w.close() + try: + w.close() + except IOError: + pass if start_viewer: # Workaround for file not found error: We wait until tempfile is written. waiting_start = time.time() diff --git a/test/pipetool.uts b/test/pipetool.uts new file mode 100644 index 0000000000000000000000000000000000000000..5b0a7103babbc9dc4f158028d7862bea00d121eb --- /dev/null +++ b/test/pipetool.uts @@ -0,0 +1,219 @@ +######################## +% Pipetool related tests +######################## + ++ Basic tests + += Test default test case + +s = PeriodicSource("hello", 1, name="src") +d1 = Drain(name="d1") +c = ConsoleSink(name="c") +tf = TransformDrain(lambda x:"Got %r" % x) +t = TermSink(name="t", keepterm=False) +s > d1 > c +d1 > tf > t + +p = PipeEngine(s) +p.graph(type="png",target="> /tmp/pipe.png") +p.start() +time.sleep(3) +p.stop() + += Test add_pipe + +s = AutoSource() +p = PipeEngine(s) +p.add(Pipe()) +assert len(p.active_pipes) == 2 + +x = p.spawn_Pipe() +assert len(p.active_pipes) == 3 +assert isinstance(x, Pipe) + += Test exhausted source + +s = AutoSource() +s._gen_data("hello") +s.is_exhausted = True +d1 = Drain(name="d1") +c = ConsoleSink(name="c") +s > d1 > c + +p = PipeEngine(s) +p.start() +time.sleep(1) +p.wait_and_stop() + += Test add_pipe on running instance + +test_val = None + +class TestSink(Sink): + def push(self, msg): + global test_val + test_val = msg + +p = PipeEngine() +p.start() + +s = AutoSource() +s._gen_data("hello") +s.is_exhausted = True + +d1 = Drain(name="d1") +c = TestSink(name="c") +s > d1 > c + +p.add(s) +time.sleep(1) +p.wait_and_stop() +assert test_val == "hello" + += Test Operators + +s = AutoSource() +p = PipeEngine(s) +assert p == p +assert not p < p +assert not p > p + +a = AutoSource() +b = AutoSource() +a >> b +assert len(a.high_sinks) == 1 +assert len(a.high_sources) == 0 +assert len(b.high_sinks) == 0 +assert len(b.high_sources) == 1 +a +b + +a = AutoSource() +b = AutoSource() +a << b +assert len(a.high_sinks) == 0 +assert len(a.high_sources) == 1 +assert len(b.high_sinks) == 1 +assert len(b.high_sources) == 0 +a +b + +a = AutoSource() +b = AutoSource() +a == b +assert len(a.sinks) == 1 +assert len(a.sources) == 1 +assert len(b.sinks) == 1 +assert len(b.sources) == 1 + +a = AutoSource() +b = AutoSource() +a//b +assert len(a.high_sinks) == 1 +assert len(a.high_sources) == 1 +assert len(b.high_sinks) == 1 +assert len(b.high_sources) == 1 + +a = AutoSource() +b = AutoSource() +a^b +assert len(b.trigger_sources) == 1 +assert len(a.trigger_sinks) == 1 + += Test doc + +s = AutoSource() +p = PipeEngine(s) +p.list_pipes() +p.list_pipes_detailed() + += Test RawConsoleSink with CLIFeeder + +p = PipeEngine() +p.start() + +s = CLIFeeder() +s.send("hello") +s.is_exhausted = True + +r, w = os.pipe() + +d1 = Drain(name="d1") +c = RawConsoleSink(name="c") +c._write_pipe = w +s > d1 > c + +p.add(s) +time.sleep(1) +p.wait_and_stop() +assert os.read(r, 20) == "hello\n" + += Test QueueSink with CLIFeeder + +p = PipeEngine() +p.start() + +s = CLIFeeder() +s.send("hello") +s.is_exhausted = True + +d1 = Drain(name="d1") +c = QueueSink(name="c") +s > d1 > c + +p.add(s) +time.sleep(1) +p.wait_and_stop() +assert c.recv() == "hello" + += Test UpDrain + +test_val = None + +class TestSink(Sink): + def high_push(self, msg): + global test_val + test_val = msg + +p = PipeEngine() +p.start() + +s = CLIFeeder() +s.send("hello") +s.is_exhausted = True + +d1 = UpDrain(name="d1") +c = TestSink(name="c") +s > d1 +d1 >> c + +p.add(s) +time.sleep(1) +p.wait_and_stop() +assert test_val == "hello" + += Test DownDrain + +test_val = None + +class TestSink(Sink): + def push(self, msg): + global test_val + test_val = msg + +p = PipeEngine() +p.start() + +s = CLIHighFeeder() +s.send("hello") +s.is_exhausted = True + +d1 = DownDrain(name="d1") +c = TestSink(name="c") +s >> d1 +d1 > c + +p.add(s) +time.sleep(1) +p.wait_and_stop() +assert test_val == "hello"