1. ホーム
  2. hadoop

[解決済み] Hadoopはブロック境界をまたいで分割されたレコードをどのように処理するのか?

2022-08-09 07:49:41

質問

によると Hadoop - The Definitive Guide

FileInputFormatsが定義する論理レコードは、通常HDFSのブロックにきちんと収まるわけではありません。例えば、TextInputFormatの論理レコードは行で、HDFSの境界を横断することが多くなります。これはプログラムの機能には関係ありません-例えば、行が欠けたり壊れたりすることはありません-が、データローカルなマップ(つまり、入力データと同じホストで実行されているマップ)がいくつかのリモートリードを実行することを意味するので、知っておく価値があります。これが引き起こすわずかなオーバーヘッドは、通常は重要ではありません。

レコード行が2つのブロック(b1とb2)に分割されているとします。最初のブロック (b1) を処理するマッパーは、最後の行に EOL セパレータがないことに気付き、次のデータ ブロック (b2) から行の残りをフェッチします。

2番目のブロック (b2) を処理するマッパーは、最初のレコードが不完全で、ブロック (b2) の2番目のレコードから処理する必要があるとどのように判断しますか?

どのように解決するのですか?

興味深い質問ですね。私は詳細のためにコードを見るのに時間を費やしましたが、以下は私の考えです。分割は、クライアント側で InputFormat.getSplits で、FileInputFormat を見ると、次のような情報が得られます。

  • 各入力ファイルについて、ファイル長、ブロック サイズを取得し、次のように分割サイズを計算します。 max(minSize, min(maxSize, blockSize)) ここで maxSizemapred.max.split.sizeminSizemapred.min.split.size .
  • ファイルを分割して FileSplit に分割します。ここで重要なのは それぞれ FileSplit が初期化されていることです。 start パラメータは入力ファイルのオフセットに対応する . この時点ではまだ行の処理は行われていません。コードの該当箇所は以下のようになります。

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    
    

その後に LineRecordReader で定義されている TextInputFormat というように、行が処理されるところです。

  • を初期化するとき LineRecordReader のインスタンスを作成しようとします。 LineReader の上の行を読むことができるようにするための抽象化されたものです。 FSDataInputStream . 2つのケースがあります。
  • がある場合 CompressionCodec が定義されている場合、このコーデックは境界を処理する責任があります。おそらく質問には関係ないでしょう。
  • しかし、コーデックがない場合は、そこが興味深いところです。 startInputSplit が0より大きい場合、あなたは 1文字戻ってから、" \n" または "\r" で指定された最初の行をスキップする (Windows)。 ! バックトラックが重要なのは、行の境界が分割境界と同じである場合に、有効な行をスキップしないようにするためです。以下はそのコードです。

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    
    

分割はクライアントで計算されるので、マッパーは順番に実行する必要がなく、どのマッパーも最初の行を破棄する必要があるかどうか、すでに知っています。

つまり、基本的には、同じファイルにそれぞれ 100Mb の 2 行があり、単純化するために分割サイズを 64Mb とします。そして、入力の分割が計算されるとき、次のようなシナリオになります。

  • このブロックへのパスとホストを含むスプリット 1。開始 200-200=0Mb、長さ 64Mb で初期化されます。
  • スプリット2は開始200-200+64=64Mb、長さ64Mbで初期化されます。
  • スプリット3は開始時200-200+128=128Mb、長さ64Mbで初期化されます。
  • スプリット4は、開始200-200+192=192Mb、長さ8Mbで初期化されます。
  • マッパーAはスプリット1を処理し、開始が0なので最初の行をスキップせず、64Mbの制限を超えるのでリモート読み取りが必要な行をすべて読み取ります。
  • マッパー B は分割 2 を処理し、開始は != 0 なので 64Mb-1 バイト後の最初の行をスキップします。これは、まだ分割 2 にある 100Mb の行 1 の終わりに相当し、分割 2 には行の 28Mb があるので、残りの 72Mb をリモート読み取りを行います。
  • マッパーCは分割3を処理します。開始は!=0なので、128Mb-1byte後の最初の行をスキップします。これは、200Mbの2行目の終わりに相当し、ファイルの終わりなので何もしないでください。
  • マッパー D はマッパー C と同じですが、192MB-1byte の後に改行があるかどうかを調べます。