aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala39
4 files changed, 55 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 28fc00cf39..b874f561c1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
import scala.util.matching.Regex
import org.apache.spark.{Logging, SparkException}
-import org.apache.spark.rdd.{BlockRDD, RDD}
+import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext.rddToFileName
@@ -292,7 +292,13 @@ abstract class DStream[T: ClassTag] (
// set this DStream's creation site, generate RDDs and then restore the previous call site.
val prevCallSite = ssc.sparkContext.getCallSite()
ssc.sparkContext.setCallSite(creationSite)
- val rddOption = compute(time)
+ // Disable checks for existing output directories in jobs launched by the streaming
+ // scheduler, since we may need to write output to an existing directory during checkpoint
+ // recovery; see SPARK-4835 for more details. We need to have this call here because
+ // compute() might cause Spark jobs to be launched.
+ val rddOption = PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+ compute(time)
+ }
ssc.sparkContext.setCallSite(prevCallSite)
rddOption.foreach { case newRDD =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 7cd4554282..71b61856e2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.streaming.{Duration, Time}
import scala.reflect.ClassTag
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index cfa3cd8925..0e0f5bd3b9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConversions._
import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
import akka.actor.{ActorRef, Actor, Props}
import org.apache.spark.{SparkException, Logging, SparkEnv}
+import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.streaming._
@@ -168,7 +169,12 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
- job.run()
+ // Disable checks for existing output directories in jobs launched by the streaming scheduler,
+ // since we may need to write output to an existing directory during checkpoint recovery;
+ // see SPARK-4835 for more details.
+ PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+ job.run()
+ }
eventActor ! JobCompleted(job)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 72d055eb2e..5d232c6ade 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -255,6 +255,45 @@ class CheckpointSuite extends TestSuiteBase {
}
}
+ test("recovery with saveAsHadoopFile inside transform operation") {
+ // Regression test for SPARK-4835.
+ //
+ // In that issue, the problem was that `saveAsHadoopFile(s)` would fail when the last batch
+ // was restarted from a checkpoint since the output directory would already exist. However,
+ // the other saveAsHadoopFile* tests couldn't catch this because they only tested whether the
+ // output matched correctly and not whether the post-restart batch had successfully finished
+ // without throwing any errors. The following test reproduces the same bug with a test that
+ // actually fails because the error in saveAsHadoopFile causes transform() to fail, which
+ // prevents the expected output from being written to the output stream.
+ //
+ // This is not actually a valid use of transform, but it's being used here so that we can test
+ // the fix for SPARK-4835 independently of additional test cleanup.
+ //
+ // After SPARK-5079 is addressed, should be able to remove this test since a strengthened
+ // version of the other saveAsHadoopFile* tests would prevent regressions for this issue.
+ val tempDir = Files.createTempDir()
+ try {
+ testCheckpointedOperation(
+ Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
+ (s: DStream[String]) => {
+ s.transform { (rdd, time) =>
+ val output = rdd.map(x => (x, 1)).reduceByKey(_ + _)
+ output.saveAsHadoopFile(
+ new File(tempDir, "result-" + time.milliseconds).getAbsolutePath,
+ classOf[Text],
+ classOf[IntWritable],
+ classOf[TextOutputFormat[Text, IntWritable]])
+ output
+ }
+ },
+ Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
+ 3
+ )
+ } finally {
+ Utils.deleteRecursively(tempDir)
+ }
+ }
+
// This tests whether the StateDStream's RDD checkpoints works correctly such
// that the system can recover from a master failure. This assumes as reliable,
// replayable input source - TestInputDStream.