bridge

bridge

MIT 6.5840(2) - Lab1 MapReduce

理论#

MapReduce 主要用来解决大规模数据处理的问题,它能够将一个计算任务分解为许多小任务,这些小任务可以在多台机器上并行执行,从而大大提高处理速度和效率。

它的核心思想参考 Google 论文中的图片

image

Master 程序是主要的程序,它会将文件(N 个)分配个 worker,所有 worker 是对等的。worker 将会首先执行 Map 命令,接受一些文件,然后输出为键值对并写入 R 个(用户可以自定义 R 的值)中间文件(图中的 Intermediate files)。当所有 Map 任务完成后,Master 会给 worker 分配 Reduce 任务,对中间文件进行处理,然后生成最终的结果文件。

其中,Map 函数和 Reduce 函数可以以插件的形式提供给系统。

例如,在 Words Count 场景下(统计若干文件里每个相同单词的数量):

map(String key, String value):
    // key: document name
    // value: document contents
    for each word w in value:
        EmitIntermediate(w, "1");
reduce(String key, Iterator values):
    // key: a word
    // values: a list of counts
    int result = 0;
    for each v in values:
        result += ParseInt(v);
    Emit(AsString(result));

这个例子中,map 函数将生成一系列 key 为 word,value 为 "1" 的中间文件,然后 reduce 将同一个 key 聚合,统计 value 的数量。

这样我们就实现了数据集的分布式处理。

代码#

系统的启动流程如下:

  • 运行mrcoordinator
  • 运行若干个 mrworker
  • mrcoordinator 调用 coordinator,并定期检查 coordinator 完成情况
  • mrworker 加载 map 和 reduce 函数,并拉起 worker 进程

其中 coordinator 和 worker 之间使用 rpc 通信,我们只需要实现 coordinator 和 worker 的逻辑。

Coordinator#

几个要点:

  • 考虑维护 task 的状态,而不需要维护 worker 的状态,因为 worker 实际上是对等的。
  • Coordinator 是多线程的,需要加锁。由于不需要各线程通信,这里选用 Mutex
  • 两个 rpc 关键函数:FetchTask 和 FinishTask,一个用来给 worker 查找任务,一个用来标记任务结束

关键结构体如下:

type Coordinator struct {
	// Your definitions here.
	lock               sync.Mutex
	files              []string
	mapTaskStatuses    []int
	reduceTaskStatuses []int
	nMap               int
	nReduce            int
	mapDoneNum         int // map任务完成数量
	reduceDoneNum      int
	mapFinished        bool
	reduceFinished     bool
}

两个 rpc 函数主要逻辑:


func (c *Coordinator) FetchTask(args *FetchTaskArgs, reply *FetchTaskReply) error {
	c.lock.Lock()
	defer c.lock.Unlock()
	if !c.mapFinished {
		mapNum := xxx // 循环mapTaskStatuses找到未开始的任务
		if mapTaskStatuses全都分配了 {
			reply.TaskType = NOTASK
		} else {
            // 分配任务
			// 10s不完成任务就错误恢复
		}
	} else if !c.reduceFinished {
            // 和map类似
		}
	} else {
		reply.TaskType = NOTASK
	}
	return nil
}
func (c *Coordinator) FinishTask(args *FinishTaskArgs, reply *FinishTaskReply) error {
	c.lock.Lock()
	defer c.lock.Unlock()
	switch args.TaskType {
	case MAPTASK:
		{
			// 由于错误恢复机制,这里可能提交同样的任务。仅接受第一个提交的任务即可
			if c.mapTaskStatuses[args.TaskNum] != DONE {
				c.mapDoneNum += 1
				c.mapTaskStatuses[args.TaskNum] = DONE
				if c.mapDoneNum == c.nMap {
					c.mapFinished = true
				}
			}
		}
	case REDUCETASK:
        // 和上面类似
	}
	return nil
}

Worker#

