05-5. asyncio 基礎概念

⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (中等)


🎯 本篇重點

理解 asyncio 的核心概念,包含協程(Coroutine)、事件循環(Event Loop)、非同步 I/O,以及如何使用 async/await 語法。


🤔 什麼是 asyncio?

asyncio = Asynchronous I/O(非同步輸入輸出)

一句話解釋: asyncio 是 Python 的非同步程式設計框架,使用單 Thread 處理大量 I/O 操作,無需多執行緒或多進程。


🏢 餐廳比喻

傳統同步(Synchronous)

一個服務生(Thread)
├─ 接待客人 A → 點餐 → 等廚房做菜(阻塞)→ 送餐
└─ 接待客人 B → ...(要等客人 A 完成)

問題:服務生在等廚房時閒置,效率低

多執行緒(Multi-threading)

三個服務生(3 個 Thread)
├─ 服務生 1 → 客人 A(等廚房時阻塞)
├─ 服務生 2 → 客人 B(等廚房時阻塞)
└─ 服務生 3 → 客人 C(等廚房時阻塞)

優點:可同時服務多位客人
缺點:服務生多了成本高(記憶體)

asyncio(非同步)

一個服務生(1 個 Thread)
├─ 客人 A 點餐 → 交給廚房 → 不等待,去服務客人 B
├─ 客人 B 點餐 → 交給廚房 → 不等待,去服務客人 C
├─ 客人 C 點餐 → 交給廚房 → 回去檢查客人 A 的餐好了沒
└─ 客人 A 的餐好了 → 送餐 → 繼續服務其他客人

優點:一個服務生高效服務多位客人
關鍵:不等待,持續切換任務

💻 asyncio vs Thread vs Process

特性asyncioThreadProcess
並發模型協程(單 Thread)多 Thread多 Process
適用場景I/O 密集型I/O 密集型CPU 密集型
記憶體佔用極低(KB)低(MB)高(MB)
創建速度極快
最大數量數萬+數百數十
GIL 影響無(單 Thread)
切換成本極低(無上下文切換)

1️⃣ 基礎:async 和 await

同步 vs 非同步

import time

# 同步版本
def fetch_data():
    print("開始獲取資料...")
    time.sleep(2)  # 模擬 I/O 等待
    print("資料獲取完成")
    return "data"

def main_sync():
    print("=== 同步執行 ===")
    data1 = fetch_data()  # 阻塞 2 秒
    data2 = fetch_data()  # 再阻塞 2 秒
    print(f"完成: {data1}, {data2}")

start = time.time()
main_sync()
print(f"耗時: {time.time() - start:.2f}s")
# 輸出:耗時: 4.00s

非同步版本

import asyncio

# 非同步版本
async def fetch_data():
    """協程函式(Coroutine Function)"""
    print("開始獲取資料...")
    await asyncio.sleep(2)  # 非阻塞等待
    print("資料獲取完成")
    return "data"

async def main_async():
    print("=== 非同步執行 ===")
    # 並發執行兩個協程
    data1, data2 = await asyncio.gather(
        fetch_data(),
        fetch_data()
    )
    print(f"完成: {data1}, {data2}")

import time
start = time.time()
asyncio.run(main_async())
print(f"耗時: {time.time() - start:.2f}s")
# 輸出:耗時: 2.00s(並發執行)

async 和 await 關鍵字

import asyncio

# async:定義協程函式
async def my_coroutine():
    print("協程開始")
    # await:等待另一個協程完成(交出控制權)
    await asyncio.sleep(1)
    print("協程結束")
    return "結果"

# 執行協程
result = asyncio.run(my_coroutine())
print(result)

關鍵理解:

  • async def - 定義協程函式
  • await - 等待協程完成(期間可執行其他協程)
  • asyncio.run() - 啟動事件循環並執行協程

2️⃣ 核心概念

協程(Coroutine)

import asyncio

async def coroutine_example():
    """協程函式"""
    print("協程執行中")
    await asyncio.sleep(1)
    return "完成"

# 呼叫協程函式返回協程物件
coro = coroutine_example()
print(type(coro))  # <class 'coroutine'>

# 需要用 asyncio.run() 執行
result = asyncio.run(coro)
print(result)  # 完成

事件循環(Event Loop)

import asyncio

async def task1():
    print("Task 1 開始")
    await asyncio.sleep(2)
    print("Task 1 完成")

async def task2():
    print("Task 2 開始")
    await asyncio.sleep(1)
    print("Task 2 完成")

async def main():
    # 並發執行兩個協程
    await asyncio.gather(task1(), task2())

# 事件循環執行流程:
# 1. 啟動 task1 → 遇到 await → 切換
# 2. 啟動 task2 → 遇到 await → 切換
# 3. task2 sleep 1 秒完成 → 繼續執行 task2
# 4. task1 sleep 2 秒完成 → 繼續執行 task1
asyncio.run(main())

