bridge

bridge

MIT 6.5840(2) - ラボ1 MapReduce

理論#

MapReduce は主に大規模データ処理の問題を解決するために使用され、計算タスクを多くの小さなタスクに分解し、これらの小さなタスクを複数のマシンで並行して実行することで、処理速度と効率を大幅に向上させることができます。

その核心的な考え方は Google の論文にある画像を参考にしています。

image

Master プログラムは主要なプログラムで、ファイル(N 個)を worker に割り当てます。すべての worker は対等です。worker はまず Map コマンドを実行し、いくつかのファイルを受け取り、キーと値のペアとして出力し、R 個の中間ファイル(図中の Intermediate files)に書き込みます。すべての Map タスクが完了した後、Master は worker に Reduce タスクを割り当て、中間ファイルを処理し、最終的な結果ファイルを生成します。

ここで、Map 関数と Reduce 関数はプラグインの形式でシステムに提供できます。

例えば、Words Count のシナリオ(いくつかのファイル内の各同じ単語の数をカウントする)では:

map(String key, String value):
    // key: ドキュメント名
    // value: ドキュメントの内容
    for each word w in value:
        EmitIntermediate(w, "1");
reduce(String key, Iterator values):
    // key: 単語
    // values: カウントのリスト
    int result = 0;
    for each v in values:
        result += ParseInt(v);
    Emit(AsString(result));

この例では、map 関数は key が word、value が "1" の中間ファイルを生成し、reduce は同じ key を集約して value の数をカウントします。

これにより、データセットの分散処理を実現しました。

コード#

システムの起動プロセスは以下の通りです:

  • 実行mrcoordinator
  • いくつかの mrworker を実行
  • mrcoordinator が coordinator を呼び出し、定期的に coordinator の完了状況を確認
  • mrworker が map と reduce 関数をロードし、worker プロセスを起動

coordinator と worker の間は rpc 通信を使用し、coordinator と worker のロジックを実装するだけで済みます。

Coordinator#

いくつかの要点:

  • worker の状態を維持する必要はなく、task の状態を維持することを考慮します。なぜなら、worker は実際には対等だからです。
  • Coordinator はマルチスレッドであり、ロックが必要です。各スレッド間の通信が不要なため、ここでは Mutex を選択します。
  • 2 つの rpc の重要な関数:FetchTask と FinishTask、一つは worker にタスクを探すため、もう一つはタスクの終了をマークするため

重要な構造体は以下の通りです:

type Coordinator struct {
	// ここに定義を追加します。
	lock               sync.Mutex
	files              []string
	mapTaskStatuses    []int
	reduceTaskStatuses []int
	nMap               int
	nReduce            int
	mapDoneNum         int // mapタスクの完了数
	reduceDoneNum      int
	mapFinished        bool
	reduceFinished     bool
}

2 つの rpc 関数の主要なロジック:


func (c *Coordinator) FetchTask(args *FetchTaskArgs, reply *FetchTaskReply) error {
	c.lock.Lock()
	defer c.lock.Unlock()
	if !c.mapFinished {
		mapNum := xxx // mapTaskStatusesをループして未開始のタスクを見つける
		if mapTaskStatusesがすべて割り当てられている場合 {
			reply.TaskType = NOTASK
		} else {
            // タスクを割り当てる
			// 10秒以内にタスクが完了しなければエラー復旧
		}
	} else if !c.reduceFinished {
            // mapと似ています
		}
	} else {
		reply.TaskType = NOTASK
	}
	return nil
}
func (c *Coordinator) FinishTask(args *FinishTaskArgs, reply *FinishTaskReply) error {
	c.lock.Lock()
	defer c.lock.Unlock()
	switch args.TaskType {
	case MAPTASK:
		{
			// エラー復旧メカニズムのため、ここで同じタスクが提出される可能性があります。最初に提出されたタスクのみを受け入れます。
			if c.mapTaskStatuses[args.TaskNum] != DONE {
				c.mapDoneNum += 1
				c.mapTaskStatuses[args.TaskNum] = DONE
				if c.mapDoneNum == c.nMap {
					c.mapFinished = true
				}
			}
		}
	case REDUCETASK:
        // 上記と似ています
	}
	return nil
}

Worker#

