In this post I’m going to share a simple method that will turn any list comprehension into a high performance parallel job with a progress bar.

tqdm

If you are a python programmer who hasn’t worked with tqdm before, I’m about to change your life. With just 6 characters, tqdm adds a helpful and non-obstrusive progress bar to any python iterator. Try this:

	from tqdm import tqdm
	for i in tqdm(range(10)):
		#do stuff

You should see a progress bar like this:

Progress Bar

You can download tqdm here.

concurrent.futures

concurrent.futures is python’s standard module for asyncronous tasks. It gives you access to executor classes that you can use to manage threads and processes. If you’re not familiar with concurrent.futures, you can find a pretty good tutorial here.

Putting them together

from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed

def parallel_process(array, function, n_jobs=16, use_kwargs=False, front_num=3):
    """
        A parallel version of the map function with a progress bar. 

        Args:
            array (array-like): An array to iterate over.
            function (function): A python function to apply to the elements of array
            n_jobs (int, default=16): The number of cores to use
            use_kwargs (boolean, default=False): Whether to consider the elements of array as dictionaries of 
                keyword arguments to function 
            front_num (int, default=3): The number of iterations to run serially before kicking off the parallel job. 
                Useful for catching bugs
        Returns:
            [function(array[0]), function(array[1]), ...]
    """
    #We run the first few iterations serially to catch bugs
    if front_num > 0:
        front = [function(**a) if use_kwargs else function(a) for a in array[:front_num]]
    #If we set n_jobs to 1, just run a list comprehension. This is useful for benchmarking and debugging.
    if n_jobs==1:
        return front + [function(**a) if use_kwargs else function(a) for a in tqdm(array[front_num:])]
    #Assemble the workers
    with ProcessPoolExecutor(max_workers=n_jobs) as pool:
        #Pass the elements of array into function
        if use_kwargs:
            futures = [pool.submit(function, **a) for a in array[front_num:]]
        else:
            futures = [pool.submit(function, a) for a in array[front_num:]]
        kwargs = {
            'total': len(futures),
            'unit': 'it',
            'unit_scale': True,
            'leave': True
        }
        #Print out the progress as tasks complete
        for f in tqdm(as_completed(futures), **kwargs):
            pass
    out = []
    #Get the results from the futures. 
    for i, future in tqdm(enumerate(futures)):
        try:
            out.append(future.result())
        except Exception as e:
            out.append(e)
    return front + out

The syntax for parallel_process is identical to the syntax for python’s map function, but it is more customizable. The most basic use is:

A basic progress bar

We can also specify additional arguments to the function

A fancy progress bar