Django 面試準備 06-4:Channels 實戰案例

5 個真實場景的 Channels 應用與最佳實踐

06-4. Channels 實戰案例

本章通過 5 個真實場景,展示 Django Channels 在生產環境中的實際應用。


1. 案例一:聊天室系統

場景描述

構建一個多房間聊天系統,支持:

  • 多個聊天室
  • 用戶加入/離開通知
  • 在線用戶列表
  • 歷史消息加載

完整實現

步驟 1:數據模型

# models.py
from django.db import models
from django.contrib.auth.models import User

class ChatRoom(models.Model):
    """聊天室"""
    name = models.CharField(max_length=100, unique=True)
    slug = models.SlugField(unique=True)
    description = models.TextField(blank=True)
    created_at = models.DateTimeField(auto_now_add=True)

    def __str__(self):
        return self.name

class Message(models.Model):
    """聊天消息"""
    room = models.ForeignKey(ChatRoom, on_delete=models.CASCADE, related_name='messages')
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    content = models.TextField()
    created_at = models.DateTimeField(auto_now_add=True)

    class Meta:
        ordering = ['created_at']

    def __str__(self):
        return f'{self.user.username}: {self.content[:50]}'

步驟 2:Consumer 實現

# consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.contrib.auth.models import User
import json
from datetime import datetime

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        """用戶連接"""
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'
        self.user = self.scope['user']

        # 檢查用戶是否已認證
        if not self.user.is_authenticated:
            await self.close()
            return

        # 加入房間群組
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()

        # 通知其他用戶有人加入
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'user_join',
                'user': self.user.username,
                'timestamp': datetime.now().isoformat()
            }
        )

        # 發送歷史消息給新用戶
        messages = await self.get_room_messages()
        await self.send(text_data=json.dumps({
            'type': 'history',
            'messages': messages
        }))

        # 發送在線用戶列表
        online_users = await self.get_online_users()
        await self.send(text_data=json.dumps({
            'type': 'online_users',
            'users': online_users
        }))

    async def disconnect(self, close_code):
        """用戶斷開"""
        # 通知其他用戶有人離開
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'user_leave',
                'user': self.user.username,
                'timestamp': datetime.now().isoformat()
            }
        )

        # 離開群組
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    async def receive(self, text_data):
        """接收消息"""
        data = json.loads(text_data)
        message_type = data.get('type')

        if message_type == 'chat_message':
            message = data['message']

            # 保存到資料庫
            await self.save_message(message)

            # 廣播給房間所有用戶
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    'type': 'chat_message',
                    'user': self.user.username,
                    'message': message,
                    'timestamp': datetime.now().isoformat()
                }
            )

        elif message_type == 'typing':
            # 用戶正在輸入
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    'type': 'user_typing',
                    'user': self.user.username
                }
            )

    # 群組消息處理器
    async def chat_message(self, event):
        """廣播聊天消息"""
        await self.send(text_data=json.dumps({
            'type': 'chat_message',
            'user': event['user'],
            'message': event['message'],
            'timestamp': event['timestamp']
        }))

    async def user_join(self, event):
        """用戶加入通知"""
        await self.send(text_data=json.dumps({
            'type': 'user_join',
            'user': event['user'],
            'timestamp': event['timestamp']
        }))

    async def user_leave(self, event):
        """用戶離開通知"""
        await self.send(text_data=json.dumps({
            'type': 'user_leave',
            'user': event['user'],
            'timestamp': event['timestamp']
        }))

    async def user_typing(self, event):
        """用戶正在輸入"""
        # 不發送給自己
        if event['user'] != self.user.username:
            await self.send(text_data=json.dumps({
                'type': 'user_typing',
                'user': event['user']
            }))

    # 資料庫操作
    @database_sync_to_async
    def save_message(self, message):
        """保存消息到資料庫"""
        from .models import ChatRoom, Message

        room = ChatRoom.objects.get(slug=self.room_name)
        return Message.objects.create(
            room=room,
            user=self.user,
            content=message
        )

    @database_sync_to_async
    def get_room_messages(self):
        """獲取房間歷史消息(最近 50 條)"""
        from .models import ChatRoom

        room = ChatRoom.objects.get(slug=self.room_name)
        messages = room.messages.select_related('user').order_by('-created_at')[:50]

        return [
            {
                'user': msg.user.username,
                'message': msg.content,
                'timestamp': msg.created_at.isoformat()
            }
            for msg in reversed(messages)
        ]

    @database_sync_to_async
    def get_online_users(self):
        """獲取在線用戶列表(簡化實現)"""
        # 實際應該使用 Redis 或其他方式追蹤
        return [self.user.username]

步驟 3:路由配置

# routing.py
from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]

步驟 4:前端實現

<!-- chat.html -->
<!DOCTYPE html>
<html>
<head>
    <title>Chat Room - {{ room.name }}</title>
    <style>
        #chat-log {
            height: 400px;
            overflow-y: scroll;
            border: 1px solid #ccc;
            padding: 10px;
            margin-bottom: 10px;
        }
        .message {
            margin: 5px 0;
        }
        .system-message {
            color: #666;
            font-style: italic;
        }
        .typing-indicator {
            color: #999;
            font-size: 0.9em;
        }
    </style>
</head>
<body>
    <h1>{{ room.name }}</h1>

    <div id="online-users">
        <strong>在線用戶:</strong>
        <span id="user-list"></span>
    </div>

    <div id="chat-log"></div>

    <div id="typing-indicator" class="typing-indicator"></div>

    <input id="chat-message-input" type="text" placeholder="輸入消息..." size="100">
    <button id="chat-message-submit">發送</button>

    <script>
        const roomName = "{{ room.slug }}";
        const username = "{{ request.user.username }}";

        // 建立 WebSocket 連接
        const chatSocket = new WebSocket(
            'ws://' + window.location.host +
            '/ws/chat/' + roomName + '/'
        );

        // 連接成功
        chatSocket.onopen = function(e) {
            console.log('WebSocket connected');
        };

        // 接收消息
        chatSocket.onmessage = function(e) {
            const data = JSON.parse(e.data);

            switch(data.type) {
                case 'history':
                    // 顯示歷史消息
                    data.messages.forEach(msg => {
                        appendMessage(msg.user, msg.message, msg.timestamp);
                    });
                    break;

                case 'chat_message':
                    // 新消息
                    appendMessage(data.user, data.message, data.timestamp);
                    break;

                case 'user_join':
                    // 用戶加入
                    appendSystemMessage(`${data.user} 加入了聊天室`);
                    break;

                case 'user_leave':
                    // 用戶離開
                    appendSystemMessage(`${data.user} 離開了聊天室`);
                    break;

                case 'user_typing':
                    // 用戶正在輸入
                    showTypingIndicator(data.user);
                    break;

                case 'online_users':
                    // 更新在線用戶列表
                    document.getElementById('user-list').textContent = data.users.join(', ');
                    break;
            }
        };

        // 連接關閉
        chatSocket.onclose = function(e) {
            console.error('WebSocket closed');
        };

        // 發送消息
        document.getElementById('chat-message-submit').onclick = function(e) {
            const messageInput = document.getElementById('chat-message-input');
            const message = messageInput.value.trim();

            if (message) {
                chatSocket.send(JSON.stringify({
                    'type': 'chat_message',
                    'message': message
                }));
                messageInput.value = '';
            }
        };

        // Enter 鍵發送
        document.getElementById('chat-message-input').onkeyup = function(e) {
            if (e.keyCode === 13) {
                document.getElementById('chat-message-submit').click();
            } else {
                // 發送正在輸入通知(節流)
                clearTimeout(window.typingTimeout);
                window.typingTimeout = setTimeout(() => {
                    chatSocket.send(JSON.dumps({
                        'type': 'typing'
                    }));
                }, 500);
            }
        };

        // 顯示消息
        function appendMessage(user, message, timestamp) {
            const chatLog = document.getElementById('chat-log');
            const messageDiv = document.createElement('div');
            messageDiv.className = 'message';

            const time = new Date(timestamp).toLocaleTimeString();
            messageDiv.innerHTML = `<strong>${user}</strong> [${time}]: ${escapeHtml(message)}`;

            chatLog.appendChild(messageDiv);
            chatLog.scrollTop = chatLog.scrollHeight;
        }

        // 顯示系統消息
        function appendSystemMessage(message) {
            const chatLog = document.getElementById('chat-log');
            const messageDiv = document.createElement('div');
            messageDiv.className = 'message system-message';
            messageDiv.textContent = message;

            chatLog.appendChild(messageDiv);
            chatLog.scrollTop = chatLog.scrollHeight;
        }

        // 顯示正在輸入
        function showTypingIndicator(user) {
            const indicator = document.getElementById('typing-indicator');
            indicator.textContent = `${user} 正在輸入...`;

            clearTimeout(window.typingIndicatorTimeout);
            window.typingIndicatorTimeout = setTimeout(() => {
                indicator.textContent = '';
            }, 3000);
        }

        // 轉義 HTML
        function escapeHtml(text) {
            const map = {
                '&': '&amp;',
                '<': '&lt;',
                '>': '&gt;',
                '"': '&quot;',
                "'": '&#039;'
            };
            return text.replace(/[&<>"']/g, m => map[m]);
        }
    </script>
</body>
</html>

2. 案例二:實時通知系統

場景描述

當後端事件發生時,即時推送通知給用戶:

  • 新訂單通知
  • 系統消息
  • 好友請求
  • 評論回覆

實現

# consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
import json

class NotificationConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        """連接建立"""
        self.user = self.scope['user']

        if not self.user.is_authenticated:
            await self.close()
            return

        # 每個用戶有自己的通知頻道
        self.user_group_name = f'user_{self.user.id}'

        # 加入個人群組
        await self.channel_layer.group_add(
            self.user_group_name,
            self.channel_name
        )

        await self.accept()

        # 發送未讀通知數量
        unread_count = await self.get_unread_count()
        await self.send(text_data=json.dumps({
            'type': 'unread_count',
            'count': unread_count
        }))

    async def disconnect(self, close_code):
        """斷開連接"""
        await self.channel_layer.group_discard(
            self.user_group_name,
            self.channel_name
        )

    async def receive(self, text_data):
        """接收客戶端消息"""
        data = json.loads(text_data)

        if data['type'] == 'mark_read':
            # 標記通知已讀
            notification_id = data['notification_id']
            await self.mark_notification_read(notification_id)

    # 通知處理器
    async def send_notification(self, event):
        """發送通知"""
        await self.send(text_data=json.dumps({
            'type': 'notification',
            'id': event['notification_id'],
            'title': event['title'],
            'message': event['message'],
            'url': event.get('url'),
            'timestamp': event['timestamp']
        }))

    # 資料庫操作
    @database_sync_to_async
    def get_unread_count(self):
        """獲取未讀通知數量"""
        from .models import Notification
        return Notification.objects.filter(
            user=self.user,
            is_read=False
        ).count()

    @database_sync_to_async
    def mark_notification_read(self, notification_id):
        """標記通知已讀"""
        from .models import Notification
        Notification.objects.filter(
            id=notification_id,
            user=self.user
        ).update(is_read=True)
# models.py
class Notification(models.Model):
    """通知模型"""
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    title = models.CharField(max_length=200)
    message = models.TextField()
    url = models.URLField(blank=True)
    is_read = models.BooleanField(default=False)
    created_at = models.DateTimeField(auto_now_add=True)

    class Meta:
        ordering = ['-created_at']
# utils.py
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from datetime import datetime

def send_user_notification(user_id, title, message, url=None):
    """發送通知給指定用戶"""
    from .models import Notification

    # 保存到資料庫
    notification = Notification.objects.create(
        user_id=user_id,
        title=title,
        message=message,
        url=url
    )

    # 通過 WebSocket 推送
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        f'user_{user_id}',
        {
            'type': 'send_notification',
            'notification_id': notification.id,
            'title': title,
            'message': message,
            'url': url or '',
            'timestamp': datetime.now().isoformat()
        }
    )

# 使用示例
# views.py
def create_order(request):
    order = Order.objects.create(...)

    # 發送通知
    send_user_notification(
        user_id=request.user.id,
        title='訂單已創建',
        message=f'您的訂單 {order.id} 已成功創建',
        url=f'/orders/{order.id}/'
    )

    return JsonResponse({'order_id': order.id})

3. 案例三:協作編輯(簡化版)

場景描述

多個用戶同時編輯同一個文檔,實時同步。

實現

# consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
import json

class CollaborativeEditConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        """連接建立"""
        self.document_id = self.scope['url_route']['kwargs']['document_id']
        self.room_group_name = f'document_{self.document_id}'
        self.user = self.scope['user']

        if not self.user.is_authenticated:
            await self.close()
            return

        # 加入文檔群組
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()

        # 通知其他用戶有人加入
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'user_joined',
                'user': self.user.username,
                'user_id': self.user.id
            }
        )

        # 發送當前文檔內容
        content = await self.get_document_content()
        await self.send(text_data=json.dumps({
            'type': 'document_content',
            'content': content
        }))

    async def disconnect(self, close_code):
        """斷開連接"""
        # 通知其他用戶有人離開
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'user_left',
                'user': self.user.username,
                'user_id': self.user.id
            }
        )

        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    async def receive(self, text_data):
        """接收編輯操作"""
        data = json.loads(text_data)
        operation_type = data['type']

        if operation_type == 'edit':
            # 編輯操作
            position = data['position']
            text = data['text']
            action = data['action']  # 'insert' or 'delete'

            # 保存到資料庫(可選)
            # await self.save_edit_operation(...)

            # 廣播給其他用戶
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    'type': 'broadcast_edit',
                    'user': self.user.username,
                    'user_id': self.user.id,
                    'position': position,
                    'text': text,
                    'action': action
                }
            )

        elif operation_type == 'cursor_move':
            # 光標移動
            position = data['position']

            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    'type': 'cursor_update',
                    'user': self.user.username,
                    'user_id': self.user.id,
                    'position': position
                }
            )

    # 廣播處理器
    async def broadcast_edit(self, event):
        """廣播編輯操作"""
        # 不發送給自己
        if event['user_id'] != self.user.id:
            await self.send(text_data=json.dumps({
                'type': 'edit',
                'user': event['user'],
                'position': event['position'],
                'text': event['text'],
                'action': event['action']
            }))

    async def cursor_update(self, event):
        """光標位置更新"""
        if event['user_id'] != self.user.id:
            await self.send(text_data=json.dumps({
                'type': 'cursor',
                'user': event['user'],
                'position': event['position']
            }))

    async def user_joined(self, event):
        """用戶加入通知"""
        if event['user_id'] != self.user.id:
            await self.send(text_data=json.dumps({
                'type': 'user_joined',
                'user': event['user']
            }))

    async def user_left(self, event):
        """用戶離開通知"""
        await self.send(text_data=json.dumps({
            'type': 'user_left',
            'user': event['user']
        }))

    @database_sync_to_async
    def get_document_content(self):
        """獲取文檔內容"""
        from .models import Document
        document = Document.objects.get(id=self.document_id)
        return document.content

4. 案例四:實時儀表板

場景描述

管理員監控實時數據:在線用戶數、訂單數、系統狀態等。

實現

# consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
import json
import asyncio

class DashboardConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        """連接建立"""
        self.user = self.scope['user']

        # 檢查權限(只有管理員能訪問)
        if not self.user.is_authenticated or not self.user.is_staff:
            await self.close()
            return

        self.room_group_name = 'dashboard'

        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()

        # 發送初始數據
        stats = await self.get_initial_stats()
        await self.send(text_data=json.dumps({
            'type': 'initial_stats',
            'data': stats
        }))

        # 開始定期推送更新(每 5 秒)
        asyncio.create_task(self.send_periodic_updates())

    async def disconnect(self, close_code):
        """斷開連接"""
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    async def send_periodic_updates(self):
        """定期發送更新"""
        try:
            while True:
                await asyncio.sleep(5)

                # 獲取最新數據
                stats = await self.get_current_stats()

                # 發送給客戶端
                await self.send(text_data=json.dumps({
                    'type': 'stats_update',
                    'data': stats
                }))
        except asyncio.CancelledError:
            pass

    # 實時事件處理器
    async def new_order(self, event):
        """新訂單事件"""
        await self.send(text_data=json.dumps({
            'type': 'new_order',
            'order': event['order_data']
        }))

    async def system_alert(self, event):
        """系統告警"""
        await self.send(text_data=json.dumps({
            'type': 'alert',
            'level': event['level'],
            'message': event['message']
        }))

    # 資料庫操作
    @database_sync_to_async
    def get_initial_stats(self):
        """獲取初始統計數據"""
        from django.contrib.auth.models import User
        from .models import Order
        from django.utils import timezone
        from datetime import timedelta

        today = timezone.now().date()

        return {
            'total_users': User.objects.count(),
            'online_users': self.get_online_user_count(),
            'today_orders': Order.objects.filter(created_at__date=today).count(),
            'today_revenue': Order.objects.filter(
                created_at__date=today
            ).aggregate(Sum('total'))['total__sum'] or 0,
        }

    @database_sync_to_async
    def get_current_stats(self):
        """獲取當前統計數據"""
        return self.get_initial_stats()

    def get_online_user_count(self):
        """獲取在線用戶數(簡化實現)"""
        # 實際應該使用 Redis 追蹤
        return 42
