aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala37
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala50
2 files changed, 61 insertions, 26 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 5f5e190990..02758e0bca 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -70,18 +70,7 @@ private[streaming] class ReceivedBlockTracker(
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
-
- private val logManagerRollingIntervalSecs = conf.getInt(
- "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
- private val logManagerOption = checkpointDirOption.map { checkpointDir =>
- new WriteAheadLogManager(
- ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
- hadoopConf,
- rollingIntervalSecs = logManagerRollingIntervalSecs,
- callerName = "ReceivedBlockHandlerMaster",
- clock = clock
- )
- }
+ private val logManagerOption = createLogManager()
private var lastAllocatedBatchTime: Time = null
@@ -221,6 +210,30 @@ private[streaming] class ReceivedBlockTracker(
private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
}
+
+ /** Optionally create the write ahead log manager only if the feature is enabled */
+ private def createLogManager(): Option[WriteAheadLogManager] = {
+ if (conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
+ if (checkpointDirOption.isEmpty) {
+ throw new SparkException(
+ "Cannot enable receiver write-ahead log without checkpoint directory set. " +
+ "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
+ "See documentation for more details.")
+ }
+ val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
+ val rollingIntervalSecs = conf.getInt(
+ "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
+ val logManager = new WriteAheadLogManager(logDir, hadoopConf,
+ rollingIntervalSecs = rollingIntervalSecs, clock = clock,
+ callerName = "ReceivedBlockHandlerMaster")
+ Some(logManager)
+ } else {
+ None
+ }
+ }
+
+ /** Check if the log manager is enabled. This is only used for testing purposes. */
+ private[streaming] def isLogManagerEnabled: Boolean = logManagerOption.nonEmpty
}
private[streaming] object ReceivedBlockTracker {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index fd9c97f551..01a09b67b9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -41,17 +41,16 @@ import org.apache.spark.util.Utils
class ReceivedBlockTrackerSuite
extends FunSuite with BeforeAndAfter with Matchers with Logging {
- val conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
- conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1")
-
val hadoopConf = new Configuration()
val akkaTimeout = 10 seconds
val streamId = 1
var allReceivedBlockTrackers = new ArrayBuffer[ReceivedBlockTracker]()
var checkpointDirectory: File = null
+ var conf: SparkConf = null
before {
+ conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
checkpointDirectory = Files.createTempDir()
}
@@ -64,7 +63,8 @@ class ReceivedBlockTrackerSuite
}
test("block addition, and block to batch allocation") {
- val receivedBlockTracker = createTracker(enableCheckpoint = false)
+ val receivedBlockTracker = createTracker(setCheckpointDir = false)
+ receivedBlockTracker.isLogManagerEnabled should be (false) // should be disable by default
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
val blockInfos = generateBlockInfos()
@@ -95,13 +95,11 @@ class ReceivedBlockTrackerSuite
test("block addition, block to batch allocation and cleanup with write ahead log") {
val manualClock = new ManualClock
- conf.getInt(
- "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", -1) should be (1)
-
// Set the time increment level to twice the rotation interval so that every increment creates
// a new log file
- val timeIncrementMillis = 2000L
+
def incrementTime() {
+ val timeIncrementMillis = 2000L
manualClock.addToTime(timeIncrementMillis)
}
@@ -121,7 +119,11 @@ class ReceivedBlockTrackerSuite
}
// Start tracker and add blocks
- val tracker1 = createTracker(enableCheckpoint = true, clock = manualClock)
+ conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
+ conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1")
+ val tracker1 = createTracker(clock = manualClock)
+ tracker1.isLogManagerEnabled should be (true)
+
val blockInfos1 = addBlockInfos(tracker1)
tracker1.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
@@ -132,7 +134,7 @@ class ReceivedBlockTrackerSuite
// Restart tracker and verify recovered list of unallocated blocks
incrementTime()
- val tracker2 = createTracker(enableCheckpoint = true, clock = manualClock)
+ val tracker2 = createTracker(clock = manualClock)
tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
// Allocate blocks to batch and verify whether the unallocated blocks got allocated
@@ -156,7 +158,7 @@ class ReceivedBlockTrackerSuite
// Restart tracker and verify recovered state
incrementTime()
- val tracker3 = createTracker(enableCheckpoint = true, clock = manualClock)
+ val tracker3 = createTracker(clock = manualClock)
tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
tracker3.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
tracker3.getUnallocatedBlocks(streamId) shouldBe empty
@@ -179,18 +181,38 @@ class ReceivedBlockTrackerSuite
// Restart tracker and verify recovered state, specifically whether info about the first
// batch has been removed, but not the second batch
incrementTime()
- val tracker4 = createTracker(enableCheckpoint = true, clock = manualClock)
+ val tracker4 = createTracker(clock = manualClock)
tracker4.getUnallocatedBlocks(streamId) shouldBe empty
tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned
tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
}
+
+ test("enabling write ahead log but not setting checkpoint dir") {
+ conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
+ intercept[SparkException] {
+ createTracker(setCheckpointDir = false)
+ }
+ }
+
+ test("setting checkpoint dir but not enabling write ahead log") {
+ // When WAL config is not set, log manager should not be enabled
+ val tracker1 = createTracker(setCheckpointDir = true)
+ tracker1.isLogManagerEnabled should be (false)
+
+ // When WAL is explicitly disabled, log manager should not be enabled
+ conf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
+ val tracker2 = createTracker(setCheckpointDir = true)
+ tracker2.isLogManagerEnabled should be(false)
+ }
/**
* Create tracker object with the optional provided clock. Use fake clock if you
* want to control time by manually incrementing it to test log cleanup.
*/
- def createTracker(enableCheckpoint: Boolean, clock: Clock = new SystemClock): ReceivedBlockTracker = {
- val cpDirOption = if (enableCheckpoint) Some(checkpointDirectory.toString) else None
+ def createTracker(
+ setCheckpointDir: Boolean = true,
+ clock: Clock = new SystemClock): ReceivedBlockTracker = {
+ val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None
val tracker = new ReceivedBlockTracker(conf, hadoopConf, Seq(streamId), clock, cpDirOption)
allReceivedBlockTrackers += tracker
tracker