Mastering Async Programming in Python with asyncio
Asynchronous programming in Python can significantly improve the performance of I/O-bound applications. Let’s explore asyncio and learn how to write efficient async code.
What is Async Programming?
Async programming allows you to handle multiple operations concurrently without blocking the main thread. It’s perfect for:
- API calls
- Database queries
- File I/O
- Network operations
Basic Concepts
async/await Syntax
import asyncio
async def fetch_data():
print("Start fetching...")
await asyncio.sleep(2) # Simulate I/O operation
print("Done fetching!")
return {"data": "result"}
# Run the async function
asyncio.run(fetch_data())
Running Multiple Tasks Concurrently
async def task1():
await asyncio.sleep(1)
return "Task 1 complete"
async def task2():
await asyncio.sleep(2)
return "Task 2 complete"
async def main():
# Run tasks concurrently
results = await asyncio.gather(
task1(),
task2()
)
print(results)
# ['Task 1 complete', 'Task 2 complete']
asyncio.run(main())
Real-World Examples
Async HTTP Requests with aiohttp
import aiohttp
import asyncio
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.json()
async def fetch_multiple_urls():
urls = [
'https://api.example.com/users',
'https://api.example.com/posts',
'https://api.example.com/comments'
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Run
results = asyncio.run(fetch_multiple_urls())
Async Database Queries
import asyncpg
import asyncio
async def get_users():
conn = await asyncpg.connect(
user='user',
password='password',
database='mydb',
host='localhost'
)
try:
rows = await conn.fetch('SELECT * FROM users')
return rows
finally:
await conn.close()
users = asyncio.run(get_users())
Advanced Patterns
Task Groups (Python 3.11+)
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data(1))
task2 = tg.create_task(fetch_data(2))
# All tasks completed
print(task1.result())
print(task2.result())
Timeout Handling
async def slow_operation():
await asyncio.sleep(10)
return "Done"
async def main():
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=5.0
)
except asyncio.TimeoutError:
print("Operation timed out!")
asyncio.run(main())
Semaphore for Rate Limiting
async def fetch_with_limit(url, semaphore):
async with semaphore:
# Only N concurrent requests
return await fetch_url(url)
async def main():
semaphore = asyncio.Semaphore(5) # Max 5 concurrent
urls = [f"https://api.example.com/item/{i}" for i in range(100)]
tasks = [fetch_with_limit(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
asyncio.run(main())
Common Patterns
Producer-Consumer with Queue
async def producer(queue):
for i in range(10):
await asyncio.sleep(0.5)
await queue.put(f"item-{i}")
print(f"Produced: item-{i}")
async def consumer(queue, name):
while True:
item = await queue.get()
print(f"{name} consumed: {item}")
await asyncio.sleep(1)
queue.task_done()
async def main():
queue = asyncio.Queue()
# Start producer and consumers
producers = [asyncio.create_task(producer(queue))]
consumers = [
asyncio.create_task(consumer(queue, f"Consumer-{i}"))
for i in range(3)
]
# Wait for all items to be produced
await asyncio.gather(*producers)
# Wait for queue to be empty
await queue.join()
# Cancel consumers
for c in consumers:
c.cancel()
asyncio.run(main())
Background Tasks
import asyncio
async def background_task():
while True:
print("Background task running...")
await asyncio.sleep(5)
async def main():
# Start background task
task = asyncio.create_task(background_task())
# Do other work
await asyncio.sleep(20)
# Cancel background task
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Background task cancelled")
asyncio.run(main())
Best Practices
1. Don’t Block the Event Loop
# ❌ Bad - blocks event loop
async def bad_example():
import time
time.sleep(5) # Blocks everything!
# ✅ Good - non-blocking
async def good_example():
await asyncio.sleep(5)
2. Use asyncio-compatible Libraries
- ✅
aiohttpfor HTTP - ✅
asyncpgfor PostgreSQL - ✅
motorfor MongoDB - ❌ Avoid
requests(blocking) - ❌ Avoid
psycopg2(blocking)
3. Handle Exceptions Properly
async def safe_fetch(url):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
except aiohttp.ClientError as e:
print(f"Error fetching {url}: {e}")
return None
4. Use Context Managers
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
Performance Comparison
import time
import asyncio
import aiohttp
# Synchronous version
def sync_fetch():
import requests
start = time.time()
for i in range(10):
requests.get('https://api.example.com')
print(f"Sync: {time.time() - start:.2f}s")
# Async version
async def async_fetch():
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [
session.get('https://api.example.com')
for i in range(10)
]
await asyncio.gather(*tasks)
print(f"Async: {time.time() - start:.2f}s")
# Sync: 10.5s
# Async: 1.2s (8-9x faster!)
Common Pitfalls
- Forgetting await: Results in a coroutine object instead of the actual result
- Blocking the event loop: Using synchronous I/O operations
- Not handling exceptions: Unhandled exceptions in tasks
- Resource leaks: Not properly closing connections
Conclusion
Async programming in Python with asyncio is powerful for building high-performance, I/O-bound applications. Master the basics, understand the event loop, and use async-compatible libraries to get the most out of asynchronous programming.