top of page

Data Science in Drilling - Episode 6

Writer: Zeyu YanZeyu Yan

Use Multithreading in Python to Boost the Performance of IO-bounded Tasks


written by Zeyu Yan, Ph.D., Head of Data Science from Nvicta AI


Data Science in Drilling is a multi-episode series written by the technical team members in Nvicta AI. Nvicta AI is a startup company that helps drilling service companies increase their value offering by providing them with advanced AI and automation technologies and services. The goal of this Data Science in Drilling series is to provide both data engineers and drilling engineers an insight to the state-of-art techniques combining both drilling engineering and data science.


If you enjoyed Episode 5, I am sure you will also enjoy this one!


Enjoying great knowledge is just like enjoying delicious mochi ice cream.


Introduction


In the previous episode of this series, we covered how to use Python's multiprocessing to parallelize CPU-bounded tasks. Generally, threads are lighter than processes and cause less overhead. However, due to the limitations of the Global Interpretor Lock (GIL), Python's multithreading is not suitable for CPU-bounded tasks but is often used for IO-bounded tasks. Recall the general rule to choose between multiprocessing and multithreading in Python is to use multiprocessing for CPU-bounded tasks, and multithreading for IO-bounded tasks.


In this article, we will cover how to use multithreading in Python to boost the performance of IO-bounded tasks.


What We'll Cover Today
  1. How to use Python's multithreading to parallel IO-bounded tasks.

  2. How to create a customized thread to perform specific tasks.

  3. How to use thread lock to guarantee thread safety.


Use Multithreading to Parallelize Tasks


To simulate some IO-bounded tasks which take some time to finish, let's define the following function:

import threading
import time

def long_time_task(i):
    print(f'Current child thread: {threading.current_thread().name} - task: {i}')
    time.sleep(i)
    print(f'Result: {i}')

The above function will take i seconds to finish and print out the name of the thread which takes charge of running the task. Let's first run two of the tasks in sequence to see how they perform:

print(f'Current main thread: {threading.current_thread().name}')
start = time.perf_counter()
long_time_task(3)
long_time_task(2)
end = time.perf_counter()
print(f'Time consumed: {end - start} seconds')

The results are:

Current main thread: MainThread
Current child thread: MainThread - task: 3
Result: 3
Current child thread: MainThread - task: 2
Result: 2
Time consumed: 5.003756629999998 seconds

It is seen that both tasks were finished within the same thread which was the main thread. The total time consumption was 5 seconds without any parallelizations. Now let's try to use Python's multithreading to boost the performance:

start = time.perf_counter()
print(f'This is the main thread: {threading.current_thread().name}')
t1 = threading.Thread(target=long_time_task, args=(3, ))
t2 = threading.Thread(target=long_time_task, args=(2, ))
t1.start()
t2.start()
t1.join()
t2.join()
end = time.perf_counter()
print(f'Total time consumed: {end - start} seconds')

Similar to multiprocessing, the .join method in the above code snippet makes the main thread wait the two child thread to finish before the main thread itself can finish and exit. Without the .join method, the main thread will finish and exit immediately. The input parameter was passed to the function through the args parameter of the thread constructor and should be a Python tuple. Here are the results of the above code snippet:

This is the main thread: MainThread
Current child thread: Thread-6 - task: 3
Current child thread: Thread-7 - task: 2
Result: 2
Result: 3
Total time consumed: 3.005277941000031 seconds

It only took around 3 seconds to finish both the tasks in parallel using multithreading. The following code snippet shows how to use multithreading to parallelize tasks through a loop:

start = time.perf_counter()
print(f'This is the main thread: {threading.current_thread().name}')
thread_list = []
for i in range(1, 5):
    t = threading.Thread(target=long_time_task, args=(i, ))
    t.start()
    thread_list.append(t)

for t in thread_list:
    t.join()

end = time.perf_counter()
print(f'Total time consumed: {end - start} seconds')

The results of the parallelized tasks are:

This is the main thread: MainThread
Current child thread: Thread-8 - task: 1
Current child thread: Thread-9 - task: 2
Current child thread: Thread-10 - task: 3
Current child thread: Thread-11 - task: 4
Result: 1
Result: 2
Result: 3
Result: 4
Total time consumed: 4.004850304999991 seconds

Sometimes one may want the tasks of the child threads to be finished in the background and the main thread not to be blocked. In this case, the .setDaemon method can be used, which will make the child threads to become daemon threads:

start = time.perf_counter()
print(f'This is the main thread: {threading.current_thread().name}')
for i in range(1, 5):
    t = threading.Thread(target=long_time_task, args=(i, ))
    t.setDaemon(True)
    t.start()

end = time.perf_counter()
print(f'Total time consumed: {end - start} seconds')

The results of the above code snippet are:

This is the main thread: MainThread
Current child thread: Thread-12 - task: 1
Current child thread: Thread-13 - task: 2
Current child thread: Thread-14 - task: 3
Current child thread: Thread-15 - task: 4
Total time consumed: 0.0020358709999754865 seconds
Result: 1
Result: 2
Result: 3
Result: 4

It can be seen that the main thread finished immediately, while the tasks of the child threads finished in the background.


Similar to the concept of process pool, a thread pool can also be created through concurrent.futures:

import concurrent.futures

start = time.perf_counter()
print(f'This is the main thread: {threading.current_thread().name}')
with concurrent.futures.ThreadPoolExecutor() as executor:
    executor.submit(long_time_task, 3)
    executor.submit(long_time_task, 2)
end = time.perf_counter()
print(f'Total time consumed: {end - start} seconds')

The results of the above code snippet are:

