I am working on a pet project to compress a terabyte of video into a slimmer format. While I have been able to automate working with ffmpeg, I didn’t like the fact that I couldn’t follow along with the subprocess running ffmpeg.
I tried a few different ideas of how to watch ffmpeg but also avoid the script from blocking because I wanted to be able to time and monitor it’s progress
import subprocess
process = subprocess.pOpen
stdout, stderr = process communicate blocks until the process is finished
subprocess.stdout.readline() and subprocess.stderr.readline() will both block until there is sufficient data. In ffmpeg’s case there is never stdout output so it will block indefinitely.
https://gist.github.com/devdave/9b8553d63e24ef19eea7e56f7cb95c78
import subprocess | |
import threading | |
import queue | |
import os | |
import time | |
class Runner(object): | |
def __init__(self, cmd: []): | |
self.cmd = cmd | |
self.return_code = None | |
self.process = None # type: subprocess.Popen | |
self.run_time = 0 | |
@property | |
def _default_popen_kwargs(self): | |
return { | |
"env": os.environ.copy(), | |
"stdout": subprocess.PIPE, | |
"stderr": subprocess.PIPE, | |
"shell": True, | |
"universal_newlines": True, | |
"bufsize": 1, | |
} | |
def _watch_output(self, process: subprocess.Popen, queue): | |
for line in iter(process.stderr.readline, ""): | |
queue.put(line) | |
if process.poll() is not None: | |
return | |
@property | |
def stdout(self): | |
return self.process.stdout | |
@property | |
def stderr(self): | |
return self.process.stderr | |
def start(self, wait_limit = 15): | |
start_time = time.time() | |
pargs = self._default_popen_kwargs | |
if self.cwd is not None: | |
pargs['cwd'] = self.cwd | |
self.process = subprocess.Popen(self.cmd, **pargs) | |
self.returned = None | |
last_output = time.time() | |
q = queue.Queue() | |
t = threading.Thread(target=self._watch_output, args=(self.process, q,)) | |
t.daemon = True | |
t.start() | |
while self.returned is None: | |
self.returned = self.process.poll() | |
delay = last_output-time.time() | |
if self.returned is None: | |
stdout = f"{last_output-time.time()} waited" | |
try: | |
stderr = q.get_nowait() | |
except queue.Empty: | |
time.sleep(1) | |
else: | |
yield stdout, stderr | |
last_output = time.time() | |
if delay > wait_limit: | |
print("Waited 15 seconds, breaking") | |
break | |
self.run_time = time.time() - start_time |
By using threading Queue and constantly polling the process, I can watch the output as fast as it can come in but not worry about the main process blocking, just the threads.
A further improvement on the idea would be to have two threads (for stdout and stderr respectively) with the queue items put with sentinels like queue.put((STDERR, line_from_stderr)) and a sentinel for STDOUT.
To use -
r = Runner(["some_long_running_process", "-arg1", "arg1 value"])
for stdout, stderr in r.start():
print("STDOUT", stdout)
print("STDERR", stderr)