aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-11-19 13:06:48 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-11-19 13:06:48 -0800
commit22fc4e751c0a2f0ff39e42aa0a8fb9459d7412ec (patch)
tree01574531180adde9e31e9a110aacc395a5ee7025 /streaming/src
parenteacc788346ccae232bd530dd880f801475a49734 (diff)
downloadspark-22fc4e751c0a2f0ff39e42aa0a8fb9459d7412ec.tar.gz
spark-22fc4e751c0a2f0ff39e42aa0a8fb9459d7412ec.tar.bz2
spark-22fc4e751c0a2f0ff39e42aa0a8fb9459d7412ec.zip
[SPARK-4482][Streaming] Disable ReceivedBlockTracker's write ahead log by default
The write ahead log of ReceivedBlockTracker gets enabled as soon as checkpoint directory is set. This should not happen, as the WAL should be enabled only if the WAL is enabled in the Spark configuration. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #3358 from tdas/SPARK-4482 and squashes the following commits: b740136 [Tathagata Das] Fixed bug in ReceivedBlockTracker
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala37
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala50
2 files changed, 61 insertions, 26 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 5f5e190990..02758e0bca 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -70,18 +70,7 @@ private[streaming] class ReceivedBlockTracker(
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
-
- private val logManagerRollingIntervalSecs = conf.getInt(
- "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
- private val logManagerOption = checkpointDirOption.map { checkpointDir =>
- new WriteAheadLogManager(
- ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
- hadoopConf,
- rollingIntervalSecs = logManagerRollingIntervalSecs,
- callerName = "ReceivedBlockHandlerMaster",
- clock = clock
- )
- }
+ private val logManagerOption = createLogManager()
private var lastAllocatedBatchTime: Time = null
@@ -221,6 +210,30 @@ private[streaming] class ReceivedBlockTracker(
private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
}
+
+ /** Optionally create the write ahead log manager only if the feature is enabled */
+ private def createLogManager(): Option[WriteAheadLogManager] = {
+ if (conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
+ if (checkpointDirOption.isEmpty) {
+ throw new SparkException(
+ "Cannot enable receiver write-ahead log without checkpoint directory set. " +
+ "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
+ "See documentation for more details.")
+ }
+ val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
+ val rollingIntervalSecs = conf.getInt(
+ "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
+ val logManager = new WriteAheadLogManager(logDir, hadoopConf,
+ rollingIntervalSecs = rollingIntervalSecs, clock = clock,
+ callerName = "ReceivedBlockHandlerMaster")
+ Some(logManager)
+ } else {
+ None
+ }
+ }
+
+ /** Check if the log manager is enabled. This is only used for testing purposes. */
+ private[streaming] def isLogManagerEnabled: Boolean = logManagerOption.nonEmpty
}
private[streaming] object ReceivedBlockTracker {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index fd9c97f551..01a09b67b9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -41,17 +41,16 @@ import org.apache.spark.util.Utils
class ReceivedBlockTrackerSuite
extends FunSuite with BeforeAndAfter with Matchers with Logging {
- val conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
- conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1")
-
val hadoopConf = new Configuration()
val akkaTimeout = 10 seconds
val streamId = 1
var allReceivedBlockTrackers = new ArrayBuffer[ReceivedBlockTracker]()
var checkpointDirectory: File = null
+ var conf: SparkConf = null
before {
+ conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
checkpointDirectory = Files.createTempDir()
}
@@ -64,7 +63,8 @@ class ReceivedBlockTrackerSuite
}
test("block addition, and block to batch allocation") {
- val receivedBlockTracker = createTracker(enableCheckpoint = false)
+ val receivedBlockTracker = createTracker(setCheckpointDir = false)
+ receivedBlockTracker.isLogManagerEnabled should be (false) // should be disable by default
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
val blockInfos = generateBlockInfos()
@@ -95,13 +95,11 @@ class ReceivedBlockTrackerSuite
test("block addition, block to batch allocation and cleanup with write ahead log") {
val manualClock = new ManualClock
- conf.getInt(
- "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", -1) should be (1)
-
// Set the time increment level to twice the rotation interval so that every increment creates
// a new log file
- val timeIncrementMillis = 2000L
+
def incrementTime() {
+ val timeIncrementMillis = 2000L
manualClock.addToTime(timeIncrementMillis)
}
@@ -121,7 +119,11 @@ class ReceivedBlockTrackerSuite
}
// Start tracker and add blocks
- val tracker1 = createTracker(enableCheckpoint = true, clock = manualClock)
+ conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
+ conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1")
+ val tracker1 = createTracker(clock = manualClock)
+ tracker1.isLogManagerEnabled should be (true)
+
val blockInfos1 = addBlockInfos(tracker1)
tracker1.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
@@ -132,7 +134,7 @@ class ReceivedBlockTrackerSuite
// Restart tracker and verify recovered list of unallocated blocks
incrementTime()
- val tracker2 = createTracker(enableCheckpoint = true, clock = manualClock)
+ val tracker2 = createTracker(clock = manualClock)
tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
// Allocate blocks to batch and verify whether the unallocated blocks got allocated
@@ -156,7 +158,7 @@ class ReceivedBlockTrackerSuite
// Restart tracker and verify recovered state
incrementTime()
- val tracker3 = createTracker(enableCheckpoint = true, clock = manualClock)
+ val tracker3 = createTracker(clock = manualClock)
tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
tracker3.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
tracker3.getUnallocatedBlocks(streamId) shouldBe empty
@@ -179,18 +181,38 @@ class ReceivedBlockTrackerSuite
// Restart tracker and verify recovered state, specifically whether info about the first
// batch has been removed, but not the second batch
incrementTime()
- val tracker4 = createTracker(enableCheckpoint = true, clock = manualClock)
+ val tracker4 = createTracker(clock = manualClock)
tracker4.getUnallocatedBlocks(streamId) shouldBe empty
tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned
tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
}
+
+ test("enabling write ahead log but not setting checkpoint dir") {
+ conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
+ intercept[SparkException] {
+ createTracker(setCheckpointDir = false)
+ }
+ }
+
+ test("setting checkpoint dir but not enabling write ahead log") {
+ // When WAL config is not set, log manager should not be enabled
+ val tracker1 = createTracker(setCheckpointDir = true)
+ tracker1.isLogManagerEnabled should be (false)
+
+ // When WAL is explicitly disabled, log manager should not be enabled
+ conf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
+ val tracker2 = createTracker(setCheckpointDir = true)
+ tracker2.isLogManagerEnabled should be(false)
+ }
/**
* Create tracker object with the optional provided clock. Use fake clock if you
* want to control time by manually incrementing it to test log cleanup.
*/
- def createTracker(enableCheckpoint: Boolean, clock: Clock = new SystemClock): ReceivedBlockTracker = {
- val cpDirOption = if (enableCheckpoint) Some(checkpointDirectory.toString) else None
+ def createTracker(
+ setCheckpointDir: Boolean = true,
+ clock: Clock = new SystemClock): ReceivedBlockTracker = {
+ val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None
val tracker = new ReceivedBlockTracker(conf, hadoopConf, Seq(streamId), clock, cpDirOption)
allReceivedBlockTrackers += tracker
tracker