1. ホーム
  2. apache-spark

[解決済み] spark checkpointとpersist to a diskの違いは何ですか?

2022-03-05 03:53:49

質問

spark checkpointとpersist to a diskの違いは何ですか? どちらもローカルディスクに保存されるのでしょうか?

どのように解決するのですか?

いくつかの重要な違いがありますが、根本的な違いは、リネージがどうなるかということです。 Persist / cache は、系統を維持したまま checkpoint は系統を断ち切ります。次のような例を考えてみましょう。

import org.apache.spark.storage.StorageLevel

val rdd = sc.parallelize(1 to 10).map(x => (x % 3, 1)).reduceByKey(_ + _)

  • cache / persist :

    val indCache  = rdd.mapValues(_ > 4)
    indCache.persist(StorageLevel.DISK_ONLY)
    
    indCache.toDebugString
    // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated]
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated]
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated]
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
    
    indCache.count
    // 3
    
    indCache.toDebugString
    // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated]
    //  |       CachedPartitions: 8; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 587.0 B
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated]
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated]
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
    
    
  • checkpoint :

    val indChk  = rdd.mapValues(_ > 4)
    indChk.checkpoint
    
    indChk.toDebugString
    // (8) MapPartitionsRDD[11] at mapValues at <console>:24 []
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 []
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 []
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 []
    
    indChk.count
    // 3
    
    indChk.toDebugString
    // (8) MapPartitionsRDD[11] at mapValues at <console>:24 []
    //  |  ReliableCheckpointRDD[12] at count at <console>:27 []
    
    

ご覧のように、最初のケースでは、データがキャッシュから取り出されたとしても、系統は維持されます。の一部のパーティションが削除されても、データはゼロから再計算できることを意味する。 indCache が失われます。2つ目のケースでは、チェックポイントの後、リネージは完全に失われ indChk は、もう再構築に必要な情報を持っていない。

checkpoint とは異なり cache / persist は他のジョブとは別に計算されます。そのため、チェックポイントの対象となるRDDはキャッシュされるべきなのです。

このRDDはメモリ上に保存することを強く推奨します。そうしないと、ファイルに保存した場合、再計算が必要になります。

最後に checkpointed の後に削除されない永続的なデータです。 SparkContext が破棄される。

データの保存について SparkContext.setCheckpointDir で使用される RDD.checkpoint が必要です。 DFS 非ローカルモードで動作している場合はパスを指定します。それ以外の場合は、ローカルのファイルシステムでもかまいません。 localCheckpointpersist レプリケーションを使用しない場合は、ローカルファイルシステムを使用する必要があります。

重要な注意事項 :

RDDのチェックポイントは、Spark Streamingのチェックポイントとは異なる概念です。前者はリネージの問題に対処するためのもので、後者はストリーミングの信頼性と障害回復のためのものです。