Toulouse, 12 September 2022

Subprocess is the bread-and-butter of software automation. In Python, the standard library provides the subprocess module for this with many options that make it suitable for a broad range of usages. In this article, we’ll cover the specific topic of monitoring such subprocesses. The techniques for this are numerous and some of them a bit advanced, so it will be an opportunity to compare them. This will also be a pretext to study the subprocess module (and its asyncio counterpart) in details.

Python logo

What’s a subprocess?

A subprocess is the execution of a program or command as a process external to where it got launched. As such, the parent program (for instance, a Python application) will create and manage a child process for this external program.

There are two main ways to use such subprocesses:

  • by running the external program until termination, or,
  • by starting the external program as a daemon.

In Python, the subprocess module of the standard library provides entry points for these main use cases:

  • the run() function which, as it name suggests, fulfills the first use case, and,
  • the Popen class, which is used to create a process in more versatile ways, in particular the daemon use case.

The name Popen comes from the popen(3) function, from the C standard library, which “opens a process by creating a pipe, forking, and invoking the shell”; we’ll get back to this later on.

The standard library also ships with asyncio.subprocess for Async I/O programs (we’ll demonstrate how to use this module for our monitoring topic later on).

Communication

One important aspect of subprocesses is communication because one typically wants to be able to pass data from/to the parent and child. In Python, this is possibly by using stdin, stdout and stderr parameters of subprocess.Popen (or subprocess.run(), though this function also has a convenient input parameter). As their name suggests, these are standard I/O streams that are normally attached to any program when started: one for input, one for output and one for errors.

Looking at Popen documentation, we can notice that these parameters can take different kinds of value:

PIPE, DEVNULL, an existing file descriptor (a positive integer), an existing file object, and None.

Amongst them, the PIPE case is particularly interesting as it provides an idiomatic way for communication between the parent (Python) and the child through nice Python stream objects (which can be either read from or written to).

Monitoring subprocesses (and a guiding example)

As mentioned in the introduction, the topic of this article is not only subprocesses in Python, but rather their monitoring. The reader might wonder why would we want to monitor a subprocess and what does it actually mean. Before introducing the motivation for this topic, let’s first define a guiding example in which we’ll use the pg_basebackup program from PostgreSQL which takes a backup of a database cluster (this is typically used to setup a streaming-replication standby server). The command is, in general, issued from a backup (or would-be standby) host and typically looks like:

$ pg_basebackup -d "host=primary.example.com user=admin" -D pgdata -v -P
pg_basebackup: initiating base backup, waiting for checkpoint to complete
pg_basebackup: checkpoint completed
pg_basebackup: write-ahead log start point: 0/E9000028 on timeline 1
pg_basebackup: starting background WAL receiver
pg_basebackup: created temporary replication slot "pg_basebackup_12956"
1808807/1808807 kB (100%), 1/1 tablespace
pg_basebackup: write-ahead log end point: 0/E9000138
pg_basebackup: waiting for background process to finish streaming ...
pg_basebackup: syncing data to disk ...
pg_basebackup: renaming backup_manifest.tmp to backup_manifest
pg_basebackup: base backup completed

For completeness (though it does not matter much to understand the following), we used the following options: -d holds connection information to the source database cluster, -D indicates the target directory for backup files, -v triggers verbose mode and -P enables progress reporting.

In addition, this command might ask for a password to connect to the primary host.

This command might take a long time to complete when there is a lot of data to transfer and depending on the network connection or even depending on the actual state of the primary database cluster; and that’s typically one reason why we’d like to monitor it.

There are other reasons one might want to monitor a subprocess, for instance:

  • let the user know live if the commands we are launching (and which can possibly be long) are running okay;
  • and even if they are, let the user interrupt such command if they wish (for instance, reading a log message indicating something undesired).

More generally, when working with distributed systems, where components are typically waiting for others to get ready, programs typically never end even in case of dead-lock or such. So letting the user know live about the situation is nice.

Interface for monitoring Python subprocess

As mentioned earlier, subprocesses as represented by Popen objects expose their standard streams. In general, we are mostly interested by stderr for monitoring purposes. Getting back to the available interfaces for standard streams, we have: parent’s stream (i.e. child output will be redirected to parent’s one), pipes and files.

For many reasons, using parent’s streams is not a very good fit because we’re mixing streams of several programs into one and it can get confusing. Also, there is no easy way to control the rendering of child’s output when forwarded to the parent.

The file approach (i.e. creating file objects in the parent Python program and passing their file descriptor to the child) is better but working with local files does not always fit with some applications’ design which might rather rely on some external logging system (such as Sentry or Grafana Loki).

This leaves us with the pipe approach which is indeed very flexible because all data is processed in memory leaving us the possibility for post-processing, filtering and forwarding based on our own logic.

