目錄
05-5. asyncio 基礎概念
⏱️ 閱讀時間: 15 分鐘 🎯 難度: ⭐⭐⭐ (中等)
🎯 本篇重點
理解 asyncio 的核心概念,包含協程(Coroutine)、事件循環(Event Loop)、非同步 I/O,以及如何使用 async/await 語法。
🤔 什麼是 asyncio?
asyncio = Asynchronous I/O(非同步輸入輸出)
一句話解釋: asyncio 是 Python 的非同步程式設計框架,使用單 Thread 處理大量 I/O 操作,無需多執行緒或多進程。
🏢 餐廳比喻
傳統同步(Synchronous)
一個服務生(Thread)
├─ 接待客人 A → 點餐 → 等廚房做菜(阻塞)→ 送餐
└─ 接待客人 B → ...(要等客人 A 完成)
問題:服務生在等廚房時閒置,效率低多執行緒(Multi-threading)
三個服務生(3 個 Thread)
├─ 服務生 1 → 客人 A(等廚房時阻塞)
├─ 服務生 2 → 客人 B(等廚房時阻塞)
└─ 服務生 3 → 客人 C(等廚房時阻塞)
優點:可同時服務多位客人
缺點:服務生多了成本高(記憶體)asyncio(非同步)
一個服務生(1 個 Thread)
├─ 客人 A 點餐 → 交給廚房 → 不等待,去服務客人 B
├─ 客人 B 點餐 → 交給廚房 → 不等待,去服務客人 C
├─ 客人 C 點餐 → 交給廚房 → 回去檢查客人 A 的餐好了沒
└─ 客人 A 的餐好了 → 送餐 → 繼續服務其他客人
優點:一個服務生高效服務多位客人
關鍵:不等待,持續切換任務💻 asyncio vs Thread vs Process
| 特性 | asyncio | Thread | Process |
|---|---|---|---|
| 並發模型 | 協程(單 Thread) | 多 Thread | 多 Process |
| 適用場景 | I/O 密集型 | I/O 密集型 | CPU 密集型 |
| 記憶體佔用 | 極低(KB) | 低(MB) | 高(MB) |
| 創建速度 | 極快 | 快 | 慢 |
| 最大數量 | 數萬+ | 數百 | 數十 |
| GIL 影響 | 無(單 Thread) | 有 | 無 |
| 切換成本 | 極低(無上下文切換) | 中 | 高 |
1️⃣ 基礎:async 和 await
同步 vs 非同步
import time
# 同步版本
def fetch_data():
print("開始獲取資料...")
time.sleep(2) # 模擬 I/O 等待
print("資料獲取完成")
return "data"
def main_sync():
print("=== 同步執行 ===")
data1 = fetch_data() # 阻塞 2 秒
data2 = fetch_data() # 再阻塞 2 秒
print(f"完成: {data1}, {data2}")
start = time.time()
main_sync()
print(f"耗時: {time.time() - start:.2f}s")
# 輸出:耗時: 4.00s非同步版本
import asyncio
# 非同步版本
async def fetch_data():
"""協程函式(Coroutine Function)"""
print("開始獲取資料...")
await asyncio.sleep(2) # 非阻塞等待
print("資料獲取完成")
return "data"
async def main_async():
print("=== 非同步執行 ===")
# 並發執行兩個協程
data1, data2 = await asyncio.gather(
fetch_data(),
fetch_data()
)
print(f"完成: {data1}, {data2}")
import time
start = time.time()
asyncio.run(main_async())
print(f"耗時: {time.time() - start:.2f}s")
# 輸出:耗時: 2.00s(並發執行)async 和 await 關鍵字
import asyncio
# async:定義協程函式
async def my_coroutine():
print("協程開始")
# await:等待另一個協程完成(交出控制權)
await asyncio.sleep(1)
print("協程結束")
return "結果"
# 執行協程
result = asyncio.run(my_coroutine())
print(result)關鍵理解:
async def- 定義協程函式await- 等待協程完成(期間可執行其他協程)asyncio.run()- 啟動事件循環並執行協程
2️⃣ 核心概念
協程(Coroutine)
import asyncio
async def coroutine_example():
"""協程函式"""
print("協程執行中")
await asyncio.sleep(1)
return "完成"
# 呼叫協程函式返回協程物件
coro = coroutine_example()
print(type(coro)) # <class 'coroutine'>
# 需要用 asyncio.run() 執行
result = asyncio.run(coro)
print(result) # 完成事件循環(Event Loop)
import asyncio
async def task1():
print("Task 1 開始")
await asyncio.sleep(2)
print("Task 1 完成")
async def task2():
print("Task 2 開始")
await asyncio.sleep(1)
print("Task 2 完成")
async def main():
# 並發執行兩個協程
await asyncio.gather(task1(), task2())
# 事件循環執行流程:
# 1. 啟動 task1 → 遇到 await → 切換
# 2. 啟動 task2 → 遇到 await → 切換
# 3. task2 sleep 1 秒完成 → 繼續執行 task2
# 4. task1 sleep 2 秒完成 → 繼續執行 task1
asyncio.run(main())輸出:
Task 1 開始
Task 2 開始
Task 2 完成 ← 1 秒後
Task 1 完成 ← 2 秒後Task(任務)
import asyncio
async def say_hello(name, delay):
await asyncio.sleep(delay)
print(f"Hello, {name}")
async def main():
# 創建 Task
task1 = asyncio.create_task(say_hello("Alice", 2))
task2 = asyncio.create_task(say_hello("Bob", 1))
print("Task 已創建,繼續其他工作...")
# 等待 Task 完成
await task1
await task2
asyncio.run(main())輸出:
Task 已創建,繼續其他工作...
Hello, Bob ← 1 秒後
Hello, Alice ← 2 秒後3️⃣ 並發執行
gather():並發執行多個協程
import asyncio
import time
async def fetch(url, delay):
print(f"開始獲取 {url}")
await asyncio.sleep(delay)
print(f"完成 {url}")
return f"資料來自 {url}"
async def main():
# 並發執行
results = await asyncio.gather(
fetch("url1", 2),
fetch("url2", 1),
fetch("url3", 3)
)
print(f"結果: {results}")
start = time.time()
asyncio.run(main())
print(f"總耗時: {time.time() - start:.2f}s")輸出:
開始獲取 url1
開始獲取 url2
開始獲取 url3
完成 url2 ← 1 秒後
完成 url1 ← 2 秒後
完成 url3 ← 3 秒後
結果: ['資料來自 url1', '資料來自 url2', '資料來自 url3']
總耗時: 3.00s ← 不是 2+1+3=6sas_completed():按完成順序
import asyncio
async def fetch(url, delay):
await asyncio.sleep(delay)
return f"{url} 完成({delay}s)"
async def main():
tasks = [
fetch("url1", 3),
fetch("url2", 1),
fetch("url3", 2)
]
# 按完成順序處理
for coro in asyncio.as_completed(tasks):
result = await coro
print(result)
asyncio.run(main())輸出:
url2 完成(1s) ← 最先完成
url3 完成(2s)
url1 完成(3s) ← 最後完成wait():等待 Task
import asyncio
async def task(n):
await asyncio.sleep(n)
return n
async def main():
tasks = [asyncio.create_task(task(i)) for i in [3, 1, 2]]
# 等待第一個完成
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"完成: {len(done)}, 待完成: {len(pending)}")
for t in done:
print(f"結果: {t.result()}")
# 等待剩餘的
for t in pending:
await t
asyncio.run(main())4️⃣ 實戰案例
案例 1:非同步 HTTP 請求
import asyncio
import aiohttp # pip install aiohttp
import time
async def fetch_url(session, url):
"""非同步獲取 URL"""
async with session.get(url) as response:
content = await response.text()
return url, len(content)
async def main():
urls = [
'https://www.python.org',
'https://www.github.com',
'https://www.stackoverflow.com',
'https://www.reddit.com',
'https://www.wikipedia.org'
]
async with aiohttp.ClientSession() as session:
# 並發發送所有請求
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, size in results:
print(f"{url}: {size} bytes")
start = time.time()
asyncio.run(main())
print(f"總耗時: {time.time() - start:.2f}s")
# 輸出:總耗時: 1.5s(並發執行)案例 2:非同步檔案操作
import asyncio
import aiofiles # pip install aiofiles
async def write_file(filename, content):
"""非同步寫入檔案"""
async with aiofiles.open(filename, 'w') as f:
await f.write(content)
print(f"寫入完成: {filename}")
async def read_file(filename):
"""非同步讀取檔案"""
async with aiofiles.open(filename, 'r') as f:
content = await f.read()
print(f"讀取完成: {filename}, 長度: {len(content)}")
return content
async def main():
# 並發寫入多個檔案
await asyncio.gather(
write_file('file1.txt', 'Content 1' * 1000),
write_file('file2.txt', 'Content 2' * 1000),
write_file('file3.txt', 'Content 3' * 1000)
)
# 並發讀取
contents = await asyncio.gather(
read_file('file1.txt'),
read_file('file2.txt'),
read_file('file3.txt')
)
asyncio.run(main())案例 3:生產者-消費者模式
import asyncio
import random
async def producer(queue, producer_id):
"""生產者"""
for i in range(5):
item = f"P{producer_id}-Item{i}"
await queue.put(item)
print(f"生產者 {producer_id} 生產: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
async def consumer(queue, consumer_id):
"""消費者"""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"消費者 {consumer_id} 消費: {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def main():
queue = asyncio.Queue()
# 創建生產者和消費者
producers = [producer(queue, i) for i in range(3)]
consumers = [consumer(queue, i) for i in range(2)]
# 啟動生產者
await asyncio.gather(*producers)
# 等待隊列清空
await queue.join()
# 停止消費者
for _ in consumers:
await queue.put(None)
await asyncio.gather(*consumers)
asyncio.run(main())案例 4:非同步爬蟲
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def crawl_page(session, url):
"""爬取單個頁面"""
try:
async with session.get(url, timeout=10) as response:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
title = soup.find('title')
print(f"✓ {url}")
print(f" 標題: {title.text if title else 'N/A'}")
return url, title.text if title else None
except Exception as e:
print(f"✗ {url}: {e}")
return url, None
async def crawl_multiple(urls, max_concurrent=10):
"""並發爬取多個 URL"""
# 使用 Semaphore 限制並發數
semaphore = asyncio.Semaphore(max_concurrent)
async def crawl_with_semaphore(session, url):
async with semaphore:
return await crawl_page(session, url)
async with aiohttp.ClientSession() as session:
tasks = [crawl_with_semaphore(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 使用
urls = [
'https://www.python.org',
'https://www.github.com',
'https://www.stackoverflow.com',
]
asyncio.run(crawl_multiple(urls))5️⃣ 同步與非同步混用
run_in_executor():執行同步函式
import asyncio
import time
def blocking_io():
"""同步阻塞函式"""
print("開始阻塞 I/O")
time.sleep(3)
print("阻塞 I/O 完成")
return "結果"
async def main():
loop = asyncio.get_running_loop()
# 在執行緒池中執行同步函式(不阻塞事件循環)
result = await loop.run_in_executor(None, blocking_io)
print(f"結果: {result}")
asyncio.run(main())包裝同步庫為非同步
import asyncio
import requests
async def fetch_sync_api(url):
"""包裝 requests(同步)為非同步"""
loop = asyncio.get_running_loop()
# 在執行緒中執行同步請求
response = await loop.run_in_executor(
None,
requests.get,
url
)
return response.text
async def main():
# 並發執行多個同步請求
results = await asyncio.gather(
fetch_sync_api('https://httpbin.org/html'),
fetch_sync_api('https://httpbin.org/html'),
fetch_sync_api('https://httpbin.org/html')
)
print(f"獲取了 {len(results)} 個結果")
asyncio.run(main())6️⃣ 常見錯誤與陷阱
錯誤 1:忘記 await
import asyncio
async def fetch_data():
await asyncio.sleep(1)
return "data"
async def main():
# ❌ 錯誤:忘記 await
result = fetch_data() # 返回協程物件,不是結果
print(result) # <coroutine object fetch_data>
# ✅ 正確:使用 await
result = await fetch_data()
print(result) # data
asyncio.run(main())錯誤 2:在同步函式中使用 await
# ❌ 錯誤:不能在普通函式中使用 await
def sync_function():
await asyncio.sleep(1) # SyntaxError
# ✅ 正確:必須在 async 函式中
async def async_function():
await asyncio.sleep(1)錯誤 3:阻塞事件循環
import asyncio
import time
async def bad_example():
# ❌ 錯誤:使用 time.sleep 阻塞整個事件循環
time.sleep(3) # 其他協程無法執行
async def good_example():
# ✅ 正確:使用 asyncio.sleep(非阻塞)
await asyncio.sleep(3) # 其他協程可以執行✅ 重點回顧
asyncio 核心概念:
- 協程(Coroutine):使用
async def定義 - await:等待協程完成,交出控制權
- 事件循環(Event Loop):協程調度器
- Task:包裝協程,並發執行
並發執行:
asyncio.gather()- 並發執行多個協程asyncio.as_completed()- 按完成順序asyncio.wait()- 等待 Taskasyncio.create_task()- 創建 Task
適用場景:
- ✅ 大量 I/O 操作(網路、檔案)
- ✅ 需要高並發(數千個連線)
- ✅ WebSocket、聊天伺服器
- ❌ CPU 密集型(用 multiprocessing)
關鍵優勢:
- ✅ 單 Thread,無 GIL 問題
- ✅ 記憶體佔用極低
- ✅ 可處理數萬個並發連線
- ✅ 無上下文切換成本
常用庫:
aiohttp- 非同步 HTTPaiofiles- 非同步檔案asyncpg- 非同步 PostgreSQLmotor- 非同步 MongoDB
上一篇: 05-4. concurrent.futures 使用指南
最後更新:2025-01-06