Skip to content

Commit

Permalink
Allow to run multiple io.process.Process objects simultaneously
Browse files Browse the repository at this point in the history
Currently, Process incorrectly handles stdout/stderr channels which rely on

global state:

  - static methods used to handle read/close events are instantiated on

    module import. Thus, they are being mocked by addHandler() which sets

    .channel to them. Starting a new process causes resetting that field

    on the global method objects, thus all processes will receive output

    from the last started process. To mitigate this, static methods are

    replaced with closure-like lambdas.

  - Process incorrectly raises stopped() event to close stdout/stderr. This

    event, however, is used to signal Manager's death. Thus, when process is

    completed, all sockets/pipes in the entire program are closed. This is why

    stopped() event has to be replaced with process-specific terminated() event

    and we need explicitly send close() to all process pipes.
  • Loading branch information
myaut authored and prologic committed Apr 2, 2016
1 parent 056e15f commit 884b071
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 28 deletions.
56 changes: 29 additions & 27 deletions circuits/io/process.py
Expand Up @@ -7,10 +7,23 @@
from subprocess import Popen, PIPE

from circuits.core.manager import TIMEOUT
from circuits import handler, BaseComponent
from circuits import handler, BaseComponent, Event

from .file import File
from .events import started, stopped, write
from .events import started, write, close


class terminated(Event):
"""terminated Event
This Event is sent when a process is completed
:param args: (process)
:type tuple: tuple
"""

def __init__(self, *args):
super(terminated, self).__init__(*args)


class Process(BaseComponent):
Expand Down Expand Up @@ -73,39 +86,31 @@ def start(self):
).register(self)

self._stderr_read_handler = self.addHandler(
handler("read", channel="{0:d}.stderr".format(self.p.pid))(
self.__class__._on_stderr_read
handler("read", channel=self._stderr.channel)(
lambda self, data: self.stderr.write(data)
)
)

self._stdout_read_handler = self.addHandler(
handler("read", channel="{0:d}.stdout".format(self.p.pid))(
self.__class__._on_stdout_read
handler("read", channel=self._stdout.channel)(
lambda self, data: self.stdout.write(data)
)
)

self._stderr_closed_handler = self.addHandler(
handler("closed", channel="{0:d}.stderr".format(self.p.pid))(
self.__class__._on_stderr_closed
handler("closed", channel=self._stderr.channel)(
lambda self: setattr(self, '_stderr_closed', True)
)
)

self._stdout_closed_handler = self.addHandler(
handler("closed", channel="{0:d}.stdout".format(self.p.pid))(
self.__class__._on_stdout_closed
handler("closed", channel=self._stdout.channel)(
lambda self: setattr(self, '_stdout_closed', True)
)
)

self.fire(started(self))

@staticmethod
def _on_stdout_closed(self):
self._stdout_closed = True

@staticmethod
def _on_stderr_closed(self):
self._stderr_closed = True

def stop(self):
if self.p is not None:
self.p.terminate()
Expand All @@ -127,14 +132,6 @@ def status(self):
if getattr(self, "p", None) is not None:
return self.p.poll()

@staticmethod
def _on_stderr_read(self, data):
self.stderr.write(data)

@staticmethod
def _on_stdout_read(self, data):
self.stdout.write(data)

@handler("generate_events")
def _on_generate_events(self, event):
if self.p is not None and self._status is None:
Expand All @@ -147,7 +144,12 @@ def _on_generate_events(self, event):
self.removeHandler(self._stdout_read_handler)
self.removeHandler(self._stderr_closed_handler)
self.removeHandler(self._stdout_closed_handler)
self.fire(stopped(self))

self.fire(terminated(self))
self.fire(close(), self._stdin.channel,
self._stdout.channel,
self._stderr.channel)

event.reduce_time_left(0)
event.stop()
else:
Expand Down
28 changes: 27 additions & 1 deletion tests/io/test_process.py
Expand Up @@ -14,7 +14,7 @@ def test(manager, watcher):
p.start()
assert watcher.wait("started", p.channel)

assert watcher.wait("stopped", p.channel)
assert watcher.wait("terminated", p.channel)

s = p.stdout.getvalue()
assert s == b"Hello World!\n"
Expand All @@ -39,3 +39,29 @@ def test2(manager, watcher, tmpdir):

with foo.open("r") as f:
assert f.read() == "Hello World!"

def test_two_procs(manager, watcher):
p1 = Process(["echo", "1"]).register(manager)
p2 = Process("echo 2 ; sleep 1", shell = True).register(manager)

p1.start()
p2.start()

assert watcher.wait("terminated", p1.channel)
assert p1._terminated
assert not p2._terminated
assert not p2._stdout_closed
assert not p2._stderr_closed

watcher.clear() # Get rid of first terminated()

s1 = p1.stdout.getvalue()
assert s1 == b"1\n"

assert watcher.wait("terminated", p2.channel)
assert p2._terminated
assert p2._stdout_closed
assert p2._stderr_closed

s2 = p2.stdout.getvalue()
assert s2 == b"2\n"

0 comments on commit 884b071

Please sign in to comment.