From 645cf3fcc21987417b2946bdeeeb60af3edf667e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 19 Mar 2015 02:15:50 -0400 Subject: [SPARK-6222][Streaming] Dont delete checkpoint data when doing pre-batch-start checkpoint This is another alternative approach to https://github.com/apache/spark/pull/4964/ I think this is a simpler fix that can be backported easily to other branches (1.2 and 1.3). All it does it introduce a flag so that the pre-batch-start checkpoint does not call clear checkpoint. There is not unit test yet. I will add it when this approach is commented upon. Not sure if this is testable easily. Author: Tathagata Das Closes #5008 from tdas/SPARK-6222 and squashes the following commits: 7315bc2 [Tathagata Das] Removed empty line. c438de4 [Tathagata Das] Revert unnecessary change. 5e98374 [Tathagata Das] Added unit test 50cb60b [Tathagata Das] Fixed style issue 295ca5c [Tathagata Das] Fixing SPARK-6222 --- .../org/apache/spark/streaming/Checkpoint.scala | 12 +- .../spark/streaming/scheduler/JobGenerator.scala | 20 ++-- .../streaming/scheduler/JobGeneratorSuite.scala | 133 +++++++++++++++++++++ 3 files changed, 153 insertions(+), 12 deletions(-) create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index cb4c94fb9d..db64e11e16 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -119,7 +119,10 @@ class CheckpointWriter( private var stopped = false private var fs_ : FileSystem = _ - class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { + class CheckpointWriteHandler( + checkpointTime: Time, + bytes: Array[Byte], + clearCheckpointDataLater: Boolean) extends Runnable { def run() { var attempts = 0 val startTime = System.currentTimeMillis() @@ -166,7 +169,7 @@ class CheckpointWriter( val finishTime = System.currentTimeMillis() logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms") - jobGenerator.onCheckpointCompletion(checkpointTime) + jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater) return } catch { case ioe: IOException => @@ -180,7 +183,7 @@ class CheckpointWriter( } } - def write(checkpoint: Checkpoint) { + def write(checkpoint: Checkpoint, clearCheckpointDataLater: Boolean) { val bos = new ByteArrayOutputStream() val zos = compressionCodec.compressedOutputStream(bos) val oos = new ObjectOutputStream(zos) @@ -188,7 +191,8 @@ class CheckpointWriter( oos.close() bos.close() try { - executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) + executor.execute(new CheckpointWriteHandler( + checkpoint.checkpointTime, bos.toByteArray, clearCheckpointDataLater)) logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue") } catch { case rej: RejectedExecutionException => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index ac92774a38..59488dfb0f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -30,7 +30,8 @@ import org.apache.spark.util.{Clock, ManualClock} private[scheduler] sealed trait JobGeneratorEvent private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent private[scheduler] case class ClearMetadata(time: Time) extends JobGeneratorEvent -private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent +private[scheduler] case class DoCheckpoint( + time: Time, clearCheckpointDataLater: Boolean) extends JobGeneratorEvent private[scheduler] case class ClearCheckpointData(time: Time) extends JobGeneratorEvent /** @@ -163,8 +164,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { /** * Callback called when the checkpoint of a batch has been written. */ - def onCheckpointCompletion(time: Time) { - eventActor ! ClearCheckpointData(time) + def onCheckpointCompletion(time: Time, clearCheckpointDataLater: Boolean) { + if (clearCheckpointDataLater) { + eventActor ! ClearCheckpointData(time) + } } /** Processes all events */ @@ -173,7 +176,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) - case DoCheckpoint(time) => doCheckpoint(time) + case DoCheckpoint(time, clearCheckpointDataLater) => + doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } } @@ -245,7 +249,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } - eventActor ! DoCheckpoint(time) + eventActor ! DoCheckpoint(time, clearCheckpointDataLater = false) } /** Clear DStream metadata for the given `time`. */ @@ -255,7 +259,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // If checkpointing is enabled, then checkpoint, // else mark batch to be fully processed if (shouldCheckpoint) { - eventActor ! DoCheckpoint(time) + eventActor ! DoCheckpoint(time, clearCheckpointDataLater = true) } else { // If checkpointing is not enabled, then delete metadata information about // received blocks (block data not saved in any case). Otherwise, wait for @@ -278,11 +282,11 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { } /** Perform checkpoint for the give `time`. */ - private def doCheckpoint(time: Time) { + private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) { if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { logInfo("Checkpointing graph for time " + time) ssc.graph.updateCheckpointData(time) - checkpointWriter.write(new Checkpoint(ssc, time)) + checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala new file mode 100644 index 0000000000..4150b60635 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler + +import java.util.concurrent.CountDownLatch + +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming._ +import org.apache.spark.util.{ManualClock, Utils} + +class JobGeneratorSuite extends TestSuiteBase { + + // SPARK-6222 is a tricky regression bug which causes received block metadata + // to be deleted before the corresponding batch has completed. This occurs when + // the following conditions are met. + // 1. streaming checkpointing is enabled by setting streamingContext.checkpoint(dir) + // 2. input data is received through a receiver as blocks + // 3. a batch processing a set of blocks takes a long time, such that a few subsequent + // batches have been generated and submitted for processing. + // + // The JobGenerator (as of Mar 16, 2015) checkpoints twice per batch, once after generation + // of a batch, and another time after the completion of a batch. The cleanup of + // checkpoint data (including block metadata, etc.) from DStream must be done only after the + // 2nd checkpoint has completed, that is, after the batch has been completely processed. + // However, the issue is that the checkpoint data and along with it received block data is + // cleaned even in the case of the 1st checkpoint, causing pre-mature deletion of received block + // data. For example, if the 3rd batch is still being process, the 7th batch may get generated, + // and the corresponding "1st checkpoint" will delete received block metadata of batch older + // than 6th batch. That, is 3rd batch's block metadata gets deleted even before 3rd batch has + // been completely processed. + // + // This test tries to create that scenario by the following. + // 1. enable checkpointing + // 2. generate batches with received blocks + // 3. make the 3rd batch never complete + // 4. allow subsequent batches to be generated (to allow premature deletion of 3rd batch metadata) + // 5. verify whether 3rd batch's block metadata still exists + // + test("SPARK-6222: Do not clear received block data too soon") { + import JobGeneratorSuite._ + val checkpointDir = Utils.createTempDir() + val testConf = conf + testConf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + testConf.set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1") + + withStreamingContext(new StreamingContext(testConf, batchDuration)) { ssc => + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val numBatches = 10 + val longBatchNumber = 3 // 3rd batch will take a long time + val longBatchTime = longBatchNumber * batchDuration.milliseconds + + val testTimeout = timeout(10 seconds) + val inputStream = ssc.receiverStream(new TestReceiver) + + inputStream.foreachRDD((rdd: RDD[Int], time: Time) => { + if (time.milliseconds == longBatchTime) { + while (waitLatch.getCount() > 0) { + waitLatch.await() + println("Await over") + } + } + }) + + val batchCounter = new BatchCounter(ssc) + ssc.checkpoint(checkpointDir.getAbsolutePath) + ssc.start() + + // Make sure the only 1 batch of information is to be remembered + assert(inputStream.rememberDuration === batchDuration) + val receiverTracker = ssc.scheduler.receiverTracker + + // Get the blocks belonging to a batch + def getBlocksOfBatch(batchTime: Long) = { + receiverTracker.getBlocksOfBatchAndStream(Time(batchTime), inputStream.id) + } + + // Wait for new blocks to be received + def waitForNewReceivedBlocks() { + eventually(testTimeout) { + assert(receiverTracker.hasUnallocatedBlocks) + } + } + + // Wait for received blocks to be allocated to a batch + def waitForBlocksToBeAllocatedToBatch(batchTime: Long) { + eventually(testTimeout) { + assert(getBlocksOfBatch(batchTime).nonEmpty) + } + } + + // Generate a large number of batches with blocks in them + for (batchNum <- 1 to numBatches) { + waitForNewReceivedBlocks() + clock.advance(batchDuration.milliseconds) + waitForBlocksToBeAllocatedToBatch(clock.getTimeMillis()) + } + + // Wait for 3rd batch to start + eventually(testTimeout) { + ssc.scheduler.getPendingTimes().contains(Time(numBatches * batchDuration.milliseconds)) + } + + // Verify that the 3rd batch's block data is still present while the 3rd batch is incomplete + assert(getBlocksOfBatch(longBatchTime).nonEmpty, "blocks of incomplete batch already deleted") + assert(batchCounter.getNumCompletedBatches < longBatchNumber) + waitLatch.countDown() + } + } +} + +object JobGeneratorSuite { + val waitLatch = new CountDownLatch(1) +} -- cgit v1.2.3