These days I built up an inter-thread communication via
os.pipe(). While one thread is only writing to the “write end” of the pipe, the other thread is only reading from the “read end”. The latter is realized with
select.select(), which observes the read end and raises an
os.read() event whenever it recognizes data in the pipe. Hence, whenever something is written to the pipe on the one side, it is immediately read on the other side. However, does this mean that one string written by one call to
os.write() is read entirely by the following call to
os.read()? How does the behavior change with increasing data rate?
From the writing thread, I pushed a string with fixed length L (and terminated by a newline) through the pipe; N times — one after the other, within a loop. In the other thread, I
os.read() from the pipe, based on
select.select() events and checked the length of each string returned by one
os.read(). To evaluate the data, a basic histogram is convenient. Here is a neat way to count events in Python:
import collections histogram = collections.defaultdict(int) for event in events: histogram[event] +=1
When accessing a non-existing key in a
defaultdict(callable), it is automatically initialized by calling
callable() without arguments. For the histogram we can take advantage of this behavior by creating a
defaultdict of type
histogram[non-existing-key] first initializes the value with
int(), which returns zero. So
histogram[event] += 1 counts events without the need to pre-define them in the dictionary. This is the way I used it:
lines = os.read(pipe_read, 9999999).splitlines() for line in lines: histogram[len(line)] += 1
Visualization of the histogram is not required; a sorted listing is enough:
for key in sorted(histogram, key=histogram.get, reverse=1): print '%7d %s' % (histogram[key], key)
This sorts the dictionary by its values in reversed order (small first) and prints the key/value pairs with a bit of formatting.
If no lines are chopped, there would be only one datapoint in the histogram: N times L. This is what I got for short strings. However, with increasing string length L, I’ve observed chopped lines with a very small propability. For L = 150, I e.g. got ~100000 times 150 chars, and 1-3 times 1-149 chars.
This means that in some cases the string returned by
os.read() looked like
"thisisaline\nthisisaline\nthisis": The last few characters of the last line are chopped. Missing character will be be returned with the next
os.read(): “aline\nthisisaline\nthisisaline\n” — and so on. As expected, the number of all received characters matches the number of chars written to the pipe.
Result and solution
Relying on the “one os.write(string), one os.read() returns entire string” principle is a major flaw. The communication in such code is likely to break at some point. The solution is to accomplish some post-processing in order to reassamble chopped lines:
new_data = prefix + os.read(pipe_read,9999999)
lines = new_data.splitlines(True)
prefix = ''
if not lines[-1].endswith("\n"):
prefix = lines[-1]
for line in lines:
histogram[len(line.rstrip())] += 1
line 2, trailing newline characters are kept in the splitted parts. In
lines 4-6, it is checked if the last part has a trailing newline. If not, then this is a chopped line. It is deleted from the list of lines and prepended to the next string returned by
line 1). Before putting resulting lines into the histogram, trailing newlines are
.rstrip()ped. Can you imagine what the histogram says now? :-)
"6000000 150", which means 6000000 times received a string of length 150. That’s the N times L datapoint you want.
By the way, the post-processing could be done in a simpler way using
os.fdopen() on the read end of the pipe. It returns a file-like object providing the
readline() method. However, accessing
os.read() directly could be more efficient as you have direct control over the buffer size. It has to be measured which approach is faster.
It was shown that one should not rely on simple
os.read() calls on an
os.pipe() to accomplish a communication channel between two threads. A “communication protocol” has to be applied. Then, with a small post-processing and reassembling piece of code, the communication between the two treads is reliable and foreseeable: every trailing-newline-labeled-line written to the pipe by Thread 1 is read and evaluated entirely by Thread 2.
Leave a Reply