理論#
MapReduce 主要用來解決大規模數據處理的問題,它能夠將一個計算任務分解為許多小任務,這些小任務可以在多台機器上並行執行,從而大大提高處理速度和效率。
它的核心思想參考 Google 論文中的圖片
Master 程序是主要的程序,它會將文件(N 個)分配給 worker,所有 worker 是對等的。worker 將會首先執行 Map 命令,接受一些文件,然後輸出為鍵值對並寫入 R 個(用戶可以自定義 R 的值)中間文件(圖中的 Intermediate files)。當所有 Map 任務完成後,Master 會給 worker 分配 Reduce 任務,對中間文件進行處理,然後生成最終的結果文件。
其中,Map 函數和 Reduce 函數可以以插件的形式提供給系統。
例如,在 Words Count 場景下(統計若干文件裡每個相同單詞的數量):
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
這個例子中,map 函數將生成一系列 key 為 word,value 為 "1" 的中間文件,然後 reduce 將同一個 key 聚合,統計 value 的數量。
這樣我們就實現了數據集的分佈式處理。
代碼#
系統的啟動流程如下:
- 運行mrcoordinator
- 運行若干個 mrworker
- mrcoordinator 調用 coordinator,並定期檢查 coordinator 完成情況
- mrworker 加載 map 和 reduce 函數,並拉起 worker 進程
其中 coordinator 和 worker 之間使用 rpc 通信,我們只需要實現 coordinator 和 worker 的邏輯。
Coordinator#
幾個要點:
- 考慮維護 task 的狀態,而不需要維護 worker 的狀態,因為 worker 實際上是對等的。
- Coordinator 是多線程的,需要加鎖。由於不需要各線程通信,這裡選用 Mutex
- 兩個 rpc 關鍵函數:FetchTask 和 FinishTask,一個用來給 worker 查找任務,一個用來標記任務結束
關鍵結構體如下:
type Coordinator struct {
// Your definitions here.
lock sync.Mutex
files []string
mapTaskStatuses []int
reduceTaskStatuses []int
nMap int
nReduce int
mapDoneNum int // map任務完成數量
reduceDoneNum int
mapFinished bool
reduceFinished bool
}
兩個 rpc 函數主要邏輯:
func (c *Coordinator) FetchTask(args *FetchTaskArgs, reply *FetchTaskReply) error {
c.lock.Lock()
defer c.lock.Unlock()
if !c.mapFinished {
mapNum := xxx // 循環mapTaskStatuses找到未開始的任務
if mapTaskStatuses全都分配了 {
reply.TaskType = NOTASK
} else {
// 分配任務
// 10s不完成任務就錯誤恢復
}
} else if !c.reduceFinished {
// 和map類似
}
} else {
reply.TaskType = NOTASK
}
return nil
}
func (c *Coordinator) FinishTask(args *FinishTaskArgs, reply *FinishTaskReply) error {
c.lock.Lock()
defer c.lock.Unlock()
switch args.TaskType {
case MAPTASK:
{
// 由於錯誤恢復機制,這裡可能提交同樣的任務。僅接受第一個提交的任務即可
if c.mapTaskStatuses[args.TaskNum] != DONE {
c.mapDoneNum += 1
c.mapTaskStatuses[args.TaskNum] = DONE
if c.mapDoneNum == c.nMap {
c.mapFinished = true
}
}
}
case REDUCETASK:
// 和上面類似
}
return nil
}
Worker#
worker 主要是業務邏輯。大體框架如下:
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
reply := &FetchTaskReply{}
ok := call("Coordinator.FetchTask", &FetchTaskArgs{}, &reply)
if ok {
switch reply.TaskType {
case MAPTASK:
{
// 根據reply指定的filename調用map處理,並輸出中間文件
// rpc FinishTask
}
case REDUCETASK:
{
// 根據reply的ReduceNum讀取中間文件(多個),輸出為一個結果文件
mp := make(map[string][]string)
// 聚合
// reduce
// rpc FinishTask
}
case NOTASK:
{
break
}
}
} else {
break
}
}
}
一些細節需要注意:
- 中間文件名為mr-X-Y,X 是 MapNum(即 map 任務編號),Y 是 map 函數生成的 key 到 [0, NReduce) 整數集的單射,具體而言是通過如下哈希函數計算的:
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}
- 每一個 reduce 函數需要處理一個 Y 對應的所有 X
- 由於 reduce 的輸入是一個 key 和它對應的所有 values,所以需要對中間文件進行聚合。這裡使用的是 HashMap。在生產級別的 MapReduce 實現中,該部分需要內存 + 外存來保證不會 OOM。
- 為了保證創建並寫入文件的原子性(要麼完全寫入,要麼完全不寫入),可以先創建臨時文件,再寫入,再重命名:
func writeStringToFileAtomic(filename string, content string) {
f, _ := os.CreateTemp(".", filename+"*")
f.WriteString(content)
os.Rename(f.Name(), filename)
f.Close()
}
- reduce 最後生成的文件內容需要不斷拼接結果字符串,應該使用 StringBuilder 來獲得更好的性能
var result strings.Builder
for key, values := range mp {
reduceResult := reducef(key, values)
result.WriteString(fmt.Sprintf("%v %v\n", key, reduceResult))
}
// 獲得字符串:result.String()
拓展#
重讀 Google 的論文可以發現 Google 用於生產環境的 MapReduce 相比於我們上面實現的 lab 還有幾個改進:
- 當 worker 運行在多個機器上時,需要使用一些分佈式文件系統方案來管理文件,例如 GFS
- reduce 生成的文件可能會被用於下一個 MapReduce 任務,從而形成某種鏈子
- 在實際實現中,還是需要維護 Worker 的狀態,這樣可以便於獲取 Worker 機器的信息、錯誤恢復等
- 實際使用中,Google 發現當一些機器在即將處理完 Map 或 Reduce 任務時,性能會出現明顯的下降。解決方法是 Master 在組任務即將完成時會分配剩下的任務作為 Backup Tasks,分配到其他 worker 上,以最快完成的任務為最終結果。
Google 也提出了一些改進思路
- Partitioning Function:上面把 key 映射的 hash 函數是可以自定義的,叫做 Partitioning Function。例如 key 是 URL 的時候可以把同一個 Host 分到同一個 Reduce 任務裡
- Ordering Guarantees:在同一個 partition 裡可以先做 key 的排序處理,這樣可以使得結果更為友好
- Combiner Function:注意到上面 Words Count 例子中,大量的類似 (word,"1") 會在網絡上傳輸,而根據 key 聚合 values 發生在 reduce 函數中。如果把聚合操作寫在 map 函數裡則可以避免這些重複的數據傳輸。combiner function 和 reduce function 的唯一区别是前者輸出中間文件,後者輸出最終文件。
- Input and Output Types:Google 提供了 reader 接口,使得用戶在實現 Map 函數的時候可以讀取更多的 Input Types,例如數據庫裡的數據或內存裡的數據。Output 同理。
- master 可以維護一個 HTTP panel 顯示各個 worker、task 等的狀態
- Counter:可以在 map 函數調用自定義的 counter,持續返回給 master。者可以用於收集一些數據的信息,例如 Words Count 例子中
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
最後,2024 年的今天 MapReduce 對於 Google 來說早已經成為了歷史。但是它催生出了 Hadoop 等開源大數據處理框架。