Skip to content
Snippets Groups Projects
Commit 0bfed5c5 authored by gpotter2's avatar gpotter2
Browse files

Use passive form to select.select

parent a28da9f7
No related branches found
No related tags found
No related merge requests found
......@@ -19,17 +19,119 @@ from scapy.data import MTU
from scapy.supersocket import SuperSocket
from scapy.consts import WINDOWS
class ObjectPipe:
class SelectableObject:
trigger = threading.Lock()
was_ended = False
def check_recv(self):
"""DEV: will be called only once (at beggining) to check if the object is ready."""
return False
def _wait_non_ressources(self, callback):
self.call_release()
self.trigger.acquire()
self.trigger.acquire()
if not self.was_ended:
callback(self)
def wait_return(self, callback):
if self.check_recv():
return callback(self)
threading.Thread(target=self._wait_non_ressources, args=(callback,)).start()
def call_release(self, arborted=False):
"""DEV: Must be call when the object is ready to read."""
self.was_ended = arborted
try:
self.trigger.release()
except:
pass
class SelectableSelector(object):
"""
Select SelectableObject objects.
inputs: objects to process
remain: timeout. If 0, return [].
customTypes: types of the objects that have the checkRecv function.
"""
results = None
inputs = None
available_lock = None
_ended = False
def _release_all(self):
for i in self.inputs:
i.call_release(True)
self.available_lock.release()
def _timeout_thread(self, remain):
time.sleep(remain)
if not self._ended:
self._ended = True
self._release_all()
def _exit_door(self,_input):
self.results.append(_input)
if self._ended:
return
self._ended = True
self._release_all()
def __init__(self, inputs, remain):
self.results = []
self.inputs = list(inputs)
self.remain = remain
self.available_lock = threading.Lock()
self.available_lock.acquire()
self._ended = False
def process(self):
if WINDOWS:
if not self.remain:
for i in self.inputs:
if not isinstance(i, SelectableObject):
warning("Unknown ignored object type: " + type(i))
else:
if i.check_recv():
self.results.append(i)
return self.results
for i in self.inputs:
if not isinstance(i, SelectableObject):
warning("Unknown ignored object type: " + type(i))
else:
i.wait_return(self._exit_door)
threading.Thread(target=self._timeout_thread, args=(self.remain,)).start()
if not self._ended:
self.available_lock.acquire()
return self.results
else:
r,_,_ = select(self.inputs,[],[],self.remain)
return r
def select_objects(inputs, remain):
"""
Select SelectableObject objects.
inputs: objects to process
remain: timeout. If 0, return [].
customTypes: types of the objects that have the checkRecv function.
"""
handler = SelectableSelector(inputs, remain)
return handler.process()
class ObjectPipe(SelectableObject):
def __init__(self):
self.rd,self.wr = os.pipe()
self.queue = deque()
def fileno(self):
return self.rd
def checkRecv(self):
def check_recv(self):
return len(self.queue) > 0
def send(self, obj):
self.queue.append(obj)
os.write(self.wr,"X")
self.call_release()
def write(self, obj):
self.send(obj)
def recv(self, n=0):
......@@ -323,37 +425,6 @@ class Automaton_metaclass(type):
s += '\t"%s" -> "%s" [label="%s",color=blue];\n' % (k,n,l)
s += "}\n"
return do_graph(s, **kargs)
def select_objects(inputs, remain, customTypes=()):
"""
Select object that have checkRecv function.
inputs: objects to process
remain: timeout. If 0, return [].
customTypes: types of the objects that have the checkRecv function.
"""
if WINDOWS:
r = []
def look_for_select():
for fd in list(inputs):
if isinstance(fd, (ObjectPipe, Automaton._IO_fdwrapper) + customTypes):
if fd.checkRecv():
r.append(fd)
else:
raise OSError("Not supported type of socket:" + str(type(fd)))
break
def search_select():
while len(r) == 0:
look_for_select()
if remain == 0:
look_for_select()
return r
t_select = threading.Thread(target=search_select)
t_select.start()
t_select.join(remain)
return r
else:
r,_,_ = select(inputs,[],[],remain)
return r
class Automaton:
__metaclass__ = Automaton_metaclass
......@@ -372,7 +443,7 @@ class Automaton:
## Utility classes and exceptions
class _IO_fdwrapper:
class _IO_fdwrapper(SelectableObject):
def __init__(self,rd,wr):
if WINDOWS:
# rd will be used for reading and sending
......@@ -389,7 +460,7 @@ class Automaton:
self.wr = wr
def fileno(self):
return self.rd
def checkRecv(self):
def check_recv(self):
return self.rd.checkRecv()
def read(self, n=65535):
if WINDOWS:
......@@ -397,7 +468,8 @@ class Automaton:
return os.read(self.rd, n)
def write(self, msg):
if WINDOWS:
return self.rd.send(msg)
self.rd.send(msg)
return self.call_release()
return os.write(self.wr,msg)
def recv(self, n=65535):
return self.read(n)
......
......@@ -13,7 +13,7 @@ import time
import Queue
from threading import Lock, Thread
from scapy.automaton import Message, select_objects
from scapy.automaton import Message, select_objects, SelectableObject
from scapy.consts import WINDOWS
from scapy.error import log_interactive, warning
from scapy.config import conf
......@@ -21,7 +21,7 @@ from scapy.utils import get_temp_file, do_graph
import scapy.arch
class PipeEngine:
class PipeEngine(SelectableObject):
pipes = {}
@classmethod
def list_pipes(cls):
......@@ -46,9 +46,9 @@ class PipeEngine:
self._add_pipes(*pipes)
self.thread_lock = Lock()
self.command_lock = Lock()
self.__fd_queue = []
self.__fd_queue = collections.deque()
self.__fdr,self.__fdw = os.pipe()
self.threadid = None
self.thread = None
def __getattr__(self, attr):
if attr.startswith("spawn_"):
dname = attr[6:]
......@@ -61,7 +61,7 @@ class PipeEngine:
return f
raise AttributeError(attr)
def checkRecv(self):
def check_recv(self):
"""As select.select is not available, we check if there
is some data to read by using a list that stores pointers."""
return len(self.__fd_queue) > 0
......@@ -70,12 +70,13 @@ class PipeEngine:
return self.__fdr
def _read_cmd(self):
self.__fd_queue.pop()
return os.read(self.__fdr,1)
os.read(self.__fdr,1)
return self.__fd_queue.popleft()
def _write_cmd(self, _cmd):
os.write(self.__fdw, _cmd)
self.__fd_queue.append("X")
self.__fd_queue.append(_cmd)
os.write(self.__fdw, "X")
self.call_release()
def add_one_pipe(self, pipe):
self.active_pipes.add(pipe)
......@@ -117,7 +118,7 @@ class PipeEngine:
RUN=True
STOP_IF_EXHAUSTED = False
while RUN and (not STOP_IF_EXHAUSTED or len(sources) > 1):
fds = select_objects(sources, 2, customTypes=(AutoSource, PipeEngine))
fds = select_objects(sources, 2)
for fd in fds:
if fd is self:
cmd = self._read_cmd()
......@@ -154,7 +155,7 @@ class PipeEngine:
if self.thread_lock.acquire(0):
_t = Thread(target=self.run)
_t.start()
self.threadid = _t.ident
self.thread = _t
else:
warning("Pipe engine already running")
def wait_and_stop(self):
......@@ -162,11 +163,13 @@ class PipeEngine:
def stop(self, _cmd="X"):
try:
with self.command_lock:
if self.threadid is not None:
if self.thread is not None:
self._write_cmd(_cmd)
while not self.thread_lock.acquire(0):
time.sleep(0.01) # interruptible wait for thread to terminate
self.thread_lock.release() # (not using .join() because it needs 'threading' module)
self.thread.join()
try:
self.thread_lock.release()
except:
pass
else:
warning("Pipe engine thread not running")
except KeyboardInterrupt:
......@@ -175,7 +178,7 @@ class PipeEngine:
def add(self, *pipes):
pipes = self._add_pipes(*pipes)
with self.command_lock:
if self.threadid is not None:
if self.thread is not None:
for p in pipes:
p.start()
self._write_cmd("A")
......@@ -305,7 +308,7 @@ class Pipe(_ConnectorLogic):
s += ct.punct(">")
return s
class Source(Pipe):
class Source(Pipe, SelectableObject):
def __init__(self, name=None):
Pipe.__init__(self, name=name)
self.is_exhausted = False
......@@ -316,7 +319,7 @@ class Source(Pipe):
self._send(msg)
def fileno(self):
return None
def checkRecv(self):
def check_recv(self):
return False
def exhausted(self):
return self.is_exhausted
......@@ -353,14 +356,14 @@ class Sink(Pipe):
pass
class AutoSource(Source):
class AutoSource(Source, SelectableObject):
def __init__(self, name=None):
Source.__init__(self, name=name)
self.__fdr,self.__fdw = os.pipe()
self._queue = collections.deque()
def fileno(self):
return self.__fdr
def checkRecv(self):
def check_recv(self):
return len(self._queue) > 0
def _gen_data(self, msg):
self._queue.append((msg,False))
......@@ -369,7 +372,7 @@ class AutoSource(Source):
self._queue.append((msg,True))
self._wake_up()
def _wake_up(self):
os.write(self.__fdw,"x")
os.write(self.__fdw,"X")
def deliver(self):
os.read(self.__fdr,1)
try:
......@@ -377,6 +380,7 @@ class AutoSource(Source):
except IndexError: #empty queue. Exhausted source
pass
else:
self.call_release()
if high:
self._high_send(msg)
else:
......@@ -502,6 +506,7 @@ class TermSink(Sink):
if not self.opened:
self.opened = True
self.__f = get_temp_file()
open(self.__f, "a").close()
self.name = "Scapy" if self.name is None else self.name
# Start a powershell in a new window and print the PID
cmd = "$app = Start-Process PowerShell -ArgumentList '-command &{$host.ui.RawUI.WindowTitle=\\\"%s\\\";Get-Content \\\"%s\\\" -wait}' -passthru; echo $app.Id" % (self.name, self.__f.replace("\\", "\\\\"))
......
......@@ -65,7 +65,6 @@ c = TestSink(name="c")
s > d1 > c
p.add(s)
time.sleep(1)
p.wait_and_stop()
assert test_val == "hello"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment