提醒:本文已翻新

本文已於 2026-06-23 翻新,原文可見 🔗 LeetCode 🔴 295. Find Median from Data Stream (Old)

🔗 🔴 295. Find Median from Data Stream

Problem Statement

題目簡述

中位數(Median) 是有序整數列表的中間值。若元素數量為偶數,則中位數是兩個中間值的平均。

  • 例如 [2,3,4][2, 3, 4] 的中位數是 33
  • 例如 [2,3][2, 3] 的中位數是 2+32=2.5\frac{2+3}{2} = 2.5

請實作 MedianFinder 類別:

  • MedianFinder() 初始化 MedianFinder 物件。
  • void addNum(int num) 從資料流中加入整數 num 到資料結構中。
  • double findMedian() 返回目前所有元素的中位數。與實際答案誤差在 10510^{-5} 之內將被接受。

Constraints

約束條件

  • 105num105-10^5 \leq \text{num} \leq 10^5
  • 在呼叫 findMedian 前,資料結構中至少會有一個元素。
  • 最多會呼叫 5×1045 \times 10^4addNumfindMedian

思路:對頂堆積

首先考慮最直觀的做法:每次 addNum 後將所有元素排序求中位數,或是在 findMedian 時才排序。前者的查詢可以做到 O(1)\mathcal{O}(1),但每次插入需要 O(nlogn)\mathcal{O}(n \log n);後者雖然插入是 O(1)\mathcal{O}(1),但查詢又需要 O(nlogn)\mathcal{O}(n \log n)。由於無法預先知道操作的分布,最壞情況下總時間複雜度會達到 O(n2logn)\mathcal{O}(n^2 \log n),在資料規模 5×1045 \times 10^4 下無法接受。

因此目標很明確:需要找到插入和查詢都在 O(logn)\mathcal{O}(\log n) 或更優的解法。

1. 從中位數的定義出發

中位數是有序列表的中間值——左邊有 n2\frac{n}{2} 個元素,右邊也有 n2\frac{n}{2} 個元素。這啟發我們用兩個容器分別維護較小的一半與較大的一半,並在插入新元素時維持兩者的大小關係。

雖然可以用有序容器(如 C++ multiset 或 Python SortedList)直接做到,但更經典的做法是使用 對頂堆積(Two Heaps)

  • 最大堆:保存 ≤ 中位數的一半數字,堆頂是這半邊的最大值。
  • 最小堆:保存 > 中位數的一半數字,堆頂是這半邊的最小值。
核心套路:對頂堆

Python 3.14 的 heapq 新增了 heappush_maxheappushpop_max,可以直接操作最大堆,不再需要靠取相反數來模擬。下面的程式碼使用的就是這組新 API。
當兩個堆的元素數量相同時,中位數等於兩個堆頂的平均值;當總數為奇數時,可以約定讓最大堆多一個元素,此時中位數就是最大堆的堆頂。

2. 插入操作:維護兩個不變量

每次插入都需要同時維護兩個性質:

  1. 大小平衡:最大堆的元素數量不少於最小堆,且最多多一個。
  2. 順序正確:最大堆的所有元素都不大於最小堆的所有元素。

大小平衡可以由目前元素總數的奇偶性來決定新元素歸屬:

  • 偶數時:當前兩堆大小相等,插入後應讓最大堆多一個元素。
  • 奇數時:當前最大堆多一個,插入後應讓兩堆大小相等。

但新元素不一定真的屬於目標那一邊,需要按實際值分情況處理:

  • 要放到最大堆時:如果新元素比最小堆的堆頂還大,它應該先進入最小堆,再把最小堆的堆頂移到最大堆。
  • 要放到最小堆時:如果新元素比最大堆的堆頂還小,它應該先進入最大堆,再把最大堆的堆頂移到最小堆。

上述兩種情況可以統一用「推入對側後彈出堆頂」來實作,這樣一次插入至多進行兩次 O(logn)\mathcal{O}(\log n) 的堆操作。

關鍵觀察:邊界會自動修正

插入操作本質上是在調整兩個堆的分界線。即使新元素落在錯誤的那一側,用一次「推入對側再彈出堆頂」就能自然修正,不需要多餘的判斷。

3. 查詢中位數

由於上述不變量始終成立,查詢時:

  • 若兩堆大小相等,中位數是兩個堆頂的平均值。
  • 若最大堆多一個,中位數就是最大堆的堆頂。

