Queue(佇列)資料結構完整教學

理解 FIFO 原理與 Python 實作技巧

Queue(佇列)資料結構完整教學

什麼是 Queue?

Queue(佇列)是一種遵循 FIFO(First In First Out,先進先出) 原則的線性資料結構。就像現實生活中的排隊一樣,第一個進入佇列的元素會第一個離開。

視覺化概念

排隊買票的例子:
[出口] ← 小明 ← 小華 ← 小美 ← [入口]
         ↑                      ↑
       先到的人              後到的人
      (先離開)              (後離開)

資料結構表示:
Front [10 | 20 | 30 | 40] Rear
出隊 ←                    ← 入隊

Queue 的基本操作

  1. Enqueue(入隊):在佇列尾端加入元素
  2. Dequeue(出隊):從佇列前端移除元素
  3. Front/Peek:查看佇列前端元素(不移除)
  4. IsEmpty:檢查佇列是否為空
  5. Size:取得佇列中的元素數量

為什麼需要 Queue?

實際應用場景

# 1. 印表機工作佇列
print_queue = Queue()
print_queue.enqueue("文件1.pdf")
print_queue.enqueue("報告.docx")
print_queue.enqueue("圖片.jpg")
# 按照順序列印

# 2. 作業系統的程序排程
process_queue = Queue()
process_queue.enqueue("Process A")
process_queue.enqueue("Process B")
# CPU 按順序處理

# 3. 網頁伺服器處理請求
request_queue = Queue()
request_queue.enqueue("GET /index.html")
request_queue.enqueue("POST /api/data")
# 按順序處理請求

實作方式比較

1. 使用 Python List(簡單但效率較低)

class QueueWithList:
    """使用 Python list 實作的 Queue"""
    
    def __init__(self):
        self.items = []
    
    def enqueue(self, item):
        """入隊 - O(1)"""
        self.items.append(item)
    
    def dequeue(self):
        """出隊 - O(n) 因為需要移動所有元素"""
        if self.is_empty():
            raise IndexError("Queue is empty")
        return self.items.pop(0)  # 移除第一個元素
    
    def front(self):
        """查看隊首元素 - O(1)"""
        if self.is_empty():
            raise IndexError("Queue is empty")
        return self.items[0]
    
    def is_empty(self):
        """檢查是否為空 - O(1)"""
        return len(self.items) == 0
    
    def size(self):
        """取得大小 - O(1)"""
        return len(self.items)
    
    def __str__(self):
        return f"Queue({self.items})"

# 使用範例
queue = QueueWithList()
queue.enqueue(10)
queue.enqueue(20)
queue.enqueue(30)
print(queue)  # Queue([10, 20, 30])
print(queue.dequeue())  # 10
print(queue)  # Queue([20, 30])

2. 使用 collections.deque(推薦)

from collections import deque

class QueueWithDeque:
    """使用 collections.deque 實作的 Queue - 效率最高"""
    
    def __init__(self):
        self.items = deque()
    
    def enqueue(self, item):
        """入隊 - O(1)"""
        self.items.append(item)
    
    def dequeue(self):
        """出隊 - O(1)"""
        if self.is_empty():
            raise IndexError("Queue is empty")
        return self.items.popleft()  # 從左邊移除
    
    def front(self):
        """查看隊首元素 - O(1)"""
        if self.is_empty():
            raise IndexError("Queue is empty")
        return self.items[0]
    
    def is_empty(self):
        """檢查是否為空 - O(1)"""
        return len(self.items) == 0
    
    def size(self):
        """取得大小 - O(1)"""
        return len(self.items)
    
    def __str__(self):
        return f"Queue({list(self.items)})"

# 使用範例
queue = QueueWithDeque()
for i in [10, 20, 30]:
    queue.enqueue(i)
    print(f"Enqueued {i}: {queue}")

while not queue.is_empty():
    item = queue.dequeue()
    print(f"Dequeued {item}: {queue}")

