aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorjiangxingbo <jiangxb1987@gmail.com>2016-11-08 09:41:01 -0800
committerReynold Xin <rxin@databricks.com>2016-11-08 09:41:01 -0800
commit9c419698fe110a805570031cac3387a51957d9d1 (patch)
tree847284e6313c49aedd0a864d3931cacaf92ea425 /streaming
parent73feaa30ebfb62c81c7ce2c60ce2163611dd8852 (diff)
downloadspark-9c419698fe110a805570031cac3387a51957d9d1.tar.gz
spark-9c419698fe110a805570031cac3387a51957d9d1.tar.bz2
spark-9c419698fe110a805570031cac3387a51957d9d1.zip
[SPARK-18191][CORE] Port RDD API to use commit protocol
## What changes were proposed in this pull request? This PR port RDD API to use commit protocol, the changes made here: 1. Add new internal helper class that saves an RDD using a Hadoop OutputFormat named `SparkNewHadoopWriter`, it's similar with `SparkHadoopWriter` but uses commit protocol. This class supports the newer `mapreduce` API, instead of the old `mapred` API which is supported by `SparkHadoopWriter`; 2. Rewrite `PairRDDFunctions.saveAsNewAPIHadoopDataset` function, so it uses commit protocol now. ## How was this patch tested? Exsiting test cases. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #15769 from jiangxb1987/rdd-commit.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala5
2 files changed, 6 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index fa15a0bf65..7e0a2ca609 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -27,7 +27,8 @@ import scala.util.matching.Regex
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD, RDDOperationScope}
+import org.apache.spark.internal.io.SparkHadoopWriterUtils
+import org.apache.spark.rdd.{BlockRDD, RDD, RDDOperationScope}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext.rddToFileName
@@ -337,7 +338,7 @@ abstract class DStream[T: ClassTag] (
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
- PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+ SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 98e099354a..b7d114bc16 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -26,7 +26,8 @@ import org.apache.commons.lang3.SerializationUtils
import org.apache.spark.ExecutorAllocationClient
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.{PairRDDFunctions, RDD}
+import org.apache.spark.internal.io.SparkHadoopWriterUtils
+import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.api.python.PythonDStream
import org.apache.spark.streaming.ui.UIUtils
@@ -250,7 +251,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
- PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+ SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
job.run()
}
_eventLoop = eventLoop