大人の事情で古い CDH 5.1.2 を素の設定で使うと Reducer で OOME が出たり出なかったりする。
2016-02-12 00:58:44,134 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#1 at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162) Caused by: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:56) at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:46) at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.<init>(InMemoryMapOutput.java:63) at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:297) at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:287) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165)
OutOfMemoryError: Java heap space
という文字を見て脊髄反射的に Reducer のヒープメモリを増強すると却って傷が広がるので要注意。
事象
MapReduce は以下のようなイメージで動く。象本に載っているのと同じ図がインターネットに転がっていたので拝借。
問題が起こっているのは reduce task
の "Sort" phase
のところ。
mapper が分割したデータのうち、自分が集計するものを集めてマージするわけだけど、mapper が分割したデータは上図のように全部同じ大きさではなくて、あるものは小さいし、あるものは大きい。
そんな状況において限りあるリソースを最大限に活用してできるだけ速くマージするために、小さいデータはメモリに載せて、大きいデータはディスクに吐く。
このうち、ディスクに吐いたデータは当然メモリを使わないので今回の問題には関係なくて、小さいデータが積もりに積もってヒープを食い尽くしてしまったという話。
以下、原因と対応。
原因
結論から先に書くと、CDH 5.1.2 (に同梱されている Apache Hadoop 2.3.0) のデフォルト設定値と実装バグが原因。
設定値
マージに使えるメモリ量 (memoryLimit) は、下記の通りデフォルトでコンテナが使えるメモリの 90% に設定される。
一方で、データをメモリに載せるかディスクに吐くかの閾値 (maxSingleShuffleLimit) は、下記の通りデフォルトで memoryLimit の 25% に設定される。
仮にコンテナのヒープメモリを 1GB に設定した場合、225MB (=1,000 * 0.9 * 0.25) 以下のデータはメモリに載せて、それより大きいとディスクに吐く。
これだけだと小さいデータをどんどん拾ってきた場合に memoryLimit に到達してしまうので、ここにも閾値 (mergeThreshold) が設けられていて、デフォルトで memoryLimit の 90% に設定される。
つまり、メモリでマージするデータ量が 810MB (=1,000 * 0.9 * 0.9) を超えるとディスクに書き出してメモリを解放する。これはスピルレコードとしてレポートに出力されて、チューニングの指標になる。
実装
牧歌的な逐次実行脳で考えると上記の設定で問題ないように思えるけど、現実は甘くないので mapper からデータを拾う処理はマルチスレッドで動いていて、スピルレコードを書き出している最中にもデータがどんどん飛んでくる。
このため、最後の砦としてメモリ使用状況をチェックする処理が入っているんだけど、残念ながらこれがバグっている。
具体的には以下の箇所で、
if (usedMemory > memoryLimit) {
現状のメモリ使用量だけではなく、これから扱おうとしているデータ量も考慮して
if (usedMemory + requestedSize > memoryLimit) {
と判定しなければならない。
これができていないので、例えば
- コンテナのヒープメモリが 1GB
- すでにマージ用メモリに 890MB 載っている
- スピルレコードの書き出しは終わっていない
という状態で、mapper から 200MB のデータを拾ってきたとすると、上記チェック処理は
- メモリ使用量は memoryLimit に到達していない
- 200MB は maxSingleShuffleLimit より小さい
と判定してデータをメモリに載せようとするので、メモリ使用量は 1090MB (=890 + 200) となり、ヒープメモリを食い尽くして Reducer が死亡する。
対応
実装はまだ直っていない。2.8.0 で直るらしい。
自分で実装を直してビルドするような豪傑でない場合は設定値をいじって対応する。最新の Hadoop では memoryLimit のデフォルト値が 70% に変更されているので、これに倣えばよい。
2016/04/20 修正
CDH 5.1.2 では設定ファイルで memoryLimit のデフォルト値を 70% に設定していた。Mapper の数に対して Reducer が少な過ぎたためにスピルレコードの書き出しが間に合わなかったと考えて、Reducer 数を増強した。Reducer 数は Mapper の総出力ファイルサイズをブロックサイズで割って算出すればい。
まとめ
以上のとおり、処理タイミングに依存した問題だったため再現性が低くて苦戦したが、原因が分かってすっきりした。
それから、(たぶんヒープサイズの問題ではないとぼくが止めるのも聞かずに) 脊髄反射的にヒープメモリを増強したら、これまではディスクでマージしていたサイズのデータもメモリに載るようになり、却って再現性が高まってウケた。
当たり前だけど、問題は論理的に原因を切り分けて対処しましょうと再認識。システム開発にオカルトは存在しない。
参考

- 作者: Tom White,Sky株式会社玉川竜司,兼田聖士
- 出版社/メーカー: オライリージャパン
- 発売日: 2013/07/26
- メディア: 大型本
- この商品を含むブログ (4件) を見る