03-4. Shared Memory(共享記憶體)

⏱️ 閱讀時間: 12 分鐘 🎯 難度: ⭐⭐⭐ (中等)


🎯 本篇重點

深入理解 Shared Memory 這種最快的 IPC 方式,學習如何在多個 Process 間安全地共享資料。


🤔 什麼是 Shared Memory?

Shared Memory(共享記憶體) = 多個 Process 共用同一塊記憶體區域

一句話解釋: 讓多個獨立的 Process 直接存取同一塊記憶體,避免資料複製,是最快的 IPC 方式。


🏢 用辦公室來比喻

普通 IPC(Pipe、Queue)

Process A 辦公室          Process B 辦公室
├─ 文件 A               ├─ 文件 B(複製的)
│                       │
└──→ 傳真機 ──→ 傳真機 ──┘

每次溝通都要複製文件(慢、耗資源)

Shared Memory

Process A 辦公室          Process B 辦公室
├─ 員工                 ├─ 員工
│                       │
└──→ 共用雲端硬碟 ←──────┘
     (同一份檔案)

兩邊直接存取同一份檔案(快、高效)
但需要鎖定機制避免衝突

💻 Shared Memory 原理

記憶體佈局對比

普通 Process(獨立記憶體)

Process A (PID 1001)          Process B (PID 1002)
├─ Code Segment              ├─ Code Segment
├─ Data: counter = 0         ├─ Data: counter = 0(獨立副本)
├─ Heap                      ├─ Heap
└─ Stack                     └─ Stack

無法直接共享資料

Shared Memory

Process A (PID 1001)          Process B (PID 1002)
├─ Code Segment              ├─ Code Segment
├─ Data                      ├─ Data
├─ Heap                      ├─ Heap
│  └─ 指向共享記憶體          │  └─ 指向共享記憶體
└─ Stack                     └─ Stack
     ↓                            ↓
     └──→ Shared Memory ←─────────┘
          counter = 100

兩個 Process 存取同一塊記憶體

1️⃣ Python Shared Memory 基礎

Value:共享單一值

from multiprocessing import Process, Value
import time

def increment(shared_value, name):
    """增加共享值"""
    for _ in range(5):
        shared_value.value += 1
        print(f"{name}: {shared_value.value}")
        time.sleep(0.1)

if __name__ == '__main__':
    # 創建共享整數(初始值 0)
    counter = Value('i', 0)  # 'i' = int

    # 兩個 Process 同時修改
    p1 = Process(target=increment, args=(counter, 'P1'))
    p2 = Process(target=increment, args=(counter, 'P2'))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print(f"最終值: {counter.value}")
    # 可能輸出:最終值: 8(Race Condition!)

Value 支援的類型

from multiprocessing import Value

# 整數類型
i_value = Value('i', 0)      # int
l_value = Value('l', 0)      # long

# 浮點數
f_value = Value('f', 0.0)    # float
d_value = Value('d', 0.0)    # double

# 字元
c_value = Value('c', b'A')   # char

# 布林
b_value = Value('b', True)   # signed char (當作 bool)

# 使用範例
d_value.value = 3.14
print(d_value.value)  # 3.14

Array:共享陣列

from multiprocessing import Process, Array

def modify_array(shared_array, process_id):
    """修改共享陣列"""
    for i in range(len(shared_array)):
        shared_array[i] += process_id
        print(f"P{process_id}: {list(shared_array)}")

if __name__ == '__main__':
    # 創建共享陣列
    arr = Array('i', [0, 0, 0, 0, 0])  # 5 個整數

    p1 = Process(target=modify_array, args=(arr, 1))
    p2 = Process(target=modify_array, args=(arr, 2))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print(f"最終陣列: {list(arr)}")

2️⃣ 同步機制:避免 Race Condition

問題:Race Condition

from multiprocessing import Process, Value
import time

def unsafe_increment(counter):
    """不安全的增加(會有 Race Condition)"""
    for _ in range(100000):
        # 1. 讀取值
        temp = counter.value
        # 2. 增加
        temp += 1
        # 3. 寫回(可能被其他 Process 覆蓋)
        counter.value = temp