worker は主にビジネスロジックです。大体のフレームワークは以下の通りです:

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	for {
		reply := &FetchTaskReply{}
		ok := call("Coordinator.FetchTask", &FetchTaskArgs{}, &reply)
		if ok {
			switch reply.TaskType {
			case MAPTASK:
				{
                    // replyで指定されたfilenameに基づいてmap処理を呼び出し、中間ファイルを出力
                    // rpc FinishTask
				}
			case REDUCETASK:
				{
                    // replyのReduceNumに基づいて中間ファイル(複数)を読み込み、結果ファイルを出力
                    mp := make(map[string][]string)
                    // 集約
                    // reduce
                    // rpc FinishTask
				}
			case NOTASK:
				{
					break
				}
			}
		} else {
			break
		}

	}
}

いくつかの詳細に注意が必要です:

  • 中間ファイル名はmr-X-Y、X は MapNum(つまり map タスク番号)、Y は map 関数が生成した key から [0, NReduce) の整数集合への単射です。具体的には、以下のハッシュ関数を使用して計算されます:
func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}
  • 各 reduce 関数は Y に対応するすべての X を処理する必要があります。
  • reduce の入力は 1 つの key とそれに対応するすべての values であるため、中間ファイルを集約する必要があります。ここでは HashMap を使用します。生産レベルの MapReduce 実装では、この部分はメモリ + 外部ストレージを使用して OOM を防ぐ必要があります。
  • ファイルの作成と書き込みの原子性を保証するために(完全に書き込むか、完全に書き込まないか)、まず一時ファイルを作成し、書き込み、名前を変更することができます:
func writeStringToFileAtomic(filename string, content string) {
	f, _ := os.CreateTemp(".", filename+"*")
	f.WriteString(content)
	os.Rename(f.Name(), filename)
	f.Close()
}
  • reduce の最終生成ファイルの内容は、結果文字列を継続的に結合する必要があり、より良いパフォーマンスを得るために StringBuilder を使用すべきです。
var result strings.Builder
for key, values := range mp {
    reduceResult := reducef(key, values)
    result.WriteString(fmt.Sprintf("%v %v\n", key, reduceResult))
}
// 文字列を取得:result.String()

拡張#

Google の論文を再読すると、Google が生産環境で使用する MapReduce は、上記の実装に比べていくつかの改善点があります:

  • worker が複数のマシンで実行される場合、ファイルを管理するために分散ファイルシステムのソリューションを使用する必要があります。例えば GFS
  • reduce によって生成されたファイルは、次の MapReduce タスクで使用される可能性があり、ある種のチェーンを形成します。
  • 実際の実装では、worker の状態を維持する必要があります。これにより、worker マシンの情報やエラー復旧などが容易になります。
  • 実際の使用において、Google は一部のマシンが Map または Reduce タスクをほぼ完了しようとしているときに、パフォーマンスが明らかに低下することを発見しました。解決策は、Master がタスクがほぼ完了する際に残りのタスクを Backup Tasks として割り当て、他の worker に割り当てることで、最も早く完了したタスクを最終結果とすることです。

Google はまた、いくつかの改善のアイデアを提案しました。

  • Partitioning Function:上記の key をマッピングするハッシュ関数はカスタマイズ可能で、Partitioning Function と呼ばれます。例えば key が URL の場合、同じ Host を同じ Reduce タスクに分けることができます。
  • Ordering Guarantees:同じ partition 内で key のソート処理を先に行うことで、結果がよりフレンドリーになります。
  • Combiner Function:上記の Words Count の例では、大量の類似した (word,"1") がネットワーク上で転送され、key の集約が reduce 関数内で発生します。集約操作を map 関数に書くことで、これらの重複したデータ転送を回避できます。combiner function と reduce function の唯一の違いは、前者が中間ファイルを出力し、後者が最終ファイルを出力することです。
  • Input and Output Types:Google は reader インターフェースを提供し、ユーザーが Map 関数を実装する際に、データベース内のデータやメモリ内のデータなど、より多くの Input Types を読み取ることができます。Output も同様です。
  • master は HTTP パネルを維持し、各 worker、task などの状態を表示できます。
  • Counter:map 関数内でカスタムの counter を呼び出し、master に継続的に返すことができます。これは、Words Count の例のように、いくつかのデータ情報を収集するために使用できます。
Counter* uppercase;
uppercase = GetCounter("uppercase");

map(String name, String contents):
    for each word w in contents:
        if (IsCapitalized(w)):
            uppercase->Increment();
        EmitIntermediate(w, "1");

最後に、2024 年の今日、MapReduce は Google にとってすでに歴史となっています。しかし、それは Hadoop などのオープンソースの大規模データ処理フレームワークを生み出しました。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。