aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-03-19 02:15:50 -0400
committerTathagata Das <tathagata.das1565@gmail.com>2015-03-19 02:15:50 -0400
commit645cf3fcc21987417b2946bdeeeb60af3edf667e (patch)
tree4dae62fb42cdf97cf097a0efcf3212cd43f88c72 /streaming
parent540b2a4eabe0bad2455f5140c4ad6a315e37cc3f (diff)
downloadspark-645cf3fcc21987417b2946bdeeeb60af3edf667e.tar.gz
spark-645cf3fcc21987417b2946bdeeeb60af3edf667e.tar.bz2
spark-645cf3fcc21987417b2946bdeeeb60af3edf667e.zip
[SPARK-6222][Streaming] Dont delete checkpoint data when doing pre-batch-start checkpoint
This is another alternative approach to https://github.com/apache/spark/pull/4964/ I think this is a simpler fix that can be backported easily to other branches (1.2 and 1.3). All it does it introduce a flag so that the pre-batch-start checkpoint does not call clear checkpoint. There is not unit test yet. I will add it when this approach is commented upon. Not sure if this is testable easily. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #5008 from tdas/SPARK-6222 and squashes the following commits: 7315bc2 [Tathagata Das] Removed empty line. c438de4 [Tathagata Das] Revert unnecessary change. 5e98374 [Tathagata Das] Added unit test 50cb60b [Tathagata Das] Fixed style issue 295ca5c [Tathagata Das] Fixing SPARK-6222
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala20
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala133
3 files changed, 153 insertions, 12 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index cb4c94fb9d..db64e11e16 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -119,7 +119,10 @@ class CheckpointWriter(
private var stopped = false
private var fs_ : FileSystem = _
- class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
+ class CheckpointWriteHandler(
+ checkpointTime: Time,
+ bytes: Array[Byte],
+ clearCheckpointDataLater: Boolean) extends Runnable {
def run() {
var attempts = 0
val startTime = System.currentTimeMillis()
@@ -166,7 +169,7 @@ class CheckpointWriter(
val finishTime = System.currentTimeMillis()
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile +
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")
- jobGenerator.onCheckpointCompletion(checkpointTime)
+ jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater)
return
} catch {
case ioe: IOException =>
@@ -180,7 +183,7 @@ class CheckpointWriter(
}
}
- def write(checkpoint: Checkpoint) {
+ def write(checkpoint: Checkpoint, clearCheckpointDataLater: Boolean) {
val bos = new ByteArrayOutputStream()
val zos = compressionCodec.compressedOutputStream(bos)
val oos = new ObjectOutputStream(zos)
@@ -188,7 +191,8 @@ class CheckpointWriter(
oos.close()
bos.close()
try {
- executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
+ executor.execute(new CheckpointWriteHandler(
+ checkpoint.checkpointTime, bos.toByteArray, clearCheckpointDataLater))
logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
} catch {
case rej: RejectedExecutionException =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index ac92774a38..59488dfb0f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -30,7 +30,8 @@ import org.apache.spark.util.{Clock, ManualClock}
private[scheduler] sealed trait JobGeneratorEvent
private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent
private[scheduler] case class ClearMetadata(time: Time) extends JobGeneratorEvent
-private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent
+private[scheduler] case class DoCheckpoint(
+ time: Time, clearCheckpointDataLater: Boolean) extends JobGeneratorEvent
private[scheduler] case class ClearCheckpointData(time: Time) extends JobGeneratorEvent
/**
@@ -163,8 +164,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
/**
* Callback called when the checkpoint of a batch has been written.
*/
- def onCheckpointCompletion(time: Time) {
- eventActor ! ClearCheckpointData(time)
+ def onCheckpointCompletion(time: Time, clearCheckpointDataLater: Boolean) {
+ if (clearCheckpointDataLater) {
+ eventActor ! ClearCheckpointData(time)
+ }
}
/** Processes all events */
@@ -173,7 +176,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
- case DoCheckpoint(time) => doCheckpoint(time)
+ case DoCheckpoint(time, clearCheckpointDataLater) =>
+ doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
@@ -245,7 +249,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
- eventActor ! DoCheckpoint(time)
+ eventActor ! DoCheckpoint(time, clearCheckpointDataLater = false)
}
/** Clear DStream metadata for the given `time`. */
@@ -255,7 +259,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// If checkpointing is enabled, then checkpoint,
// else mark batch to be fully processed
if (shouldCheckpoint) {
- eventActor ! DoCheckpoint(time)
+ eventActor ! DoCheckpoint(time, clearCheckpointDataLater = true)
} else {
// If checkpointing is not enabled, then delete metadata information about
// received blocks (block data not saved in any case). Otherwise, wait for
@@ -278,11 +282,11 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
}
/** Perform checkpoint for the give `time`. */
- private def doCheckpoint(time: Time) {
+ private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time)
- checkpointWriter.write(new Checkpoint(ssc, time))
+ checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
new file mode 100644
index 0000000000..4150b60635
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import java.util.concurrent.CountDownLatch
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming._
+import org.apache.spark.util.{ManualClock, Utils}
+
+class JobGeneratorSuite extends TestSuiteBase {
+
+ // SPARK-6222 is a tricky regression bug which causes received block metadata
+ // to be deleted before the corresponding batch has completed. This occurs when
+ // the following conditions are met.
+ // 1. streaming checkpointing is enabled by setting streamingContext.checkpoint(dir)
+ // 2. input data is received through a receiver as blocks
+ // 3. a batch processing a set of blocks takes a long time, such that a few subsequent
+ // batches have been generated and submitted for processing.
+ //
+ // The JobGenerator (as of Mar 16, 2015) checkpoints twice per batch, once after generation
+ // of a batch, and another time after the completion of a batch. The cleanup of
+ // checkpoint data (including block metadata, etc.) from DStream must be done only after the
+ // 2nd checkpoint has completed, that is, after the batch has been completely processed.
+ // However, the issue is that the checkpoint data and along with it received block data is
+ // cleaned even in the case of the 1st checkpoint, causing pre-mature deletion of received block
+ // data. For example, if the 3rd batch is still being process, the 7th batch may get generated,
+ // and the corresponding "1st checkpoint" will delete received block metadata of batch older
+ // than 6th batch. That, is 3rd batch's block metadata gets deleted even before 3rd batch has
+ // been completely processed.
+ //
+ // This test tries to create that scenario by the following.
+ // 1. enable checkpointing
+ // 2. generate batches with received blocks
+ // 3. make the 3rd batch never complete
+ // 4. allow subsequent batches to be generated (to allow premature deletion of 3rd batch metadata)
+ // 5. verify whether 3rd batch's block metadata still exists
+ //
+ test("SPARK-6222: Do not clear received block data too soon") {
+ import JobGeneratorSuite._
+ val checkpointDir = Utils.createTempDir()
+ val testConf = conf
+ testConf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ testConf.set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
+
+ withStreamingContext(new StreamingContext(testConf, batchDuration)) { ssc =>
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val numBatches = 10
+ val longBatchNumber = 3 // 3rd batch will take a long time
+ val longBatchTime = longBatchNumber * batchDuration.milliseconds
+
+ val testTimeout = timeout(10 seconds)
+ val inputStream = ssc.receiverStream(new TestReceiver)
+
+ inputStream.foreachRDD((rdd: RDD[Int], time: Time) => {
+ if (time.milliseconds == longBatchTime) {
+ while (waitLatch.getCount() > 0) {
+ waitLatch.await()
+ println("Await over")
+ }
+ }
+ })
+
+ val batchCounter = new BatchCounter(ssc)
+ ssc.checkpoint(checkpointDir.getAbsolutePath)
+ ssc.start()
+
+ // Make sure the only 1 batch of information is to be remembered
+ assert(inputStream.rememberDuration === batchDuration)
+ val receiverTracker = ssc.scheduler.receiverTracker
+
+ // Get the blocks belonging to a batch
+ def getBlocksOfBatch(batchTime: Long) = {
+ receiverTracker.getBlocksOfBatchAndStream(Time(batchTime), inputStream.id)
+ }
+
+ // Wait for new blocks to be received
+ def waitForNewReceivedBlocks() {
+ eventually(testTimeout) {
+ assert(receiverTracker.hasUnallocatedBlocks)
+ }
+ }
+
+ // Wait for received blocks to be allocated to a batch
+ def waitForBlocksToBeAllocatedToBatch(batchTime: Long) {
+ eventually(testTimeout) {
+ assert(getBlocksOfBatch(batchTime).nonEmpty)
+ }
+ }
+
+ // Generate a large number of batches with blocks in them
+ for (batchNum <- 1 to numBatches) {
+ waitForNewReceivedBlocks()
+ clock.advance(batchDuration.milliseconds)
+ waitForBlocksToBeAllocatedToBatch(clock.getTimeMillis())
+ }
+
+ // Wait for 3rd batch to start
+ eventually(testTimeout) {
+ ssc.scheduler.getPendingTimes().contains(Time(numBatches * batchDuration.milliseconds))
+ }
+
+ // Verify that the 3rd batch's block data is still present while the 3rd batch is incomplete
+ assert(getBlocksOfBatch(longBatchTime).nonEmpty, "blocks of incomplete batch already deleted")
+ assert(batchCounter.getNumCompletedBatches < longBatchNumber)
+ waitLatch.countDown()
+ }
+ }
+}
+
+object JobGeneratorSuite {
+ val waitLatch = new CountDownLatch(1)
+}