兩者都只需要讀取堆頂,時間複雜度為 O(1)\mathcal{O}(1)

易錯點:溢出處理

雖然本題測資範圍內不會發生,但如果數字值域很大,兩個整數相加可能溢位。此時可以用 maxHeap.top() + (minHeap.top() - maxHeap.top()) / 2 來避免直接相加。

題型總結

對頂堆適合處理動態資料流中的中位數、滑動窗口中位數、排名追蹤等問題。核心思路是只維護答案附近的邊界資訊,而非維護完整排序。後續遇到「動態第 kk 大/小」或「兩組資料之間的邊界排名」問題時,都可優先考慮用對頂堆建模:一個堆存放較小的 kk 個元素,另一個存放其餘元素,堆頂即為邊界。

複雜度分析

  • 時間複雜度:初始化為 O(1)\mathcal{O}(1);每次 addNum 至多需要一次堆插入與一次堆彈出,時間為 O(logn)\mathcal{O}(\log n);每次 findMedian 只讀取堆頂,時間為 O(1)\mathcal{O}(1)
  • 空間複雜度:O(n)\mathcal{O}(n),需要儲存資料流中的所有元素。

Follow-up

Q1. 如果資料流中的所有整數都在範圍 [0,100][0, 100] 內,如何優化?

固定值域:計數陣列

如果所有數字都落在 [0,100][0, 100],就不需要用堆維護相對大小了。可以使用長度為 101101 的計數陣列,記錄每個數字出現了幾次。

查詢中位數時,設目前共有 nn 個元素,只要找到排序後第 n+12\frac{n+1}{2} 個與第 n+22\frac{n+2}{2} 個元素即可。做法是從 00 掃到 100100,累加出現次數,找到這兩個排名所在的值。

複雜度

插入只需更新一次計數,時間複雜度為 O(1)\mathcal{O}(1)
查詢時最多掃描 101101 個位置,因為值域固定,所以也可視為 O(1)\mathcal{O}(1)
空間複雜度為 O(1)\mathcal{O}(1)

Q2. 如果資料流中 99%99\% 的整數都在範圍 [0,100][0, 100] 內,如何優化?

混合做法:主體值域計數 + 離群值有序維護

仍然可以把 [0,100][0, 100] 作為主體值域,用計數陣列統計這部分的數字。對於小於 00 或大於 100100 的少量離群值,則需要額外用有序結構維護,例如 SortedList、平衡樹,或其他能查第 kk 小元素的資料結構。

查詢中位數時,先計算目標排名。若排名落在 [0,100][0, 100] 的累積計數範圍內,就直接掃描計數陣列;若排名落在小於 00 或大於 100100 的離群值區間,才到額外的有序結構中查找。

易錯點:不能只用普通堆保存離群值

這個 Follow-up 的重點是利用「大多數值落在小範圍」來加速查詢,但離群值仍然需要按大小定位。若只把所有離群值丟進普通堆,通常只能快速取得最小值或最大值,不能直接找到任意排名的元素;因此更合適的是有序集合或支援 order statistic 的結構。

類題

類似題目

Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from heapq import heappush, heappushpop, heappush_max, heappushpop_max
# heappush_max / heappushpop_max 是 Python 3.14 新增的 heapq API,
# 可直接對最大堆操作,無需再手動取相反數。
# https://docs.python.org/zh-tw/3.14/library/heapq.html#heapq.heappush_max


class MedianFinder:
def __init__(self):
# max heap,保存 <= median 的數字
self.L = []
# min heap,保存 > median 的數字
self.R = []

def addNum(self, num: int) -> None:
# 當前數量為偶數,加入到左邊
if len(self.L) == len(self.R):
# 如果新加入的數字比右邊的最小值還要大,則先加入到右邊,再將右邊的最小值加入到左邊
if self.R and num > self.R[0]:
heappush_max(self.L, heappushpop(self.R, num))
else:
heappush_max(self.L, num)
# 當前數量為奇數,加入到右邊
else:
# 如果新加入的數字比左邊的最大值還要小,則先加入到左邊,再將左邊的最大值加入到右邊
if num < self.L[0]:
heappush(self.R, heappushpop_max(self.L, num))
else:
heappush(self.R, num)

def findMedian(self) -> float:
return (self.L[0] + self.R[0]) / 2 if len(self.L) == len(self.R) else self.L[0]