bridge

bridge

MIT 6.5840(2) - 實驗室1 MapReduce

理論#

MapReduce 主要用來解決大規模數據處理的問題,它能夠將一個計算任務分解為許多小任務,這些小任務可以在多台機器上並行執行,從而大大提高處理速度和效率。

它的核心思想參考 Google 論文中的圖片

image

Master 程序是主要的程序,它會將文件(N 個)分配給 worker,所有 worker 是對等的。worker 將會首先執行 Map 命令,接受一些文件,然後輸出為鍵值對並寫入 R 個(用戶可以自定義 R 的值)中間文件(圖中的 Intermediate files)。當所有 Map 任務完成後,Master 會給 worker 分配 Reduce 任務,對中間文件進行處理,然後生成最終的結果文件。

其中,Map 函數和 Reduce 函數可以以插件的形式提供給系統。

例如,在 Words Count 場景下(統計若干文件裡每個相同單詞的數量):

map(String key, String value):
    // key: document name
    // value: document contents
    for each word w in value:
        EmitIntermediate(w, "1");
reduce(String key, Iterator values):
    // key: a word
    // values: a list of counts
    int result = 0;
    for each v in values:
        result += ParseInt(v);
    Emit(AsString(result));

這個例子中,map 函數將生成一系列 key 為 word,value 為 "1" 的中間文件,然後 reduce 將同一個 key 聚合,統計 value 的數量。

這樣我們就實現了數據集的分佈式處理。

代碼#

系統的啟動流程如下:

  • 運行mrcoordinator
  • 運行若干個 mrworker
  • mrcoordinator 調用 coordinator,並定期檢查 coordinator 完成情況
  • mrworker 加載 map 和 reduce 函數,並拉起 worker 進程

其中 coordinator 和 worker 之間使用 rpc 通信,我們只需要實現 coordinator 和 worker 的邏輯。

Coordinator#

幾個要點:

  • 考慮維護 task 的狀態,而不需要維護 worker 的狀態,因為 worker 實際上是對等的。
  • Coordinator 是多線程的,需要加鎖。由於不需要各線程通信,這裡選用 Mutex
  • 兩個 rpc 關鍵函數:FetchTask 和 FinishTask,一個用來給 worker 查找任務,一個用來標記任務結束

關鍵結構體如下:

type Coordinator struct {
	// Your definitions here.
	lock               sync.Mutex
	files              []string
	mapTaskStatuses    []int
	reduceTaskStatuses []int
	nMap               int
	nReduce            int
	mapDoneNum         int // map任務完成數量
	reduceDoneNum      int
	mapFinished        bool
	reduceFinished     bool
}

兩個 rpc 函數主要邏輯:


func (c *Coordinator) FetchTask(args *FetchTaskArgs, reply *FetchTaskReply) error {
	c.lock.Lock()
	defer c.lock.Unlock()
	if !c.mapFinished {
		mapNum := xxx // 循環mapTaskStatuses找到未開始的任務
		if mapTaskStatuses全都分配了 {
			reply.TaskType = NOTASK
		} else {
            // 分配任務
			// 10s不完成任務就錯誤恢復
		}
	} else if !c.reduceFinished {
            // 和map類似
		}
	} else {
		reply.TaskType = NOTASK
	}
	return nil
}
func (c *Coordinator) FinishTask(args *FinishTaskArgs, reply *FinishTaskReply) error {
	c.lock.Lock()
	defer c.lock.Unlock()
	switch args.TaskType {
	case MAPTASK:
		{
			// 由於錯誤恢復機制,這裡可能提交同樣的任務。僅接受第一個提交的任務即可
			if c.mapTaskStatuses[args.TaskNum] != DONE {
				c.mapDoneNum += 1
				c.mapTaskStatuses[args.TaskNum] = DONE
				if c.mapDoneNum == c.nMap {
					c.mapFinished = true
				}
			}
		}
	case REDUCETASK:
        // 和上面類似
	}
	return nil
}

Worker#

