aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-11-17 14:48:29 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-17 14:48:29 -0800
commit928d631625297857fb6998fbeb0696917fbfd60f (patch)
tree16c2446b3381accc01b58d577db60fed24fe387e /streaming
parent936bc0bcbf957fa1d7cb5cfe88d628c830df5981 (diff)
downloadspark-928d631625297857fb6998fbeb0696917fbfd60f.tar.gz
spark-928d631625297857fb6998fbeb0696917fbfd60f.tar.bz2
spark-928d631625297857fb6998fbeb0696917fbfd60f.zip
[SPARK-11740][STREAMING] Fix the race condition of two checkpoints in a batch
We will do checkpoint when generating a batch and completing a batch. When the processing time of a batch is greater than the batch interval, checkpointing for completing an old batch may run after checkpointing for generating a new batch. If this happens, checkpoint of an old batch actually has the latest information, so we want to recovery from it. This PR will use the latest checkpoint time as the file name, so that we can always recovery from the latest checkpoint file. Author: Shixiong Zhu <shixiong@databricks.com> Closes #9707 from zsxwing/fix-checkpoint.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala18
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala27
2 files changed, 41 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 0cd55d9aec..fd0e8d5d69 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -187,16 +187,30 @@ class CheckpointWriter(
private var stopped = false
private var fs_ : FileSystem = _
+ @volatile private var latestCheckpointTime: Time = null
+
class CheckpointWriteHandler(
checkpointTime: Time,
bytes: Array[Byte],
clearCheckpointDataLater: Boolean) extends Runnable {
def run() {
+ if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) {
+ latestCheckpointTime = checkpointTime
+ }
var attempts = 0
val startTime = System.currentTimeMillis()
val tempFile = new Path(checkpointDir, "temp")
- val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime)
- val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime)
+ // We will do checkpoint when generating a batch and completing a batch. When the processing
+ // time of a batch is greater than the batch interval, checkpointing for completing an old
+ // batch may run after checkpointing of a new batch. If this happens, checkpoint of an old
+ // batch actually has the latest information, so we want to recovery from it. Therefore, we
+ // also use the latest checkpoint time as the file name, so that we can recovery from the
+ // latest checkpoint file.
+ //
+ // Note: there is only one thread writting the checkpoint files, so we don't need to worry
+ // about thread-safety.
+ val checkpointFile = Checkpoint.checkpointFile(checkpointDir, latestCheckpointTime)
+ val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, latestCheckpointTime)
while (attempts < MAX_ATTEMPTS && !stopped) {
attempts += 1
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 84f5294aa3..b1cbc7163b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.streaming
import java.io.{ObjectOutputStream, ByteArrayOutputStream, ByteArrayInputStream, File}
-import org.apache.spark.TestUtils
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.reflect.ClassTag
@@ -30,11 +29,13 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
+import org.mockito.Mockito.mock
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
+import org.apache.spark.TestUtils
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
-import org.apache.spark.streaming.scheduler.{ConstantEstimator, RateTestInputDStream, RateTestReceiver}
+import org.apache.spark.streaming.scheduler._
import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils}
/**
@@ -611,6 +612,28 @@ class CheckpointSuite extends TestSuiteBase {
assert(ois.readObject().asInstanceOf[Class[_]].getName == "[LtestClz;")
}
+ test("SPARK-11267: the race condition of two checkpoints in a batch") {
+ val jobGenerator = mock(classOf[JobGenerator])
+ val checkpointDir = Utils.createTempDir().toString
+ val checkpointWriter =
+ new CheckpointWriter(jobGenerator, conf, checkpointDir, new Configuration())
+ val bytes1 = Array.fill[Byte](10)(1)
+ new checkpointWriter.CheckpointWriteHandler(
+ Time(2000), bytes1, clearCheckpointDataLater = false).run()
+ val bytes2 = Array.fill[Byte](10)(2)
+ new checkpointWriter.CheckpointWriteHandler(
+ Time(1000), bytes2, clearCheckpointDataLater = true).run()
+ val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir).reverse.map { path =>
+ new File(path.toUri)
+ }
+ assert(checkpointFiles.size === 2)
+ // Although bytes2 was written with an old time, it contains the latest status, so we should
+ // try to read from it at first.
+ assert(Files.toByteArray(checkpointFiles(0)) === bytes2)
+ assert(Files.toByteArray(checkpointFiles(1)) === bytes1)
+ checkpointWriter.stop()
+ }
+
/**
* Tests a streaming operation under checkpointing, by restarting the operation
* from checkpoint file and verifying whether the final output is correct.