3. 使用 Linked List 實作

class Node:
    """節點類別"""
    def __init__(self, data):
        self.data = data
        self.next = None

class QueueWithLinkedList:
    """使用 Linked List 實作的 Queue"""
    
    def __init__(self):
        self.front_node = None  # 隊首
        self.rear_node = None   # 隊尾
        self._size = 0
    
    def enqueue(self, item):
        """入隊 - O(1)"""
        new_node = Node(item)
        
        if self.is_empty():
            self.front_node = self.rear_node = new_node
        else:
            self.rear_node.next = new_node
            self.rear_node = new_node
        
        self._size += 1
    
    def dequeue(self):
        """出隊 - O(1)"""
        if self.is_empty():
            raise IndexError("Queue is empty")
        
        data = self.front_node.data
        self.front_node = self.front_node.next
        
        if self.front_node is None:  # 佇列變空了
            self.rear_node = None
        
        self._size -= 1
        return data
    
    def front(self):
        """查看隊首元素 - O(1)"""
        if self.is_empty():
            raise IndexError("Queue is empty")
        return self.front_node.data
    
    def is_empty(self):
        """檢查是否為空 - O(1)"""
        return self.front_node is None
    
    def size(self):
        """取得大小 - O(1)"""
        return self._size
    
    def __str__(self):
        if self.is_empty():
            return "Queue([])"
        
        result = []
        current = self.front_node
        while current:
            result.append(str(current.data))
            current = current.next
        return f"Queue([{', '.join(result)}])"

# 使用範例
queue = QueueWithLinkedList()
print(f"空佇列: {queue}")

for i in range(1, 4):
    queue.enqueue(i * 10)
    print(f"Enqueue {i * 10}: {queue}")

print(f"Front element: {queue.front()}")
print(f"Dequeue: {queue.dequeue()}")
print(f"After dequeue: {queue}")

4. 使用固定大小的陣列(Circular Queue)

class CircularQueue:
    """環形佇列 - 使用固定大小的陣列"""
    
    def __init__(self, capacity):
        self.capacity = capacity
        self.items = [None] * capacity
        self.front = -1
        self.rear = -1
        self._size = 0
    
    def enqueue(self, item):
        """入隊 - O(1)"""
        if self.is_full():
            raise OverflowError("Queue is full")
        
        if self.is_empty():
            self.front = self.rear = 0
        else:
            self.rear = (self.rear + 1) % self.capacity
        
        self.items[self.rear] = item
        self._size += 1
    
    def dequeue(self):
        """出隊 - O(1)"""
        if self.is_empty():
            raise IndexError("Queue is empty")
        
        data = self.items[self.front]
        
        if self.front == self.rear:  # 只有一個元素
            self.front = self.rear = -1
        else:
            self.front = (self.front + 1) % self.capacity
        
        self._size -= 1
        return data
    
    def front_element(self):
        """查看隊首元素 - O(1)"""
        if self.is_empty():
            raise IndexError("Queue is empty")
        return self.items[self.front]
    
    def is_empty(self):
        """檢查是否為空 - O(1)"""
        return self.front == -1
    
    def is_full(self):
        """檢查是否已滿 - O(1)"""
        return self._size == self.capacity
    
    def size(self):
        """取得大小 - O(1)"""
        return self._size
    
    def __str__(self):
        if self.is_empty():
            return "CircularQueue([])"
        
        result = []
        i = self.front
        while True:
            result.append(str(self.items[i]))
            if i == self.rear:
                break
            i = (i + 1) % self.capacity
        
        return f"CircularQueue({result})"

# 使用範例
cq = CircularQueue(5)
print(f"容量: {cq.capacity}")

# 測試環形特性
for i in range(1, 6):
    cq.enqueue(i)
    print(f"Enqueue {i}: {cq}")

print(f"\n佇列已滿: {cq.is_full()}")

# 出隊後再入隊,展示環形特性
for _ in range(2):
    print(f"Dequeue: {cq.dequeue()}")

for i in range(6, 8):
    cq.enqueue(i)
    print(f"Enqueue {i}: {cq}")

Python 內建的 Queue 模組

Python 提供了 queue 模組,包含多種佇列實作:

import queue

# 1. 基本的 FIFO Queue
fifo_queue = queue.Queue(maxsize=5)  # maxsize=0 表示無限大

# 入隊
fifo_queue.put(10)
fifo_queue.put(20)
fifo_queue.put(30)

# 出隊
print(fifo_queue.get())  # 10
print(fifo_queue.get())  # 20

# 檢查狀態
print(f"Queue size: {fifo_queue.qsize()}")
print(f"Is empty: {fifo_queue.empty()}")
print(f"Is full: {fifo_queue.full()}")

# 2. LIFO Queue(其實就是 Stack)
lifo_queue = queue.LifoQueue()
lifo_queue.put(10)
lifo_queue.put(20)
print(lifo_queue.get())  # 20 (後進先出)

# 3. Priority Queue(優先權佇列)
pq = queue.PriorityQueue()
pq.put((2, "中優先"))
pq.put((1, "高優先"))
pq.put((3, "低優先"))

while not pq.empty():
    priority, item = pq.get()
    print(f"Priority {priority}: {item}")
# 輸出順序:高優先 → 中優先 → 低優先

實際應用範例

1. 任務排程系統

class TaskScheduler:
    """簡單的任務排程器"""
    
    def __init__(self):
        self.task_queue = QueueWithDeque()
        self.completed_tasks = []
    
    def add_task(self, task_name):
        """新增任務"""
        self.task_queue.enqueue({
            'name': task_name,
            'added_time': __import__('time').time()
        })
        print(f"Task '{task_name}' added to queue")
    
    def process_next_task(self):
        """處理下一個任務"""
        if self.task_queue.is_empty():
            print("No tasks to process")
            return
        
        task = self.task_queue.dequeue()
        print(f"Processing task: {task['name']}")
        
        # 模擬處理時間
        __import__('time').sleep(1)
        
        task['completed_time'] = __import__('time').time()
        self.completed_tasks.append(task)
        print(f"Task '{task['name']}' completed")
    
    def process_all_tasks(self):
        """處理所有任務"""
        while not self.task_queue.is_empty():
            self.process_next_task()
    
    def show_status(self):
        """顯示狀態"""
        print(f"\nPending tasks: {self.task_queue.size()}")
        print(f"Completed tasks: {len(self.completed_tasks)}")

# 使用範例
scheduler = TaskScheduler()
scheduler.add_task("發送郵件")
scheduler.add_task("生成報告")
scheduler.add_task("備份資料")

scheduler.show_status()
scheduler.process_all_tasks()
scheduler.show_status()

2. 廣度優先搜尋(BFS)

from collections import deque

def bfs_shortest_path(graph, start, target):
    """使用 BFS 找最短路徑"""
    queue = deque([(start, [start])])
    visited = {start}
    
    while queue:
        node, path = queue.popleft()
        
        if node == target:
            return path
        
        for neighbor in graph[node]:
            if neighbor not in visited:
                visited.add(neighbor)
                queue.append((neighbor, path + [neighbor]))
    
    return None

# 使用範例
graph = {
    'A': ['B', 'C'],
    'B': ['A', 'D', 'E'],
    'C': ['A', 'F'],
    'D': ['B'],
    'E': ['B', 'F'],
    'F': ['C', 'E']
}

path = bfs_shortest_path(graph, 'A', 'F')
print(f"Shortest path from A to F: {' -> '.join(path)}")
# 輸出: A -> C -> F

3. 生產者-消費者模式

import threading
import queue
import time
import random