輸出:

Task 1 開始
Task 2 開始
Task 2 完成  ← 1 秒後
Task 1 完成  ← 2 秒後

Task(任務)

import asyncio

async def say_hello(name, delay):
    await asyncio.sleep(delay)
    print(f"Hello, {name}")

async def main():
    # 創建 Task
    task1 = asyncio.create_task(say_hello("Alice", 2))
    task2 = asyncio.create_task(say_hello("Bob", 1))

    print("Task 已創建,繼續其他工作...")

    # 等待 Task 完成
    await task1
    await task2

asyncio.run(main())

輸出:

Task 已創建,繼續其他工作...
Hello, Bob   ← 1 秒後
Hello, Alice ← 2 秒後

3️⃣ 並發執行

gather():並發執行多個協程

import asyncio
import time

async def fetch(url, delay):
    print(f"開始獲取 {url}")
    await asyncio.sleep(delay)
    print(f"完成 {url}")
    return f"資料來自 {url}"

async def main():
    # 並發執行
    results = await asyncio.gather(
        fetch("url1", 2),
        fetch("url2", 1),
        fetch("url3", 3)
    )
    print(f"結果: {results}")

start = time.time()
asyncio.run(main())
print(f"總耗時: {time.time() - start:.2f}s")

輸出:

開始獲取 url1
開始獲取 url2
開始獲取 url3
完成 url2      ← 1 秒後
完成 url1      ← 2 秒後
完成 url3      ← 3 秒後
結果: ['資料來自 url1', '資料來自 url2', '資料來自 url3']
總耗時: 3.00s  ← 不是 2+1+3=6s

as_completed():按完成順序

import asyncio

async def fetch(url, delay):
    await asyncio.sleep(delay)
    return f"{url} 完成({delay}s)"

async def main():
    tasks = [
        fetch("url1", 3),
        fetch("url2", 1),
        fetch("url3", 2)
    ]

    # 按完成順序處理
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(result)

asyncio.run(main())

輸出:

url2 完成(1s)  ← 最先完成
url3 完成(2s)
url1 完成(3s)  ← 最後完成

wait():等待 Task

import asyncio

async def task(n):
    await asyncio.sleep(n)
    return n

async def main():
    tasks = [asyncio.create_task(task(i)) for i in [3, 1, 2]]

    # 等待第一個完成
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    print(f"完成: {len(done)}, 待完成: {len(pending)}")
    for t in done:
        print(f"結果: {t.result()}")

    # 等待剩餘的
    for t in pending:
        await t

asyncio.run(main())

4️⃣ 實戰案例

案例 1:非同步 HTTP 請求

import asyncio
import aiohttp  # pip install aiohttp
import time

async def fetch_url(session, url):
    """非同步獲取 URL"""
    async with session.get(url) as response:
        content = await response.text()
        return url, len(content)

async def main():
    urls = [
        'https://www.python.org',
        'https://www.github.com',
        'https://www.stackoverflow.com',
        'https://www.reddit.com',
        'https://www.wikipedia.org'
    ]

    async with aiohttp.ClientSession() as session:
        # 並發發送所有請求
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

        for url, size in results:
            print(f"{url}: {size} bytes")

start = time.time()
asyncio.run(main())
print(f"總耗時: {time.time() - start:.2f}s")
# 輸出:總耗時: 1.5s(並發執行)

案例 2:非同步檔案操作

import asyncio
import aiofiles  # pip install aiofiles

async def write_file(filename, content):
    """非同步寫入檔案"""
    async with aiofiles.open(filename, 'w') as f:
        await f.write(content)
    print(f"寫入完成: {filename}")

async def read_file(filename):
    """非同步讀取檔案"""
    async with aiofiles.open(filename, 'r') as f:
        content = await f.read()
    print(f"讀取完成: {filename}, 長度: {len(content)}")
    return content

async def main():
    # 並發寫入多個檔案
    await asyncio.gather(
        write_file('file1.txt', 'Content 1' * 1000),
        write_file('file2.txt', 'Content 2' * 1000),
        write_file('file3.txt', 'Content 3' * 1000)
    )

    # 並發讀取
    contents = await asyncio.gather(
        read_file('file1.txt'),
        read_file('file2.txt'),
        read_file('file3.txt')
    )

asyncio.run(main())

案例 3:生產者-消費者模式

import asyncio
import random