if __name__ == '__main__':
    counter = Value('i', 0)

    p1 = Process(target=unsafe_increment, args=(counter,))
    p2 = Process(target=unsafe_increment, args=(counter,))

    p1.start(); p2.start()
    p1.join(); p2.join()

    print(f"計數器: {counter.value}")
    # 預期:200000
    # 實際:可能 150000(錯誤!)

解決:使用 Lock

from multiprocessing import Process, Value, Lock

def safe_increment(counter, lock):
    """安全的增加(使用 Lock)"""
    for _ in range(100000):
        lock.acquire()
        try:
            counter.value += 1
        finally:
            lock.release()

# 或使用 with(推薦)
def safe_increment_with(counter, lock):
    for _ in range(100000):
        with lock:
            counter.value += 1

if __name__ == '__main__':
    counter = Value('i', 0)
    lock = Lock()

    p1 = Process(target=safe_increment_with, args=(counter, lock))
    p2 = Process(target=safe_increment_with, args=(counter, lock))

    p1.start(); p2.start()
    p1.join(); p2.join()

    print(f"計數器: {counter.value}")
    # 輸出:200000(正確!)

Value 內建鎖

from multiprocessing import Process, Value

def increment_builtin_lock(counter):
    """Value 內建的 Lock"""
    for _ in range(100000):
        with counter.get_lock():  # Value 內建 Lock
            counter.value += 1

if __name__ == '__main__':
    counter = Value('i', 0)

    p1 = Process(target=increment_builtin_lock, args=(counter,))
    p2 = Process(target=increment_builtin_lock, args=(counter,))

    p1.start(); p2.start()
    p1.join(); p2.join()

    print(f"計數器: {counter.value}")  # 200000

3️⃣ Manager:共享複雜資料結構

Manager 支援的類型

from multiprocessing import Manager

if __name__ == '__main__':
    with Manager() as manager:
        # 列表
        shared_list = manager.list([1, 2, 3])

        # 字典
        shared_dict = manager.dict({'key': 'value'})

        # 佇列
        shared_queue = manager.Queue()

        # Namespace(最靈活)
        shared_ns = manager.Namespace()
        shared_ns.counter = 0
        shared_ns.name = "Test"

        print(f"List: {shared_list}")
        print(f"Dict: {shared_dict}")
        print(f"Namespace: {shared_ns.counter}, {shared_ns.name}")

Manager.list 範例

from multiprocessing import Process, Manager

def add_items(shared_list, start, end):
    """添加項目到共享列表"""
    for i in range(start, end):
        shared_list.append(i)
        print(f"添加: {i}")

if __name__ == '__main__':
    with Manager() as manager:
        shared_list = manager.list()

        # 兩個 Process 同時添加
        p1 = Process(target=add_items, args=(shared_list, 0, 5))
        p2 = Process(target=add_items, args=(shared_list, 5, 10))

        p1.start(); p2.start()
        p1.join(); p2.join()

        print(f"共享列表: {list(shared_list)}")
        # 輸出:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9](順序可能不同)

Manager.dict 範例

from multiprocessing import Process, Manager
import time

def worker(shared_dict, worker_id):
    """Worker 更新共享字典"""
    shared_dict[worker_id] = f"Worker {worker_id} 完成"
    shared_dict['counter'] = shared_dict.get('counter', 0) + 1
    time.sleep(1)

if __name__ == '__main__':
    with Manager() as manager:
        shared_dict = manager.dict()
        shared_dict['counter'] = 0

        processes = [Process(target=worker, args=(shared_dict, i)) for i in range(5)]

        for p in processes:
            p.start()
        for p in processes:
            p.join()

        print("共享字典內容:")
        for key, value in shared_dict.items():
            print(f"  {key}: {value}")

Manager.Namespace 範例

from multiprocessing import Process, Manager

def update_status(namespace, worker_id):
    """更新狀態"""
    namespace.active_workers += 1
    namespace.status = f"Worker {worker_id} 正在工作"

    # 執行任務
    import time
    time.sleep(2)

    namespace.completed_tasks += 1
    namespace.active_workers -= 1

if __name__ == '__main__':
    with Manager() as manager:
        # 創建 Namespace
        ns = manager.Namespace()
        ns.active_workers = 0
        ns.completed_tasks = 0
        ns.status = "等待中"

        processes = [Process(target=update_status, args=(ns, i)) for i in range(3)]

        for p in processes:
            p.start()

        # 監控狀態
        import time
        for _ in range(5):
            print(f"活動 Worker: {ns.active_workers}, 完成: {ns.completed_tasks}, 狀態: {ns.status}")
            time.sleep(0.5)

        for p in processes:
            p.join()

