Django 面試精華:asyncio 在 Django 中的使用
掌握 Python 異步編程在 Django 中的實戰技巧
前言
asyncio 是 Python 3.4+ 引入的異步 I/O 框架,Django 3.0+ 開始原生支持 asyncio。掌握 asyncio 在 Django 中的使用,可以顯著提升應用性能。
傳統同步代碼的問題:
def get_user_data(user_id):
# 串行執行,總耗時累加
user = fetch_user_service(user_id) # 100ms
orders = fetch_order_service(user_id) # 150ms
notifications = fetch_notification_service(user_id) # 120ms
# 總耗時:370ms
return {
'user': user,
'orders': orders,
'notifications': notifications,
}使用 asyncio 的優勢:
async def get_user_data(user_id):
# 並行執行,總耗時取最慢的那個
user, orders, notifications = await asyncio.gather(
fetch_user_service(user_id), # 100ms
fetch_order_service(user_id), # 150ms
fetch_notification_service(user_id), # 120ms
)
# 總耗時:150ms(取最慢的那個)
# 性能提升:59%
return {
'user': user,
'orders': orders,
'notifications': notifications,
}這篇文章將深入探討 asyncio 在 Django 中的實戰應用,包括任務調度、超時控制、錯誤處理等高級技巧。
1. asyncio 基礎回顧
1.1 核心概念
| 概念 | 說明 | 示例 |
|---|---|---|
| 協程 (Coroutine) | 使用 async def 定義的函數 | async def foo() |
| await | 掛起協程,等待另一個協程完成 | await foo() |
| 事件循環 (Event Loop) | 管理和調度協程執行 | asyncio.get_event_loop() |
| 任務 (Task) | 封裝協程的可調度單元 | asyncio.create_task() |
| Future | 代表未來的結果 | asyncio.Future() |
1.2 基本語法
import asyncio
# 定義協程
async def fetch_data(url):
print(f"開始獲取: {url}")
await asyncio.sleep(1) # 模擬 I/O 操作
print(f"完成獲取: {url}")
return f"Data from {url}"
# 執行協程(在 Django 異步視圖中不需要手動運行事件循環)
async def main():
result = await fetch_data("https://api.example.com")
print(result)
# 在普通 Python 腳本中運行
if __name__ == "__main__":
asyncio.run(main())1.3 在 Django 中使用 asyncio
Django 異步視圖自動運行在事件循環中:
# Django 異步視圖
from django.http import JsonResponse
import asyncio
async def my_async_view(request):
# 已經在事件循環中,可以直接使用 await
result = await fetch_data("https://api.example.com")
return JsonResponse({'result': result})
# ❌ 不要這樣做
# asyncio.run(fetch_data(...)) # 會創建新的事件循環,導致錯誤2. asyncio.gather() - 並行執行
2.1 基本用法
asyncio.gather() 是最常用的並行執行工具:
import asyncio
from django.http import JsonResponse
async def fetch_user_dashboard(request, user_id):
"""並行獲取用戶儀表板數據"""
# 方式 1:傳遞協程
results = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_notifications(user_id),
)
user, orders, notifications = results
# 方式 2:解包結果
user, orders, notifications = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_notifications(user_id),
)
return JsonResponse({
'user': user,
'orders': orders,
'notifications': notifications,
})
async def fetch_user(user_id):
"""獲取用戶信息(模擬)"""
await asyncio.sleep(0.1) # 模擬 I/O 延遲
return {'id': user_id, 'name': 'Alice'}
async def fetch_orders(user_id):
"""獲取訂單列表(模擬)"""
await asyncio.sleep(0.15)
return [{'id': 1, 'total': 100}, {'id': 2, 'total': 200}]
async def fetch_notifications(user_id):
"""獲取通知列表(模擬)"""
await asyncio.sleep(0.12)
return [{'id': 1, 'message': 'New order'}]執行時序:
時間軸 (ms)
0 ─┬─ fetch_user() 開始
├─ fetch_orders() 開始
└─ fetch_notifications() 開始
100 ─ fetch_user() 完成 ✓
120 ─ fetch_notifications() 完成 ✓
150 ─ fetch_orders() 完成 ✓
└─ 所有任務完成,返回結果
總耗時:150ms(不是 100 + 150 + 120 = 370ms)2.2 錯誤處理
return_exceptions=True:讓單個任務失敗不影響其他任務
async def fetch_dashboard_safe(request, user_id):
"""容錯的並行請求"""
results = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_notifications(user_id),
fetch_external_api(user_id), # 可能失敗
return_exceptions=True, # 關鍵參數
)
user, orders, notifications, external = results
# 檢查每個結果
if isinstance(user, Exception):
return JsonResponse({'error': 'User not found'}, status=404)
if isinstance(orders, Exception):
orders = [] # 使用默認值
if isinstance(notifications, Exception):
notifications = []
if isinstance(external, Exception):
external = {'error': str(external)} # 記錄錯誤,不中斷
return JsonResponse({
'user': user,
'orders': orders,
'notifications': notifications,
'external': external,
})
async def fetch_external_api(user_id):
"""可能失敗的外部 API 調用"""
await asyncio.sleep(0.5)
raise Exception("External API timeout") # 模擬錯誤3. asyncio.create_task() - 後台任務
3.1 創建後台任務
使用 create_task() 創建不阻塞主流程的後台任務:
import asyncio
from django.http import JsonResponse
async def create_order(request):
"""創建訂單,同時觸發後台任務"""
# 創建訂單(主要業務邏輯)
order = await Order.objects.acreate(
user=request.user,
total=100,
)
# 創建後台任務(不等待完成)
asyncio.create_task(send_order_email(order.id))
asyncio.create_task(update_inventory(order.id))
asyncio.create_task(notify_seller(order.id))
# 立即返回響應,不等待後台任務完成
return JsonResponse({
'order_id': order.id,
'status': 'created',
})
async def send_order_email(order_id):
"""發送訂單確認郵件(後台任務)"""
await asyncio.sleep(2) # 模擬郵件發送
print(f"Email sent for order {order_id}")
async def update_inventory(order_id):
"""更新庫存(後台任務)"""
await asyncio.sleep(1)
print(f"Inventory updated for order {order_id}")
async def notify_seller(order_id):
"""通知賣家(後台任務)"""
await asyncio.sleep(0.5)
print(f"Seller notified for order {order_id}")執行時序:
請求到達
↓
創建訂單(50ms)
↓
啟動 3 個後台任務
├─ send_order_email() → 2 秒後完成
├─ update_inventory() → 1 秒後完成
└─ notify_seller() → 0.5 秒後完成
↓
立即返回響應(不等待後台任務) ✓
用戶感知響應時間:50ms
實際任務執行時間:2 秒(在後台進行)3.2 等待後台任務完成
如果需要等待後台任務完成:
async def create_order_and_wait(request):
"""創建訂單並等待關鍵後台任務完成"""
# 創建訂單
order = await Order.objects.acreate(
user=request.user,
total=100,
)
# 創建任務
task1 = asyncio.create_task(send_order_email(order.id))
task2 = asyncio.create_task(update_inventory(order.id))
task3 = asyncio.create_task(notify_seller(order.id))
# 等待關鍵任務完成(庫存更新)
await task2
# 其他任務在後台繼續執行
# task1 和 task3 不等待
return JsonResponse({
'order_id': order.id,
'status': 'created',
'inventory_updated': True,
})4. asyncio.wait_for() - 超時控制
4.1 設置超時時間
使用 asyncio.wait_for() 為異步操作設置超時:
import asyncio
from django.http import JsonResponse
async def fetch_with_timeout(request):
"""帶超時的外部 API 調用"""
try:
# 設置 5 秒超時
result = await asyncio.wait_for(
fetch_external_api(),
timeout=5.0
)
return JsonResponse({'data': result})
except asyncio.TimeoutError:
return JsonResponse({
'error': 'Request timeout after 5 seconds'
}, status=504)
async def fetch_external_api():
"""外部 API 調用(可能很慢)"""
await asyncio.sleep(10) # 模擬慢速 API
return {'status': 'ok'}4.2 部分超時控制
對並行任務設置不同的超時時間:
async def fetch_dashboard_with_timeouts(request, user_id):
"""為不同任務設置不同超時"""
async def fetch_user_with_timeout():
return await asyncio.wait_for(
fetch_user(user_id),
timeout=2.0 # 用戶信息:2 秒超時
)
async def fetch_orders_with_timeout():
return await asyncio.wait_for(
fetch_orders(user_id),
timeout=5.0 # 訂單列表:5 秒超時
)
async def fetch_notifications_with_timeout():
return await asyncio.wait_for(
fetch_notifications(user_id),
timeout=3.0 # 通知列表:3 秒超時
)
# 並行執行,各自有超時控制
results = await asyncio.gather(
fetch_user_with_timeout(),
fetch_orders_with_timeout(),
fetch_notifications_with_timeout(),
return_exceptions=True, # 超時不影響其他任務
)
user, orders, notifications = results
# 處理超時
if isinstance(user, asyncio.TimeoutError):
return JsonResponse({'error': 'User data timeout'}, status=504)
if isinstance(orders, asyncio.TimeoutError):
orders = [] # 使用默認值
if isinstance(notifications, asyncio.TimeoutError):
notifications = []
return JsonResponse({
'user': user,
'orders': orders,
'notifications': notifications,
})5. asyncio.as_completed() - 按完成順序處理
5.1 基本用法
as_completed() 按完成順序返回結果,適合需要快速響應的場景:
import asyncio
async def fetch_multiple_apis(request):
"""調用多個 API,按完成順序處理結果"""
# 創建多個任務
tasks = [
fetch_api('https://api1.example.com'),
fetch_api('https://api2.example.com'),
fetch_api('https://api3.example.com'),
]
results = []
# 按完成順序處理
for coro in asyncio.as_completed(tasks):
result = await coro
results.append(result)
print(f"收到結果: {result}")
return JsonResponse({'results': results})
async def fetch_api(url):
"""獲取 API 數據"""
import random
delay = random.uniform(0.1, 0.5)
await asyncio.sleep(delay)
return {'url': url, 'delay': delay}執行過程:
啟動 3 個任務
↓
api2 完成(0.15s) → 處理結果 #1
↓
api1 完成(0.23s) → 處理結果 #2
↓
api3 完成(0.41s) → 處理結果 #3
↓
返回所有結果5.2 實戰案例:多數據源搜索
async def search_multiple_sources(request):
"""從多個數據源搜索,返回最快的結果"""
query = request.GET.get('q', '')
# 創建多個搜索任務
tasks = [
search_database(query),
search_elasticsearch(query),
search_external_api(query),
]
results = []
timeout = 2.0 # 總超時 2 秒
try:
# 按完成順序收集結果,總超時 2 秒
for coro in asyncio.as_completed(tasks, timeout=timeout):
try:
result = await coro
results.extend(result)
# 如果已經有足夠結果,可以提前返回
if len(results) >= 10:
break
except Exception as e:
print(f"搜索源出錯: {e}")
continue
except asyncio.TimeoutError:
print("搜索超時,返回已有結果")
return JsonResponse({
'query': query,
'results': results,
'count': len(results),
})
async def search_database(query):
"""從數據庫搜索"""
await asyncio.sleep(0.5)
return [{'source': 'db', 'title': f'DB result for {query}'}]
async def search_elasticsearch(query):
"""從 Elasticsearch 搜索"""
await asyncio.sleep(0.3)
return [{'source': 'es', 'title': f'ES result for {query}'}]
async def search_external_api(query):
"""從外部 API 搜索"""
await asyncio.sleep(0.8)
return [{'source': 'api', 'title': f'API result for {query}'}]6. asyncio.Queue - 生產者消費者模式
6.1 基本隊列操作
import asyncio
from django.http import JsonResponse
async def process_batch_orders(request):
"""批量處理訂單(生產者消費者模式)"""
# 創建異步隊列
queue = asyncio.Queue(maxsize=10)
# 啟動消費者
consumers = [
asyncio.create_task(order_processor(queue, i))
for i in range(3) # 3 個消費者
]
# 生產者:將訂單放入隊列
order_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
for order_id in order_ids:
await queue.put(order_id)
# 等待所有訂單處理完成
await queue.join()
# 停止消費者
for _ in consumers:
await queue.put(None) # 發送停止信號
await asyncio.gather(*consumers)
return JsonResponse({
'status': 'All orders processed',
'count': len(order_ids),
})
async def order_processor(queue, worker_id):
"""訂單處理器(消費者)"""
while True:
order_id = await queue.get()
if order_id is None: # 停止信號
queue.task_done()
break
try:
# 處理訂單
await process_order(order_id)
print(f"Worker {worker_id} processed order {order_id}")
except Exception as e:
print(f"Worker {worker_id} error: {e}")
finally:
queue.task_done()
async def process_order(order_id):
"""處理單個訂單"""
await asyncio.sleep(1) # 模擬處理時間執行過程:
隊列: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Worker 0: 處理訂單 1 → 1 秒
Worker 1: 處理訂單 2 → 1 秒
Worker 2: 處理訂單 3 → 1 秒
↓ (1 秒後)
Worker 0: 處理訂單 4 → 1 秒
Worker 1: 處理訂單 5 → 1 秒
Worker 2: 處理訂單 6 → 1 秒
↓ (1 秒後)
...
總耗時:約 4 秒(10 個訂單 / 3 個 Worker)
vs 串行:10 秒7. 與第三方庫集成
7.1 aiohttp - 異步 HTTP 客戶端
import aiohttp
from django.http import JsonResponse
async def fetch_multiple_urls(request):
"""並行獲取多個 URL"""
urls = [
'https://api.github.com/users/django',
'https://api.github.com/users/python',
'https://api.github.com/users/microsoft',
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return JsonResponse({'results': results})
async def fetch_url(session, url):
"""獲取單個 URL"""
try:
async with session.get(url, timeout=5) as response:
return await response.json()
except Exception as e:
return {'error': str(e), 'url': url}7.2 aiofiles - 異步文件操作
import aiofiles
from django.http import JsonResponse
async def read_large_file(request, filename):
"""異步讀取大文件"""
content = []
# 異步讀取文件
async with aiofiles.open(f'/tmp/{filename}', 'r') as f:
async for line in f:
content.append(line.strip())
# 限制行數
if len(content) >= 1000:
break
return JsonResponse({
'filename': filename,
'lines': len(content),
'preview': content[:10],
})
async def write_logs(request):
"""異步寫入日誌文件"""
logs = request.POST.getlist('logs')
# 異步寫入文件
async with aiofiles.open('/var/log/app.log', 'a') as f:
for log in logs:
await f.write(f"{log}\n")
return JsonResponse({'status': 'Logs written', 'count': len(logs)})7.3 aiomysql / asyncpg - 異步數據庫驅動
# 使用 asyncpg(PostgreSQL)
import asyncpg
async def raw_sql_query(request):
"""使用原始異步 SQL 查詢"""
# 創建連接池
pool = await asyncpg.create_pool(
user='myuser',
password='mypassword',
database='mydb',
host='localhost',
)
# 執行查詢
async with pool.acquire() as conn:
rows = await conn.fetch('SELECT * FROM users WHERE age > $1', 18)
await pool.close()
return JsonResponse({
'count': len(rows),
'users': [dict(row) for row in rows],
})8. 實戰案例
8.1 案例 1:聚合多個微服務
import aiohttp
from django.http import JsonResponse
import asyncio
async def user_profile_aggregation(request, user_id):
"""
聚合多個微服務的用戶數據
- 用戶服務
- 訂單服務
- 支付服務
- 推薦服務
"""
async with aiohttp.ClientSession() as session:
# 設置超時和重試
results = await asyncio.gather(
fetch_with_retry(
session,
f'http://user-service/api/users/{user_id}',
timeout=2.0,
retries=2
),
fetch_with_retry(
session,
f'http://order-service/api/orders?user_id={user_id}',
timeout=3.0,
retries=2
),
fetch_with_retry(
session,
f'http://payment-service/api/payments?user_id={user_id}',
timeout=3.0,
retries=2
),
fetch_with_retry(
session,
f'http://recommendation-service/api/recommendations/{user_id}',
timeout=5.0,
retries=1
),
return_exceptions=True,
)
user, orders, payments, recommendations = results
# 處理錯誤
response_data = {}
if isinstance(user, Exception):
return JsonResponse({'error': 'User service unavailable'}, status=503)
response_data['user'] = user
response_data['orders'] = orders if not isinstance(orders, Exception) else []
response_data['payments'] = payments if not isinstance(payments, Exception) else []
response_data['recommendations'] = recommendations if not isinstance(recommendations, Exception) else []
return JsonResponse(response_data)
async def fetch_with_retry(session, url, timeout=5.0, retries=2):
"""帶重試的 HTTP 請求"""
for attempt in range(retries + 1):
try:
async with session.get(url, timeout=timeout) as response:
return await response.json()
except asyncio.TimeoutError:
if attempt == retries:
raise
await asyncio.sleep(0.1 * (2 ** attempt)) # 指數退避
except Exception as e:
if attempt == retries:
raise
await asyncio.sleep(0.1 * (2 ** attempt))8.2 案例 2:實時數據流處理
import asyncio
from django.http import StreamingHttpResponse
async def stream_live_data(request):
"""流式返回實時數據(Server-Sent Events)"""
async def event_stream():
"""生成事件流"""
for i in range(100):
# 並行獲取多個數據源
data1, data2, data3 = await asyncio.gather(
fetch_sensor_data(1),
fetch_sensor_data(2),
fetch_sensor_data(3),
)
# 格式化為 SSE
yield f"data: {{'sensor1': {data1}, 'sensor2': {data2}, 'sensor3': {data3}}}\n\n"
# 等待 1 秒
await asyncio.sleep(1)
response = StreamingHttpResponse(
event_stream(),
content_type='text/event-stream'
)
response['Cache-Control'] = 'no-cache'
response['X-Accel-Buffering'] = 'no'
return response
async def fetch_sensor_data(sensor_id):
"""獲取傳感器數據"""
await asyncio.sleep(0.1)
import random
return round(random.uniform(20, 30), 2)8.3 案例 3:批量數據導入
import asyncio
from django.http import JsonResponse
async def batch_import_users(request):
"""批量導入用戶(分批並行處理)"""
# 假設從文件讀取了 1000 個用戶
users_data = [
{'name': f'User{i}', 'email': f'user{i}@example.com'}
for i in range(1000)
]
# 分批處理(每批 100 個)
batch_size = 100
batches = [
users_data[i:i + batch_size]
for i in range(0, len(users_data), batch_size)
]
# 並行處理所有批次
results = await asyncio.gather(
*[process_user_batch(batch, idx) for idx, batch in enumerate(batches)],
return_exceptions=True,
)
# 統計結果
total_success = sum(r['success'] for r in results if isinstance(r, dict))
total_failed = sum(r['failed'] for r in results if isinstance(r, dict))
return JsonResponse({
'total': len(users_data),
'success': total_success,
'failed': total_failed,
'batches': len(batches),
})
async def process_user_batch(batch, batch_id):
"""處理一批用戶"""
success = 0
failed = 0
for user_data in batch:
try:
await User.objects.acreate(**user_data)
success += 1
except Exception as e:
print(f"Batch {batch_id} error: {e}")
failed += 1
# 避免過載
await asyncio.sleep(0.01)
return {'batch_id': batch_id, 'success': success, 'failed': failed}9. 最佳實踐與陷阱
9.1 最佳實踐
✅ 1. 使用 asyncio.gather() 並行執行獨立任務
# ✅ 好
user, orders, notifications = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_notifications(user_id),
)
# ❌ 差
user = await fetch_user(user_id)
orders = await fetch_orders(user_id)
notifications = await fetch_notifications(user_id)✅ 2. 為所有外部調用設置超時
# ✅ 好
result = await asyncio.wait_for(
fetch_external_api(),
timeout=5.0
)
# ❌ 差(可能永久阻塞)
result = await fetch_external_api()✅ 3. 使用 return_exceptions=True 容錯
# ✅ 好
results = await asyncio.gather(
task1(),
task2(),
task3(),
return_exceptions=True, # 一個失敗不影響其他
)
# ❌ 差(一個失敗全部失敗)
results = await asyncio.gather(
task1(),
task2(),
task3(),
)✅ 4. 限制並發數量
# ✅ 好(使用信號量限制並發)
semaphore = asyncio.Semaphore(10) # 最多 10 個並發
async def fetch_with_limit(url):
async with semaphore:
return await fetch_url(url)
tasks = [fetch_with_limit(url) for url in urls]
results = await asyncio.gather(*tasks)
# ❌ 差(無限並發,可能壓垮服務器)
tasks = [fetch_url(url) for url in urls] # 可能有 10000 個!
results = await asyncio.gather(*tasks)9.2 常見陷阱
❌ 陷阱 1:忘記 await
# ❌ 錯誤
async def bad_view(request):
user = User.objects.aget(id=1) # 忘記 await!
# user 是協程對象,不是 User 實例
print(user.name) # AttributeError!
# ✅ 正確
async def good_view(request):
user = await User.objects.aget(id=1)
print(user.name) # OK❌ 陷阱 2:在異步視圖中使用同步代碼
# ❌ 錯誤(阻塞事件循環)
async def bad_view(request):
user = User.objects.get(id=1) # 同步查詢,阻塞!
time.sleep(1) # 阻塞整個事件循環!
# ✅ 正確
async def good_view(request):
user = await User.objects.aget(id=1) # 異步查詢
await asyncio.sleep(1) # 異步睡眠❌ 陷阱 3:過度使用 create_task 導致資源耗盡
# ❌ 錯誤(可能創建數千個任務)
async def bad_view(request):
for i in range(10000):
asyncio.create_task(process_item(i)) # 資源耗盡!
# ✅ 正確(使用隊列和有限 Worker)
async def good_view(request):
queue = asyncio.Queue(maxsize=100)
workers = [
asyncio.create_task(worker(queue))
for _ in range(10) # 只有 10 個 Worker
]
for i in range(10000):
await queue.put(i)
await queue.join()10. 面試常見問題
Q1: asyncio.gather() 和 asyncio.wait() 有什麼區別?
答案:
| 特性 | gather() | wait() |
|---|---|---|
| 返回值 | 按順序返回結果列表 | 返回兩個集合:done, pending |
| 錯誤處理 | 可選 return_exceptions | 需要手動檢查每個 task |
| 使用場景 | 簡單並行執行 | 需要細粒度控制 |
| 易用性 | 簡單 | 複雜 |
推薦:大多數情況使用 gather(),除非需要細粒度控制。
Q2: 如何限制並發數量?
答案:
使用 asyncio.Semaphore:
semaphore = asyncio.Semaphore(10) # 最多 10 個並發
async def fetch_with_limit(url):
async with semaphore:
return await fetch_url(url)
# 使用
results = await asyncio.gather(*[fetch_with_limit(url) for url in urls])Q3: 異步視圖中如何處理 CPU 密集型任務?
答案:
使用 loop.run_in_executor() 在線程池中執行:
import asyncio
async def cpu_intensive_view(request):
loop = asyncio.get_event_loop()
# 在線程池中執行 CPU 密集型任務
result = await loop.run_in_executor(None, complex_calculation, data)
return JsonResponse({'result': result})
def complex_calculation(data):
# CPU 密集型計算
return sum(data) ** 2Q4: 如何在 Django 信號中使用異步代碼?
答案:
Django 信號是同步的,需要使用 async_to_sync:
from django.db.models.signals import post_save
from django.dispatch import receiver
from asgiref.sync import async_to_sync
@receiver(post_save, sender=Order)
def order_created(sender, instance, created, **kwargs):
if created:
# 將異步函數轉換為同步
async_to_sync(send_order_email)(instance.id)
async def send_order_email(order_id):
# 異步發送郵件
await asyncio.sleep(1)
print(f"Email sent for order {order_id}")Q5: asyncio 在 Django 中的性能提升有多大?
答案:
取決於應用類型:
I/O 密集型(大量 API 調用、數據庫查詢):
- 響應時間:減少 50-80%
- 吞吐量:提升 10-50 倍
CPU 密集型:
- 性能提升:幾乎沒有(甚至更差)
混合型:
- 性能提升:取決於 I/O 占比
11. 總結
11.1 核心要點
asyncio 工具:
gather(): 並行執行create_task(): 後台任務wait_for(): 超時控制as_completed(): 按完成順序處理Queue: 生產者消費者
最佳實踐:
- ✅ 並行執行獨立任務
- ✅ 設置超時
- ✅ 使用
return_exceptions=True容錯 - ✅ 限制並發數量
常見陷阱:
- ❌ 忘記
await - ❌ 在異步中使用同步代碼
- ❌ 無限並發
- ❌ 忘記
性能:
- I/O 密集型:巨大提升
- CPU 密集型:沒有提升
11.2 決策表
| 場景 | 推薦方案 |
|---|---|
| 並行執行多個任務 | asyncio.gather() |
| 後台任務 | asyncio.create_task() |
| 設置超時 | asyncio.wait_for() |
| 限制並發 | asyncio.Semaphore |
| CPU 密集型 | loop.run_in_executor() |
參考資料
官方文檔:
第三方庫:
深入閱讀:
本章完結:Django 異步編程系列到此結束!