diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-05 01:45:19 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-05 01:45:19 -0700 |
commit | 1854ac326a9cc6014817d8df30ed0458eee5d7d1 (patch) | |
tree | b9fc9a06d93207ce3f99ec73d6f10367e526e5fe /core/src/main | |
parent | c5790a2f772168351c18bb0da51a124cee89a06f (diff) | |
download | spark-1854ac326a9cc6014817d8df30ed0458eee5d7d1.tar.gz spark-1854ac326a9cc6014817d8df30ed0458eee5d7d1.tar.bz2 spark-1854ac326a9cc6014817d8df30ed0458eee5d7d1.zip |
[SPARK-7139] [STREAMING] Allow received block metadata to be saved to WAL and recovered on driver failure
- Enabled ReceivedBlockTracker WAL by default
- Stored block metadata in the WAL
- Optimized WALBackedBlockRDD by skipping block fetch when the block is known to not exist in Spark
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #5732 from tdas/SPARK-7139 and squashes the following commits:
575476e [Tathagata Das] Added more tests to get 100% coverage of the WALBackedBlockRDD
19668ba [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
685fab3 [Tathagata Das] Addressed comments in PR
637bc9c [Tathagata Das] Changed segment to handle
466212c [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
5f67a59 [Tathagata Das] Fixed HdfsUtils to handle append in local file system
1bc5bc3 [Tathagata Das] Fixed bug on unexpected recovery
d06fa21 [Tathagata Das] Enabled ReceivedBlockTracker by default, stored block metadata and optimized block fetching in WALBackedBlockRDD
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index 71578d1210..9220302637 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -31,7 +31,7 @@ private[spark] class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId]) extends RDD[T](sc, Nil) { - @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get) + @transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get) @volatile private var _isValid = true override def getPartitions: Array[Partition] = { @@ -54,7 +54,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds override def getPreferredLocations(split: Partition): Seq[String] = { assertValid() - locations_(split.asInstanceOf[BlockRDDPartition].blockId) + _locations(split.asInstanceOf[BlockRDDPartition].blockId) } /** @@ -79,14 +79,14 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds /** Check if this BlockRDD is valid. If not valid, exception is thrown. */ private[spark] def assertValid() { - if (!_isValid) { + if (!isValid) { throw new SparkException( "Attempted to use %s after its blocks have been removed!".format(toString)) } } protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = { - locations_ + _locations } } |