# 在其他地方觸發事件
# views.py
def create_order(request):
    order = Order.objects.create(...)

    # 推送新訂單到儀表板
    from channels.layers import get_channel_layer
    from asgiref.sync import async_to_sync

    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        'dashboard',
        {
            'type': 'new_order',
            'order_data': {
                'id': order.id,
                'user': order.user.username,
                'total': float(order.total),
                'status': order.status
            }
        }
    )

    return JsonResponse({'order_id': order.id})

5. 案例五:Celery + Channels 整合

場景描述

長時間任務(Celery)完成後,通過 WebSocket(Channels)即時通知用戶。

實現

# tasks.py
from celery import shared_task
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from datetime import datetime

@shared_task(bind=True)
def generate_report_task(self, user_id, report_type):
    """生成報表(耗時任務)"""
    try:
        # 發送開始通知
        send_task_update(user_id, self.request.id, 'started', 0)

        # 模擬報表生成(分步驟)
        steps = 10
        for i in range(steps):
            # 執行某個步驟
            time.sleep(2)  # 模擬耗時操作

            # 更新進度
            progress = int((i + 1) / steps * 100)
            send_task_update(
                user_id,
                self.request.id,
                'progress',
                progress,
                f'正在處理步驟 {i+1}/{steps}'
            )

        # 生成完成
        report_url = f'/reports/{self.request.id}.pdf'
        send_task_update(
            user_id,
            self.request.id,
            'completed',
            100,
            '報表生成完成',
            result={'url': report_url}
        )

        return report_url

    except Exception as e:
        # 任務失敗
        send_task_update(
            user_id,
            self.request.id,
            'failed',
            0,
            str(e)
        )
        raise

