diff options
author | jiangxingbo <jiangxb1987@gmail.com> | 2016-11-08 09:41:01 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-11-08 09:41:01 -0800 |
commit | 9c419698fe110a805570031cac3387a51957d9d1 (patch) | |
tree | 847284e6313c49aedd0a864d3931cacaf92ea425 /core/src/test | |
parent | 73feaa30ebfb62c81c7ce2c60ce2163611dd8852 (diff) | |
download | spark-9c419698fe110a805570031cac3387a51957d9d1.tar.gz spark-9c419698fe110a805570031cac3387a51957d9d1.tar.bz2 spark-9c419698fe110a805570031cac3387a51957d9d1.zip |
[SPARK-18191][CORE] Port RDD API to use commit protocol
## What changes were proposed in this pull request?
This PR port RDD API to use commit protocol, the changes made here:
1. Add new internal helper class that saves an RDD using a Hadoop OutputFormat named `SparkNewHadoopWriter`, it's similar with `SparkHadoopWriter` but uses commit protocol. This class supports the newer `mapreduce` API, instead of the old `mapred` API which is supported by `SparkHadoopWriter`;
2. Rewrite `PairRDDFunctions.saveAsNewAPIHadoopDataset` function, so it uses commit protocol now.
## How was this patch tested?
Exsiting test cases.
Author: jiangxingbo <jiangxb1987@gmail.com>
Closes #15769 from jiangxb1987/rdd-commit.
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala | 20 |
1 files changed, 2 insertions, 18 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index b0d69de6e2..fe547d4d91 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -509,21 +509,6 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { (2, ArrayBuffer(1)))) } - test("saveNewAPIHadoopFile should call setConf if format is configurable") { - val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) - - // No error, non-configurable formats still work - pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored") - - /* - Check that configurable formats get configured: - ConfigTestFormat throws an exception if we try to write - to it when setConf hasn't been called first. - Assertion is in ConfigTestFormat.getRecordWriter. - */ - pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored") - } - test("saveAsHadoopFile should respect configured output committers") { val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) val conf = new JobConf() @@ -544,7 +529,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val e = intercept[SparkException] { pairs.saveAsNewAPIHadoopFile[NewFakeFormatWithCallback]("ignored") } - assert(e.getMessage contains "failed to write") + assert(e.getCause.getMessage contains "failed to write") assert(FakeWriterWithCallback.calledBy === "write,callback,close") assert(FakeWriterWithCallback.exception != null, "exception should be captured") @@ -725,8 +710,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } /* - These classes are fakes for testing - "saveNewAPIHadoopFile should call setConf if format is configurable". + These classes are fakes for testing saveAsHadoopFile/saveNewAPIHadoopFile. Unfortunately, they have to be top level classes, and not defined in the test method, because otherwise Scala won't generate no-args constructors and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile |