diff --git a/scapy/automaton.py b/scapy/automaton.py
index b6a60764d7783124f3d03b4defcc195540c5c45d..a0db7eeba02a0341061aa410ac135b28a16c1138 100644
--- a/scapy/automaton.py
+++ b/scapy/automaton.py
@@ -28,6 +28,12 @@ except ImportError:
 else:
     THREAD_EXCEPTION = thread.error
 
+if WINDOWS:
+    from scapy.arch.pcapdnet import PcapTimeoutElapsed
+    recv_error = PcapTimeoutElapsed
+else:
+    recv_error = ()
+
 """ In Windows, select.select is not available for custom objects. Here's the implementation of scapy to re-create this functionnality
 # Passive way: using no-ressources locks
                +---------+             +---------------+      +-------------------------+
@@ -350,7 +356,11 @@ class _ATMT_supersocket(SuperSocket):
             s = str(s)
         return self.spa.send(s)
     def recv(self, n=MTU):
-        r = self.spa.recv(n)
+        try:
+            r = self.spa.recv(n)
+        except recv_error:
+            if not WINDOWS:
+                raise
         if self.proto is not None:
             r = self.proto(r)
         return r
diff --git a/scapy/scapypipes.py b/scapy/scapypipes.py
index 4d37f5e1ef7653a62a109cc7f1b5ba77e25cde44..84cb5c45d3e8c83294705e167bee4cc132f60e12 100644
--- a/scapy/scapypipes.py
+++ b/scapy/scapypipes.py
@@ -9,7 +9,7 @@ import Queue
 from scapy.pipetool import Source,Drain,Sink
 from scapy.config import conf
 from scapy.utils import PcapReader, PcapWriter
-
+from scapy.automaton import recv_error
 
 class SniffSource(Source):
     """Read packets from an interface and send them to low exit.
@@ -29,8 +29,14 @@ class SniffSource(Source):
         self.s.close()
     def fileno(self):
         return self.s.fileno()
+    def check_recv(self):
+        return True
     def deliver(self):
-        self._send(self.s.recv())
+        try:
+            self._send(self.s.recv())
+        except recv_error:
+            if not WINDOWS:
+                raise
 
 class RdpcapSource(Source):
     """Read packets from a PCAP file send them to low exit.
@@ -53,6 +59,8 @@ class RdpcapSource(Source):
         self.f.close()
     def fileno(self):
         return self.f.fileno()
+    def check_recv(self):
+        return True
     def deliver(self):    
         p = self.f.recv()
         print("deliver %r" % p)
@@ -100,6 +108,7 @@ class WrpcapSink(Sink):
         self.f = PcapWriter(fname)
     def stop(self):
         self.f.flush()
+        self.f.close()
     def push(self, msg):
         self.f.write(msg)
         
diff --git a/test/pipetool.uts b/test/pipetool.uts
index 06a87ce33440946dc78e3479f9052deb9fa5c9ce..e371491efdcdf6363bf3a00db0f4290108c823d4 100644
--- a/test/pipetool.uts
+++ b/test/pipetool.uts
@@ -216,3 +216,80 @@ p.start()
 
 p.wait_and_stop()
 assert test_val == "hello"
+
++ Advanced ScapyPipes pipetools tests
+
+= Test SniffSource
+~ netaccess
+
+p = PipeEngine()
+
+s = SniffSource()
+d1 = Drain(name="d1")
+c = QueueSink(name="c")
+s > d1 > c
+
+p.add(s)
+p.start()
+sniff(count=3)
+p.stop()
+assert c.q.get()
+
+= Test RdpcapSource and WrpcapSink
+~ needs_root
+
+req = Ether()/IP()/ICMP()
+rpy = Ether()/IP('E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
+
+wrpcap("t.pcap", [req, rpy])
+
+p = PipeEngine()
+
+s = RdpcapSource("t.pcap")
+d1 = Drain(name="d1")
+c = WrpcapSink("t2.pcap", name="c")
+s > d1 > c
+p.add(s)
+p.start()
+p.wait_and_stop()
+
+results = rdpcap("t2.pcap")
+
+assert str(results[0]) == str(req)
+assert str(results[1]) == str(rpy)
+
+os.unlink("t.pcap")
+os.unlink("t2.pcap")
+
+= Test InjectSink and Inject3Sink
+~ needs_root
+
+a = IP(dst="192.168.0.1")/ICMP()
+msgs = []
+
+class FakeSocket(object):
+    def __init__(self, *arg, **karg):
+        pass
+    def close(self):
+        pass
+    def send(self, msg):
+        global msgs
+        msgs.append(msg)
+
+@mock.patch("scapy.scapypipes.conf.L2socket", FakeSocket)
+@mock.patch("scapy.scapypipes.conf.L3socket", FakeSocket)
+def _inject_sink(i3):
+    s = CLIFeeder()
+    s.send(a)
+    s.is_exhausted = True
+    d1 = Drain(name="d1")
+    c = Inject3Sink() if i3 else InjectSink()
+    s > d1 > c
+    p = PipeEngine(s)
+    p.start()
+    p.wait_and_stop()
+
+_inject_sink(False) # InjectSink
+_inject_sink(True) # Inject3Sink
+
+assert msgs == [a,a]