4️⃣ Python 3.8+ shared_memory 模組

新式 Shared Memory API

from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
import numpy as np

def worker(shm_name, shape):
    """Worker Process 存取共享記憶體"""
    # 連接到現有共享記憶體
    existing_shm = SharedMemory(name=shm_name)

    # 創建 NumPy 陣列視圖
    arr = np.ndarray(shape, dtype=np.int64, buffer=existing_shm.buf)

    # 修改資料
    arr[0] = 100
    arr[1] = 200

    print(f"Worker 修改後: {arr}")

    # 關閉(不刪除)
    existing_shm.close()

if __name__ == '__main__':
    # 創建共享記憶體
    shm = SharedMemory(create=True, size=1000)

    # 創建 NumPy 陣列
    arr = np.ndarray((10,), dtype=np.int64, buffer=shm.buf)
    arr[:] = 0  # 初始化

    print(f"初始陣列: {arr}")

    # 啟動 Worker
    p = Process(target=worker, args=(shm.name, arr.shape))
    p.start()
    p.join()

    print(f"主 Process 看到: {arr}")

    # 清理
    shm.close()
    shm.unlink()  # 刪除共享記憶體

5️⃣ 實戰案例

案例 1:多 Process 計數器

from multiprocessing import Process, Value, Lock
import time

class SharedCounter:
    def __init__(self):
        self.count = Value('i', 0)
        self.lock = Lock()

    def increment(self):
        with self.lock:
            self.count.value += 1

    def get_value(self):
        return self.count.value

def worker(counter, worker_id, num_tasks):
    """Worker 執行任務並更新計數器"""
    for i in range(num_tasks):
        # 模擬工作
        time.sleep(0.01)
        counter.increment()

        if (i + 1) % 10 == 0:
            print(f"Worker {worker_id}: 完成 {i+1} 個任務,總計 {counter.get_value()}")

if __name__ == '__main__':
    counter = SharedCounter()
    num_workers = 4
    tasks_per_worker = 25

    processes = [
        Process(target=worker, args=(counter, i, tasks_per_worker))
        for i in range(num_workers)
    ]

    start = time.time()
    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(f"\n總完成任務: {counter.get_value()}")
    print(f"耗時: {time.time() - start:.2f}s")

案例 2:共享狀態監控

from multiprocessing import Process, Manager
import time
import random

def data_processor(shared_state, worker_id):
    """資料處理 Worker"""
    for i in range(10):
        # 更新狀態
        with shared_state['lock']:
            shared_state['active_workers'].add(worker_id)
            shared_state['total_processed'] += 1

        # 模擬處理
        time.sleep(random.uniform(0.1, 0.5))

        # 更新進度
        progress = (i + 1) * 10
        shared_state['worker_progress'][worker_id] = progress

    # 完成
    with shared_state['lock']:
        shared_state['active_workers'].remove(worker_id)
        shared_state['completed_workers'].add(worker_id)

def monitor(shared_state, num_workers):
    """監控 Worker 狀態"""
    while len(shared_state['completed_workers']) < num_workers:
        print("\n=== 狀態監控 ===")
        print(f"活動 Worker: {len(shared_state['active_workers'])}")
        print(f"已完成 Worker: {len(shared_state['completed_workers'])}")
        print(f"總處理數: {shared_state['total_processed']}")

        print("Worker 進度:")
        for worker_id, progress in shared_state['worker_progress'].items():
            print(f"  Worker {worker_id}: {progress}%")

        time.sleep(1)

    print("\n所有 Worker 完成!")

if __name__ == '__main__':
    with Manager() as manager:
        # 共享狀態
        shared_state = manager.dict()
        shared_state['active_workers'] = manager.set()
        shared_state['completed_workers'] = manager.set()
        shared_state['worker_progress'] = manager.dict()
        shared_state['total_processed'] = 0
        shared_state['lock'] = manager.Lock()

        num_workers = 5

        # 啟動 Worker
        workers = [
            Process(target=data_processor, args=(shared_state, i))
            for i in range(num_workers)
        ]

        # 啟動監控
        monitor_process = Process(target=monitor, args=(shared_state, num_workers))

        for w in workers:
            w.start()
        monitor_process.start()

        for w in workers:
            w.join()
        monitor_process.join()

