aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorjiangxingbo <jiangxb1987@gmail.com>2016-11-08 09:41:01 -0800
committerReynold Xin <rxin@databricks.com>2016-11-08 09:41:01 -0800
commit9c419698fe110a805570031cac3387a51957d9d1 (patch)
tree847284e6313c49aedd0a864d3931cacaf92ea425 /sql/core
parent73feaa30ebfb62c81c7ce2c60ce2163611dd8852 (diff)
downloadspark-9c419698fe110a805570031cac3387a51957d9d1.tar.gz
spark-9c419698fe110a805570031cac3387a51957d9d1.tar.bz2
spark-9c419698fe110a805570031cac3387a51957d9d1.zip
[SPARK-18191][CORE] Port RDD API to use commit protocol
## What changes were proposed in this pull request? This PR port RDD API to use commit protocol, the changes made here: 1. Add new internal helper class that saves an RDD using a Hadoop OutputFormat named `SparkNewHadoopWriter`, it's similar with `SparkHadoopWriter` but uses commit protocol. This class supports the newer `mapreduce` API, instead of the old `mapred` API which is supported by `SparkHadoopWriter`; 2. Rewrite `PairRDDFunctions.saveAsNewAPIHadoopDataset` function, so it uses commit protocol now. ## How was this patch tested? Exsiting test cases. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #15769 from jiangxb1987/rdd-commit.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala4
1 files changed, 2 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index e404dcd545..fa7fe143da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark._
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -153,7 +153,7 @@ object FileFormatWriter extends Logging {
committer: FileCommitProtocol,
iterator: Iterator[InternalRow]): (TaskCommitMessage, Set[String]) = {
- val jobId = SparkHadoopWriter.createJobID(new Date, sparkStageId)
+ val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)