Today, I encountered an issue with subprocess calls. I was faced with the need of redirecting output streams from a subprocess call to the standard outputs and simultaneously to log files.
Say you have the choice of executing code using a batch scheduler, using a SSH tunnel or directly on your computer. Everything is wrapped through a launcher that uses the correct backend (LSF, SSH, local, …). The logs are created by a subprocess.call() inside the selected backend.
Now, a user comes and says that he wants local calls and blocking batch calls to output on the fly what its program outputs. Of course, it’s not possible to modify every single program, I thus have to adapt the Python wrapper so that it writes on the standard outputs and in the log files. Besides, with a batch scheduler like LSF, I have to use the interactive mode which redirects the program streams to the console.
This is a simple program that may write on both streams:
#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import print_function import random import time import sys for i in range(20): time.sleep(1) j = random.randint(0, 1) if j == 0: print("Hi! %i" % i, file=sys.stdout) sys.stdout.flush() else: print("Hi! %i" % i, file=sys.stderr)
A crude solution
It’s not possible to read from both stream at the same time. So to fix this, I will launch one thread per stream to catch. Then I will write to all output file-like what I just read line by line:
#!/usr/bin/env python # -*- coding: utf-8 -*- import time class Filers(object): def __init__(self, files_list): self.files_list = files_list def write(self, buffer): for f in self.files_list: f.write(buffer) f.flush() def forward_pipe(process, process_pipe, filers): while process.poll() is None: filers.write(process_pipe.readline()) if __name__ == "__main__": import sys import subprocess import threading f_out = Filers([open("test2.log", "w"), sys.stdout]) f_err = Filers([open("test1.err", "w"), open("test2.err", "w"), sys.stderr]) p = subprocess.Popen(["python", "alternate.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) t2 = threading.Thread(target=forward_pipe, args=(p, p.stderr, f_err)) t2.start() t1 = threading.Thread(target=forward_pipe, args=(p, p.stdout, f_out)) t1.start() t2.join() t1.join()
A simple class wraps all the file-like in one instance per stream. The class may also be seen as a file-like, which simplifies the design a little bit.
In the problematic aspects of this solution, I had to flush the output stream in the subprocess call, because it wouldn’t update, whereas the error stream was flushed automatically. Perhaps it is a design decision that only one is flushed?
The solution I came up with is not really the most elegant ever. I think it may be enhanced, so if you have a solution, I’d gladly hear about it.