def send_task_update(user_id, task_id, status, progress, message='', result=None):
    """發送任務更新通知"""
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        f'user_{user_id}',
        {
            'type': 'task_update',
            'task_id': task_id,
            'status': status,
            'progress': progress,
            'message': message,
            'result': result or {},
            'timestamp': datetime.now().isoformat()
        }
    )
# consumers.py
class TaskNotificationConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        """連接建立"""
        self.user = self.scope['user']

        if not self.user.is_authenticated:
            await self.close()
            return

        self.user_group_name = f'user_{self.user.id}'

        await self.channel_layer.group_add(
            self.user_group_name,
            self.channel_name
        )

        await self.accept()

    async def disconnect(self, close_code):
        """斷開連接"""
        await self.channel_layer.group_discard(
            self.user_group_name,
            self.channel_name
        )

    async def task_update(self, event):
        """接收任務更新"""
        await self.send(text_data=json.dumps({
            'type': 'task_update',
            'task_id': event['task_id'],
            'status': event['status'],
            'progress': event['progress'],
            'message': event['message'],
            'result': event['result'],
            'timestamp': event['timestamp']
        }))
# views.py
from .tasks import generate_report_task

def request_report(request):
    """請求生成報表"""
    report_type = request.POST['type']

    # 發送 Celery 任務
    task = generate_report_task.delay(request.user.id, report_type)

    return JsonResponse({
        'task_id': task.id,
        'message': '報表生成中,請稍候...'
    })
// 前端實現
const socket = new WebSocket('ws://localhost:8000/ws/tasks/');