class ProducerConsumer:
    """生產者-消費者模式範例"""
    
    def __init__(self, buffer_size=5):
        self.buffer = queue.Queue(maxsize=buffer_size)
        self.running = True
    
    def producer(self, producer_id):
        """生產者執行緒"""
        while self.running:
            item = f"Item_{producer_id}_{random.randint(1, 100)}"
            self.buffer.put(item)
            print(f"Producer {producer_id} produced: {item}")
            time.sleep(random.uniform(0.5, 1.5))
    
    def consumer(self, consumer_id):
        """消費者執行緒"""
        while self.running or not self.buffer.empty():
            try:
                item = self.buffer.get(timeout=1)
                print(f"Consumer {consumer_id} consumed: {item}")
                time.sleep(random.uniform(0.5, 1.0))
            except queue.Empty:
                continue
    
    def run(self, duration=5):
        """執行模擬"""
        # 建立執行緒
        producers = [
            threading.Thread(target=self.producer, args=(i,))
            for i in range(2)
        ]
        consumers = [
            threading.Thread(target=self.consumer, args=(i,))
            for i in range(3)
        ]
        
        # 啟動執行緒
        for p in producers:
            p.start()
        for c in consumers:
            c.start()
        
        # 執行一段時間
        time.sleep(duration)
        self.running = False
        
        # 等待所有執行緒結束
        for p in producers:
            p.join()
        for c in consumers:
            c.join()
        
        print("\nSimulation completed!")

# 使用範例(簡化版,實際執行需要處理執行緒)
# pc = ProducerConsumer()
# pc.run(duration=5)

4. 網頁爬蟲的 URL 佇列

class WebCrawlerQueue:
    """網頁爬蟲的 URL 管理佇列"""
    
    def __init__(self):
        self.url_queue = QueueWithDeque()
        self.visited_urls = set()
        self.max_depth = 3
    
    def add_url(self, url, depth=0):
        """新增 URL 到佇列"""
        if url not in self.visited_urls and depth <= self.max_depth:
            self.url_queue.enqueue({'url': url, 'depth': depth})
    
    def crawl_next(self):
        """爬取下一個 URL"""
        if self.url_queue.is_empty():
            return None
        
        url_info = self.url_queue.dequeue()
        url = url_info['url']
        depth = url_info['depth']
        
        if url in self.visited_urls:
            return self.crawl_next()
        
        self.visited_urls.add(url)
        print(f"Crawling: {url} (depth: {depth})")
        
        # 模擬找到新的連結
        if depth < self.max_depth:
            for i in range(random.randint(0, 3)):
                new_url = f"{url}/page{i}"
                self.add_url(new_url, depth + 1)
        
        return url
    
    def crawl_all(self):
        """爬取所有 URL"""
        while not self.url_queue.is_empty():
            self.crawl_next()
        
        print(f"\nTotal URLs crawled: {len(self.visited_urls)}")

# 使用範例
crawler = WebCrawlerQueue()
crawler.add_url("https://example.com")
crawler.crawl_all()

效能分析

時間複雜度比較

操作List 實作Deque 實作Linked ListCircular Array
enqueue()O(1)O(1)O(1)O(1)
dequeue()O(n)O(1)O(1)O(1)
front()O(1)O(1)O(1)O(1)
is_empty()O(1)O(1)O(1)O(1)
size()O(1)O(1)O(1)O(1)

空間複雜度

  • List/Deque:O(n),可能有額外的空間浪費
  • Linked List:O(n),每個節點需要額外的指標空間
  • Circular Array:O(capacity),固定大小

選擇適合的 Queue 實作

