Non-blocking python subprocess

I am working on a pet project to compress a terabyte of video into a slimmer format. While I have been able to automate working with ffmpeg, I didn’t like the fact that I couldn’t follow along with the subprocess running ffmpeg.

I tried a few different ideas of how to watch ffmpeg but also avoid the script from blocking because I wanted to be able to time and monitor it’s progress

import subprocess

process = subprocess.pOpen

stdout, stderr = process communicate blocks until the process is finished

subprocess.stdout.readline() and subprocess.stderr.readline() will both block until there is sufficient data. In ffmpeg’s case there is never stdout output so it will block indefinitely.

https://gist.github.com/devdave/9b8553d63e24ef19eea7e56f7cb95c78

import subprocess
import threading
import queue
import os
import time
class Runner(object):
def __init__(self, cmd: []):
self.cmd = cmd
self.return_code = None
self.process = None # type: subprocess.Popen
self.run_time = 0
@property
def _default_popen_kwargs(self):
return {
"env": os.environ.copy(),
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
"shell": True,
"universal_newlines": True,
"bufsize": 1,
}
def _watch_output(self, process: subprocess.Popen, queue):
for line in iter(process.stderr.readline, ""):
queue.put(line)
if process.poll() is not None:
return
@property
def stdout(self):
return self.process.stdout
@property
def stderr(self):
return self.process.stderr
def start(self, wait_limit = 15):
start_time = time.time()
pargs = self._default_popen_kwargs
if self.cwd is not None:
pargs['cwd'] = self.cwd
self.process = subprocess.Popen(self.cmd, **pargs)
self.returned = None
last_output = time.time()
q = queue.Queue()
t = threading.Thread(target=self._watch_output, args=(self.process, q,))
t.daemon = True
t.start()
while self.returned is None:
self.returned = self.process.poll()
delay = last_output-time.time()
if self.returned is None:
stdout = f"{last_output-time.time()} waited"
try:
stderr = q.get_nowait()
except queue.Empty:
time.sleep(1)
else:
yield stdout, stderr
last_output = time.time()
if delay > wait_limit:
print("Waited 15 seconds, breaking")
break
self.run_time = time.time() - start_time
view raw runner.py hosted with ❤ by GitHub

By using threading Queue and constantly polling the process, I can watch the output as fast as it can come in but not worry about the main process blocking, just the threads.

A further improvement on the idea would be to have two threads (for stdout and stderr respectively) with the queue items put with sentinels like queue.put((STDERR, line_from_stderr)) and a sentinel for STDOUT.

To use - 

r = Runner(["some_long_running_process", "-arg1", "arg1 value"])

for stdout, stderr in r.start():
    print("STDOUT", stdout)
    print("STDERR", stderr)