[解決済み] Hadoopはブロック境界をまたいで分割されたレコードをどのように処理するのか?
質問
によると
Hadoop - The Definitive Guide
FileInputFormatsが定義する論理レコードは、通常HDFSのブロックにきちんと収まるわけではありません。例えば、TextInputFormatの論理レコードは行で、HDFSの境界を横断することが多くなります。これはプログラムの機能には関係ありません-例えば、行が欠けたり壊れたりすることはありません-が、データローカルなマップ(つまり、入力データと同じホストで実行されているマップ)がいくつかのリモートリードを実行することを意味するので、知っておく価値があります。これが引き起こすわずかなオーバーヘッドは、通常は重要ではありません。
レコード行が2つのブロック(b1とb2)に分割されているとします。最初のブロック (b1) を処理するマッパーは、最後の行に EOL セパレータがないことに気付き、次のデータ ブロック (b2) から行の残りをフェッチします。
2番目のブロック (b2) を処理するマッパーは、最初のレコードが不完全で、ブロック (b2) の2番目のレコードから処理する必要があるとどのように判断しますか?
どのように解決するのですか?
興味深い質問ですね。私は詳細のためにコードを見るのに時間を費やしましたが、以下は私の考えです。分割は、クライアント側で
InputFormat.getSplits
で、FileInputFormat を見ると、次のような情報が得られます。
-
各入力ファイルについて、ファイル長、ブロック サイズを取得し、次のように分割サイズを計算します。
max(minSize, min(maxSize, blockSize))
ここでmaxSize
はmapred.max.split.size
とminSize
はmapred.min.split.size
. -
ファイルを分割して
FileSplit
に分割します。ここで重要なのは それぞれFileSplit
が初期化されていることです。start
パラメータは入力ファイルのオフセットに対応する . この時点ではまだ行の処理は行われていません。コードの該当箇所は以下のようになります。while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; }
その後に
LineRecordReader
で定義されている
TextInputFormat
というように、行が処理されるところです。
-
を初期化するとき
LineRecordReader
のインスタンスを作成しようとします。LineReader
の上の行を読むことができるようにするための抽象化されたものです。FSDataInputStream
. 2つのケースがあります。 -
がある場合
CompressionCodec
が定義されている場合、このコーデックは境界を処理する責任があります。おそらく質問には関係ないでしょう。 -
しかし、コーデックがない場合は、そこが興味深いところです。
start
のInputSplit
が0より大きい場合、あなたは 1文字戻ってから、" \n" または "\r" で指定された最初の行をスキップする (Windows)。 ! バックトラックが重要なのは、行の境界が分割境界と同じである場合に、有効な行をスキップしないようにするためです。以下はそのコードです。if (codec != null) { in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; --start; fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { // skip first line and re-establish "start". start += in.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - start)); } this.pos = start;
分割はクライアントで計算されるので、マッパーは順番に実行する必要がなく、どのマッパーも最初の行を破棄する必要があるかどうか、すでに知っています。
つまり、基本的には、同じファイルにそれぞれ 100Mb の 2 行があり、単純化するために分割サイズを 64Mb とします。そして、入力の分割が計算されるとき、次のようなシナリオになります。
- このブロックへのパスとホストを含むスプリット 1。開始 200-200=0Mb、長さ 64Mb で初期化されます。
- スプリット2は開始200-200+64=64Mb、長さ64Mbで初期化されます。
- スプリット3は開始時200-200+128=128Mb、長さ64Mbで初期化されます。
- スプリット4は、開始200-200+192=192Mb、長さ8Mbで初期化されます。
- マッパーAはスプリット1を処理し、開始が0なので最初の行をスキップせず、64Mbの制限を超えるのでリモート読み取りが必要な行をすべて読み取ります。
- マッパー B は分割 2 を処理し、開始は != 0 なので 64Mb-1 バイト後の最初の行をスキップします。これは、まだ分割 2 にある 100Mb の行 1 の終わりに相当し、分割 2 には行の 28Mb があるので、残りの 72Mb をリモート読み取りを行います。
- マッパーCは分割3を処理します。開始は!=0なので、128Mb-1byte後の最初の行をスキップします。これは、200Mbの2行目の終わりに相当し、ファイルの終わりなので何もしないでください。
- マッパー D はマッパー C と同じですが、192MB-1byte の後に改行があるかどうかを調べます。
関連
-
HDFSソースコード解析 --- デコミッション
-
コンテナがゼロ以外の終了コード1で終了しました。エラーファイル: prelaunch.err.org.apache.hadoop.mapreduce.
-
[解決済み] リストを均等な大きさの塊に分割するには?
-
[解決済み] Bashで文字列をデリミターで分割するには?
-
[解決済み] Javaで文字列を分割する方法
-
[解決済み] Bashで文字列を配列に分割する方法は?
-
Kerberosに関するFailed to find any Kerberos tgt問題を解決する。
-
[解決済み] HDFSからローカルファイルシステムにファイルをコピーする方法
-
[解決済み] Hiveでテーブルをパーティショニングすることとバケット化することの違いは何ですか?
-
[解決済み] Hadoopで複数のMapReduceジョブを連鎖させる
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
Hbase公式ドキュメント(中国語
-
HDFSソースコード解析 --- デコミッション
-
Hadoopデータディレクトリの移行
-
コンテナがゼロ以外の終了コード1で終了しました。エラーファイル: prelaunch.err.org.apache.hadoop.mapreduce.
-
Kerberosに関するFailed to find any Kerberos tgt問題を解決する。
-
[解決済み】Hadoop、HBase、Hive、Pigはいつ使う?
-
[解決済み] HDFSからローカルファイルシステムにファイルをコピーする方法
-
[解決済み] Hiveでテーブルをパーティショニングすることとバケット化することの違いは何ですか?
-
[解決済み] HBaseとHadoop/HDFSの違いについて
-
[解決済み] Hadoopで複数のMapReduceジョブを連鎖させる