Concurrent connections to Redis with gevent and redis-py

Redis is a powerful, lightning-fast key-value store. Gevent (1.0) is an event-driven concurrency framework for Python (based on greenlet and libev) allowing for highly concurrent networking and tackling the C10K problem. Redis-py, the well-established Redis client for Python, makes Python talk to Redis. The communication takes place through sockets and is request-response-based. A typical Redis-based Python application therefore is I/O-bound rather than CPU-bound. Furthermore, various features of Redis implicate that a request is not immediately followed by a response, making certain requests block the calling thread for an arbitrary amount of time. This is where gevent comes into play: it allows for concurrent execution of these blocking requests within coroutines. In this blog post, I am presenting a short code example integrating gevent with redis-py.

Preconditions

  • Python 2.7.3
  • gevent 1.0
  • Redis server 2.6.9 (default settings)
  • redis-py 2.7.2

(Long-)blocking calls

All requests to Redis are blocking I/O operations: the communication round trip through the TCP or UNIX domain socket takes time. Requests that are answered immediately block microseconds to milliseconds (depending on the network connection), while others may block for much longer, such as this one:

r = redis.StrictRedis()
r.brpop("list", timeout=10)

If the list "list" is non-existing or empty, this call will not return earlier than after 10 seconds (according to this issue it may block up to 11 seconds).

Concurrency: threads and greenlets

In fact, what’s blocked by a “blocking call” is the calling thread. While being blocked for I/O, it does nothing but wait. A concurrency mechanism is required in order to perform other tasks during this otherwise wasted time. The classical approach for introducing concurrency is thread-based: N threads might execute N blocking calls simultaneously. In this scenario, each thread has its own connection to the remote end. Threading, however, imposes certain limits on the concurrency: an increasing N requires increasing computing resources for managing the threads. And this does not scale well. At some point the thread management itself is more expensive than the actual tasks to be executed by the threads (this is an oversimplification, more details here and elsewhere).

Greenlets and coroutines in general are very cheap to create, terminate, and switch. They create much less overhead than threads and that is why they easily overcome the limits imposed by threading-based concurrency. Coroutines shift the limits for networking concurrency by orders of magnitudes.

Gevent-based concurrency for redis-py: code example

Let’s again consider the blocking call to Redis introduced above. In the following piece of code (which you can actually run yourself following the preconditions listed above), 500 of these requests (each of them blocking for 1 to 2 seconds) are fired against the Redis server simultaneously. Each request is executed in its own greenlet. The redis-py connection pool provides each greenlet with its own connection to the Redis server. While these 500 greenlets wait for their requests to be responded to, another greenlet is running concurrently. This one logs a message every 0.2 seconds in order to visualize the simultaneity of events. Everything is running within one process and one thread.

import logging
logging.basicConfig(
    format='%(asctime)s,%(msecs)05.1f (%(funcName)s) %(message)s',
    datefmt='%H:%M:%S')
log = logging.getLogger()
log.setLevel(logging.INFO)
 
import threading
import os
import time
import gevent
import redis
import redis.connection
redis.connection.socket = gevent.socket
 
 
r = redis.StrictRedis()
p = r.connection_pool
 
 
def main():
    crongreenlet = gevent.spawn(cron)
    log.info("Spawning 500 greenlets connecting to Redis...")
    redisgreenlets = [gevent.spawn(ask_redis) for _ in xrange(500)]
    starttime = time.time()
    # Wait until all greenlets have started and connected.
    gevent.sleep(1)
    log.info("# active `threading` threads: %s" % threading.active_count())
    log.info("# Redis connections created: %s" % p._created_connections)
    log.info("# Redis connections in use: %s" % len(p._in_use_connections))
    log.info("# Redis connections available: %s" % len(p._available_connections))
    log.info("Waiting for Redis connection greenlets to terminate...")
    gevent.joinall(redisgreenlets)
    d = time.time() - starttime
    log.info("All Redis connection greenlets terminated. Duration: %.2f s." % d)
    crongreenlet.kill()
 
 
def ask_redis():
    # BRPOP returns (key, value) tuple or None
    assert r.brpop("list", timeout=1) is None
 
 
def cron():
    while True:
        log.info("Hello from cron greenlet.")
        gevent.sleep(0.2)
 
 
if __name__ == "__main__":
    main()

This is the output on my system:

$ python redis_gevent_concurrency_test.py
18:29:24,153.9 (main) Spawning 500 greenlets connecting to Redis...
18:29:24,160.8 (cron) Hello from cron greenlet.
18:29:24,362.3 (cron) Hello from cron greenlet.
18:29:24,571.4 (cron) Hello from cron greenlet.
18:29:24,783.7 (cron) Hello from cron greenlet.
18:29:24,993.7 (cron) Hello from cron greenlet.
18:29:25,161.0 (main) # active `threading` threads: 1
18:29:25,161.1 (main) # Redis connections created: 500
18:29:25,161.2 (main) # Redis connections in use: 500
18:29:25,161.3 (main) # Redis connections available: 0
18:29:25,161.4 (main) Waiting for Redis connection greenlets to terminate...
18:29:25,201.0 (cron) Hello from cron greenlet.
18:29:25,403.7 (cron) Hello from cron greenlet.
18:29:25,613.7 (cron) Hello from cron greenlet.
18:29:25,823.7 (cron) Hello from cron greenlet.
18:29:26,032.1 (cron) Hello from cron greenlet.
18:29:26,058.3 (main) All Redis connection greenlets terminated. Duration: 1.90 s.

The “Hello…” message appears every ~ 0.2 seconds. As expected, all 500 requests are answered within 2 seconds.

How it works

If you are new to the coroutine concept and would like to understand it step by step, I can recommend this excellent write-up. In short, redis.connection.socket = gevent.socket makes redis-py use gevent’s socket module rather than Python’s standard socket module. Gevent’s socket module basically is the standard one with all blocking I/O calls (i.e. read/write operations on file descriptors) changed into “gevent-aware”/”cooperative” calls. Both terms mean the same thing: these calls do not block the calling thread anymore and allow for so-called context switches. Each greenlet defines and runs in its own context. A greenlet can stop/pause its execution at a certain point (e.g. a blocking socket read), without losing its context. It tells a controlling entity (called the gevent “hub”) upon which event exactly it wants to proceed execution (it is clearly waiting for a ready-to-read event on this socket before it wants to proceed!). In the meantime, the hub can switch context to a different coroutine (and the program continues executing there). So, the hub is aware of all greenlets, and runs only one greenlet at any given time. The magic of switching context is performed by switching the stack, really. This is awesome and yet quite a simple idea. The other part of the magic is performed by an underlying event loop, which evaluates incoming I/O events and executes event handlers that have previously been registered by the hub.

Hence, in the example code above, 500 greenlets pause their execution and simultaneously wait for ready-to-read events on their corresponding sockets. The execution of greenlets (coroutines) is then proceeded in the order of incoming ready-to-read events. Note that everything runs in one OS-level thread in one process, meaning that different code parts do not actually run at the same time. The only thing that is really happening simultaneously is waiting. That is why this type of concurrency mechanism is relatively easy to deal with (no true simultaneous access to data structures by e.g. multiple OS threads). But this also means that it can only help in I/O-bound applications, where the program usually spends a significant amount of time waiting for I/O events. Concurrency and performance improvement in CPU-bound applications can only be achieved via distribution of tasks onto multiple threads and/or processes, which can then run on multiple CPU cores (with actual, true simultaneity).

Once again: the super-simple idea here is that while one context is waiting (cooperatively blocking), a different context can proceed execution.

Leave a Reply to Thomas Cancel reply

Your email address will not be published. Required fields are marked *

Human? Please fill this out: * Time limit is exhausted. Please reload CAPTCHA.

  1. […] socket 交给 gevent 管理. 而 redis-py 也可以将其 socket 类更换为 gevent 的socket 参见博客 Concurrent connections to Redis with gevent and redis-py . 那么两个库的socket类都使用了 gevent 的 socket […]

  2. […] Oldest Title Publisher More from Mark Teisman: RasPi Miscellaneous Sort Share gehrcke.de       1 minute […]

  3. Xuqing Jia Avatar
    Xuqing Jia

    I used redis and gevent WSGIServer to build a simple server, but the performance is not good. When I use ab to test my server, there is only one connection to redis used. Can you help me to find out what’s wrong with my server? https://gist.github.com/amazingjxq/5274574 Thanks a lot.

  4. Thomas Avatar
    Thomas

    Hallo Jan-Philip! Falls der Aufsatz “Das Rauschen überlisten” von Dir stammt, wuerde ich Dir gerne eine Frage zum Dithering stellen: Warum werden bei AD-Convertern z.B. SAA7366 zwei verschiedene Dithering-Widerstaende verwendet 620 kOhm und 330 kOhm ???