Accordingly, we proceed by using either the capture_output=True parameter of subprocess.run() and/or stderr=subprocess.PIPE in subprocess.Popen or subprocess.run().

With all this set up, assuming we’ll have access to process’s stderr, we’ll monitor this through Python logging in the form of a simple log function:

import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(message)s", datefmt="[%X]")

def log_stderr(program: str, line: str) -> None:
    logger.debug("%s: %s", program, line.rstrip())

Using subprocess.run()

Running our pg_basebackup command with subprocess.run() would look like:

import subprocess

cmd = ["pg_basebackup", "-D", "pgdata", "-v", "-P"]
result = subprocess.run(cmd, capture_output=True, check=True, text=True)

There is typically no way to monitor what happens during child process execution here. We’re only left with what the result return value, a CompletedProcess instance, which we can use only after program termination:

for errline in result.stderr.splitlines():
    log_stderr(cmd[0], errline)

Using subprocess.Popen

When using Popen, we pass the PIPE special value as stdout and stderr parameters. This will make Python create a “pipe” for each stream, as a unidirectional data channel for interprocess communication (quoting pipe(2)); this uses os.pipe() on POSIX systems. In addition, Popen object can be used as a context manager which will wait for the child process to terminate while letting us run arbitrary code under the with block. So typically:

cmd = ["pg_basebackup", "-D", "pgdata", "--verbose", "--progress"]
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as proc:
    for line in proc.stderr:
        log_stderr(cmd[0], line)

Here we are able to monitor live child stderr while the process is running.

However, using Popen as a context manager will also close streams at exit, meaning that we need to process them within the context. Similarly, we need to handle input ourselves, which would usually need a call to Popen.communicate().

All in all, the previous example would become:

with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as proc:
    for line in proc.stderr:
        log_stderr(cmd[0], line)
    stdout, stderr = proc.communicate()
result = subprocess.CompletedProcess(cmd, proc.returncode, stdout, stderr)

On par with subprocess.run(), we built a CompletedProcess result. However, inspecting it would reveal that its stderr attribute is an empty string. This is because proc.communicate() returned an empty string in the second value of the tuple. In turns, this is because we actually already consumed the stderr pipe in the for loop for monitoring. So if we’d like to keep the original stderr while also monitoring it, we’d need to do it ourselves:

with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as proc:
    errs = []
    for line in proc.stderr:
        log_stderr(cmd[0], line)
        errs.append(line)
    stdout, _ = proc.communicate()
result = subprocess.CompletedProcess(cmd, proc.returncode, stdout, "\n".join(errs))

Also note that the communicate step comes after the monitoring loop, meaning that this will not work if we need to pass input data. This issue is in fact more general as we might be interested in doing concurrent things while the process is running and while we’re monitoring.

Monitoring concurrently, with asyncio

So we want to keep our logging task while communicating with the process (or doing something else while it’s running). Threading might be an option, but as this essentially involves I/O, we’re instead heading towards the asyncio (standard) library which also provides builtin support for suprocesses. However, we’ll see that things are not that straightforward.

Let’s try that:

import asyncio.subprocess