案例 3:大型陣列共享(NumPy)

from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
import numpy as np
import time

def parallel_compute(shm_name, shape, start_idx, end_idx):
    """並行計算陣列的一部分"""
    # 連接共享記憶體
    shm = SharedMemory(name=shm_name)
    arr = np.ndarray(shape, dtype=np.float64, buffer=shm.buf)

    # 計算指定範圍
    for i in range(start_idx, end_idx):
        arr[i] = np.sin(i) ** 2 + np.cos(i) ** 2

    shm.close()

if __name__ == '__main__':
    # 創建大型陣列(100 萬個元素)
    size = 1000000
    shm = SharedMemory(create=True, size=size * 8)  # 8 bytes per float64

    arr = np.ndarray((size,), dtype=np.float64, buffer=shm.buf)
    arr[:] = 0  # 初始化

    # 分配給 4 個 Process
    num_workers = 4
    chunk_size = size // num_workers

    start = time.time()

    processes = []
    for i in range(num_workers):
        start_idx = i * chunk_size
        end_idx = (i + 1) * chunk_size if i < num_workers - 1 else size

        p = Process(
            target=parallel_compute,
            args=(shm.name, arr.shape, start_idx, end_idx)
        )
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

    print(f"並行計算耗時: {time.time() - start:.2f}s")
    print(f"前 10 個結果: {arr[:10]}")
    print(f"檢查總和: {arr.sum():.2f}")

    # 清理
    shm.close()
    shm.unlink()

6️⃣ Value/Array vs Manager 對比

效能對比

from multiprocessing import Process, Value, Manager
import time

def test_value(shared_value):
    """測試 Value 效能"""
    for _ in range(100000):
        with shared_value.get_lock():
            shared_value.value += 1

def test_manager(shared_dict):
    """測試 Manager 效能"""
    for _ in range(100000):
        shared_dict['counter'] = shared_dict.get('counter', 0) + 1

if __name__ == '__main__':
    # Value 測試
    value = Value('i', 0)
    start = time.time()
    p = Process(target=test_value, args=(value,))
    p.start()
    p.join()
    print(f"Value 耗時: {time.time() - start:.2f}s")

    # Manager 測試
    with Manager() as manager:
        shared_dict = manager.dict()
        shared_dict['counter'] = 0

        start = time.time()
        p = Process(target=test_manager, args=(shared_dict,))
        p.start()
        p.join()
        print(f"Manager 耗時: {time.time() - start:.2f}s")

結果:

Value 耗時: 0.5s     ← 快
Manager 耗時: 2.5s   ← 慢(5 倍)

選擇建議

特性Value/ArrayManager
效能⚡ 快🐢 慢(有代理成本)
類型支援基本類型複雜類型
使用難度簡單簡單
適用場景簡單共享、高頻存取複雜結構、靈活性

建議:

  • 簡單類型、高效能 → Value/Array
  • 複雜結構、彈性 → Manager
  • 大型陣列、NumPy → shared_memory

✅ 重點回顧

Shared Memory 優勢:

  • ✅ 最快的 IPC 方式(無資料複製)
  • ✅ 適合大量資料交換
  • ✅ 低延遲

三種實現方式:

  1. Value/Array - 基本類型,高效能
  2. Manager - 複雜類型,靈活
  3. shared_memory - 大型資料,NumPy

同步機制:

  • ✅ 必須使用 Lock 避免 Race Condition
  • ✅ Value/Array 內建 Lock
  • ✅ Manager 自動處理部分同步

關鍵注意:

  • ⚠️ 需要手動同步(Lock)
  • ⚠️ Manager 有代理成本(較慢)
  • ⚠️ 需要手動清理(shared_memory.unlink)

適用場景:

  • ✅ 大型資料集共享(影像、陣列)
  • ✅ 高頻率資料交換
  • ✅ 需要最佳效能的情況
  • ❌ 簡單通訊(用 Queue 更簡單)

上一篇: 03-3. Message Queue(消息隊列) 下一篇: 03-5. Socket 通訊


最後更新:2025-01-06

0%