aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorjiangxingbo <jiangxb1987@gmail.com>2016-11-08 09:41:01 -0800
committerReynold Xin <rxin@databricks.com>2016-11-08 09:41:01 -0800
commit9c419698fe110a805570031cac3387a51957d9d1 (patch)
tree847284e6313c49aedd0a864d3931cacaf92ea425 /core/src/test
parent73feaa30ebfb62c81c7ce2c60ce2163611dd8852 (diff)
downloadspark-9c419698fe110a805570031cac3387a51957d9d1.tar.gz
spark-9c419698fe110a805570031cac3387a51957d9d1.tar.bz2
spark-9c419698fe110a805570031cac3387a51957d9d1.zip
[SPARK-18191][CORE] Port RDD API to use commit protocol
## What changes were proposed in this pull request? This PR port RDD API to use commit protocol, the changes made here: 1. Add new internal helper class that saves an RDD using a Hadoop OutputFormat named `SparkNewHadoopWriter`, it's similar with `SparkHadoopWriter` but uses commit protocol. This class supports the newer `mapreduce` API, instead of the old `mapred` API which is supported by `SparkHadoopWriter`; 2. Rewrite `PairRDDFunctions.saveAsNewAPIHadoopDataset` function, so it uses commit protocol now. ## How was this patch tested? Exsiting test cases. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #15769 from jiangxb1987/rdd-commit.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala20
1 files changed, 2 insertions, 18 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index b0d69de6e2..fe547d4d91 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -509,21 +509,6 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
(2, ArrayBuffer(1))))
}
- test("saveNewAPIHadoopFile should call setConf if format is configurable") {
- val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
-
- // No error, non-configurable formats still work
- pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored")
-
- /*
- Check that configurable formats get configured:
- ConfigTestFormat throws an exception if we try to write
- to it when setConf hasn't been called first.
- Assertion is in ConfigTestFormat.getRecordWriter.
- */
- pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
- }
-
test("saveAsHadoopFile should respect configured output committers") {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
val conf = new JobConf()
@@ -544,7 +529,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
val e = intercept[SparkException] {
pairs.saveAsNewAPIHadoopFile[NewFakeFormatWithCallback]("ignored")
}
- assert(e.getMessage contains "failed to write")
+ assert(e.getCause.getMessage contains "failed to write")
assert(FakeWriterWithCallback.calledBy === "write,callback,close")
assert(FakeWriterWithCallback.exception != null, "exception should be captured")
@@ -725,8 +710,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
}
/*
- These classes are fakes for testing
- "saveNewAPIHadoopFile should call setConf if format is configurable".
+ These classes are fakes for testing saveAsHadoopFile/saveNewAPIHadoopFile.
Unfortunately, they have to be top level classes, and not defined in
the test method, because otherwise Scala won't generate no-args constructors
and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile