diff options
Diffstat (limited to 'streaming/src/test/scala')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala | 27 |
1 files changed, 25 insertions, 2 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 84f5294aa3..b1cbc7163b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.streaming import java.io.{ObjectOutputStream, ByteArrayOutputStream, ByteArrayInputStream, File} -import org.apache.spark.TestUtils import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.reflect.ClassTag @@ -30,11 +29,13 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} +import org.mockito.Mockito.mock import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ +import org.apache.spark.TestUtils import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} -import org.apache.spark.streaming.scheduler.{ConstantEstimator, RateTestInputDStream, RateTestReceiver} +import org.apache.spark.streaming.scheduler._ import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils} /** @@ -611,6 +612,28 @@ class CheckpointSuite extends TestSuiteBase { assert(ois.readObject().asInstanceOf[Class[_]].getName == "[LtestClz;") } + test("SPARK-11267: the race condition of two checkpoints in a batch") { + val jobGenerator = mock(classOf[JobGenerator]) + val checkpointDir = Utils.createTempDir().toString + val checkpointWriter = + new CheckpointWriter(jobGenerator, conf, checkpointDir, new Configuration()) + val bytes1 = Array.fill[Byte](10)(1) + new checkpointWriter.CheckpointWriteHandler( + Time(2000), bytes1, clearCheckpointDataLater = false).run() + val bytes2 = Array.fill[Byte](10)(2) + new checkpointWriter.CheckpointWriteHandler( + Time(1000), bytes2, clearCheckpointDataLater = true).run() + val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir).reverse.map { path => + new File(path.toUri) + } + assert(checkpointFiles.size === 2) + // Although bytes2 was written with an old time, it contains the latest status, so we should + // try to read from it at first. + assert(Files.toByteArray(checkpointFiles(0)) === bytes2) + assert(Files.toByteArray(checkpointFiles(1)) === bytes1) + checkpointWriter.stop() + } + /** * Tests a streaming operation under checkpointing, by restarting the operation * from checkpoint file and verifying whether the final output is correct. |