async def producer(queue, producer_id):
    """生產者"""
    for i in range(5):
        item = f"P{producer_id}-Item{i}"
        await queue.put(item)
        print(f"生產者 {producer_id} 生產: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))

async def consumer(queue, consumer_id):
    """消費者"""
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break

        print(f"消費者 {consumer_id} 消費: {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def main():
    queue = asyncio.Queue()

    # 創建生產者和消費者
    producers = [producer(queue, i) for i in range(3)]
    consumers = [consumer(queue, i) for i in range(2)]

    # 啟動生產者
    await asyncio.gather(*producers)

    # 等待隊列清空
    await queue.join()

    # 停止消費者
    for _ in consumers:
        await queue.put(None)

    await asyncio.gather(*consumers)

asyncio.run(main())

案例 4:非同步爬蟲

import asyncio
import aiohttp
from bs4 import BeautifulSoup

async def crawl_page(session, url):
    """爬取單個頁面"""
    try:
        async with session.get(url, timeout=10) as response:
            html = await response.text()
            soup = BeautifulSoup(html, 'html.parser')
            title = soup.find('title')
            print(f"✓ {url}")
            print(f"  標題: {title.text if title else 'N/A'}")
            return url, title.text if title else None
    except Exception as e:
        print(f"✗ {url}: {e}")
        return url, None

async def crawl_multiple(urls, max_concurrent=10):
    """並發爬取多個 URL"""
    # 使用 Semaphore 限制並發數
    semaphore = asyncio.Semaphore(max_concurrent)

    async def crawl_with_semaphore(session, url):
        async with semaphore:
            return await crawl_page(session, url)

    async with aiohttp.ClientSession() as session:
        tasks = [crawl_with_semaphore(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    return results

# 使用
urls = [
    'https://www.python.org',
    'https://www.github.com',
    'https://www.stackoverflow.com',
]
asyncio.run(crawl_multiple(urls))

5️⃣ 同步與非同步混用

run_in_executor():執行同步函式

import asyncio
import time

def blocking_io():
    """同步阻塞函式"""
    print("開始阻塞 I/O")
    time.sleep(3)
    print("阻塞 I/O 完成")
    return "結果"

async def main():
    loop = asyncio.get_running_loop()

    # 在執行緒池中執行同步函式(不阻塞事件循環)
    result = await loop.run_in_executor(None, blocking_io)
    print(f"結果: {result}")

asyncio.run(main())

包裝同步庫為非同步

import asyncio
import requests

async def fetch_sync_api(url):
    """包裝 requests(同步)為非同步"""
    loop = asyncio.get_running_loop()
    # 在執行緒中執行同步請求
    response = await loop.run_in_executor(
        None,
        requests.get,
        url
    )
    return response.text

async def main():
    # 並發執行多個同步請求
    results = await asyncio.gather(
        fetch_sync_api('https://httpbin.org/html'),
        fetch_sync_api('https://httpbin.org/html'),
        fetch_sync_api('https://httpbin.org/html')
    )
    print(f"獲取了 {len(results)} 個結果")

asyncio.run(main())

6️⃣ 常見錯誤與陷阱

錯誤 1:忘記 await

import asyncio

async def fetch_data():
    await asyncio.sleep(1)
    return "data"

async def main():
    # ❌ 錯誤:忘記 await
    result = fetch_data()  # 返回協程物件,不是結果
    print(result)  # <coroutine object fetch_data>

    # ✅ 正確:使用 await
    result = await fetch_data()
    print(result)  # data

asyncio.run(main())

錯誤 2:在同步函式中使用 await

# ❌ 錯誤:不能在普通函式中使用 await
def sync_function():
    await asyncio.sleep(1)  # SyntaxError

# ✅ 正確:必須在 async 函式中
async def async_function():
    await asyncio.sleep(1)

錯誤 3:阻塞事件循環

import asyncio
import time

async def bad_example():
    # ❌ 錯誤:使用 time.sleep 阻塞整個事件循環
    time.sleep(3)  # 其他協程無法執行

async def good_example():
    # ✅ 正確:使用 asyncio.sleep(非阻塞)
    await asyncio.sleep(3)  # 其他協程可以執行

✅ 重點回顧

asyncio 核心概念:

  • 協程(Coroutine):使用 async def 定義
  • await:等待協程完成,交出控制權
  • 事件循環(Event Loop):協程調度器
  • Task:包裝協程,並發執行

並發執行:

  • asyncio.gather() - 並發執行多個協程
  • asyncio.as_completed() - 按完成順序
  • asyncio.wait() - 等待 Task
  • asyncio.create_task() - 創建 Task

適用場景:

  • ✅ 大量 I/O 操作(網路、檔案)
  • ✅ 需要高並發(數千個連線)
  • ✅ WebSocket、聊天伺服器
  • ❌ CPU 密集型(用 multiprocessing)

關鍵優勢:

  • ✅ 單 Thread,無 GIL 問題
  • ✅ 記憶體佔用極低
  • ✅ 可處理數萬個並發連線
  • ✅ 無上下文切換成本

常用庫:

  • aiohttp - 非同步 HTTP
  • aiofiles - 非同步檔案
  • asyncpg - 非同步 PostgreSQL
  • motor - 非同步 MongoDB

上一篇: 05-4. concurrent.futures 使用指南


最後更新:2025-01-06

0%