From 9c419698fe110a805570031cac3387a51957d9d1 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Tue, 8 Nov 2016 09:41:01 -0800 Subject: [SPARK-18191][CORE] Port RDD API to use commit protocol ## What changes were proposed in this pull request? This PR port RDD API to use commit protocol, the changes made here: 1. Add new internal helper class that saves an RDD using a Hadoop OutputFormat named `SparkNewHadoopWriter`, it's similar with `SparkHadoopWriter` but uses commit protocol. This class supports the newer `mapreduce` API, instead of the old `mapred` API which is supported by `SparkHadoopWriter`; 2. Rewrite `PairRDDFunctions.saveAsNewAPIHadoopDataset` function, so it uses commit protocol now. ## How was this patch tested? Exsiting test cases. Author: jiangxingbo Closes #15769 from jiangxb1987/rdd-commit. --- .../src/main/scala/org/apache/spark/streaming/dstream/DStream.scala | 5 +++-- .../scala/org/apache/spark/streaming/scheduler/JobScheduler.scala | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index fa15a0bf65..7e0a2ca609 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -27,7 +27,8 @@ import scala.util.matching.Regex import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD, RDDOperationScope} +import org.apache.spark.internal.io.SparkHadoopWriterUtils +import org.apache.spark.rdd.{BlockRDD, RDD, RDDOperationScope} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext.rddToFileName @@ -337,7 +338,7 @@ abstract class DStream[T: ClassTag] ( // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. - PairRDDFunctions.disableOutputSpecValidation.withValue(true) { + SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) { compute(time) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 98e099354a..b7d114bc16 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -26,7 +26,8 @@ import org.apache.commons.lang3.SerializationUtils import org.apache.spark.ExecutorAllocationClient import org.apache.spark.internal.Logging -import org.apache.spark.rdd.{PairRDDFunctions, RDD} +import org.apache.spark.internal.io.SparkHadoopWriterUtils +import org.apache.spark.rdd.RDD import org.apache.spark.streaming._ import org.apache.spark.streaming.api.python.PythonDStream import org.apache.spark.streaming.ui.UIUtils @@ -250,7 +251,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. - PairRDDFunctions.disableOutputSpecValidation.withValue(true) { + SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop = eventLoop -- cgit v1.2.3