proc = await asyncio.subprocess.create_subprocess_exec(
    *cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

async def log_stderr() -> bytes:
    errs = []
    async for line in proc.stderr:
        logger.debug("%s: %s", cmd[0], line.decode().rstrip())
        errs.append(line)
    return b"".join(errs)

t = asyncio.create_task(log_stderr())
stdout, stderr = await asyncio.gather(proc.stdout.read(), t)

We’re first building an asyncio Process with create_subprocess_exec(). Then we’re spawning a task to log its stderr. And finally, we gather our monitoring task with reading process’s stdout. Also note that all communications involve bytes with the asyncio library.

So we have achieved concurrent monitoring of child’s stderr, but using proc.communicate() is still not possible because we’re already reading stderr through log_stderr() and asyncio does not permit several readers to be attached to a pipe.

At this point, we have (at least) two options:

  • re-implement Process.communicate() to plug in our monitoring logic,
  • use low-level asyncio, to get more control on Process handling of streams.

Low-level asyncio

In order to be able to properly communicate with our asyncio Process while monitoring its stderr, we need to dive into the low-level API of asyncio. This involves using loop.subprocess_exec() and, while we gain more flexibility, this also implies handling “protocols” and “transports” ourselves. But luckily, this is documented including nice examples. Hold on!

Our starting point is loop.subprocess_exec(), which takes a protocol_factory argument to build a SubprocessProtocol responsible for handling communication with the child, as well as process exit. We’ll leverage this to define a custom protocol which binds an extra reader to stderr file-description (fd=2), this implements the SubprocessProtocol interface:

class MyProtocol(asyncio.subprocess.SubprocessStreamProtocol):
    def __init__(self, reader, limit, loop):
        super().__init__(limit=limit, loop=loop)
        self._reader = reader

    def pipe_data_received(self, fd, data):
        """Called when the child process writes data into its stdout
        or stderr pipe.
        """
        super().pipe_data_received(fd, data)
        if fd == 2:
            self._reader.feed_data(data)

    def pipe_connection_lost(self, fd, exc):
        """Called when one of the pipes communicating with the child
        process is closed.
        """
        super().pipe_connection_lost(fd, exc)
        if fd == 2:
            if exc:
                self._reader.set_exception(exc)
            else:
                self._reader.feed_eof()

The reader argument to our previous protocol is simply an asyncio.StreamReader instance. Thanks to our protocol, this reader will receive the same data as process’ stderr pipe, sort of a clone. This will be our handler for monitoring.

Finally, with a bit of plumbing:

loop = asyncio.get_event_loop()

reader = asyncio.StreamReader(loop=loop)
protocol_factory = functools.partial(
    MyProtocol, reader, limit=2**16, loop=loop
)

async def log_stderr():
    async for line in reader:
        logger.debug("%s: %s", cmd[0], line.decode().rstrip())

transport, protocol = await loop.subprocess_exec(
    protocol_factory,
    *cmd,
    stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)

proc = asyncio.subprocess.Process(transport, protocol, loop)
(out, err), _ = await asyncio.gather(proc.communicate(), log_stderr())

Running this program produces the following logs:

[10:50:55] - Using selector: EpollSelector
[10:50:55] - pg_basebackup: pg_basebackup: initiating base backup, waiting for checkpoint to complete
[10:50:55] - pg_basebackup: pg_basebackup: checkpoint completed
[10:50:55] - pg_basebackup: pg_basebackup: write-ahead log start point: 1/10000028 on timeline 1
[10:50:55] - pg_basebackup: pg_basebackup: starting background WAL receiver
[10:50:55] - pg_basebackup: pg_basebackup: created temporary replication slot "pg_basebackup_11979"
[10:50:55] - pg_basebackup:       0/1808798 kB (0%), 0/1 tablespace (pgdata/backup_label                )
[10:50:56] - pg_basebackup:  109809/1808798 kB (6%), 0/1 tablespace (pgdata/base/49533/49540            )
[10:50:57] - pg_basebackup:  714149/1808798 kB (39%), 0/1 tablespace (pgdata/base/49533/49549            )
[10:50:58] - pg_basebackup: 1274532/1808798 kB (70%), 0/1 tablespace (pgdata/base/49533/49546            )
[10:50:59] - pg_basebackup: 1661093/1808798 kB (91%), 0/1 tablespace (pgdata/base/16387/17957            )
[10:50:59] - pg_basebackup: 1808807/1808807 kB (100%), 0/1 tablespace (pgdata/global/pg_control           )
[10:50:59] - pg_basebackup: 1808807/1808807 kB (100%), 1/1 tablespace
[10:50:59] - pg_basebackup: pg_basebackup: write-ahead log end point: 1/10000138
[10:50:59] - pg_basebackup: pg_basebackup: waiting for background process to finish streaming ...
[10:50:59] - pg_basebackup: pg_basebackup: syncing data to disk ...
[10:51:00] - pg_basebackup: pg_basebackup: renaming backup_manifest.tmp to backup_manifest
[10:51:00] - pg_basebackup: pg_basebackup: base backup completed

Fancy processing

With this approach, we can also do more interesting things than just logging. In particular, reading the previous output from pg_basebackup, we could intercept the progress messages like “ 714149/1808798 kB (39%), 0/1 tablespace […]” and use them to report progress to the user.

We’ll use the excellent rich for rendering log messages and report progress in separate panels:

from rich.logging import RichHandler
from rich.progress import Progress, TaskID

logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.DEBUG,
    format="%(message)s",
    datefmt="[%X]",
    handlers=[RichHandler(show_path=False)],
)

loop = asyncio.get_event_loop()
reader = asyncio.StreamReader(loop=loop)
protocol_factory = functools.partial(MyProtocol, reader, limit=2**16, loop=loop)

async def log_stderr(progress: Progress, taskid: TaskID) -> None:
    async for line in reader:
        m = re.search(r"\((\d+)%\)", line.decode())
        if m:
            p = int(m.group(1))
            progress.update(taskid, advance=p)
        else:
            logger.debug("%s", line.decode().rstrip())

transport, protocol = await loop.subprocess_exec(
    protocol_factory,
    *cmd,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

proc = asyncio.subprocess.Process(transport, protocol, loop)

with Progress() as progress:
    taskid = progress.add_task(str(cmd[0]), total=100)
    (out, err), _ = await asyncio.gather(
        proc.communicate(), log_stderr(progress, taskid)
    )

See the screen cast:


DALIBO

DALIBO est le spécialiste français de PostgreSQL®. Nous proposons du support, de la formation et du conseil depuis 2005.