Understanding Concurrency vs Parallelism
- Concurrency: Multiple tasks make progress (not necessarily simultaneously) โ one CPU switches between tasks
- Parallelism: Multiple tasks run simultaneously โ multiple CPUs
- asyncio: Best for I/O-bound tasks (network, file, database)
- threading: I/O-bound tasks with blocking libraries
- multiprocessing: CPU-bound tasks (bypasses GIL)
asyncio Basics
import asyncio
async def greet(name, delay):
await asyncio.sleep(delay) # yields control during wait
print(f"Hello, {name}!")
return f"Greeted {name}"
async def main():
# Sequential:
result1 = await greet("Alice", 1)
result2 = await greet("Bob", 1)
# Total: ~2 seconds
# Concurrent (gather runs all simultaneously):
results = await asyncio.gather(
greet("Alice", 1),
greet("Bob", 1),
greet("Carol", 1),
)
# Total: ~1 second!
print(results)
asyncio.run(main())
Tasks and Task Groups
import asyncio
async def fetch(url: str, delay: float) -> str:
await asyncio.sleep(delay) # simulate network request
return f"Response from {url}"
async def main():
# Create tasks (start immediately)
task1 = asyncio.create_task(fetch("example.com", 1.0))
task2 = asyncio.create_task(fetch("python.org", 0.5))
# Await them
r1 = await task1
r2 = await task2
print(r1, r2)
# TaskGroup (Python 3.11+) โ preferred approach:
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch("a.com", 0.3))
t2 = tg.create_task(fetch("b.com", 0.5))
# All tasks done here
print(t1.result(), t2.result())
asyncio.run(main())
Async Context Managers and Iterators
import asyncio
class AsyncDatabase:
async def __aenter__(self):
print("Connecting...")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, *args):
print("Disconnecting...")
await asyncio.sleep(0.1)
async def query(self, sql):
await asyncio.sleep(0.05)
return [{"id": 1}, {"id": 2}]
async def main():
async with AsyncDatabase() as db:
results = await db.query("SELECT * FROM users")
print(results)
# Async iterators:
async def async_range(n):
for i in range(n):
await asyncio.sleep(0.01)
yield i
async def consume():
async for n in async_range(5):
print(n)
# Async comprehension:
squares = [n**2 async for n in async_range(5)]
print(squares)
Error Handling in asyncio
import asyncio
async def may_fail(n):
if n == 2:
raise ValueError(f"Failed at {n}")
return n * 10
async def main():
# gather with return_exceptions โ doesn't raise immediately
results = await asyncio.gather(
may_fail(1),
may_fail(2),
may_fail(3),
return_exceptions=True
)
for r in results:
if isinstance(r, Exception):
print(f"Error: {r}")
else:
print(f"Success: {r}")
asyncio.run(main())
Threading
import threading
import time
def worker(name, delay):
time.sleep(delay)
print(f"Thread {name} done")
# Create and start threads
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i, 0.5))
threads.append(t)
t.start()
for t in threads:
t.join() # wait for all to finish
# Thread-safe with Lock:
counter = 0
lock = threading.Lock()
def increment(n):
global counter
for _ in range(n):
with lock:
counter += 1
t1 = threading.Thread(target=increment, args=(10000,))
t2 = threading.Thread(target=increment, args=(10000,))
t1.start(); t2.start()
t1.join(); t2.join()
print(counter) # always 20000
Multiprocessing
from multiprocessing import Pool, cpu_count
import time
def cpu_intensive(n):
return sum(i**2 for i in range(n))
if __name__ == "__main__":
numbers = [1_000_000] * 8
# Sequential
start = time.time()
results = [cpu_intensive(n) for n in numbers]
print(f"Sequential: {time.time() - start:.2f}s")
# Parallel (uses all CPU cores)
start = time.time()
with Pool(cpu_count()) as pool:
results = pool.map(cpu_intensive, numbers)
print(f"Parallel: {time.time() - start:.2f}s")
# Roughly 4-8x faster on multi-core machines
concurrent.futures
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import urllib.request
def fetch_url(url):
with urllib.request.urlopen(url, timeout=5) as r:
return len(r.read())
urls = [
"https://python.org",
"https://example.com",
]
# Thread pool for I/O-bound:
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(fetch_url, url) for url in urls]
results = [f.result() for f in futures]
# Process pool for CPU-bound:
def compute(n):
return sum(range(n))
with ProcessPoolExecutor() as executor:
result = list(executor.map(compute, [1000, 2000, 3000]))
Exercises
Exercise 1: Concurrent API Simulation
Simulate fetching data from 5 "APIs" concurrently using asyncio.
Solution:
import asyncio
import random
async def fetch_api(api_id):
delay = random.uniform(0.1, 0.5)
await asyncio.sleep(delay)
return {"api": api_id, "data": api_id * 100, "time": delay}
async def main():
tasks = [fetch_api(i) for i in range(1, 6)]
results = await asyncio.gather(*tasks)
for r in results:
print(f"API {r['api']}: {r['data']} (took {r['time']:.2f}s)")
asyncio.run(main())
Exercise 2: Async Rate Limiter
Implement a simple semaphore-based rate limiter.
Solution:
import asyncio
async def limited_fetch(sem, api_id):
async with sem: # max N concurrent
await asyncio.sleep(0.1)
return f"Result from {api_id}"
async def main():
sem = asyncio.Semaphore(3) # max 3 concurrent
tasks = [limited_fetch(sem, i) for i in range(10)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())