Use Multiprocessing in Python to Boost the Efficiency of Your Applications
written by Zeyu Yan, Ph.D., Head of Data Science from Nvicta AI
Data Science in Drilling is a multi-episode series written by the technical team members in Nvicta AI. Nvicta AI is a startup company who helps drilling service companies increase their value offering by providing them with advanced AI and automation technologies and services. The goal of this Data Science in Drilling series is to provide both data engineers and drilling engineers an insight of the state-of-art techniques combining both drilling engineering and data science.
Episode 5, here we go!

Enjoying great knowledge is just like enjoying delicious sushi.
Introduction
In Episode 2 of this series, we covered how to use AWS Lambda functions in an asynchronous pattern to parallelize CPU-bounded tasks. However, in some scenarios, it may not be ideal to integrate AWS Lambda as part of the application. In this case, parallelization can still be achieved by using multiprocessing technique. Multiprocessing is a technique which leverages multiple processors in a computer to simultaneously process multiple different portions of the same program. Basically every programming language has its own way of utilizing the power of multiprocessing. Due to the limitations from the Global Interpretor Lock (GIL), Python's multithreading is not suitable for CPU-bounded tasks. Here is the general rule to choose between multiprocessing, multithreading and coroutine in Python:
Use multiprocessing for CPU-bounded tasks.
Use multithreading or coroutine for IO-bounded tasks.
In this article, we will cover how to use multiprocessing in Python to boost the performance of CPU-bounded tasks.
What We'll Cover Today
How to use Python's multiprocessing to parallel CPU-bounded tasks.
How to create a customized process to run perform specific tasks.
Using Multiprocessing to Parallelize Tasks
To simulate some computationally intensive tasks which take some time to finish, let's define the following function:
import os
import time
def long_time_task(i):
print(f'Child process: {os.getpid()} - Task: {i}')
time.sleep(i)
print(f'Result: {i}')
The above function will take i seconds to finish and print out the ID of the process which takes charge of running the task. Let's first run two of the tasks in sequence to see how they perform:
print(f'Current mother process: {os.getpid()}')
start = time.perf_counter()
long_time_task(3)
long_time_task(2)
end = time.perf_counter()
print(f'Time consumed: {end - start} seconds')
The results are:
Current mother process: 17246
Child process: 17246 - Task: 3
Result: 3
Child process: 17246 - Task: 2
Result: 2
Time consumed: 5.009215759 seconds
It is seen that both tasks were finished within the same process as the main program. The total time consumption was 5 seconds as expected. Now it's time to use Python's multiprocessing to boost the performance:
from multiprocessing import Process
print(f'Current mother process: {os.getpid()}')
start = time.perf_counter()
p1 = Process(target=long_time_task, args=(3, )) # Create the 1st process.
p2 = Process(target=long_time_task, args=(2, )) # Create the 2nd process.
p1.start() # Start the 1st process.
p2.start() # Start the 2nd process.
p1.join() # Make the main process wait the 1st process to be finished.
p2.join() # Make the main process wait the 2nd process to be finished.
print('All child processes finished...')
end = time.perf_counter()
print(f'Time consumed: {end - start} seconds')
One thing to be addressed for the above code snippet is the .join method. The .join method actually makes the main process wait the two child process to finish before the main process itself can finish and exit. Without the .join method, the main process will finish and exit immediately. The input parameter was passed to the function through the args parameter of the process constructor and should be a Python tuple. Here are the results of the above code snippet:
Current mother process: 17246
Child process: 17270 - Task: 3
Child process: 17271 - Task: 2
Result: 2
Result: 3
All child processes finished...
Time consumed: 3.022054391000001 seconds
It only took around 3 seconds to finish both the tasks in parallel.
In some scenarios, we may not want to create each child process manually. In this case, a process pool can be used:
from multiprocessing import Pool
print(f'Current mother process: {os.getpid()}')
start = time.perf_counter()
p = Pool(5)
for i in range(5):
p.apply_async(long_time_task, args=(i + 1, ))
p.close()
p.join()
end = time.perf_counter()
print(f'Time consumed: {end - start} seconds')
In the above code, a process pool of size 5 was created to run 5 tasks in parallel. One thing worth mentioning here is when using process pool, the .close method must be called before .join method.
Here are the results:
Current mother process: 17246
Child process: 17274 - Task: 1
Child process: 17277 - Task: 4
Child process: 17275 - Task: 2
Child process: 17276 - Task: 3
Child process: 17278 - Task: 5
Result: 1
Result: 2
Result: 3
Result: 4
Result: 5
All child processes finished...
Time consumed: 5.1443070209999995 seconds
It took approximately 5 seconds to finish all the tasks, which was in line with our expectations. One thing we have to be aware of is the size of the process pool needs be greater than or equal to the number of parallel tasks to run all the tasks simultaneously. However, the maximum size of the pool is limited by the number of CPU cores of the machine. The following code can be used to check the number of CPU cores:
from multiprocessing import cpu_count
print(f'Number of CPU cores: {cpu_count()}')
If no input parameter is passed to the pool constructor, the default pool size will equal to the number of CPU cores of the machine.
Now let's define another function which receives 2 input arguments and returns a string:
def do_something(a, b):
seconds = a + b
print(f'Sleeping {seconds} seconds...')
time.sleep(seconds)
return f'Done sleeping {seconds} seconds'
The following code snippet shows how to pass multiple input arguments to a function as well as how to get the return value from the function when using multiprocessing:
start = time.perf_counter()
p = Pool(2)
results = []
results.append(p.apply_async(do_something, args=(1, 2)))
results.append(p.apply_async(do_something, args=(1, 1)))
p.close()
p.join()
for result in results:
print(result.get())
print('All child processes finished...')
end = time.perf_counter()
print(f'Time consumed: {end - start} seconds')
The .get method is used to retrieve the return value of the function. Here are the results of the above code snippet:
Sleeping 3 seconds...
Sleeping 2 seconds...
Done sleeping 3 seconds
Done sleeping 2 seconds
All child processes finished...
Time consumed: 3.0396323649999992 seconds
Now let's say we have an iterable, instead of just a few items as the input arguments of the function. In this case, the .starmap_async method can be used:
start = time.perf_counter()
p = Pool(5)
inputs = [(i, i + 1) for i in range(5)]
results = p.starmap_async(do_something, inputs)
p.close()
p.join()
print(results.get())
print('All child processes finished...')
end = time.perf_counter()
print(f'Time consumed: {end - start} seconds')
The results of the above code snippet are:
Sleeping 3 seconds...
Sleeping 1 seconds...
Sleeping 7 seconds...
Sleeping 5 seconds...
Sleeping 9 seconds...
['Done sleeping 1 seconds', 'Done sleeping 3 seconds', 'Done sleeping 5 seconds', 'Done sleeping 7 seconds', 'Done sleeping 9 seconds']
All child processes finished...
Time consumed: 9.102658118999997 seconds
Another way to leverage multiprocessing in Python is through concurrent.futures:
import concurrent.futures
start = time.perf_counter()
with concurrent.futures.ProcessPoolExecutor() as executor:
f1 = executor.submit(do_something, 1, 2)
f2 = executor.submit(do_something, 1, 1)
print(f1.result())
print(f2.result())
print('All child processes finished...')
end = time.perf_counter()
print(f'Time consumed: {end - start} seconds')
The .result method is used to retrieve the return value of the function. We can also get:
Sleeping 3 seconds...
Sleeping 2 seconds...
Done sleeping 3 seconds
Done sleeping 2 seconds
All child processes finished...
Time consumed: 3.071668759999998 seconds
Here is how we pass multiple input arguments to the function when using concurrent.futures:
start = time.perf_counter()
inputs = [(i, i + 1) for i in range(5)]
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(do_something, *item) for item in inputs]
for f in results:
print(f.result())
print('All child processes finished...')
end = time.perf_counter()
print(f'Time: {end - start}')
Here the "*" used to decorate item actually means unpacking in Python and unpacks the values in a tuple into multiple input arguments to the function. Here are the results of the above code snippet:
Sleeping 1 seconds...
Sleeping 3 seconds...
Sleeping 5 seconds...
Sleeping 7 seconds...
Sleeping 9 seconds...
Done sleeping 1 seconds
Done sleeping 3 seconds
Done sleeping 5 seconds
Done sleeping 7 seconds
Done sleeping 9 seconds
All child processes finished...
Time: 9.073480680999978
Customize Your Own Process
In some scenarios one may want to create a customized process to perform some specific tasks. Here is an example on how to customize a process:
class MyProcess(Process):
def __init__(self, value):
super().__init__()
self.value = value
def step_1(self):
print(self.value + 1)
def step_2(self):
print(self.value + 2)
def run(self):
self.step_1()
self.step_2()
All the customized processes need to inherit Process class and implement the run method. Let's test our customized process:
p = MyProcess(2)
p.start()
p.join()
The results of the above code snippet are:
3
4
Conclusion
In this article, we mainly went through how to use Python's multiprocessing to parallelize CPU-bounded tasks. We also covered how to create a customized process to perform specific tasks.
Get in Touch
Thank you for reading! Please let us know if you like this series or if you have critiques. If this series was helpful to you, please follow us and share this series to your friends.
If you or your company needs any help on projects related to drilling automation and optimization, AI, and data science, please get in touch with us Nvicta AI. We are here to help. Cheers!
Comments