worker 主要是業務邏輯。大體框架如下:

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	for {
		reply := &FetchTaskReply{}
		ok := call("Coordinator.FetchTask", &FetchTaskArgs{}, &reply)
		if ok {
			switch reply.TaskType {
			case MAPTASK:
				{
                    // 根據reply指定的filename調用map處理,並輸出中間文件
                    // rpc FinishTask
				}
			case REDUCETASK:
				{
                    // 根據reply的ReduceNum讀取中間文件(多個),輸出為一個結果文件
                    mp := make(map[string][]string)
                    // 聚合
                    // reduce
                    // rpc FinishTask
				}
			case NOTASK:
				{
					break
				}
			}
		} else {
			break
		}

	}
}

一些細節需要注意:

  • 中間文件名為mr-X-Y,X 是 MapNum(即 map 任務編號),Y 是 map 函數生成的 key 到 [0, NReduce) 整數集的單射,具體而言是通過如下哈希函數計算的:
func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}
  • 每一個 reduce 函數需要處理一個 Y 對應的所有 X
  • 由於 reduce 的輸入是一個 key 和它對應的所有 values,所以需要對中間文件進行聚合。這裡使用的是 HashMap。在生產級別的 MapReduce 實現中,該部分需要內存 + 外存來保證不會 OOM。
  • 為了保證創建並寫入文件的原子性(要麼完全寫入,要麼完全不寫入),可以先創建臨時文件,再寫入,再重命名:
func writeStringToFileAtomic(filename string, content string) {
	f, _ := os.CreateTemp(".", filename+"*")
	f.WriteString(content)
	os.Rename(f.Name(), filename)
	f.Close()
}
  • reduce 最後生成的文件內容需要不斷拼接結果字符串,應該使用 StringBuilder 來獲得更好的性能
var result strings.Builder
for key, values := range mp {
    reduceResult := reducef(key, values)
    result.WriteString(fmt.Sprintf("%v %v\n", key, reduceResult))
}
// 獲得字符串:result.String()

拓展#

重讀 Google 的論文可以發現 Google 用於生產環境的 MapReduce 相比於我們上面實現的 lab 還有幾個改進:

  • 當 worker 運行在多個機器上時,需要使用一些分佈式文件系統方案來管理文件,例如 GFS
  • reduce 生成的文件可能會被用於下一個 MapReduce 任務,從而形成某種鏈子
  • 在實際實現中,還是需要維護 Worker 的狀態,這樣可以便於獲取 Worker 機器的信息、錯誤恢復等
  • 實際使用中,Google 發現當一些機器在即將處理完 Map 或 Reduce 任務時,性能會出現明顯的下降。解決方法是 Master 在組任務即將完成時會分配剩下的任務作為 Backup Tasks,分配到其他 worker 上,以最快完成的任務為最終結果。

Google 也提出了一些改進思路

  • Partitioning Function:上面把 key 映射的 hash 函數是可以自定義的,叫做 Partitioning Function。例如 key 是 URL 的時候可以把同一個 Host 分到同一個 Reduce 任務裡
  • Ordering Guarantees:在同一個 partition 裡可以先做 key 的排序處理,這樣可以使得結果更為友好
  • Combiner Function:注意到上面 Words Count 例子中,大量的類似 (word,"1") 會在網絡上傳輸,而根據 key 聚合 values 發生在 reduce 函數中。如果把聚合操作寫在 map 函數裡則可以避免這些重複的數據傳輸。combiner function 和 reduce function 的唯一区别是前者輸出中間文件,後者輸出最終文件。
  • Input and Output Types:Google 提供了 reader 接口,使得用戶在實現 Map 函數的時候可以讀取更多的 Input Types,例如數據庫裡的數據或內存裡的數據。Output 同理。
  • master 可以維護一個 HTTP panel 顯示各個 worker、task 等的狀態
  • Counter:可以在 map 函數調用自定義的 counter,持續返回給 master。者可以用於收集一些數據的信息,例如 Words Count 例子中
Counter* uppercase;
uppercase = GetCounter("uppercase");

map(String name, String contents):
    for each word w in contents:
        if (IsCapitalized(w)):
            uppercase->Increment();
        EmitIntermediate(w, "1");

最後,2024 年的今天 MapReduce 對於 Google 來說早已經成為了歷史。但是它催生出了 Hadoop 等開源大數據處理框架。

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。