Redirecting Python processes out and err streams to several streams

Today, I encountered an issue with subprocess calls. I was faced with the need of redirecting output streams from a subprocess call to the standard outputs and simultaneously to log files.

Situation

Say you have the choice of executing code using a batch scheduler, using a SSH tunnel or directly on your computer. Everything is wrapped through a launcher that uses the correct backend (LSF, SSH, local, …). The logs are created by a subprocess.call() inside the selected backend.

Now, a user comes and says that he wants local calls and blocking batch calls to output on the fly what its program outputs. Of course, it’s not possible to modify every single program, I thus have to adapt the Python wrapper so that it writes on the standard outputs and in the log files. Besides, with a batch scheduler like LSF, I have to use the interactive mode which redirects the program streams to the console.

This is a simple program that may write on both streams:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
from __future__ import print_function
 
import random
import time
import sys
 
for i in range(20):
  time.sleep(1)
 
  j = random.randint(0, 1)
  if j == 0:
    print("Hi! %i" % i, file=sys.stdout)
    sys.stdout.flush()
  else:
    print("Hi! %i" % i, file=sys.stderr)

A crude solution

It’s not possible to read from both stream at the same time. So to fix this, I will launch one thread per stream to catch. Then I will write to all output file-like what I just read line by line:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import time
 
class Filers(object):
  def __init__(self, files_list):
    self.files_list = files_list
 
  def write(self, buffer):
    for f in self.files_list:
      f.write(buffer)
      f.flush()
 
def forward_pipe(process, process_pipe, filers):
  while process.poll() is None:
    filers.write(process_pipe.readline())
 
if __name__ == "__main__":
  import sys
  import subprocess
  import threading
 
  f_out = Filers([open("test2.log", "w"), sys.stdout])
  f_err = Filers([open("test1.err", "w"), open("test2.err", "w"), sys.stderr])
 
  p = subprocess.Popen(["python", "alternate.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
  t2 = threading.Thread(target=forward_pipe, args=(p, p.stderr, f_err))
  t2.start()
  t1 = threading.Thread(target=forward_pipe, args=(p, p.stdout, f_out))
  t1.start()
  t2.join()
  t1.join()

A simple class wraps all the file-like in one instance per stream. The class may also be seen as a file-like, which simplifies the design a little bit.

In the problematic aspects of this solution, I had to flush the output stream in the subprocess call, because it wouldn’t update, whereas the error stream was flushed automatically. Perhaps it is a design decision that only one is flushed?

The solution I came up with is not really the most elegant ever. I think it may be enhanced, so if you have a solution, I’d gladly hear about it.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.