AdvancedPython ยท Lesson 1

Async Programming and Concurrency

Master asyncio, async/await, concurrent programming with threads and processes

Understanding Concurrency vs Parallelism

  • Concurrency: Multiple tasks make progress (not necessarily simultaneously) โ€” one CPU switches between tasks
  • Parallelism: Multiple tasks run simultaneously โ€” multiple CPUs
  • asyncio: Best for I/O-bound tasks (network, file, database)
  • threading: I/O-bound tasks with blocking libraries
  • multiprocessing: CPU-bound tasks (bypasses GIL)

asyncio Basics

import asyncio

async def greet(name, delay):
    await asyncio.sleep(delay)   # yields control during wait
    print(f"Hello, {name}!")
    return f"Greeted {name}"

async def main():
    # Sequential:
    result1 = await greet("Alice", 1)
    result2 = await greet("Bob", 1)
    # Total: ~2 seconds

    # Concurrent (gather runs all simultaneously):
    results = await asyncio.gather(
        greet("Alice", 1),
        greet("Bob", 1),
        greet("Carol", 1),
    )
    # Total: ~1 second!
    print(results)

asyncio.run(main())

Tasks and Task Groups

import asyncio

async def fetch(url: str, delay: float) -> str:
    await asyncio.sleep(delay)  # simulate network request
    return f"Response from {url}"

async def main():
    # Create tasks (start immediately)
    task1 = asyncio.create_task(fetch("example.com", 1.0))
    task2 = asyncio.create_task(fetch("python.org", 0.5))

    # Await them
    r1 = await task1
    r2 = await task2
    print(r1, r2)

    # TaskGroup (Python 3.11+) โ€” preferred approach:
    async with asyncio.TaskGroup() as tg:
        t1 = tg.create_task(fetch("a.com", 0.3))
        t2 = tg.create_task(fetch("b.com", 0.5))
    # All tasks done here
    print(t1.result(), t2.result())

asyncio.run(main())

Async Context Managers and Iterators

import asyncio

class AsyncDatabase:
    async def __aenter__(self):
        print("Connecting...")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, *args):
        print("Disconnecting...")
        await asyncio.sleep(0.1)

    async def query(self, sql):
        await asyncio.sleep(0.05)
        return [{"id": 1}, {"id": 2}]

async def main():
    async with AsyncDatabase() as db:
        results = await db.query("SELECT * FROM users")
        print(results)


# Async iterators:
async def async_range(n):
    for i in range(n):
        await asyncio.sleep(0.01)
        yield i

async def consume():
    async for n in async_range(5):
        print(n)

    # Async comprehension:
    squares = [n**2 async for n in async_range(5)]
    print(squares)

Error Handling in asyncio

import asyncio

async def may_fail(n):
    if n == 2:
        raise ValueError(f"Failed at {n}")
    return n * 10

async def main():
    # gather with return_exceptions โ€” doesn't raise immediately
    results = await asyncio.gather(
        may_fail(1),
        may_fail(2),
        may_fail(3),
        return_exceptions=True
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"Error: {r}")
        else:
            print(f"Success: {r}")

asyncio.run(main())

Threading

import threading
import time

def worker(name, delay):
    time.sleep(delay)
    print(f"Thread {name} done")

# Create and start threads
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i, 0.5))
    threads.append(t)
    t.start()

for t in threads:
    t.join()  # wait for all to finish

# Thread-safe with Lock:
counter = 0
lock = threading.Lock()

def increment(n):
    global counter
    for _ in range(n):
        with lock:
            counter += 1

t1 = threading.Thread(target=increment, args=(10000,))
t2 = threading.Thread(target=increment, args=(10000,))
t1.start(); t2.start()
t1.join(); t2.join()
print(counter)  # always 20000

Multiprocessing

from multiprocessing import Pool, cpu_count
import time

def cpu_intensive(n):
    return sum(i**2 for i in range(n))

if __name__ == "__main__":
    numbers = [1_000_000] * 8

    # Sequential
    start = time.time()
    results = [cpu_intensive(n) for n in numbers]
    print(f"Sequential: {time.time() - start:.2f}s")

    # Parallel (uses all CPU cores)
    start = time.time()
    with Pool(cpu_count()) as pool:
        results = pool.map(cpu_intensive, numbers)
    print(f"Parallel: {time.time() - start:.2f}s")
    # Roughly 4-8x faster on multi-core machines

concurrent.futures

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import urllib.request

def fetch_url(url):
    with urllib.request.urlopen(url, timeout=5) as r:
        return len(r.read())

urls = [
    "https://python.org",
    "https://example.com",
]

# Thread pool for I/O-bound:
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(fetch_url, url) for url in urls]
    results = [f.result() for f in futures]

# Process pool for CPU-bound:
def compute(n):
    return sum(range(n))

with ProcessPoolExecutor() as executor:
    result = list(executor.map(compute, [1000, 2000, 3000]))

Exercises

Exercise 1: Concurrent API Simulation

Simulate fetching data from 5 "APIs" concurrently using asyncio.

Solution:

import asyncio
import random

async def fetch_api(api_id):
    delay = random.uniform(0.1, 0.5)
    await asyncio.sleep(delay)
    return {"api": api_id, "data": api_id * 100, "time": delay}

async def main():
    tasks = [fetch_api(i) for i in range(1, 6)]
    results = await asyncio.gather(*tasks)
    for r in results:
        print(f"API {r['api']}: {r['data']} (took {r['time']:.2f}s)")

asyncio.run(main())

Exercise 2: Async Rate Limiter

Implement a simple semaphore-based rate limiter.

Solution:

import asyncio

async def limited_fetch(sem, api_id):
    async with sem:   # max N concurrent
        await asyncio.sleep(0.1)
        return f"Result from {api_id}"

async def main():
    sem = asyncio.Semaphore(3)  # max 3 concurrent
    tasks = [limited_fetch(sem, i) for i in range(10)]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())