aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-05 01:45:19 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-05 01:45:19 -0700
commit1854ac326a9cc6014817d8df30ed0458eee5d7d1 (patch)
treeb9fc9a06d93207ce3f99ec73d6f10367e526e5fe /core/src/main
parentc5790a2f772168351c18bb0da51a124cee89a06f (diff)
downloadspark-1854ac326a9cc6014817d8df30ed0458eee5d7d1.tar.gz
spark-1854ac326a9cc6014817d8df30ed0458eee5d7d1.tar.bz2
spark-1854ac326a9cc6014817d8df30ed0458eee5d7d1.zip
[SPARK-7139] [STREAMING] Allow received block metadata to be saved to WAL and recovered on driver failure
- Enabled ReceivedBlockTracker WAL by default - Stored block metadata in the WAL - Optimized WALBackedBlockRDD by skipping block fetch when the block is known to not exist in Spark Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #5732 from tdas/SPARK-7139 and squashes the following commits: 575476e [Tathagata Das] Added more tests to get 100% coverage of the WALBackedBlockRDD 19668ba [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139 685fab3 [Tathagata Das] Addressed comments in PR 637bc9c [Tathagata Das] Changed segment to handle 466212c [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139 5f67a59 [Tathagata Das] Fixed HdfsUtils to handle append in local file system 1bc5bc3 [Tathagata Das] Fixed bug on unexpected recovery d06fa21 [Tathagata Das] Enabled ReceivedBlockTracker by default, stored block metadata and optimized block fetching in WALBackedBlockRDD
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 71578d1210..9220302637 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -31,7 +31,7 @@ private[spark]
class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {
- @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
+ @transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
@volatile private var _isValid = true
override def getPartitions: Array[Partition] = {
@@ -54,7 +54,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
override def getPreferredLocations(split: Partition): Seq[String] = {
assertValid()
- locations_(split.asInstanceOf[BlockRDDPartition].blockId)
+ _locations(split.asInstanceOf[BlockRDDPartition].blockId)
}
/**
@@ -79,14 +79,14 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
/** Check if this BlockRDD is valid. If not valid, exception is thrown. */
private[spark] def assertValid() {
- if (!_isValid) {
+ if (!isValid) {
throw new SparkException(
"Attempted to use %s after its blocks have been removed!".format(toString))
}
}
protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = {
- locations_
+ _locations
}
}