worker 主要是业务逻辑。大体框架如下:

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	for {
		reply := &FetchTaskReply{}
		ok := call("Coordinator.FetchTask", &FetchTaskArgs{}, &reply)
		if ok {
			switch reply.TaskType {
			case MAPTASK:
				{
                    // 根据reply指定的filename调用map处理,并输出中间文件
                    // rpc FinishTask
				}
			case REDUCETASK:
				{
                    // 根据reply的ReduceNum读取中间文件(多个),输出为一个结果文件
                    mp := make(map[string][]string)
                    // 聚合
                    // reduce
                    // rpc FinishTask
				}
			case NOTASK:
				{
					break
				}
			}
		} else {
			break
		}

	}
}

一些细节需要注意:

  • 中间文件名为mr-X-Y,X 是 MapNum(即 map 任务编号),Y 是 map 函数生成的 key 到 [0, NReduce) 整数集的单射,具体而言是通过如下哈希函数计算的:
func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}
  • 每一个 reduce 函数需要处理一个 Y 对应的所有 X
  • 由于 reduce 的输入是一个 key 和它对应的所有 values,所以需要对中间文件进行聚合。这里使用的是 HashMap。在生产级别的 MapReduce 实现中,该部分需要内存 + 外存来保证不会 OOM。
  • 为了保证创建并写入文件的原子性(要么完全写入,要么完全不写入),可以先创建临时文件,再写入,再重命名:
func writeStringToFileAtomic(filename string, content string) {
	f, _ := os.CreateTemp(".", filename+"*")
	f.WriteString(content)
	os.Rename(f.Name(), filename)
	f.Close()
}
  • reduce 最后生成的文件内容需要不断拼接结果字符串,应该使用 StringBuilder 来获得更好的性能
var result strings.Builder
for key, values := range mp {
    reduceResult := reducef(key, values)
    result.WriteString(fmt.Sprintf("%v %v\n", key, reduceResult))
}
// 获得字符串:result.String()

拓展#

重读 Google 的论文可以发现 Google 用于生产环境的 MapReduce 相比于我们上面实现的 lab 还有几个改进:

  • 当 worker 运行在多个机器上时,需要使用一些分布式文件系统方案来管理文件,例如 GFS
  • reduce 生成的文件可能会被用于下一个 MapReduce 任务,从而形成某种链子
  • 在实际实现中,还是需要维护 Worker 的状态,这样可以便于获取 Worker 机器的信息、错误恢复等
  • 实际使用中,Google 发现当一些机器在即将处理完 Map 或 Reduce 任务时,性能会出现明显的下降。解决方法是 Master 在组任务即将完成时会分配剩下的任务作为 Backup Tasks,分配到其他 worker 上,以最快完成的任务为最终结果。

Google 也提出了一些改进思路

  • Partitioning Function:上面把 key 映射的 hash 函数是可以自定义的,叫做 Partitioning Function。例如 key 是 URL 的时候可以把同一个 Host 分到同一个 Reduce 任务里
  • Ordering Guarantees:在同一个 partition 里可以先做 key 的排序处理,这样可以使得结果更为友好
  • Combiner Function:注意到上面 Words Count 例子中,大量的类似 (word,"1") 会在网络上传输,而根据 key 聚合 values 发生在 reduce 函数中。如果把聚合操作写在 map 函数里则可以避免这些重复的数据传输。combiner function 和 reduce function 的唯一区别是前者输出中间文件,后者输出最终文件。
  • Input and Output Types:Google 提供了 reader 接口,使得用户在实现 Map 函数的时候可以读取更多的 Input Types,例如数据库里的数据或内存里的数据。Output 同理。
  • master 可以维护一个 HTTP panel 显示各个 worker、task 等的状态
  • Counter:可以在 map 函数调用自定义的 counter,持续返回给 master。者可以用于收集一些数据的信息,例如 Words Count 例子中
Counter* uppercase;
uppercase = GetCounter("uppercase");

map(String name, String contents):
    for each word w in contents:
        if (IsCapitalized(w)):
            uppercase->Increment();
        EmitIntermediate(w, "1");

最后,2024 年的今天 MapReduce 对于 Google 来说早已经成为了历史。但是它催生出了 Hadoop 等开源大数据处理框架。

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。