diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-11-19 13:06:48 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-11-19 13:16:10 -0800 |
commit | ce5ea0fd611ce560f6e1fac83562469bdb97091e (patch) | |
tree | 738ba7cdde7224c8a2e246e8bd448a3accd0e5a1 /streaming | |
parent | 2fb40e1aa758a0c305198befb1884b81ac22ae79 (diff) | |
download | spark-ce5ea0fd611ce560f6e1fac83562469bdb97091e.tar.gz spark-ce5ea0fd611ce560f6e1fac83562469bdb97091e.tar.bz2 spark-ce5ea0fd611ce560f6e1fac83562469bdb97091e.zip |
[SPARK-4482][Streaming] Disable ReceivedBlockTracker's write ahead log by default
The write ahead log of ReceivedBlockTracker gets enabled as soon as checkpoint directory is set. This should not happen, as the WAL should be enabled only if the WAL is enabled in the Spark configuration.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #3358 from tdas/SPARK-4482 and squashes the following commits:
b740136 [Tathagata Das] Fixed bug in ReceivedBlockTracker
(cherry picked from commit 22fc4e751c0a2f0ff39e42aa0a8fb9459d7412ec)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala | 37 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala | 50 |
2 files changed, 61 insertions, 26 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 5f5e190990..02758e0bca 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -70,18 +70,7 @@ private[streaming] class ReceivedBlockTracker( private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue] private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks] - - private val logManagerRollingIntervalSecs = conf.getInt( - "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60) - private val logManagerOption = checkpointDirOption.map { checkpointDir => - new WriteAheadLogManager( - ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir), - hadoopConf, - rollingIntervalSecs = logManagerRollingIntervalSecs, - callerName = "ReceivedBlockHandlerMaster", - clock = clock - ) - } + private val logManagerOption = createLogManager() private var lastAllocatedBatchTime: Time = null @@ -221,6 +210,30 @@ private[streaming] class ReceivedBlockTracker( private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = { streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue) } + + /** Optionally create the write ahead log manager only if the feature is enabled */ + private def createLogManager(): Option[WriteAheadLogManager] = { + if (conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) { + if (checkpointDirOption.isEmpty) { + throw new SparkException( + "Cannot enable receiver write-ahead log without checkpoint directory set. " + + "Please use streamingContext.checkpoint() to set the checkpoint directory. " + + "See documentation for more details.") + } + val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get) + val rollingIntervalSecs = conf.getInt( + "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60) + val logManager = new WriteAheadLogManager(logDir, hadoopConf, + rollingIntervalSecs = rollingIntervalSecs, clock = clock, + callerName = "ReceivedBlockHandlerMaster") + Some(logManager) + } else { + None + } + } + + /** Check if the log manager is enabled. This is only used for testing purposes. */ + private[streaming] def isLogManagerEnabled: Boolean = logManagerOption.nonEmpty } private[streaming] object ReceivedBlockTracker { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index fd9c97f551..01a09b67b9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -41,17 +41,16 @@ import org.apache.spark.util.Utils class ReceivedBlockTrackerSuite extends FunSuite with BeforeAndAfter with Matchers with Logging { - val conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite") - conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1") - val hadoopConf = new Configuration() val akkaTimeout = 10 seconds val streamId = 1 var allReceivedBlockTrackers = new ArrayBuffer[ReceivedBlockTracker]() var checkpointDirectory: File = null + var conf: SparkConf = null before { + conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite") checkpointDirectory = Files.createTempDir() } @@ -64,7 +63,8 @@ class ReceivedBlockTrackerSuite } test("block addition, and block to batch allocation") { - val receivedBlockTracker = createTracker(enableCheckpoint = false) + val receivedBlockTracker = createTracker(setCheckpointDir = false) + receivedBlockTracker.isLogManagerEnabled should be (false) // should be disable by default receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty val blockInfos = generateBlockInfos() @@ -95,13 +95,11 @@ class ReceivedBlockTrackerSuite test("block addition, block to batch allocation and cleanup with write ahead log") { val manualClock = new ManualClock - conf.getInt( - "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", -1) should be (1) - // Set the time increment level to twice the rotation interval so that every increment creates // a new log file - val timeIncrementMillis = 2000L + def incrementTime() { + val timeIncrementMillis = 2000L manualClock.addToTime(timeIncrementMillis) } @@ -121,7 +119,11 @@ class ReceivedBlockTrackerSuite } // Start tracker and add blocks - val tracker1 = createTracker(enableCheckpoint = true, clock = manualClock) + conf.set("spark.streaming.receiver.writeAheadLog.enable", "true") + conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1") + val tracker1 = createTracker(clock = manualClock) + tracker1.isLogManagerEnabled should be (true) + val blockInfos1 = addBlockInfos(tracker1) tracker1.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1 @@ -132,7 +134,7 @@ class ReceivedBlockTrackerSuite // Restart tracker and verify recovered list of unallocated blocks incrementTime() - val tracker2 = createTracker(enableCheckpoint = true, clock = manualClock) + val tracker2 = createTracker(clock = manualClock) tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1 // Allocate blocks to batch and verify whether the unallocated blocks got allocated @@ -156,7 +158,7 @@ class ReceivedBlockTrackerSuite // Restart tracker and verify recovered state incrementTime() - val tracker3 = createTracker(enableCheckpoint = true, clock = manualClock) + val tracker3 = createTracker(clock = manualClock) tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1 tracker3.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 tracker3.getUnallocatedBlocks(streamId) shouldBe empty @@ -179,18 +181,38 @@ class ReceivedBlockTrackerSuite // Restart tracker and verify recovered state, specifically whether info about the first // batch has been removed, but not the second batch incrementTime() - val tracker4 = createTracker(enableCheckpoint = true, clock = manualClock) + val tracker4 = createTracker(clock = manualClock) tracker4.getUnallocatedBlocks(streamId) shouldBe empty tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 } + + test("enabling write ahead log but not setting checkpoint dir") { + conf.set("spark.streaming.receiver.writeAheadLog.enable", "true") + intercept[SparkException] { + createTracker(setCheckpointDir = false) + } + } + + test("setting checkpoint dir but not enabling write ahead log") { + // When WAL config is not set, log manager should not be enabled + val tracker1 = createTracker(setCheckpointDir = true) + tracker1.isLogManagerEnabled should be (false) + + // When WAL is explicitly disabled, log manager should not be enabled + conf.set("spark.streaming.receiver.writeAheadLog.enable", "false") + val tracker2 = createTracker(setCheckpointDir = true) + tracker2.isLogManagerEnabled should be(false) + } /** * Create tracker object with the optional provided clock. Use fake clock if you * want to control time by manually incrementing it to test log cleanup. */ - def createTracker(enableCheckpoint: Boolean, clock: Clock = new SystemClock): ReceivedBlockTracker = { - val cpDirOption = if (enableCheckpoint) Some(checkpointDirectory.toString) else None + def createTracker( + setCheckpointDir: Boolean = true, + clock: Clock = new SystemClock): ReceivedBlockTracker = { + val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None val tracker = new ReceivedBlockTracker(conf, hadoopConf, Seq(streamId), clock, cpDirOption) allReceivedBlockTrackers += tracker tracker |