aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-05 01:45:19 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-05 01:45:19 -0700
commit1854ac326a9cc6014817d8df30ed0458eee5d7d1 (patch)
treeb9fc9a06d93207ce3f99ec73d6f10367e526e5fe /streaming
parentc5790a2f772168351c18bb0da51a124cee89a06f (diff)
downloadspark-1854ac326a9cc6014817d8df30ed0458eee5d7d1.tar.gz
spark-1854ac326a9cc6014817d8df30ed0458eee5d7d1.tar.bz2
spark-1854ac326a9cc6014817d8df30ed0458eee5d7d1.zip
[SPARK-7139] [STREAMING] Allow received block metadata to be saved to WAL and recovered on driver failure
- Enabled ReceivedBlockTracker WAL by default - Stored block metadata in the WAL - Optimized WALBackedBlockRDD by skipping block fetch when the block is known to not exist in Spark Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #5732 from tdas/SPARK-7139 and squashes the following commits: 575476e [Tathagata Das] Added more tests to get 100% coverage of the WALBackedBlockRDD 19668ba [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139 685fab3 [Tathagata Das] Addressed comments in PR 637bc9c [Tathagata Das] Changed segment to handle 466212c [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139 5f67a59 [Tathagata Das] Fixed HdfsUtils to handle append in local file system 1bc5bc3 [Tathagata Das] Fixed bug on unexpected recovery d06fa21 [Tathagata Das] Enabled ReceivedBlockTracker by default, stored block metadata and optimized block fetching in WALBackedBlockRDD
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala49
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala156
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala30
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala24
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala55
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala107
9 files changed, 277 insertions, 149 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index ba88416ef4..15d9710d37 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -20,11 +20,11 @@ package org.apache.spark.streaming.dstream
import scala.reflect.ClassTag
import org.apache.spark.rdd.{BlockRDD, RDD}
-import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.storage.BlockId
import org.apache.spark.streaming._
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
-import org.apache.spark.streaming.receiver.{Receiver, WriteAheadLogBasedStoreResult}
-import org.apache.spark.streaming.scheduler.{InputInfo, ReceivedBlockInfo}
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.util.WriteAheadLogUtils
/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
@@ -64,31 +64,30 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
} else {
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
- val blockInfos =
- ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty)
- val blockStoreResults = blockInfos.map { _.blockStoreResult }
- val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
+ val receiverTracker = ssc.scheduler.receiverTracker
+ val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
+ val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
- // Register the input blocks information into InputInfoTracker
- val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
- ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+ // Are WAL record handles present with all the blocks
+ val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
- // Check whether all the results are of the same type
- val resultTypes = blockStoreResults.map { _.getClass }.distinct
- if (resultTypes.size > 1) {
- logWarning("Multiple result types in block information, WAL information will be ignored.")
- }
-
- // If all the results are of type WriteAheadLogBasedStoreResult, then create
- // WriteAheadLogBackedBlockRDD else create simple BlockRDD.
- if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
- val logSegments = blockStoreResults.map {
- _.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle
- }.toArray
- // Since storeInBlockManager = false, the storage level does not matter.
- new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
- blockIds, logSegments, storeInBlockManager = false, StorageLevel.MEMORY_ONLY_SER)
+ if (areWALRecordHandlesPresent) {
+ // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
+ val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
+ val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
+ new WriteAheadLogBackedBlockRDD[T](
+ ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
} else {
+ // Else, create a BlockRDD. However, if there are some blocks with WAL info but not others
+ // then that is unexpected and log a warning accordingly.
+ if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
+ if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
+ logError("Some blocks do not have Write Ahead Log information; " +
+ "this is unexpected and data may not be recoverable after driver failures")
+ } else {
+ logWarning("Some blocks have Write Ahead Log information; this is unexpected")
+ }
+ }
new BlockRDD[T](ssc.sc, blockIds)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index f4c8046e8a..ffce6a4c3c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -23,6 +23,8 @@ import java.util.UUID
import scala.reflect.ClassTag
import scala.util.control.NonFatal
+import org.apache.commons.io.FileUtils
+
import org.apache.spark._
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
@@ -31,30 +33,42 @@ import org.apache.spark.streaming.util._
/**
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
* It contains information about the id of the blocks having this partition's data and
- * the segment of the write ahead log that backs the partition.
+ * the corresponding record handle in the write ahead log that backs the partition.
* @param index index of the partition
* @param blockId id of the block having the partition data
+ * @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
+ * executors). If not, then block lookups by the block ids will be skipped.
+ * By default, this is an empty array signifying true for all the blocks.
* @param walRecordHandle Handle of the record in a write ahead log having the partition data
*/
private[streaming]
class WriteAheadLogBackedBlockRDDPartition(
val index: Int,
val blockId: BlockId,
- val walRecordHandle: WriteAheadLogRecordHandle)
- extends Partition
+ val isBlockIdValid: Boolean,
+ val walRecordHandle: WriteAheadLogRecordHandle
+ ) extends Partition
/**
* This class represents a special case of the BlockRDD where the data blocks in
* the block manager are also backed by data in write ahead logs. For reading
* the data, this RDD first looks up the blocks by their ids in the block manager.
- * If it does not find them, it looks up the corresponding data in the write ahead log.
+ * If it does not find them, it looks up the WAL using the corresponding record handle.
+ * The lookup of the blocks from the block manager can be skipped by setting the corresponding
+ * element in isBlockIdValid to false. This is a performance optimization which does not affect
+ * correctness, and it can be used in situations where it is known that the block
+ * does not exist in the Spark executors (e.g. after a failed driver is restarted).
+ *
*
* @param sc SparkContext
* @param blockIds Ids of the blocks that contains this RDD's data
* @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
- * @param storeInBlockManager Whether to store in the block manager after reading
- * from the WAL record
+ * @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
+ * executors). If not, then block lookups by the block ids will be skipped.
+ * By default, this is an empty array signifying true for all the blocks.
+ * @param storeInBlockManager Whether to store a block in the block manager
+ * after reading it from the WAL
* @param storageLevel storage level to store when storing in block manager
* (applicable when storeInBlockManager = true)
*/
@@ -63,23 +77,32 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
@transient sc: SparkContext,
@transient blockIds: Array[BlockId],
@transient walRecordHandles: Array[WriteAheadLogRecordHandle],
- storeInBlockManager: Boolean,
- storageLevel: StorageLevel)
+ @transient isBlockIdValid: Array[Boolean] = Array.empty,
+ storeInBlockManager: Boolean = false,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER)
extends BlockRDD[T](sc, blockIds) {
require(
blockIds.length == walRecordHandles.length,
- s"Number of block ids (${blockIds.length}) must be " +
- s"the same as number of WAL record handles (${walRecordHandles.length}})!")
+ s"Number of block Ids (${blockIds.length}) must be " +
+ s" same as number of WAL record handles (${walRecordHandles.length}})")
+
+ require(
+ isBlockIdValid.isEmpty || isBlockIdValid.length == blockIds.length,
+ s"Number of elements in isBlockIdValid (${isBlockIdValid.length}) must be " +
+ s" same as number of block Ids (${blockIds.length})")
// Hadoop configuration is not serializable, so broadcast it as a serializable.
@transient private val hadoopConfig = sc.hadoopConfiguration
private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig)
+ override def isValid(): Boolean = true
+
override def getPartitions: Array[Partition] = {
assertValid()
- Array.tabulate(blockIds.size) { i =>
- new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), walRecordHandles(i))
+ Array.tabulate(blockIds.length) { i =>
+ val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
+ new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), isValid, walRecordHandles(i))
}
}
@@ -94,51 +117,57 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
val blockManager = SparkEnv.get.blockManager
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockId = partition.blockId
- blockManager.get(blockId) match {
- case Some(block) => // Data is in Block Manager
- val iterator = block.data.asInstanceOf[Iterator[T]]
- logDebug(s"Read partition data of $this from block manager, block $blockId")
- iterator
- case None => // Data not found in Block Manager, grab it from write ahead log file
- var dataRead: ByteBuffer = null
- var writeAheadLog: WriteAheadLog = null
- try {
- // The WriteAheadLogUtils.createLog*** method needs a directory to create a
- // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
- // writing log data. However, the directory is not needed if data needs to be read, hence
- // a dummy path is provided to satisfy the method parameter requirements.
- // FileBasedWriteAheadLog will not create any file or directory at that path.
- // FileBasedWriteAheadLog will not create any file or directory at that path. Also,
- // this dummy directory should not already exist otherwise the WAL will try to recover
- // past events from the directory and throw errors.
- val nonExistentDirectory = new File(
- System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath
- writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
- SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
- dataRead = writeAheadLog.read(partition.walRecordHandle)
- } catch {
- case NonFatal(e) =>
- throw new SparkException(
- s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
- } finally {
- if (writeAheadLog != null) {
- writeAheadLog.close()
- writeAheadLog = null
- }
- }
- if (dataRead == null) {
+
+ def getBlockFromBlockManager(): Option[Iterator[T]] = {
+ blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
+ }
+
+ def getBlockFromWriteAheadLog(): Iterator[T] = {
+ var dataRead: ByteBuffer = null
+ var writeAheadLog: WriteAheadLog = null
+ try {
+ // The WriteAheadLogUtils.createLog*** method needs a directory to create a
+ // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
+ // writing log data. However, the directory is not needed if data needs to be read, hence
+ // a dummy path is provided to satisfy the method parameter requirements.
+ // FileBasedWriteAheadLog will not create any file or directory at that path.
+ // FileBasedWriteAheadLog will not create any file or directory at that path. Also,
+ // this dummy directory should not already exist otherwise the WAL will try to recover
+ // past events from the directory and throw errors.
+ val nonExistentDirectory = new File(
+ System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath
+ writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
+ SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
+ dataRead = writeAheadLog.read(partition.walRecordHandle)
+ } catch {
+ case NonFatal(e) =>
throw new SparkException(
- s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
- s"read returned null")
+ s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
+ } finally {
+ if (writeAheadLog != null) {
+ writeAheadLog.close()
+ writeAheadLog = null
}
- logInfo(s"Read partition data of $this from write ahead log, record handle " +
- partition.walRecordHandle)
- if (storeInBlockManager) {
- blockManager.putBytes(blockId, dataRead, storageLevel)
- logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
- dataRead.rewind()
- }
- blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
+ }
+ if (dataRead == null) {
+ throw new SparkException(
+ s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
+ s"read returned null")
+ }
+ logInfo(s"Read partition data of $this from write ahead log, record handle " +
+ partition.walRecordHandle)
+ if (storeInBlockManager) {
+ blockManager.putBytes(blockId, dataRead, storageLevel)
+ logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
+ dataRead.rewind()
+ }
+ blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
+ }
+
+ if (partition.isBlockIdValid) {
+ getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() }
+ } else {
+ getBlockFromWriteAheadLog()
}
}
@@ -149,12 +178,23 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
*/
override def getPreferredLocations(split: Partition): Seq[String] = {
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
- val blockLocations = getBlockIdLocations().get(partition.blockId)
+ val blockLocations = if (partition.isBlockIdValid) {
+ getBlockIdLocations().get(partition.blockId)
+ } else {
+ None
+ }
+
blockLocations.getOrElse {
partition.walRecordHandle match {
case fileSegment: FileBasedWriteAheadLogSegment =>
- HdfsUtils.getFileSegmentLocations(
- fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
+ try {
+ HdfsUtils.getFileSegmentLocations(
+ fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
+ } catch {
+ case NonFatal(e) =>
+ logError("Error getting WAL file segment locations", e)
+ Seq.empty
+ }
case _ =>
Seq.empty
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 93f047b910..92938379b9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -146,7 +146,7 @@ private[streaming] class ReceiverSupervisorImpl(
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
- val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
+ val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
index 94beb590f5..dc11e84f29 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
@@ -17,12 +17,38 @@
package org.apache.spark.streaming.scheduler
-import org.apache.spark.streaming.receiver.ReceivedBlockStoreResult
+import org.apache.spark.storage.StreamBlockId
+import org.apache.spark.streaming.receiver.{ReceivedBlockStoreResult, WriteAheadLogBasedStoreResult}
+import org.apache.spark.streaming.util.WriteAheadLogRecordHandle
/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
numRecords: Long,
+ metadataOption: Option[Any],
blockStoreResult: ReceivedBlockStoreResult
- )
+ ) {
+
+ @volatile private var _isBlockIdValid = true
+
+ def blockId: StreamBlockId = blockStoreResult.blockId
+
+ def walRecordHandleOption: Option[WriteAheadLogRecordHandle] = {
+ blockStoreResult match {
+ case walStoreResult: WriteAheadLogBasedStoreResult => Some(walStoreResult.walRecordHandle)
+ case _ => None
+ }
+ }
+
+ /** Is the block ID valid, that is, is the block present in the Spark executors. */
+ def isBlockIdValid(): Boolean = _isBlockIdValid
+
+ /**
+ * Set the block ID as invalid. This is useful when it is known that the block is not present
+ * in the Spark executors.
+ */
+ def setBlockIdInvalid(): Unit = {
+ _isBlockIdValid = false
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 14e769a281..a9f4147a5f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -45,7 +45,7 @@ private[streaming] case class BatchCleanupEvent(times: Seq[Time])
private[streaming]
case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
def getBlocksOfStream(streamId: Int): Seq[ReceivedBlockInfo] = {
- streamIdToAllocatedBlocks.get(streamId).getOrElse(Seq.empty)
+ streamIdToAllocatedBlocks.getOrElse(streamId, Seq.empty)
}
}
@@ -63,6 +63,7 @@ private[streaming] class ReceivedBlockTracker(
hadoopConf: Configuration,
streamIds: Seq[Int],
clock: Clock,
+ recoverFromWriteAheadLog: Boolean,
checkpointDirOption: Option[String])
extends Logging {
@@ -75,7 +76,9 @@ private[streaming] class ReceivedBlockTracker(
private var lastAllocatedBatchTime: Time = null
// Recover block information from write ahead logs
- recoverFromWriteAheadLogs()
+ if (recoverFromWriteAheadLog) {
+ recoverPastEvents()
+ }
/** Add received block. This event will get written to the write ahead log (if enabled). */
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
@@ -167,10 +170,11 @@ private[streaming] class ReceivedBlockTracker(
* Recover all the tracker actions from the write ahead logs to recover the state (unallocated
* and allocated block info) prior to failure.
*/
- private def recoverFromWriteAheadLogs(): Unit = synchronized {
+ private def recoverPastEvents(): Unit = synchronized {
// Insert the recovered block information
def insertAddedBlock(receivedBlockInfo: ReceivedBlockInfo) {
logTrace(s"Recovery: Inserting added block $receivedBlockInfo")
+ receivedBlockInfo.setBlockIdInvalid()
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}
@@ -224,19 +228,9 @@ private[streaming] class ReceivedBlockTracker(
/** Optionally create the write ahead log manager only if the feature is enabled */
private def createWriteAheadLog(): Option[WriteAheadLog] = {
- if (WriteAheadLogUtils.enableReceiverLog(conf)) {
- if (checkpointDirOption.isEmpty) {
- throw new SparkException(
- "Cannot enable receiver write-ahead log without checkpoint directory set. " +
- "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
- "See documentation for more details.")
- }
+ checkpointDirOption.map { checkpointDir =>
val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
-
- val log = WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
- Some(log)
- } else {
- None
+ WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 1af65716d3..3c341390ed 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -62,6 +62,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
ssc.sparkContext.hadoopConfiguration,
receiverInputStreamIds,
ssc.scheduler.clock,
+ ssc.isCheckpointPresent,
Option(ssc.checkpointDir)
)
private val listenerBus = ssc.scheduler.listenerBus
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index 858ba3c9eb..f60688f173 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -27,7 +27,7 @@ private[streaming] object HdfsUtils {
// If the file exists and we have append support, append instead of creating a new file
val stream: FSDataOutputStream = {
if (dfs.isFile(dfsPath)) {
- if (conf.getBoolean("hdfs.append.support", false)) {
+ if (conf.getBoolean("hdfs.append.support", false) || dfs.isInstanceOf[RawLocalFileSystem]) {
dfs.append(dfsPath)
} else {
throw new IllegalStateException("File exists and there is no append support!")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 8317fb9720..b1af8d5eaa 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -67,15 +67,20 @@ class ReceivedBlockTrackerSuite
// Verify added blocks are unallocated blocks
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
+ receivedBlockTracker.hasUnallocatedReceivedBlocks should be (true)
+
// Allocate the blocks to a batch and verify that all of them have been allocated
receivedBlockTracker.allocateBlocksToBatch(1)
receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos
+ receivedBlockTracker.getBlocksOfBatch(1) shouldEqual Map(streamId -> blockInfos)
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldBe empty
+ receivedBlockTracker.hasUnallocatedReceivedBlocks should be (false)
// Allocate no blocks to another batch
receivedBlockTracker.allocateBlocksToBatch(2)
receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty
+ receivedBlockTracker.getBlocksOfBatch(2) shouldEqual Map(streamId -> Seq.empty)
// Verify that older batches have no operation on batch allocation,
// will return the same blocks as previously allocated.
@@ -88,7 +93,7 @@ class ReceivedBlockTrackerSuite
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
}
- test("block addition, block to batch allocation and clean up with write ahead log") {
+ test("recovery and cleanup with write ahead logs") {
val manualClock = new ManualClock
// Set the time increment level to twice the rotation interval so that every increment creates
// a new log file
@@ -114,9 +119,7 @@ class ReceivedBlockTrackerSuite
}
// Set WAL configuration
- conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
- require(WriteAheadLogUtils.enableReceiverLog(conf))
require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1)
// Start tracker and add blocks
@@ -131,15 +134,27 @@ class ReceivedBlockTrackerSuite
getWrittenLogData() shouldEqual expectedWrittenData1
getWriteAheadLogFiles() should have size 1
- // Restart tracker and verify recovered list of unallocated blocks
incrementTime()
- val tracker2 = createTracker(clock = manualClock)
- tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
+
+ // Recovery without recovery from WAL and verify list of unallocated blocks is empty
+ val tracker1_ = createTracker(clock = manualClock, recoverFromWriteAheadLog = false)
+ tracker1_.getUnallocatedBlocks(streamId) shouldBe empty
+ tracker1_.hasUnallocatedReceivedBlocks should be (false)
+
+ // Restart tracker and verify recovered list of unallocated blocks
+ val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
+ val unallocatedBlocks = tracker2.getUnallocatedBlocks(streamId).toList
+ unallocatedBlocks shouldEqual blockInfos1
+ unallocatedBlocks.foreach { block =>
+ block.isBlockIdValid() should be (false)
+ }
+
// Allocate blocks to batch and verify whether the unallocated blocks got allocated
val batchTime1 = manualClock.getTimeMillis()
tracker2.allocateBlocksToBatch(batchTime1)
tracker2.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
+ tracker2.getBlocksOfBatch(batchTime1) shouldEqual Map(streamId -> blockInfos1)
// Add more blocks and allocate to another batch
incrementTime()
@@ -157,7 +172,7 @@ class ReceivedBlockTrackerSuite
// Restart tracker and verify recovered state
incrementTime()
- val tracker3 = createTracker(clock = manualClock)
+ val tracker3 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
tracker3.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
tracker3.getUnallocatedBlocks(streamId) shouldBe empty
@@ -180,28 +195,16 @@ class ReceivedBlockTrackerSuite
// Restart tracker and verify recovered state, specifically whether info about the first
// batch has been removed, but not the second batch
incrementTime()
- val tracker4 = createTracker(clock = manualClock)
+ val tracker4 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
tracker4.getUnallocatedBlocks(streamId) shouldBe empty
tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned
tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
}
- test("enabling write ahead log but not setting checkpoint dir") {
- conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
- intercept[SparkException] {
- createTracker(setCheckpointDir = false)
- }
- }
-
- test("setting checkpoint dir but not enabling write ahead log") {
- // When WAL config is not set, log manager should not be enabled
- val tracker1 = createTracker(setCheckpointDir = true)
+ test("disable write ahead log when checkpoint directory is not set") {
+ // When checkpoint is disabled, then the write ahead log is disabled
+ val tracker1 = createTracker(setCheckpointDir = false)
tracker1.isWriteAheadLogEnabled should be (false)
-
- // When WAL is explicitly disabled, log manager should not be enabled
- conf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
- val tracker2 = createTracker(setCheckpointDir = true)
- tracker2.isWriteAheadLogEnabled should be(false)
}
/**
@@ -210,16 +213,18 @@ class ReceivedBlockTrackerSuite
*/
def createTracker(
setCheckpointDir: Boolean = true,
+ recoverFromWriteAheadLog: Boolean = false,
clock: Clock = new SystemClock): ReceivedBlockTracker = {
val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None
- val tracker = new ReceivedBlockTracker(conf, hadoopConf, Seq(streamId), clock, cpDirOption)
+ val tracker = new ReceivedBlockTracker(
+ conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, cpDirOption)
allReceivedBlockTrackers += tracker
tracker
}
/** Generate blocks infos using random ids */
def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
- List.fill(5)(ReceivedBlockInfo(streamId, 0,
+ List.fill(5)(ReceivedBlockInfo(streamId, 0, None,
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)))))
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index 8b300d8dd3..6859b65c71 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
import org.apache.spark.streaming.util.{FileBasedWriteAheadLogSegment, FileBasedWriteAheadLogWriter}
import org.apache.spark.util.Utils
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
class WriteAheadLogBackedBlockRDDSuite
extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
@@ -60,24 +60,35 @@ class WriteAheadLogBackedBlockRDDSuite
System.clearProperty("spark.driver.port")
}
- test("Read data available in block manager and write ahead log") {
- testRDD(5, 5)
+ test("Read data available in both block manager and write ahead log") {
+ testRDD(numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 5)
}
test("Read data available only in block manager, not in write ahead log") {
- testRDD(5, 0)
+ testRDD(numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 0)
}
test("Read data available only in write ahead log, not in block manager") {
- testRDD(0, 5)
+ testRDD(numPartitions = 5, numPartitionsInBM = 0, numPartitionsInWAL = 5)
}
- test("Read data available only in write ahead log, and test storing in block manager") {
- testRDD(0, 5, testStoreInBM = true)
+ test("Read data with partially available in block manager, and rest in write ahead log") {
+ testRDD(numPartitions = 5, numPartitionsInBM = 3, numPartitionsInWAL = 2)
}
- test("Read data with partially available in block manager, and rest in write ahead log") {
- testRDD(3, 2)
+ test("Test isBlockValid skips block fetching from BlockManager") {
+ testRDD(
+ numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 0, testIsBlockValid = true)
+ }
+
+ test("Test whether RDD is valid after removing blocks from block manager") {
+ testRDD(
+ numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 5, testBlockRemove = true)
+ }
+
+ test("Test storing of blocks recovered from write ahead log back into block manager") {
+ testRDD(
+ numPartitions = 5, numPartitionsInBM = 0, numPartitionsInWAL = 5, testStoreInBM = true)
}
/**
@@ -85,23 +96,52 @@ class WriteAheadLogBackedBlockRDDSuite
* and the rest to a write ahead log, and then reading reading it all back using the RDD.
* It can also test if the partitions that were read from the log were again stored in
* block manager.
- * @param numPartitionsInBM Number of partitions to write to the Block Manager
- * @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log
- * @param testStoreInBM Test whether blocks read from log are stored back into block manager
+ *
+ *
+ *
+ * @param numPartitions Number of partitions in RDD
+ * @param numPartitionsInBM Number of partitions to write to the BlockManager.
+ * Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager
+ * @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log.
+ * Partitions (numPartitions - 1 - numPartitionsInWAL) to
+ * (numPartitions - 1) will be written to WAL
+ * @param testIsBlockValid Test whether setting isBlockValid to false skips block fetching
+ * @param testBlockRemove Test whether calling rdd.removeBlock() makes the RDD still usable with
+ * reads falling back to the WAL
+ * @param testStoreInBM Test whether blocks read from log are stored back into block manager
+ *
+ * Example with numPartitions = 5, numPartitionsInBM = 3, and numPartitionsInWAL = 4
+ *
+ * numPartitionsInBM = 3
+ * |------------------|
+ * | |
+ * 0 1 2 3 4
+ * | |
+ * |-------------------------|
+ * numPartitionsInWAL = 4
*/
private def testRDD(
- numPartitionsInBM: Int, numPartitionsInWAL: Int, testStoreInBM: Boolean = false) {
- val numBlocks = numPartitionsInBM + numPartitionsInWAL
- val data = Seq.fill(numBlocks, 10)(scala.util.Random.nextString(50))
+ numPartitions: Int,
+ numPartitionsInBM: Int,
+ numPartitionsInWAL: Int,
+ testIsBlockValid: Boolean = false,
+ testBlockRemove: Boolean = false,
+ testStoreInBM: Boolean = false
+ ) {
+ require(numPartitionsInBM <= numPartitions,
+ "Can't put more partitions in BlockManager than that in RDD")
+ require(numPartitionsInWAL <= numPartitions,
+ "Can't put more partitions in write ahead log than that in RDD")
+ val data = Seq.fill(numPartitions, 10)(scala.util.Random.nextString(50))
// Put the necessary blocks in the block manager
- val blockIds = Array.fill(numBlocks)(StreamBlockId(Random.nextInt(), Random.nextInt()))
+ val blockIds = Array.fill(numPartitions)(StreamBlockId(Random.nextInt(), Random.nextInt()))
data.zip(blockIds).take(numPartitionsInBM).foreach { case(block, blockId) =>
blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER)
}
- // Generate write ahead log file segments
- val recordHandles = generateFakeRecordHandles(numPartitionsInBM) ++
+ // Generate write ahead log record handles
+ val recordHandles = generateFakeRecordHandles(numPartitions - numPartitionsInWAL) ++
generateWALRecordHandles(data.takeRight(numPartitionsInWAL),
blockIds.takeRight(numPartitionsInWAL))
@@ -111,7 +151,7 @@ class WriteAheadLogBackedBlockRDDSuite
"Expected blocks not in BlockManager"
)
require(
- blockIds.takeRight(numPartitionsInWAL).forall(blockManager.get(_).isEmpty),
+ blockIds.takeRight(numPartitions - numPartitionsInBM).forall(blockManager.get(_).isEmpty),
"Unexpected blocks in BlockManager"
)
@@ -122,19 +162,42 @@ class WriteAheadLogBackedBlockRDDSuite
"Expected blocks not in write ahead log"
)
require(
- recordHandles.take(numPartitionsInBM).forall(s =>
+ recordHandles.take(numPartitions - numPartitionsInWAL).forall(s =>
!new File(s.path.stripPrefix("file://")).exists()),
"Unexpected blocks in write ahead log"
)
// Create the RDD and verify whether the returned data is correct
val rdd = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
- recordHandles.toArray, storeInBlockManager = false, StorageLevel.MEMORY_ONLY)
+ recordHandles.toArray, storeInBlockManager = false)
assert(rdd.collect() === data.flatten)
+ // Verify that the block fetching is skipped when isBlockValid is set to false.
+ // This is done by using a RDD whose data is only in memory but is set to skip block fetching
+ // Using that RDD will throw exception, as it skips block fetching even if the blocks are in
+ // in BlockManager.
+ if (testIsBlockValid) {
+ require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager")
+ require(numPartitionsInWAL === 0, "No partitions must be in WAL")
+ val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
+ recordHandles.toArray, isBlockIdValid = Array.fill(blockIds.length)(false))
+ intercept[SparkException] {
+ rdd2.collect()
+ }
+ }
+
+ // Verify that the RDD is not invalid after the blocks are removed and can still read data
+ // from write ahead log
+ if (testBlockRemove) {
+ require(numPartitions === numPartitionsInWAL, "All partitions must be in WAL for this test")
+ require(numPartitionsInBM > 0, "Some partitions must be in BlockManager for this test")
+ rdd.removeBlocks()
+ assert(rdd.collect() === data.flatten)
+ }
+
if (testStoreInBM) {
val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
- recordHandles.toArray, storeInBlockManager = true, StorageLevel.MEMORY_ONLY)
+ recordHandles.toArray, storeInBlockManager = true, storageLevel = StorageLevel.MEMORY_ONLY)
assert(rdd2.collect() === data.flatten)
assert(
blockIds.forall(blockManager.get(_).nonEmpty),