This is the main thread: MainThread
Current child thread: ThreadPoolExecutor-1_0 - task: 3
Current child thread: ThreadPoolExecutor-1_1 - task: 2
Result: 2
Result: 3
Total time consumed: 3.001323351999872 seconds

Now let's define another function that receives 2 input arguments and returns a string:

def do_something(a, b):
    seconds = a + b
    print(f'Sleeping {seconds} seconds...')
    time.sleep(seconds)
    return f'Done sleeping {seconds} seconds'

The following code snippet shows how to pass multiple input arguments to a function as well as how to get the return value from the function when using multithreading:

start = time.perf_counter()
print(f'This is the main thread: {threading.current_thread().name}')
with concurrent.futures.ThreadPoolExecutor() as executor:
    future_1 = executor.submit(do_something, 1, 2)
    future_2 = executor.submit(do_something, 1, 1)
    print(future_1.result())
    print(future_2.result())
end = time.perf_counter()
print(f'Total time consumed: {end - start} seconds')

The .result method is used to retrieve the return value of the function. Here are the results of the above code snippet:

This is the main thread: MainThread
Sleeping 3 seconds...
Sleeping 2 seconds...
Done sleeping 3 seconds
Done sleeping 2 seconds
Total time consumed: 3.0016597430001184 seconds

Here is how we pass multiple input arguments to the function when using concurrent.futures:

start = time.perf_counter()
inputs = [(i, i + 1) for i in range(5)]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(do_something, *item) for item in inputs]
    
    for f in futures:
        print(f.result())

end = time.perf_counter()
print(f'Time: {end - start}')

Here the "*" used to decorate item actually means unpacking in Python and unpacks the values in a tuple into multiple input arguments to the function. Here are the results of the above code snippet:

Sleeping 1 seconds...
Sleeping 3 seconds...
Sleeping 5 seconds...
Sleeping 7 seconds...
Sleeping 9 seconds...
Done sleeping 1 seconds
Done sleeping 3 seconds
Done sleeping 5 seconds
Done sleeping 7 seconds
Done sleeping 9 seconds
Time: 9.008446461999938

Customize Your Own Thread


In some scenarios one may want to create a customized thread to perform some specific tasks. Customizing a thread is quite similar to customizing a process covered in the previous episode:

class MyThread(threading.Thread):
    def __init__(self, value):
        super().__init__()
        self.value = value
    
    def step_1(self):
        print(self.value + 1)
    
    def step_2(self):
        print(self.value + 2)
    
    def run(self):
        self.step_1()
        self.step_2()

All the customized threads need to inherit threading.Thread class and implement the run method. Let's test our customized thread:

t = MyThread(2)
t.start()
t.join()

The results of the above code snippet are:

3
4

Use Thread Lock to Guarantee Thread Safety


Thread safety is an important topic when designing multithreading programs. Whenever threads are introduced in a program, the shared state amongst the threads becomes vulnerable to corruption. Failing to guarantee thread safety may introduce unexpected problems to the programs. Please refer to this link for more details about thread safety.


Here is an example about thread safety. Let's define the following class which simulates one's bank account with an initial balance of 0. When the self.save and self.withdraw methods are called, they will increase and decrease self.balance by 1 for 100000 times, respectively.

class Account:
    def __init__(self):
        self.balance = 0
    
    def save(self):
        for _ in range(100000):
            self.balance += 1
    
    def withdraw(self):
        for _ in range(100000):
            self.balance -= 1

Let's create an account instance and try to run the self.save and self.withdraw methods using two separate threads and check the total balance of the account:

account = Account()
t_save = threading.Thread(target=account.save, name='Save')
t_withdraw = threading.Thread(target=account.withdraw, name='Withdraw')
t_save.start()
t_withdraw.start()
t_save.join()
t_withdraw.join()
print(f'Current balance: {account.balance}')

The result of the above code snippet is:

Current balance: -44136

The result is not in line with our expectations. We expect a total balance of 0 after the program finishes running. If we run the above program multiple times, then the result may be different for each run. This random behavior of the program was caused by two separate threads modifying the value of a shared state and the program was not designed to be thread-safe. Let's modify the program to use thread lock to make it thread-safe:

# Add lock.
class Account:
    def __init__(self):
        self.balance = 0
    
    def save(self, lock):
        lock.acquire()
        for _ in range(100000):
            self.balance += 1
        lock.release()
    
    def withdraw(self, lock):
        lock.acquire()
        for _ in range(100000):
            self.balance -= 1
        lock.release()

The thread lock is acquired before a thread tries to modify the value of the shared state, and released only after the modification has been finished. In this way, the thread safety can be guaranteed. Let's run the test again:

account = Account()
lock = threading.Lock()
t_save = threading.Thread(target=account.save, args=(lock, ), name='Save')
t_withdraw = threading.Thread(target=account.withdraw, args=(lock, ), name='Withdraw')
t_save.start()
t_withdraw.start()
t_save.join()
t_withdraw.join()
print(f'Current balance: {account.balance}')

The result of the above code snippet is:

Current balance: 0

No matter how many times we run the above program, the outcome will always be 0. This is because the thread-safety has been guaranteed through thread lock.


Conclusions


In this article, we mainly went through how to use Python's multithreading to parallelize IO-bounded tasks. We also covered how to create a customized thread to perform specific tasks and how to use thread lock to guarantee thread safety.


Get in Touch


Thank you for reading! Please let us know if you like this series or if you have critiques. If this series was helpful to you, please follow us and share this series to your friends.

If you or your company needs any help on projects related to drilling automation and optimization, AI, and data science, please get in touch with us Nvicta AI. We are here to help. Cheers!


Comments


bottom of page