In this post I’m going to share a simple method that will turn any list comprehension into a high performance parallel job with a progress bar.
tqdm
If you are a python programmer who hasn’t worked with tqdm before, I’m about to change your life. With just 6 characters, tqdm adds a helpful and non-obstrusive progress bar to any python iterator. Try this:
from tqdm import tqdm
for i in tqdm(range(10)):
#do stuff
You should see a progress bar like this:
You can download tqdm here.
concurrent.futures
concurrent.futures is python’s standard module for asyncronous tasks. It gives you access to executor classes that you can use to manage threads and processes. If you’re not familiar with concurrent.futures, you can find a pretty good tutorial here.
Putting them together
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
def parallel_process(array, function, n_jobs=16, use_kwargs=False, front_num=3):
"""
A parallel version of the map function with a progress bar.
Args:
array (array-like): An array to iterate over.
function (function): A python function to apply to the elements of array
n_jobs (int, default=16): The number of cores to use
use_kwargs (boolean, default=False): Whether to consider the elements of array as dictionaries of
keyword arguments to function
front_num (int, default=3): The number of iterations to run serially before kicking off the parallel job.
Useful for catching bugs
Returns:
[function(array[0]), function(array[1]), ...]
"""
#We run the first few iterations serially to catch bugs
if front_num > 0:
front = [function(**a) if use_kwargs else function(a) for a in array[:front_num]]
#If we set n_jobs to 1, just run a list comprehension. This is useful for benchmarking and debugging.
if n_jobs==1:
return front + [function(**a) if use_kwargs else function(a) for a in tqdm(array[front_num:])]
#Assemble the workers
with ProcessPoolExecutor(max_workers=n_jobs) as pool:
#Pass the elements of array into function
if use_kwargs:
futures = [pool.submit(function, **a) for a in array[front_num:]]
else:
futures = [pool.submit(function, a) for a in array[front_num:]]
kwargs = {
'total': len(futures),
'unit': 'it',
'unit_scale': True,
'leave': True
}
#Print out the progress as tasks complete
for f in tqdm(as_completed(futures), **kwargs):
pass
out = []
#Get the results from the futures.
for i, future in tqdm(enumerate(futures)):
try:
out.append(future.result())
except Exception as e:
out.append(e)
return front + out
The syntax for parallel_process
is identical to the syntax for python’s map
function, but it is more customizable. The most basic use is:
We can also specify additional arguments to the function