Django 面試準備 04-1:Worker 超時問題
深入理解 Gunicorn Worker Timeout 的成因與解決方案
目錄
04-1. Worker 超時問題
Worker 超時是 Gunicorn 生產環境中最常見的問題之一。本章將深入探討超時的成因、診斷方法與解決方案。
1. 什麼是 Worker 超時?
超時機制
Gunicorn 使用 主進程監控機制 來管理 Worker:
# Gunicorn 的超時檢查機制(簡化版)
class Arbiter:
def murder_workers(self):
"""主進程定期檢查 Worker 是否超時"""
now = time.time()
for worker_pid, worker in self.workers.items():
if (now - worker.last_notif) > self.timeout:
# Worker 在 timeout 秒內沒有回應
os.kill(worker_pid, signal.SIGKILL) # 強制殺死
self.log.critical("Worker %s timeout, killed", worker_pid)預設超時時間
# gunicorn.conf.py
timeout = 30 # 預設 30 秒
# 超時後會發生什麼:
# 1. 主進程發現 Worker 超過 30 秒沒有回應
# 2. 發送 SIGKILL 信號強制殺死 Worker
# 3. 啟動新的 Worker 替代
# 4. 記錄錯誤日誌2. 為什麼會發生超時?
常見原因
原因 1:請求處理時間過長
# ❌ 錯誤:長時間運算沒有分拆
def process_large_file(request):
file = request.FILES['data']
# 處理 1GB 的文件,耗時 60 秒
result = process_data(file.read()) # 超過 30 秒超時
return JsonResponse({'result': result})原因 2:外部 API 呼叫慢
# ❌ 錯誤:沒有設置外部 API 的超時
def fetch_user_data(request):
user_id = request.GET['user_id']
# 外部 API 響應慢,沒有超時設定
response = requests.get(
f'https://api.external.com/users/{user_id}'
# 缺少 timeout 參數!
)
return JsonResponse(response.json())原因 3:資料庫查詢慢
# ❌ 錯誤:N+1 查詢問題
def get_orders(request):
orders = Order.objects.all() # 10,000 筆
result = []
for order in orders:
# 每次都查詢資料庫!
result.append({
'order_id': order.id,
'user': order.user.name, # 查詢 1 次
'items': order.items.count(), # 查詢 1 次
})
# 總共查詢:1 + 10,000 + 10,000 = 20,001 次
# 耗時可能超過 30 秒
return JsonResponse({'orders': result})原因 4:死鎖(Deadlock)
# ❌ 錯誤:資料庫死鎖
from django.db import transaction
@transaction.atomic
def transfer_money(request):
# Transaction 1: A → B
account_a = Account.objects.select_for_update().get(id=1)
account_b = Account.objects.select_for_update().get(id=2)
# 同時有另一個 transaction: B → A
# 兩個 transaction 互相等待,形成死鎖
# Worker 永遠等待,直到超時被殺死原因 5:記憶體不足
# ❌ 錯誤:記憶體占用過高導致 swap
def process_images(request):
images = []
# 載入 1000 張圖片到記憶體
for i in range(1000):
img = Image.open(f'image_{i}.jpg')
images.append(img) # 每張 10MB = 10GB
# 記憶體不足,開始 swap
# 系統變慢,處理時間超過 30 秒3. 如何診斷超時問題?
步驟 1:檢查日誌
# Gunicorn 日誌
tail -f /var/log/gunicorn/error.log
# 典型的超時日誌:
[2025-01-31 14:23:45 +0800] [12345] [CRITICAL] WORKER TIMEOUT (pid:12346)
[2025-01-31 14:23:45 +0800] [12345] [WARNING] Worker with pid 12346 was terminated due to signal 9步驟 2:找出慢請求
# middleware.py - 記錄請求時間
import time
import logging
logger = logging.getLogger(__name__)
class SlowRequestMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
start_time = time.time()
response = self.get_response(request)
duration = time.time() - start_time
# 記錄超過 10 秒的請求
if duration > 10:
logger.warning(
f"Slow request: {request.path} "
f"took {duration:.2f}s"
)
return response# settings.py
MIDDLEWARE = [
'myapp.middleware.SlowRequestMiddleware', # 加在最前面
# ...
]步驟 3:使用 Django Debug Toolbar
# settings.py(開發環境)
if DEBUG:
INSTALLED_APPS += ['debug_toolbar']
MIDDLEWARE += ['debug_toolbar.middleware.DebugToolbarMiddleware']
INTERNAL_IPS = ['127.0.0.1']
# 可以看到:
# - SQL 查詢數量和時間
# - 每個查詢的 EXPLAIN
# - 模板渲染時間步驟 4:使用 py-spy 分析
# 安裝 py-spy
pip install py-spy
# 找出 Worker PID
ps aux | grep gunicorn
# 分析 Worker 在做什麼
sudo py-spy top --pid 12346
# 輸出:
# %Own %Total OwnTime TotalTime Function (filename:line)
# 35.00% 35.00% 3.50s 3.50s process_data (views.py:45)
# 25.00% 25.00% 2.50s 2.50s database_query (models.py:123)步驟 5:檢查資料庫慢查詢
# PostgreSQL:開啟慢查詢日誌
# postgresql.conf
log_min_duration_statement = 1000 # 記錄超過 1 秒的查詢
# 查看慢查詢
tail -f /var/log/postgresql/postgresql-*.log4. 解決方案
方案 1:增加超時時間
# gunicorn.conf.py
# ⚠️ 治標不治本!只在特殊情況使用
# 情況 1:合理的長請求(報表生成)
timeout = 120 # 2 分鐘
# 情況 2:啟動時需要載入大量資料
graceful_timeout = 120 # Worker 優雅關閉的時間何時不該增加超時:
- ❌ 程式碼有 N+1 查詢問題 → 應該修復查詢
- ❌ 外部 API 慢 → 應該加入超時或改用異步
- ❌ 資料庫查詢慢 → 應該優化查詢
方案 2:優化資料庫查詢
# ❌ 錯誤:N+1 查詢
def get_orders_slow(request):
orders = Order.objects.all()
result = []
for order in orders:
result.append({
'order_id': order.id,
'user': order.user.name, # N+1
'items': order.items.count(), # N+1
})
return JsonResponse({'orders': result})
# ✅ 正確:使用 select_related 和 prefetch_related
def get_orders_fast(request):
orders = Order.objects.select_related('user') \
.prefetch_related('items') \
.all()
result = []
for order in orders:
result.append({
'order_id': order.id,
'user': order.user.name, # 不會額外查詢
'items': order.items.count(), # 不會額外查詢
})
return JsonResponse({'orders': result})
# 查詢從 20,001 次降為 3 次!方案 3:設置外部 API 超時
# ✅ 正確:設置合理的超時時間
import requests
def fetch_user_data(request):
user_id = request.GET['user_id']
try:
response = requests.get(
f'https://api.external.com/users/{user_id}',
timeout=(3, 10) # (連接超時, 讀取超時)
)
response.raise_for_status()
return JsonResponse(response.json())
except requests.Timeout:
# 外部 API 超時,返回錯誤
return JsonResponse({'error': 'External API timeout'}, status=504)
except requests.RequestException as e:
return JsonResponse({'error': str(e)}, status=500)方案 4:改用異步處理
# ✅ 正確:長時間任務使用 Celery
from celery import shared_task
from django.http import JsonResponse
@shared_task
def process_large_file_task(file_path):
"""異步處理大文件"""
with open(file_path, 'rb') as f:
data = f.read()
result = process_data(data)
return result
def upload_file(request):
if request.method == 'POST':
file = request.FILES['data']
# 先儲存文件
file_path = f'/tmp/{file.name}'
with open(file_path, 'wb') as f:
f.write(file.read())
# 異步處理
task = process_large_file_task.delay(file_path)
# 立即返回
return JsonResponse({
'task_id': task.id,
'status': 'processing'
})方案 5:使用分頁
# ✅ 正確:大量資料使用分頁
from django.core.paginator import Paginator
def get_orders(request):
page_num = int(request.GET.get('page', 1))
page_size = int(request.GET.get('size', 100))
# 只查詢當前頁的資料
orders = Order.objects.select_related('user') \
.prefetch_related('items') \
.all()
paginator = Paginator(orders, page_size)
page = paginator.get_page(page_num)
result = []
for order in page:
result.append({
'order_id': order.id,
'user': order.user.name,
'items': order.items.count(),
})
return JsonResponse({
'orders': result,
'total': paginator.count,
'page': page_num,
'pages': paginator.num_pages,
})方案 6:添加快取
# ✅ 正確:經常訪問的資料使用快取
from django.core.cache import cache
def get_user_profile(request, user_id):
# 先檢查快取
cache_key = f'user_profile_{user_id}'
profile = cache.get(cache_key)
if profile is None:
# 快取未命中,查詢資料庫
user = User.objects.select_related('profile') \
.get(id=user_id)
profile = {
'name': user.name,
'email': user.email,
'bio': user.profile.bio,
}
# 儲存到快取,TTL 5 分鐘
cache.set(cache_key, profile, timeout=300)
return JsonResponse(profile)5. 最佳實踐
原則 1:設置合理的超時時間
# gunicorn.conf.py
# 根據應用類型設置
# API 服務(快速回應)
timeout = 30 # 預設值
# 報表系統(可能需要較長時間)
timeout = 120
# 即時服務(WebSocket)
timeout = 300
keepalive = 5原則 2:所有外部呼叫都要有超時
# ✅ 資料庫查詢超時
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("SET statement_timeout TO 5000") # 5 秒
cursor.execute("SELECT * FROM large_table")
# ✅ HTTP 請求超時
import requests
response = requests.get(url, timeout=(3, 10))
# ✅ Redis 操作超時
import redis
r = redis.Redis(host='localhost', port=6379, socket_timeout=5)原則 3:監控和告警
# middleware.py - 記錄慢請求並告警
import time
import logging
from django.conf import settings
logger = logging.getLogger(__name__)
class PerformanceMonitorMiddleware:
def __init__(self, get_response):
self.get_response = get_response
self.warning_threshold = settings.SLOW_REQUEST_WARNING # 10 秒
self.critical_threshold = settings.SLOW_REQUEST_CRITICAL # 20 秒
def __call__(self, request):
start_time = time.time()
response = self.get_response(request)
duration = time.time() - start_time
if duration > self.critical_threshold:
# 發送告警(例如:發送到 Sentry)
logger.critical(
f"CRITICAL: Request {request.path} took {duration:.2f}s"
)
elif duration > self.warning_threshold:
logger.warning(
f"WARNING: Request {request.path} took {duration:.2f}s"
)
# 記錄到 response header(方便除錯)
response['X-Request-Duration'] = f'{duration:.3f}'
return response原則 4:使用健康檢查
# views.py
from django.http import JsonResponse
from django.db import connection
from django.core.cache import cache
import redis
def health_check(request):
"""健康檢查端點"""
checks = {}
# 檢查資料庫
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
checks['database'] = 'ok'
except Exception as e:
checks['database'] = f'error: {str(e)}'
# 檢查快取
try:
cache.set('health_check', 'ok', timeout=10)
result = cache.get('health_check')
checks['cache'] = 'ok' if result == 'ok' else 'error'
except Exception as e:
checks['cache'] = f'error: {str(e)}'
# 判斷整體健康狀態
all_ok = all(v == 'ok' for v in checks.values())
return JsonResponse({
'status': 'healthy' if all_ok else 'unhealthy',
'checks': checks,
}, status=200 if all_ok else 503)6. 實戰案例:報表生成超時
問題描述
# ❌ 問題:生成大型報表超時
def generate_sales_report(request):
start_date = request.GET['start_date']
end_date = request.GET['end_date']
# 查詢 3 個月的訂單資料(100,000 筆)
orders = Order.objects.filter(
created_at__range=[start_date, end_date]
)
# 計算統計資料(耗時 45 秒)
stats = calculate_stats(orders)
# 生成 Excel(耗時 30 秒)
excel_file = generate_excel(stats)
# 總共 75 秒,超過 30 秒超時!
return FileResponse(excel_file)解決方案:改用異步
# ✅ 方案 1:使用 Celery 異步生成
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def generate_report_task(user_email, start_date, end_date):
"""異步生成報表"""
orders = Order.objects.filter(
created_at__range=[start_date, end_date]
)
stats = calculate_stats(orders)
excel_file = generate_excel(stats)
# 儲存到 S3
file_url = upload_to_s3(excel_file)
# 發送郵件通知
send_mail(
subject='您的報表已生成',
message=f'請點擊下載:{file_url}',
from_email='noreply@example.com',
recipient_list=[user_email],
)
return file_url
def request_sales_report(request):
"""提交報表生成請求"""
start_date = request.GET['start_date']
end_date = request.GET['end_date']
user_email = request.user.email
# 異步生成
task = generate_report_task.delay(user_email, start_date, end_date)
return JsonResponse({
'task_id': task.id,
'message': '報表生成中,完成後將發送郵件通知',
})面試常見問題
Q1:Gunicorn Worker 超時的預設時間是多少?如何修改?
答案:
預設是 30 秒。可以在配置文件中修改:
# gunicorn.conf.py
timeout = 60 # 改為 60 秒或啟動時指定:
gunicorn --timeout 60 myapp.wsgi:application注意: 增加超時只是治標,應該找出根本原因並優化。
Q2:如何診斷哪個請求導致超時?
答案:
有三種方法:
記錄請求時間的 Middleware:
class SlowRequestMiddleware: def __call__(self, request): start = time.time() response = self.get_response(request) duration = time.time() - start if duration > 10: logger.warning(f"Slow: {request.path} took {duration}s") return response使用 Django Debug Toolbar(開發環境)
使用 py-spy 分析 Worker:
sudo py-spy top --pid <worker_pid>
Q3:外部 API 呼叫應該如何設置超時?
答案:
使用 requests.get() 的 timeout 參數:
response = requests.get(
url,
timeout=(3, 10) # (連接超時, 讀取超時)
)
# 3 秒建立連接,10 秒完成讀取
# 總共最多 13 秒為什麼要分開設置?
- 連接超時:網路問題、DNS 解析慢
- 讀取超時:伺服器處理慢
Q4:Worker 超時被殺死後,正在處理的請求會怎樣?
答案:
- Gunicorn 發送 SIGKILL:Worker 立即被殺死
- 處理中的請求丟失:用戶會收到 502 Bad Gateway
- 資料庫事務可能未提交:導致資料不一致
- 主進程啟動新 Worker:替代被殺死的 Worker
如何避免:
- 使用異步任務處理長時間操作(Celery)
- 確保資料庫事務正確使用
@transaction.atomic - 實作冪等性(重試時不會產生副作用)
小結
Worker 超時問題的處理原則:
- 找出根本原因:不要只是增加超時時間
- 優化查詢:修復 N+1 查詢、添加索引
- 設置超時:所有外部呼叫都要有超時限制
- 使用異步:長時間任務改用 Celery
- 監控告警:及時發現慢請求
- 分頁和快取:減少單次請求的資料量
記住:超時是症狀,不是病因。 找出真正的性能瓶頸才能徹底解決問題!