def choose_queue_implementation():
    """如何選擇適合的 Queue 實作"""
    
    print("Queue 實作選擇指南:")
    print("\n1. collections.deque(大多數情況的最佳選擇)")
    print("   - 優點:效能優秀、內建支援、功能完整")
    print("   - 適用:一般用途、不需要執行緒安全")
    
    print("\n2. queue.Queue(多執行緒環境)")
    print("   - 優點:執行緒安全、支援阻塞操作")
    print("   - 適用:生產者-消費者模式、多執行緒應用")
    
    print("\n3. 自定義 Linked List Queue")
    print("   - 優點:教學用途、可自定義行為")
    print("   - 適用:學習資料結構、特殊需求")
    
    print("\n4. Circular Queue")
    print("   - 優點:固定記憶體使用、適合嵌入式系統")
    print("   - 適用:緩衝區、資源受限環境")

choose_queue_implementation()

常見面試問題

1. 使用兩個 Stack 實作 Queue

class QueueUsingStacks:
    """使用兩個 Stack 實作 Queue"""
    
    def __init__(self):
        self.in_stack = []   # 用於入隊
        self.out_stack = []  # 用於出隊
    
    def enqueue(self, item):
        """入隊 - O(1)"""
        self.in_stack.append(item)
    
    def dequeue(self):
        """出隊 - 平均 O(1),最壞 O(n)"""
        if not self.out_stack:
            # 將 in_stack 的元素轉移到 out_stack
            while self.in_stack:
                self.out_stack.append(self.in_stack.pop())
        
        if not self.out_stack:
            raise IndexError("Queue is empty")
        
        return self.out_stack.pop()
    
    def is_empty(self):
        return len(self.in_stack) == 0 and len(self.out_stack) == 0
    
    def __str__(self):
        # 顯示實際的佇列順序
        temp = self.out_stack[::-1] + self.in_stack
        return f"Queue({temp})"

# 測試
q = QueueUsingStacks()
for i in [1, 2, 3]:
    q.enqueue(i)
print(q)  # Queue([1, 2, 3])

print(q.dequeue())  # 1
q.enqueue(4)
print(q)  # Queue([2, 3, 4])

2. 實作具有 getMin() 的 Queue

class MinQueue:
    """支援 O(1) 取得最小值的 Queue"""
    
    def __init__(self):
        self.queue = deque()
        self.min_deque = deque()  # 儲存最小值
    
    def enqueue(self, item):
        """入隊 - O(1)"""
        self.queue.append(item)
        
        # 維護最小值佇列
        while self.min_deque and self.min_deque[-1] > item:
            self.min_deque.pop()
        self.min_deque.append(item)
    
    def dequeue(self):
        """出隊 - O(1)"""
        if not self.queue:
            raise IndexError("Queue is empty")
        
        item = self.queue.popleft()
        
        # 如果移除的是最小值,也要從 min_deque 移除
        if self.min_deque and self.min_deque[0] == item:
            self.min_deque.popleft()
        
        return item
    
    def get_min(self):
        """取得最小值 - O(1)"""
        if not self.min_deque:
            raise ValueError("Queue is empty")
        return self.min_deque[0]
    
    def __str__(self):
        return f"Queue({list(self.queue)}), Min: {self.get_min() if self.queue else 'N/A'}"

# 測試
mq = MinQueue()
for val in [3, 1, 4, 1, 5]:
    mq.enqueue(val)
    print(f"Enqueue {val}: {mq}")

while len(mq.queue) > 0:
    val = mq.dequeue()
    print(f"Dequeue {val}: Queue({list(mq.queue)}), Min: {mq.get_min() if mq.queue else 'N/A'}")

總結

Queue 是一個基礎但極其重要的資料結構:

核心概念

  1. FIFO 原則:先進先出
  2. 基本操作:enqueue、dequeue、front、is_empty
  3. 多種實作:list、deque、linked list、circular array

選擇建議

  • 一般用途:使用 collections.deque
  • 多執行緒:使用 queue.Queue
  • 學習目的:實作自定義 Queue
  • 特殊需求:根據具體情況選擇

應用場景

  • 任務排程
  • BFS 演算法
  • 生產者-消費者模式
  • 訊息佇列系統
  • 印表機排程

理解 Queue 的原理和應用,是掌握更複雜系統設計的重要基礎!

0%