diff options
Diffstat (limited to 'streaming')
4 files changed, 55 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 28fc00cf39..b874f561c1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -26,7 +26,7 @@ import scala.reflect.ClassTag import scala.util.matching.Regex import org.apache.spark.{Logging, SparkException} -import org.apache.spark.rdd.{BlockRDD, RDD} +import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext.rddToFileName @@ -292,7 +292,13 @@ abstract class DStream[T: ClassTag] ( // set this DStream's creation site, generate RDDs and then restore the previous call site. val prevCallSite = ssc.sparkContext.getCallSite() ssc.sparkContext.setCallSite(creationSite) - val rddOption = compute(time) + // Disable checks for existing output directories in jobs launched by the streaming + // scheduler, since we may need to write output to an existing directory during checkpoint + // recovery; see SPARK-4835 for more details. We need to have this call here because + // compute() might cause Spark jobs to be launched. + val rddOption = PairRDDFunctions.disableOutputSpecValidation.withValue(true) { + compute(time) + } ssc.sparkContext.setCallSite(prevCallSite) rddOption.foreach { case newRDD => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 7cd4554282..71b61856e2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{PairRDDFunctions, RDD} import org.apache.spark.streaming.{Duration, Time} import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index cfa3cd8925..0e0f5bd3b9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConversions._ import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors} import akka.actor.{ActorRef, Actor, Props} import org.apache.spark.{SparkException, Logging, SparkEnv} +import org.apache.spark.rdd.PairRDDFunctions import org.apache.spark.streaming._ @@ -168,7 +169,12 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { private class JobHandler(job: Job) extends Runnable { def run() { eventActor ! JobStarted(job) - job.run() + // Disable checks for existing output directories in jobs launched by the streaming scheduler, + // since we may need to write output to an existing directory during checkpoint recovery; + // see SPARK-4835 for more details. + PairRDDFunctions.disableOutputSpecValidation.withValue(true) { + job.run() + } eventActor ! JobCompleted(job) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 72d055eb2e..5d232c6ade 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -255,6 +255,45 @@ class CheckpointSuite extends TestSuiteBase { } } + test("recovery with saveAsHadoopFile inside transform operation") { + // Regression test for SPARK-4835. + // + // In that issue, the problem was that `saveAsHadoopFile(s)` would fail when the last batch + // was restarted from a checkpoint since the output directory would already exist. However, + // the other saveAsHadoopFile* tests couldn't catch this because they only tested whether the + // output matched correctly and not whether the post-restart batch had successfully finished + // without throwing any errors. The following test reproduces the same bug with a test that + // actually fails because the error in saveAsHadoopFile causes transform() to fail, which + // prevents the expected output from being written to the output stream. + // + // This is not actually a valid use of transform, but it's being used here so that we can test + // the fix for SPARK-4835 independently of additional test cleanup. + // + // After SPARK-5079 is addressed, should be able to remove this test since a strengthened + // version of the other saveAsHadoopFile* tests would prevent regressions for this issue. + val tempDir = Files.createTempDir() + try { + testCheckpointedOperation( + Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()), + (s: DStream[String]) => { + s.transform { (rdd, time) => + val output = rdd.map(x => (x, 1)).reduceByKey(_ + _) + output.saveAsHadoopFile( + new File(tempDir, "result-" + time.milliseconds).getAbsolutePath, + classOf[Text], + classOf[IntWritable], + classOf[TextOutputFormat[Text, IntWritable]]) + output + } + }, + Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()), + 3 + ) + } finally { + Utils.deleteRecursively(tempDir) + } + } + // This tests whether the StateDStream's RDD checkpoints works correctly such // that the system can recover from a master failure. This assumes as reliable, // replayable input source - TestInputDStream. |