Counting stuff in Python and assembling a histogram: analyze thread communication via os.pipe()

These days I built up an inter-thread communication via os.pipe(). While one thread is only writing to the “write end” of the pipe, the other thread is only reading from the “read end”. The latter is realized with select.select(), which observes the read end and raises an os.read() event whenever it recognizes data in the pipe. Hence, whenever something is written to the pipe on the one side, it is immediately read on the other side. However, does this mean that one string written by one call to os.write() is read entirely by the following call to os.read()? How does the behavior change with increasing data rate?

Analysis

From the writing thread, I pushed a string with fixed length L (and terminated by a newline) through the pipe; N times — one after the other, within a loop. In the other thread, I os.read() from the pipe, based on select.select() events and checked the length of each string returned by one os.read(). To evaluate the data, a basic histogram is convenient. Here is a neat way to count events in Python:

import collections
histogram = collections.defaultdict(int)
for event in events:
    histogram[event] +=1

When accessing a non-existing key in a defaultdict(callable), it is automatically initialized by calling callable() without arguments. For the histogram we can take advantage of this behavior by creating a defaultdict of type int. Accessing histogram[non-existing-key] first initializes the value with int(), which returns zero. So histogram[event] += 1 counts events without the need to pre-define them in the dictionary. This is the way I used it:

lines = os.read(pipe_read, 9999999).splitlines()
for line in lines:
    histogram[len(line)] += 1

Visualization of the histogram is not required; a sorted listing is enough:

for key in sorted(histogram, key=histogram.get, reverse=1):
    print '%7d %s' % (histogram[key], key)

This sorts the dictionary by its values in reversed order (small first) and prints the key/value pairs with a bit of formatting.

If no lines are chopped, there would be only one datapoint in the histogram: N times L. This is what I got for short strings. However, with increasing string length L, I’ve observed chopped lines with a very small propability. For L = 150, I e.g. got ~100000 times 150 chars, and 1-3 times 1-149 chars.

This means that in some cases the string returned by os.read() looked like "thisisaline\nthisisaline\nthisis": The last few characters of the last line are chopped. Missing character will be be returned with the next os.read(): “aline\nthisisaline\nthisisaline\n” — and so on. As expected, the number of all received characters matches the number of chars written to the pipe.

Result and solution

Relying on the “one os.write(string), one os.read() returns entire string” principle is a major flaw. The communication in such code is likely to break at some point. The solution is to accomplish some post-processing in order to reassamble chopped lines:

  1. new_data = prefix + os.read(pipe_read,9999999)
  2. lines = new_data.splitlines(True)
  3. prefix = ''
  4. if not lines[-1].endswith("\n"):
  5.     prefix = lines[-1]
  6.     del lines[-1]
  7. for line in lines:
  8.     histogram[len(line.rstrip())] += 1

In line 2, trailing newline characters are kept in the splitted parts. In lines 4-6, it is checked if the last part has a trailing newline. If not, then this is a chopped line. It is deleted from the list of lines and prepended to the next string returned by os.read() (see line 1). Before putting resulting lines into the histogram, trailing newlines are .rstrip()ped. Can you imagine what the histogram says now? :-)

E.g. "6000000 150", which means 6000000 times received a string of length 150. That’s the N times L datapoint you want.

By the way, the post-processing could be done in a simpler way using os.fdopen() on the read end of the pipe. It returns a file-like object providing the readline() method. However, accessing os.read() directly could be more efficient as you have direct control over the buffer size. It has to be measured which approach is faster.

Conclusion

It was shown that one should not rely on simple os.write() and os.read() calls on an os.pipe() to accomplish a communication channel between two threads. A “communication protocol” has to be applied. Then, with a small post-processing and reassembling piece of code, the communication between the two treads is reliable and foreseeable: every trailing-newline-labeled-line written to the pipe by Thread 1 is read and evaluated entirely by Thread 2.