socket.onmessage = function(e) {
    const data = JSON.parse(e.data);

    if (data.type === 'task_update') {
        const { task_id, status, progress, message, result } = data;

        // 更新進度條
        updateProgressBar(task_id, progress);

        // 更新狀態消息
        updateStatusMessage(task_id, message);

        if (status === 'completed') {
            // 任務完成
            showDownloadLink(result.url);
        } else if (status === 'failed') {
            // 任務失敗
            showError(message);
        }
    }
};

// 請求生成報表
function requestReport(type) {
    fetch('/api/reports/request/', {
        method: 'POST',
        body: JSON.stringify({ type: type })
    })
    .then(response => response.json())
    .then(data => {
        console.log('Task ID:', data.task_id);
        // WebSocket 會自動接收更新
    });
}

6. 生產環境最佳實踐

1. 認證與授權

# middleware.py
from channels.auth import AuthMiddlewareStack
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
from urllib.parse import parse_qs

class TokenAuthMiddleware:
    """Token 認證中間件"""

    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        # 從 query string 獲取 token
        query_string = parse_qs(scope['query_string'].decode())
        token = query_string.get('token', [None])[0]

        if token:
            scope['user'] = await self.get_user_from_token(token)
        else:
            scope['user'] = AnonymousUser()

        return await self.app(scope, receive, send)

    @database_sync_to_async
    def get_user_from_token(self, token):
        """從 token 獲取用戶"""
        try:
            from rest_framework.authtoken.models import Token
            return Token.objects.get(key=token).user
        except:
            return AnonymousUser()

# asgi.py
application = ProtocolTypeRouter({
    "websocket": TokenAuthMiddleware(
        URLRouter(websocket_urlpatterns)
    ),
})

2. 錯誤處理

class RobustChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        try:
            # 連接邏輯
            await self.accept()
        except Exception as e:
            logger.error(f"Connection error: {e}")
            await self.close()

    async def receive(self, text_data):
        try:
            data = json.loads(text_data)
            # 處理消息
        except json.JSONDecodeError:
            await self.send(text_data=json.dumps({
                'error': 'Invalid JSON'
            }))
        except Exception as e:
            logger.error(f"Receive error: {e}")
            await self.send(text_data=json.dumps({
                'error': 'Internal server error'
            }))

3. 限流

from django.core.cache import cache

class RateLimitedConsumer(AsyncWebsocketConsumer):
    async def receive(self, text_data):
        # 限流檢查
        cache_key = f'ratelimit_{self.user.id}'
        request_count = cache.get(cache_key, 0)

        if request_count > 100:  # 每分鐘最多 100 條消息
            await self.send(text_data=json.dumps({
                'error': 'Rate limit exceeded'
            }))
            return

        cache.set(cache_key, request_count + 1, timeout=60)

        # 處理消息
        ...

4. 監控

import logging
import time

logger = logging.getLogger(__name__)

class MonitoredConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.connect_time = time.time()
        logger.info(f"User {self.scope['user'].id} connected")
        await self.accept()

    async def disconnect(self, close_code):
        duration = time.time() - self.connect_time
        logger.info(
            f"User {self.scope['user'].id} disconnected "
            f"after {duration:.2f}s, code={close_code}"
        )

    async def receive(self, text_data):
        start_time = time.time()

        # 處理消息
        await self.process_message(text_data)

        duration = time.time() - start_time
        if duration > 1:  # 超過 1 秒記錄
            logger.warning(
                f"Slow message processing: {duration:.2f}s"
            )

小結

本章展示了 5 個 Channels 實戰案例:

  1. 聊天室系統:多房間、歷史消息、在線用戶、正在輸入
  2. 實時通知系統:個人通知頻道、未讀數量、標記已讀
  3. 協作編輯:多人同步編輯、光標位置、實時更新
  4. 實時儀表板:定期推送數據、實時事件、系統監控
  5. Celery + Channels:長任務進度推送、完成通知

關鍵要點:

  • ✅ 使用異步 Consumer 獲得最佳性能
  • ✅ 合理使用 Channel Layer(Redis)
  • ✅ 實現認證、錯誤處理、限流
  • ✅ 監控連接數和消息處理時間
  • ✅ Celery 處理耗時任務 + Channels 推送通知

下一章將對比 Celery 和 Channels,幫助你選擇合適的方案!

0%