aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-01-04 20:26:18 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-01-04 20:26:18 -0800
commit939ba1f8f6e32fef9026cc43fce55b36e4b9bfd1 (patch)
tree42259c4f15027fdda43ea817eea5feee19d48486 /streaming
parente767d7ddac5c2330af553f2a74b8575dfc7afb67 (diff)
downloadspark-939ba1f8f6e32fef9026cc43fce55b36e4b9bfd1.tar.gz
spark-939ba1f8f6e32fef9026cc43fce55b36e4b9bfd1.tar.bz2
spark-939ba1f8f6e32fef9026cc43fce55b36e4b9bfd1.zip
[SPARK-4835] Disable validateOutputSpecs for Spark Streaming jobs
This patch disables output spec. validation for jobs launched through Spark Streaming, since this interferes with checkpoint recovery. Hadoop OutputFormats have a `checkOutputSpecs` method which performs certain checks prior to writing output, such as checking whether the output directory already exists. SPARK-1100 added checks for FileOutputFormat, SPARK-1677 (#947) added a SparkConf configuration to disable these checks, and SPARK-2309 (#1088) extended these checks to run for all OutputFormats, not just FileOutputFormat. In Spark Streaming, we might have to re-process a batch during checkpoint recovery, so `save` actions may be called multiple times. In addition to `DStream`'s own save actions, users might use `transform` or `foreachRDD` and call the `RDD` and `PairRDD` save actions. When output spec. validation is enabled, the second calls to these actions will fail due to existing output. This patch automatically disables output spec. validation for jobs submitted by the Spark Streaming scheduler. This is done by using Scala's `DynamicVariable` to propagate the bypass setting without having to mutate SparkConf or introduce a global variable. Author: Josh Rosen <joshrosen@databricks.com> Closes #3832 from JoshRosen/SPARK-4835 and squashes the following commits: 36eaf35 [Josh Rosen] Add comment explaining use of transform() in test. 6485cf8 [Josh Rosen] Add test case in Streaming; fix bug for transform() 7b3e06a [Josh Rosen] Remove Streaming-specific setting to undo this change; update conf. guide bf9094d [Josh Rosen] Revise disableOutputSpecValidation() comment to not refer to Spark Streaming. e581d17 [Josh Rosen] Deduplicate isOutputSpecValidationEnabled logic. 762e473 [Josh Rosen] [SPARK-4835] Disable validateOutputSpecs for Spark Streaming jobs.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala39
4 files changed, 55 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 28fc00cf39..b874f561c1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
import scala.util.matching.Regex
import org.apache.spark.{Logging, SparkException}
-import org.apache.spark.rdd.{BlockRDD, RDD}
+import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext.rddToFileName
@@ -292,7 +292,13 @@ abstract class DStream[T: ClassTag] (
// set this DStream's creation site, generate RDDs and then restore the previous call site.
val prevCallSite = ssc.sparkContext.getCallSite()
ssc.sparkContext.setCallSite(creationSite)
- val rddOption = compute(time)
+ // Disable checks for existing output directories in jobs launched by the streaming
+ // scheduler, since we may need to write output to an existing directory during checkpoint
+ // recovery; see SPARK-4835 for more details. We need to have this call here because
+ // compute() might cause Spark jobs to be launched.
+ val rddOption = PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+ compute(time)
+ }
ssc.sparkContext.setCallSite(prevCallSite)
rddOption.foreach { case newRDD =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 7cd4554282..71b61856e2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.streaming.{Duration, Time}
import scala.reflect.ClassTag
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index cfa3cd8925..0e0f5bd3b9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConversions._
import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
import akka.actor.{ActorRef, Actor, Props}
import org.apache.spark.{SparkException, Logging, SparkEnv}
+import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.streaming._
@@ -168,7 +169,12 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
- job.run()
+ // Disable checks for existing output directories in jobs launched by the streaming scheduler,
+ // since we may need to write output to an existing directory during checkpoint recovery;
+ // see SPARK-4835 for more details.
+ PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+ job.run()
+ }
eventActor ! JobCompleted(job)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 72d055eb2e..5d232c6ade 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -255,6 +255,45 @@ class CheckpointSuite extends TestSuiteBase {
}
}
+ test("recovery with saveAsHadoopFile inside transform operation") {
+ // Regression test for SPARK-4835.
+ //
+ // In that issue, the problem was that `saveAsHadoopFile(s)` would fail when the last batch
+ // was restarted from a checkpoint since the output directory would already exist. However,
+ // the other saveAsHadoopFile* tests couldn't catch this because they only tested whether the
+ // output matched correctly and not whether the post-restart batch had successfully finished
+ // without throwing any errors. The following test reproduces the same bug with a test that
+ // actually fails because the error in saveAsHadoopFile causes transform() to fail, which
+ // prevents the expected output from being written to the output stream.
+ //
+ // This is not actually a valid use of transform, but it's being used here so that we can test
+ // the fix for SPARK-4835 independently of additional test cleanup.
+ //
+ // After SPARK-5079 is addressed, should be able to remove this test since a strengthened
+ // version of the other saveAsHadoopFile* tests would prevent regressions for this issue.
+ val tempDir = Files.createTempDir()
+ try {
+ testCheckpointedOperation(
+ Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
+ (s: DStream[String]) => {
+ s.transform { (rdd, time) =>
+ val output = rdd.map(x => (x, 1)).reduceByKey(_ + _)
+ output.saveAsHadoopFile(
+ new File(tempDir, "result-" + time.milliseconds).getAbsolutePath,
+ classOf[Text],
+ classOf[IntWritable],
+ classOf[TextOutputFormat[Text, IntWritable]])
+ output
+ }
+ },
+ Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
+ 3
+ )
+ } finally {
+ Utils.deleteRecursively(tempDir)
+ }
+ }
+
// This tests whether the StateDStream's RDD checkpoints works correctly such
// that the system can recover from a master failure. This assumes as reliable,
// replayable input source - TestInputDStream.