gipc: child processes and IPC for gevent¶
gipc (pronunciation “gipsy”) provides reliable child process management and inter-process communication (IPC) in the context of gevent. The current version of gipc has been tested on CPython 2.6/2.7/3.3/3.4. It requires gevent 1.1 and supports both, Linux and Windows.
This documentation applies to gipc 0.6.0. It was built on July 22, 2015.
Overview
Direct usage of Python’s multiprocessing package in the context of a gevent-powered application may raise problems and most likely breaks the application in various subtle ways. gipc is developed with the motivation to solve many of these issues transparently. With gipc, multiprocessing.Process-based child processes can safely be created anywhere within your gevent-powered application. The API of multiprocessing.Process objects is provided in a gevent-cooperative fashion. Furthermore, gipc comes up with a pipe-based transport layer for gevent-cooperative inter-process communication and useful helper constructs. gipc is lightweight and simple to integrate.
gipc is happily used by, among others, Quantopian’s remote Python debugger, Ajenti, Chronology, gipcrpc, NetCall, and GDriveFS. Are you successfully applying gipc in your project? That is always great to hear, so please drop me a line!
Contents of this documentation:
Usage¶
gipc’s interface is clear and slim. All you will probably ever interact with
are gipc.start_process()
, gipc.pipe()
, and their returned objects.
Make yourself comfortable with gipc’s behavior by going through the
API section as well as through the code
examples.
Quick start example¶
The following code snippet uses gipc for spawning a child process and for creating a pipe, and then sends a Python object from a greenlet in the main (parent) process through the pipe to the child process:
import gevent
import gipc
def writelet(w):
# This function runs as a greenlet in the parent process.
# Put a Python object into the write end of the transport channel.
w.put(0)
def readchild(r):
# This function runs in a child process.
# Read and validate object from the read end of the transport channel.
assert r.get() == 0
def main():
with gipc.pipe() as (readend, writeend):
# Start 'writer' greenlet. Provide it with the pipe write end.
g = gevent.spawn(writelet, writeend)
# Start 'reader' child process. Provide it with the pipe read end.
p = gipc.start_process(target=readchild, args=(readend,))
# Wait for both to finish.
g.join()
p.join()
# Protect entry point from being executed upon import; crucial on Windows.
if __name__ == "__main__":
main()
Although quite simple, this code would have various unwanted side-effects if
used with the canonical multiprocessing API instead of gipc.start_process()
and gipc.pipe()
, as outlined in the Challenges
paragraph.
Which problem does gipc address, specifically?¶
There is plenty of motivation for using multiple processes in event-driven architectures. The assumption behind gipc is that applying multiple processes that communicate among each other (whereas each process has its own event loop) can be a decent solution for many types of problems: first of all, it helps decoupling system components by making each process responsible for one part of the architecture only. Furthermore, even a generally I/O-intense application can at some point become CPU bound in which case the distribution of tasks among processes is a great way to make efficient use of multi-core machines and to easily increase application performance.
The standard way of using multiple processes in a Python application is to use multiprocessing from Python’s standard library. However, canonical usage of this package within a gevent-powered application usually breaks the application in various non-obvious ways (see below). gipc is developed with the motivation to solve these issues transparently and to make using gevent in combination with multiprocessing-based child processes and inter-process communication (IPC) a no-brainer again:
- With gipc, multiprocessing.Process-based child processes can safely be created and monitored anywhere within your gevent-powered application. Negative side-effects of child process creation in the context of gevent are prevented.
- The API of multiprocessing.Process objects is provided in a gevent-cooperative fashion.
- gevent natively works in children.
- gipc provides a pipe-based transport layer for gevent-cooperative IPC so that application developers can easily make the processes talk to each other.
- gipc is lightweight and simple to integrate, really!
What are the challenges and what is gipc’s solution?¶
Challenges:
Depending on the operating system in use, the creation of child processes via
Python’s multiprocessing in the context of a gevent application requires special
treatment. Most care is required on POSIX-compliant systems: greenlets spawned
in the current process before forking obviously become cloned by fork()
and
haunt in the child, which usually is undesired behavior. The following code
snippet clarifies this behavior by implementing the example from above, but
this time by directly using multiprocessing instead of gipc (this has been
tested on Linux with Python 3.4 & gevent 1.1):
import gevent
import multiprocessing
def writelet(c):
c.send(0)
def readchild(c):
gevent.sleep(0)
assert c.recv() == 0
assert c.recv() == 0
if __name__ == "__main__":
c1, c2 = multiprocessing.Pipe()
g = gevent.spawn(writelet, c1)
p = multiprocessing.Process(target=readchild, args=(c2,))
p.start()
g.join()
p.join()
It runs without error. Although the code intends to send only one message to
the child through a multiprocessing Pipe
, the two assert
statements
verify that the child actually receives two times the same message. One message
is sent – as intended – from the writelet in the parent through the c1
end of the pipe. It is retrieved at the c2
end of the pipe in the child.
The other message is sent from the spooky writelet clone in the child. It is
also written to the c1
end of the pipe which has implicitly been duplicated
during forking. Greenlet clones in the child of course only run when a context
switch is triggered; in this case via gevent.sleep(0)
. As you can imagine,
this behavior may lead to a wide range of side-effects including race
conditions, and therefore almost guarantees especially tedious debugging
sessions.
The second class of serious issues in the code above is that it contains several
non-cooperatively blocking function calls: p.join()
as well as the
send()
/recv()
calls (of multiprocessing.Connection
objects) block
the calling greenlet non-cooperatively, i.e. they do not allow for a context
switch into other greenlets. While this does not lead to an error in the simple
example code above, this behavior is not tolerable in real-world gevent
applications.
Solution:
gipc overcomes these and other issues for you transparently and in a straight- forward fashion.
First of all, the most basic design assumption behind gipc is that application developers never actually want to duplicate all running greenlets during fork. This leads to the rational of first destroying the inherited “gevent state” in the child and then creating a fresh gevent context, before invoking the target function.
So, the goal is that children start off with a fresh gevent state before entering the user-given target function. Consequently, as one of the first actions, children created via gipc destroy the inherited gevent hub as well as the inherited libev event loop and create their own fresh versions of these entities. This way, inherited greenlets as well as libev watchers become orphaned – the fresh hub and event loop are not connected to them anymore. Consequently, execution of code related to these inherited greenlets and watchers is efficiently prevented without the need to deactivate or kill them actively, one by one.
Furthermore, on POSIX-compliant systems, gipc entirely avoids multiprocessing’s
child monitoring implementation (which is based on the class of wait
system
calls) and instead uses libev’s wonderful child watcher system (based on SIGCHLD
signal transmission), enabling gevent-cooperative waiting for child termination.
That’s how p.join()
from the example above can be made cooperative.
For implementing gevent-cooperative inter-process communication, gipc uses efficient pipe-based data transport channels with non-blocking I/O system calls. gipc’s transport channel system has been carefully designed (for instance, it takes care of closing dispensable file descriptors in the parent as well as in the child after forking) and also abstracts away the difficulties of passing pipe handles among processes on Windows. gipc also abstracts away the implementation differences of the multiprocessing package between Python 2 and 3.
Overall, gipc’s main goal is to allow for the integration of child processes in your gevent-powered application via a simple API – on POSIX-compliant systems as well as on Windows, and on Python 2 and 3.
Condensed notes on gipc’s architecture¶
- Child process creation and invocation is done via a thin wrapper around
multiprocessing.Process
. On Unix, the inherited gevent hub as well as the inherited libev event loop become destroyed and re-initialized in the child before execution of the user-given target function. - On POSIX-compliant systems, gevent-cooperative child process monitoring is
based on libev child watchers (this affects the
is_alive()
andjoin()
methods). - gipc uses classical anonymous pipes as transport layer for
gevent-cooperative communication between greenlets and/or processes.
By default, a binary
pickle
protocol is used for transmitting arbitrary objects. Reading and writing on pipes is done withgevent
‘s cooperative versions ofos.read()
andos.write()
(on POSIX-compliant systems they use non-blocking I/O, on Windows a thread pool is used). On Linux, my test system (Xeon E5630) achieved a payload transfer rate of 1200 MB/s and a message transmission rate of 100.000 messages/s through one pipe between two processes. - gipc automatically closes handles in the parent if provided to the child, and also closes those in the child that were not explicitly transferred to it. This auto-close behavior might be a limitation in certain special cases. However, it automatically prevents file descriptor leakage and forces developers to make deliberate choices about which handles should be transferred explicitly.
- gipc provides convenience features such as a context manager for pipe
handles or timeout controls based on
gevent.Timeout
. - Read/write operations on a pipe are
gevent.lock.Semaphore
-protected and therefore greenthread-safe.
Is gipc reliable?¶
gipc is developed with a strong focus on reliability and with best intentions in mind. Although gipc handles a delicate combination of signals, threads, and forking, I have observed it to work reliably. The unit test suite covers all of gipc’s features within a clean gevent environment, but also covers scenarios of medium complexity. To my knowledge, gipc is being deployed in serious production scenarios.
But still, generally, you should be aware of the fact that mixing any of fork,
threads, greenlets and an event loop library such as libev bears the potential
for various kinds of corner-case disasters. One could argue that fork()
in
the context of libev without doing a clean exec
in the child already is
broken design. However, many people would like to do exactly this and gipc’s
basic approach has proven to work in such cases. Now it is up to you
to evaluate gipc in the context of your project – please share your experience.
Requirements, download & installation¶
gipc supports Linux and Windows and requires:
- gevent >= 1.1 (currently, gipc is developed and tested against gevent 1.1).
- CPython 2.6, 2.7, 3.3, or 3.4.
The latest gipc release from PyPI can be downloaded and installed via pip:
$ pip install gipc
pip can also install the current development version of gipc:
$ pip install hg+https://bitbucket.org/jgehrcke/gipc
gipc obeys semantic versioning.
Notes for Windows users¶
- The
_GIPCReader.get()
timeout feature is not available. - “Non-blocking I/O” is imitated by outsourcing blocking I/O calls to threads in a gevent thread pool. Compared to native non-blocking I/O as is available on POSIX-compliant systems, this leads to a significant messaging performance drop.
Windows I/O Completion Ports (IOCP) could solve both issues in an elegant way. Currently, gevent is built on top of libev which does not support IOCP. In the future, however, gevent might become libuv-backed. libuv supports IOCP and would allow for running the same gevent code on Windows as on POSIX-compliant systems. Furthermore, if gevent went with libuv, the strengths of both, the node.js and the gevent worlds would be merged. Denis Bilenko, the maintainer of gevent, seems to be open to such a transition and the first steps are already done.
Author, license, contact¶
gipc is written and maintained by Jan-Philip Gehrcke and licensed under an MIT license (see LICENSE file for details). Your feedback is highly appreciated. You can contact me at jgehrcke@googlemail.com or use the Bitbucket issue tracker.
Examples¶
Note that the following examples are designed with the motivation to demonstrate the API and capabilities of gipc, rather than showing interesting use cases.
- Example 1: gipc.pipe()-based messaging from greenlet in parent to child
- Example 2: serving multiple clients (in child) from one server (in parent)
- Example 3: time-synchronization between processes
Example 1: gipc.pipe()-based messaging from greenlet in parent to child¶
Very basic gevent and gipc concepts are explained by means of the following simple messaging example:
import gevent
import gipc
def main():
with gipc.pipe() as (r, w):
p = gipc.start_process(target=child_process, args=(r, ))
wg = gevent.spawn(writegreenlet, w)
try:
p.join()
except KeyboardInterrupt:
wg.kill(block=True)
p.terminate()
p.join()
def writegreenlet(writer):
while True:
writer.put("written to pipe from a greenlet running in the main process")
gevent.sleep(1)
def child_process(reader):
while True:
print "Child process got message from pipe:\n\t'%s'" % reader.get()
if __name__ == "__main__":
main()
The context manager with gipc.pipe() as (r, w)
creates a pipe with read
handle r
and write handle w
. On context exit (latest) the pipe ends
will be closed properly.
After creating the pipe context, the above code spawns a child process via
gipc.start_process()
. The child process is instructed to execute the target
function named child_process
whereas the pipe read handle r
is provided
as an argument to this target function. Within child_process()
an endless
loop waits for objects on the read end of the pipe via the cooperatively
blocking call to reader.get()
. Upon retrieval, it immediately writes their
string representation to stdout.
After invocation of the child process (represented by an object bound to
name p
), a greenlet wg
is spawned within the main process. This
greenlet executes the function writegreenlet
, whereas the pipe write handle
w
is provided as an argument. Within this greenlet, one string per second
is written to the write end of the pipe.
After spawning wg
, p.join()
is called immediately in the parent
process. p.join()
is blocking cooperatively, i.e. it allows for a context
switch into the write greenlet (this actually is the magic behind
gevent/greenlets). Hence, the write greenlet is ‘running’ while p.join()
cooperatively waits for the child process to terminate. The write greenlet
spends most of its time in gevent.sleep()
, which is also blocking
cooperatively, allowing for context switches back to the main greenlet in the
parent process, which is executing p.join()
. In this state, one message per
second is passed between parent and child until a KeyboardInterrupt
exception is raised in the parent.
Upon KeyboardInterrupt
, the parent first kills the write greenlet and blocks
cooperatively until it has stopped. Then it terminates the child process (via
SIGTER
on Unix) and waits for it to exit via p.join()
.
Example 2: serving multiple clients (in child) from one server (in parent)¶
For pure API and reliability demonstration purposes, this example implements TCP communication between a server in the parent process and multiple clients in one child process:
- gevent’s
StreamServer
is started in a greenlet within the initial (parent) process. For each connecting client, it receives one newline-terminated message and echoes it back. - A child process is started using gipc. Its starting point is the function
clientprocess
. There, N TCP clients are started concurrently from N greenlets. - Each client sends one message, validates the echo response and terminates.
- The child process terminates.
- After the child process is joined in the parent, the server is killed.
- The server greenlet is joined.
import gevent
from gevent.server import StreamServer
from gevent import socket
import gipc
import time
PORT = 1337
N_CLIENTS = 1000
MSG = "HELLO\n"
def serve(sock, addr):
f = sock.makefile()
f.write(f.readline())
f.flush()
f.close()
def server():
ss = StreamServer(('localhost', PORT), serve).serve_forever()
def clientprocess():
t1 = time.time()
clients = [gevent.spawn(client) for _ in xrange(N_CLIENTS)]
gevent.joinall(clients)
duration = time.time()-t1
print "%s clients served within %.2f s." % (N_CLIENTS, duration)
def client():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', PORT))
f = sock.makefile()
f.write(MSG)
f.flush()
assert f.readline() == MSG
f.close()
if __name__ == "__main__":
s = gevent.spawn(server)
c = gipc.start_process(clientprocess)
c.join()
s.kill()
s.join()
Output on my test machine: 1000 clients served within 0.54 s
.
Example 3: time-synchronization between processes¶
Child process creation may take a significant amount of time, especially on Windows. The exact amount of time is not predictable.
When code in the parent should only proceed in the moment the code in the child has reached a certain state, the proper way to tackle this is a bidirectional synchronization handshake:
- Process A sends a synchronization request to process B and waits for an acknowledgment response. It proceeds upon retrieval.
- Process B sends the acknowledgment in the moment it retrieves the sync request and proceeds.
This concept can easily be implemented using a bidirectional gipc.pipe()
:
import gevent
import gipc
import time
def main():
with gipc.pipe(duplex=True) as (cend, pend):
# `cend` is the channel end for the child, `pend` for the parent.
p = gipc.start_process(writer_process, args=(cend,))
# Synchronize with child process.
pend.put("SYN")
assert pend.get() == "ACK"
# Now in sync with child.
ptime = time.time()
ctime = pend.get()
p.join()
print "Time delta: %.8f s." % abs(ptime - ctime)
def writer_process(cend):
with cend:
assert cend.get() == "SYN"
cend.put("ACK")
# Now in sync with parent.
cend.put(time.time())
if __name__ == "__main__":
main()
The marked code blocks in parent and child are entered quasi-simultaneously.
Example output on my test machine (Linux): Time delta: 0.00005388 s
. On
Windows, time.time()
‘s precision is not sufficient to resolve the time
delta (and time.clock()
is not applicable for comparing times across
processes).
gipc API¶
- Spawning child processes
- Creating a pipe and its handle-pair
- Handling handles
- Controlling child processes
- Exception types
Spawning child processes¶
-
gipc.
start_process
(target, args=(), kwargs={}, daemon=None, name=None)¶ Start child process and execute function
target(*args, **kwargs)
. Any existing instance ofgipc._GIPCHandle
orgipc._GIPCDuplexHandle
can be passed to the child process viaargs
and/orkwargs
. If any such instance is passed to the child, gipc automatically closes the corresponding file descriptor(s) in the parent.Note
Compared to the canonical
multiprocessing.Process()
constructor, this function- returns a
gipc._GProcess
instance which is compatible with themultiprocessing.Process
API. - just as well takes the
target
,arg=()
, andkwargs={}
arguments. - introduces the
daemon=None
argument. - does not accept the
group
argument (being an artifact frommultiprocessing
‘s compatibility withthreading
). - starts the process, i.e. a subsequent call to the
start()
method of the returned object is not required.
Parameters: - target – Function to be called in the child process. Signature:
target(*args, **kwargs)
. - args – Tuple defining the positional arguments provided to
target
. - kwargs – Dictionary defining the keyword arguments provided to
target
. - name – Forwarded to
multiprocessing.Process.name
. - daemon – Forwarded to
multiprocessing.Process.daemon
.
Returns: gipc._GProcess
instance (inherits frommultiprocessing.Process
and re-implements some of its methods in a gevent-cooperative fashion).start_process()
triggers most of the magic ingipc
. Process creation is based onmultiprocessing.Process()
, i.e.fork()
on POSIX-compliant systems andCreateProcess()
on Windows.Warning
Please note that in order to provide reliable signal handling in the context of libev, the default disposition (action) is restored for all signals in the child before executing the user-given
target
function. You can (re)install any signal handler withintarget
. The notable exception is the SIGPIPE signal, whose handler is not reset to its default handler in child processes created bygipc
. That is, the SIGPIPE action in children is inherited from the parent. In CPython, the default action for SIGPIPE is SIG_IGN, i.e. the signal is ignored.- returns a
Creating a pipe and its handle pair¶
-
gipc.
pipe
(duplex=False, encoder='default', decoder='default')¶ Create a pipe-based message transport channel and return two corresponding handles for reading and writing data.
Allows for gevent-cooperative transmission of data between greenlets within one process or across processes (created via
start_process()
). The default behavior allows for transmission of any picklable Python object.The transport layer is based on
os.pipe()
(i.e. CreatePipe() on Windows and pipe() on POSIX-compliant systems).Parameters: - duplex –
- If
False
(default), create a unidirectional pipe-based message transport channel and return the corresponding handle pair, a 2-tuple with the first element of type_GIPCReader
and the second element of type_GIPCWriter
. - If
True
, create a bidirectional message transport channel (using two pipes internally) and return the corresponding 2-tuple with both elements being of type_GIPCDuplexHandle
.
- If
- encoder – Defines the entity used for object serialization before writing object
o
to the pipe viaput(o)
. Must be either a callable returning a byte string,None
, or'default'
.'default'
translates topickle.dumps
(in this mode, any pickleable Python object can be provided toput()
and transmitted through the pipe). When setting this toNone
, no automatic object serialization is performed. In that case only byte strings are allowed to be provided toput()
, and aTypeError
is thrown otherwise. ATypeError
will also be thrown if the encoder callable does not return a byte string. - decoder – Defines the entity used for data deserialization after reading raw
binary data from the pipe. Must be a callable retrieving a byte string
as first and only argument,
None
or'default'
.'default'
translates topickle.loads
. When setting this toNone
, no data decoding is performed, and a raw byte string is returned.
Returns: duplex=False
:(reader, writer)
2-tuple. The first element is of typegipc._GIPCReader
, the second of typegipc._GIPCWriter
. Both inherit fromgipc._GIPCHandle
.duplex=True
:(handle, handle)
2-tuple. Both elements are of typegipc._GIPCDuplexHandle
.
gipc._GIPCHandle
andgipc._GIPCDuplexHandle
instances are recommended to be used with a context manager as indicated in the following examples:with pipe() as (r, w): do_something(r, w)
reader, writer = pipe() with reader: do_something(reader) with writer as w: do_something(w)
with pipe(duplex=True) as (h1, h2): h1.put(1) assert h2.get() == 1 h2.put(2) assert h1.get() == 2
An example for using the encoder/decoder arguments for implementing JSON (de)serialization:
import json enc = lambda o: json.dumps(o).encode("ascii") dec = lambda b: json.loads(b.decode("ascii")) with pipe(encoder=enc, decoder=dec) as (r, w): ...
Note that JSON representation is text whereas the encoder/decoder callables must return/accept byte strings, as ensured here by ASCII en/decoding. Also note that in practice JSON serializaton has normally no advantage over pickling, so this is just an educational example.
- duplex –
Handling handles¶
-
class
gipc.gipc.
_GIPCHandle
¶ The
_GIPCHandle
class implements common features of read and write handles._GIPCHandle
instances are created viapipe()
.-
close
()¶ Close underlying file descriptor and de-register handle from further usage. Is called on context exit.
- Raises:
-
-
class
gipc.gipc.
_GIPCWriter
¶ Bases:
gipc.gipc._GIPCHandle
A
_GIPCWriter
instance manages the write end of a pipe. It is created viapipe()
.-
put
(o)¶ Encode object
o
and write it to the pipe. Block gevent-cooperatively until all data is written. The default encoder ispickle.dumps
.Parameters: o – a Python object that is encodable with the encoder of choice. - Raises:
GIPCError
GIPCClosed
pickle.PicklingError
-
-
class
gipc.gipc.
_GIPCReader
¶ Bases:
gipc.gipc._GIPCHandle
A
_GIPCReader
instance manages the read end of a pipe. It is created viapipe()
.-
get
(timeout=None)¶ Receive, decode and return data from the pipe. Block gevent-cooperatively until data is available or timeout expires. The default decoder is
pickle.loads
.Parameters: timeout – None
(default) or agevent.Timeout
instance. The timeout must be started to take effect and is canceled when the first byte of a new message arrives (i.e. providing a timeout does not guarantee that the method completes within the timeout interval).Returns: a Python object. - Raises:
gevent.Timeout
(if provided)GIPCError
GIPCClosed
pickle.UnpicklingError
Recommended usage for silent timeout control:
with gevent.Timeout(TIME_SECONDS, False) as t: reader.get(timeout=t)
Warning
The timeout control is currently not available on Windows, because Windows can’t apply select() to pipe handles. An
OSError
is expected to be raised in case you set a timeout.
-
-
class
gipc.gipc.
_GIPCDuplexHandle
¶ A
_GIPCDuplexHandle
instance manages one end of a bidirectional pipe-based message transport created viapipe()
withduplex=True
. It providesput()
,get()
, andclose()
methods which are forwarded to the corresponding methods ofgipc._GIPCWriter
andgipc._GIPCReader
.
Controlling child processes¶
-
class
gipc.gipc.
_GProcess
¶ Bases:
multiprocessing.context.Process
Compatible with the
multiprocessing.Process
API.For cooperativeness with gevent and compatibility with libev, it currently re-implements
start()
,is_alive()
,exitcode
on Unix andjoin()
on Windows as well as on Unix.Note
On Unix, child monitoring is implemented via libev child watchers. To that end, libev installs its own SIGCHLD signal handler. Any call to
os.waitpid()
would compete with that handler, so it is not recommended to call it in the context of this module.gipc
preventsmultiprocessing
from callingos.waitpid()
by monkey-patching multiprocessing’sPopen.poll
to be no-op and to always returnNone
. Callinggipc._GProcess.join()
is not required for cleaning up after zombies (libev does). It just waits for the process to terminate.