top of page

Data Science in Drilling - Episode 12

Writer: Zeyu YanZeyu Yan

Intro to Python's Async IO - Part I


written by Zeyu Yan, Ph.D., Head of Data Science from Nvicta AI


Data Science in Drilling is a multi-episode series written by the technical team members in Nvicta AI. Nvicta AI is a startup company who helps drilling service companies increase their value offering by providing them with advanced AI and automation technologies and services. The goal of this Data Science in Drilling series is to provide both data engineers and drilling engineers an insight of the state-of-art techniques combining both drilling engineering and data science.


This episode is Part I of an introduction series of Python’s asyncio library. Enjoy! :)


Enjoying great knowledge is just like enjoying delicious Takoyaki.


Introduction

In the first, second and third episodes of the Data Science in Drilling series, we have already used the asyncio library to parallelize and improve the performance of IO-bounded tasks. In today’s episode, we will cover more details about various ways to handle tasks using asyncio.


One thing to mention is that the asyncio APIs have some changes over different Python versions. The APIs used in this episode are based on the latest asyncio official documentations at the time of writing this blog post (Python 3.10.5). For more detailed information, please refer to the official documentations.


Task Handling Using Async IO


The first example is as follows:

import asyncio
import time

async def my_func(t):
    await asyncio.sleep(t)
    print(f'Slept for {t} seconds...')

async def main():
    await my_func(2)
    await my_func(1)
    print('All finished...')

if __name__ == '__main__':
    start = time.time()
    asyncio.run(main())
    end = time.time()
    print(f'Total time consumption: {end - start} seconds.')


An awaitable is an object which can be used in an await expression. There are three main types of awaitable objects: coroutines, Tasks, and Futures. In the above code snippet, a coroutine is returned by my_func. The execution results of the above code snippet are:

Slept for 2 seconds...
Slept for 1 seconds...
All finished...
Total time consumption: 3.0075151920318604 seconds.

It is seen that if two coroutines are awaited in series, their executions are also in series. Let’s modify the code a bit to wrap the coroutines as tasks using the asyncio.create_task method:

import asyncio
import time

async def my_func(t):
    print(f'Task started with t = {t}')
    await asyncio.sleep(t)
    print(f'Slept for {t} seconds...')

async def main():
    task_1 = asyncio.create_task(my_func(2))
    task_2 = asyncio.create_task(my_func(1))
    await task_1
    await task_2
    print('All finished...')

if __name__ == '__main__':
    start = time.time()
    asyncio.run(main())
    end = time.time()
    print(f'Total time consumption: {end - start} seconds.')

The execution results of the above code snippet are:

Task started with t = 2
Task started with t = 1
Slept for 1 seconds...
Slept for 2 seconds...
All finished...
Total time consumption: 2.006274700164795 seconds.

It is seen that if tasks are awaited in series, their executions are in parallel.


Let’s say now my_func has a return value. We want to run two my_func in parallel and collect their return values in the correct order. This can be achieved using the asyncio.gather method as follows:

import asyncio
import time

async def my_func(t):
    print(f'Task started with t = {t}')
    await asyncio.sleep(t)
    print(f'Slept for {t} seconds...')
    return t

async def main():
    results = await asyncio.gather(*[
        my_func(2),
        my_func(1)
    ])
    print(f'results: {results}')

if __name__ == '__main__':
    start = time.time()
    asyncio.run(main())
    end = time.time()
    print(f'Total time consumption: {end - start} seconds.')


The execution results of the above code snippet are:

Task started with t = 2
Task started with t = 1
Slept for 1 seconds...
Slept for 2 seconds...
results: [2, 1]
Total time consumption: 2.0041427612304688 seconds.

The elements in results are in the correct order as expected. One thing to be noted about the asyncio.gather method is that it will finish execution and return the results as a Python List when the execution of the last task has been finished.


The asyncio.wait method can also be used to run tasks in parallel and gather the results. The following is an example using this method:

import asyncio
import time

async def my_func(t):
    try:
        print(f'Task started with t = {t}')
        await asyncio.sleep(t)
        print(f'Slept for {t} seconds...')
        return t
    except asyncio.CancelledError:
        print(f't = {t} has been canceled.')

async def main():
    tasks = [my_func(i) for i in range(1, 6)]
    complete, pending = await asyncio.wait(tasks, timeout=3.5)
    for item in complete:
        print(f'Result: {item.result()}')
    if pending:
        print('Canceling unfinished tasks...')
        for item in pending:
            item.cancel()

if __name__ == '__main__':
    start = time.time()
    asyncio.run(main())
    end = time.time()
    print(f'Total time consumption: {end - start} seconds.')


After timeout, the tasks which haven’t been finished will be canceled. The execution results of the above code snippet are:

Task started with t = 2
Task started with t = 3
Task started with t = 1
Task started with t = 4
Task started with t = 5
Slept for 1 seconds...
Slept for 2 seconds...
Slept for 3 seconds...
Result: 3
Result: 1
Result: 2
Canceling unfinished tasks...
t = 5 has been canceled.
t = 4 has been canceled.
Total time consumption: 3.5058798789978027 seconds.

One thing to be noted about the asyncio.wait method is that when iterating through complete for results, the results are in random order.


Another method which can be used to run tasks in parallel and gather the results is the asyncio.as_completed method. The following is an example using this method:

import asyncio
import time

async def my_func(t):
    print(f'Task started with t = {t}')
    await asyncio.sleep(t)
    print(f'Slept for {t} seconds...')
    return t

async def main():
    tasks = [my_func(i) for i in range(1, 4)]
    for task in asyncio.as_completed(tasks):
        result = await task
        print(f'Result: {result}')

if __name__ == '__main__':
    start = time.time()
    asyncio.run(main())
    end = time.time()
    print(f'Total time consumption: {end - start} seconds.')

The execution results of the above code snippet are:

Task started with t = 2
Task started with t = 3
Task started with t = 1
Slept for 1 seconds...
Result: 1
Slept for 2 seconds...
Result: 2
Slept for 3 seconds...
Result: 3
Total time consumption: 3.0058958530426025 seconds.

In this case, the result is retrieved as soon as the corresponding task has been finished.


Lastly, there are cases when we only want to start the tasks but don’t want to block the event loop for awaiting their results. Take a look at the following example:

import asyncio

async def my_func(t):
    print(f'Task started with t = {t}')
    await asyncio.sleep(t)
    print(f'Slept for {t} seconds...')

async def main():
    asyncio.create_task(my_func(2))
    asyncio.create_task(my_func(1))
    print('All tasks have been started...')
    while True:
        await asyncio.sleep(1)

if __name__ == '__main__':
    asyncio.run(main())


Because there is an infinite loop, we need to use Ctrl + C to end the execution of the program. The execution results of the above code snippet are:

All tasks have been started...
Task started with t = 2
Task started with t = 1
Slept for 1 seconds...
Slept for 2 seconds...

It is seen that in the case the event loop was not blocked and the tasks were finished.


Conclusions


In this article, we went through various ways of handling tasks using Python's asyncio library. Hope you enjoy it! More skills about asyncio will be covered in the future episodes. Stay tuned!


Get in Touch


Thank you for reading! Please let us know if you like this series or if you have critiques. If this series was helpful to you, please follow us and share this series to your friends.


If you or your company needs any help on projects related to drilling automation and optimization, AI, and data science, please get in touch with us Nvicta AI. We are here to help. Cheers!

Comments


bottom of page