理論#
MapReduce は主に大規模データ処理の問題を解決するために使用され、計算タスクを多くの小さなタスクに分解し、これらの小さなタスクを複数のマシンで並行して実行することで、処理速度と効率を大幅に向上させることができます。
その核心的な考え方は Google の論文にある画像を参考にしています。
Master プログラムは主要なプログラムで、ファイル(N 個)を worker に割り当てます。すべての worker は対等です。worker はまず Map コマンドを実行し、いくつかのファイルを受け取り、キーと値のペアとして出力し、R 個の中間ファイル(図中の Intermediate files)に書き込みます。すべての Map タスクが完了した後、Master は worker に Reduce タスクを割り当て、中間ファイルを処理し、最終的な結果ファイルを生成します。
ここで、Map 関数と Reduce 関数はプラグインの形式でシステムに提供できます。
例えば、Words Count のシナリオ(いくつかのファイル内の各同じ単語の数をカウントする)では:
map(String key, String value):
// key: ドキュメント名
// value: ドキュメントの内容
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: 単語
// values: カウントのリスト
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
この例では、map 関数は key が word、value が "1" の中間ファイルを生成し、reduce は同じ key を集約して value の数をカウントします。
これにより、データセットの分散処理を実現しました。
コード#
システムの起動プロセスは以下の通りです:
- 実行mrcoordinator
- いくつかの mrworker を実行
- mrcoordinator が coordinator を呼び出し、定期的に coordinator の完了状況を確認
- mrworker が map と reduce 関数をロードし、worker プロセスを起動
coordinator と worker の間は rpc 通信を使用し、coordinator と worker のロジックを実装するだけで済みます。
Coordinator#
いくつかの要点:
- worker の状態を維持する必要はなく、task の状態を維持することを考慮します。なぜなら、worker は実際には対等だからです。
- Coordinator はマルチスレッドであり、ロックが必要です。各スレッド間の通信が不要なため、ここでは Mutex を選択します。
- 2 つの rpc の重要な関数:FetchTask と FinishTask、一つは worker にタスクを探すため、もう一つはタスクの終了をマークするため
重要な構造体は以下の通りです:
type Coordinator struct {
// ここに定義を追加します。
lock sync.Mutex
files []string
mapTaskStatuses []int
reduceTaskStatuses []int
nMap int
nReduce int
mapDoneNum int // mapタスクの完了数
reduceDoneNum int
mapFinished bool
reduceFinished bool
}
2 つの rpc 関数の主要なロジック:
func (c *Coordinator) FetchTask(args *FetchTaskArgs, reply *FetchTaskReply) error {
c.lock.Lock()
defer c.lock.Unlock()
if !c.mapFinished {
mapNum := xxx // mapTaskStatusesをループして未開始のタスクを見つける
if mapTaskStatusesがすべて割り当てられている場合 {
reply.TaskType = NOTASK
} else {
// タスクを割り当てる
// 10秒以内にタスクが完了しなければエラー復旧
}
} else if !c.reduceFinished {
// mapと似ています
}
} else {
reply.TaskType = NOTASK
}
return nil
}
func (c *Coordinator) FinishTask(args *FinishTaskArgs, reply *FinishTaskReply) error {
c.lock.Lock()
defer c.lock.Unlock()
switch args.TaskType {
case MAPTASK:
{
// エラー復旧メカニズムのため、ここで同じタスクが提出される可能性があります。最初に提出されたタスクのみを受け入れます。
if c.mapTaskStatuses[args.TaskNum] != DONE {
c.mapDoneNum += 1
c.mapTaskStatuses[args.TaskNum] = DONE
if c.mapDoneNum == c.nMap {
c.mapFinished = true
}
}
}
case REDUCETASK:
// 上記と似ています
}
return nil
}
Worker#
worker は主にビジネスロジックです。大体のフレームワークは以下の通りです:
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
reply := &FetchTaskReply{}
ok := call("Coordinator.FetchTask", &FetchTaskArgs{}, &reply)
if ok {
switch reply.TaskType {
case MAPTASK:
{
// replyで指定されたfilenameに基づいてmap処理を呼び出し、中間ファイルを出力
// rpc FinishTask
}
case REDUCETASK:
{
// replyのReduceNumに基づいて中間ファイル(複数)を読み込み、結果ファイルを出力
mp := make(map[string][]string)
// 集約
// reduce
// rpc FinishTask
}
case NOTASK:
{
break
}
}
} else {
break
}
}
}
いくつかの詳細に注意が必要です:
- 中間ファイル名はmr-X-Y、X は MapNum(つまり map タスク番号)、Y は map 関数が生成した key から [0, NReduce) の整数集合への単射です。具体的には、以下のハッシュ関数を使用して計算されます:
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}
- 各 reduce 関数は Y に対応するすべての X を処理する必要があります。
- reduce の入力は 1 つの key とそれに対応するすべての values であるため、中間ファイルを集約する必要があります。ここでは HashMap を使用します。生産レベルの MapReduce 実装では、この部分はメモリ + 外部ストレージを使用して OOM を防ぐ必要があります。
- ファイルの作成と書き込みの原子性を保証するために(完全に書き込むか、完全に書き込まないか)、まず一時ファイルを作成し、書き込み、名前を変更することができます:
func writeStringToFileAtomic(filename string, content string) {
f, _ := os.CreateTemp(".", filename+"*")
f.WriteString(content)
os.Rename(f.Name(), filename)
f.Close()
}
- reduce の最終生成ファイルの内容は、結果文字列を継続的に結合する必要があり、より良いパフォーマンスを得るために StringBuilder を使用すべきです。
var result strings.Builder
for key, values := range mp {
reduceResult := reducef(key, values)
result.WriteString(fmt.Sprintf("%v %v\n", key, reduceResult))
}
// 文字列を取得:result.String()
拡張#
Google の論文を再読すると、Google が生産環境で使用する MapReduce は、上記の実装に比べていくつかの改善点があります:
- worker が複数のマシンで実行される場合、ファイルを管理するために分散ファイルシステムのソリューションを使用する必要があります。例えば GFS
- reduce によって生成されたファイルは、次の MapReduce タスクで使用される可能性があり、ある種のチェーンを形成します。
- 実際の実装では、worker の状態を維持する必要があります。これにより、worker マシンの情報やエラー復旧などが容易になります。
- 実際の使用において、Google は一部のマシンが Map または Reduce タスクをほぼ完了しようとしているときに、パフォーマンスが明らかに低下することを発見しました。解決策は、Master がタスクがほぼ完了する際に残りのタスクを Backup Tasks として割り当て、他の worker に割り当てることで、最も早く完了したタスクを最終結果とすることです。
Google はまた、いくつかの改善のアイデアを提案しました。
- Partitioning Function:上記の key をマッピングするハッシュ関数はカスタマイズ可能で、Partitioning Function と呼ばれます。例えば key が URL の場合、同じ Host を同じ Reduce タスクに分けることができます。
- Ordering Guarantees:同じ partition 内で key のソート処理を先に行うことで、結果がよりフレンドリーになります。
- Combiner Function:上記の Words Count の例では、大量の類似した (word,"1") がネットワーク上で転送され、key の集約が reduce 関数内で発生します。集約操作を map 関数に書くことで、これらの重複したデータ転送を回避できます。combiner function と reduce function の唯一の違いは、前者が中間ファイルを出力し、後者が最終ファイルを出力することです。
- Input and Output Types:Google は reader インターフェースを提供し、ユーザーが Map 関数を実装する際に、データベース内のデータやメモリ内のデータなど、より多くの Input Types を読み取ることができます。Output も同様です。
- master は HTTP パネルを維持し、各 worker、task などの状態を表示できます。
- Counter:map 関数内でカスタムの counter を呼び出し、master に継続的に返すことができます。これは、Words Count の例のように、いくつかのデータ情報を収集するために使用できます。
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
最後に、2024 年の今日、MapReduce は Google にとってすでに歴史となっています。しかし、それは Hadoop などのオープンソースの大規模データ処理フレームワークを生み出しました。