aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test/scala')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala27
1 files changed, 25 insertions, 2 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 84f5294aa3..b1cbc7163b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.streaming
import java.io.{ObjectOutputStream, ByteArrayOutputStream, ByteArrayInputStream, File}
-import org.apache.spark.TestUtils
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.reflect.ClassTag
@@ -30,11 +29,13 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
+import org.mockito.Mockito.mock
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
+import org.apache.spark.TestUtils
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
-import org.apache.spark.streaming.scheduler.{ConstantEstimator, RateTestInputDStream, RateTestReceiver}
+import org.apache.spark.streaming.scheduler._
import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils}
/**
@@ -611,6 +612,28 @@ class CheckpointSuite extends TestSuiteBase {
assert(ois.readObject().asInstanceOf[Class[_]].getName == "[LtestClz;")
}
+ test("SPARK-11267: the race condition of two checkpoints in a batch") {
+ val jobGenerator = mock(classOf[JobGenerator])
+ val checkpointDir = Utils.createTempDir().toString
+ val checkpointWriter =
+ new CheckpointWriter(jobGenerator, conf, checkpointDir, new Configuration())
+ val bytes1 = Array.fill[Byte](10)(1)
+ new checkpointWriter.CheckpointWriteHandler(
+ Time(2000), bytes1, clearCheckpointDataLater = false).run()
+ val bytes2 = Array.fill[Byte](10)(2)
+ new checkpointWriter.CheckpointWriteHandler(
+ Time(1000), bytes2, clearCheckpointDataLater = true).run()
+ val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir).reverse.map { path =>
+ new File(path.toUri)
+ }
+ assert(checkpointFiles.size === 2)
+ // Although bytes2 was written with an old time, it contains the latest status, so we should
+ // try to read from it at first.
+ assert(Files.toByteArray(checkpointFiles(0)) === bytes2)
+ assert(Files.toByteArray(checkpointFiles(1)) === bytes1)
+ checkpointWriter.stop()
+ }
+
/**
* Tests a streaming operation under checkpointing, by restarting the operation
* from checkpoint file and verifying whether the final output is correct.