aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/FileSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/FileSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala39
1 files changed, 33 insertions, 6 deletions
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 76173608e9..01af940771 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -24,11 +24,12 @@ import scala.io.Source
import com.google.common.io.Files
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.DefaultCodec
-import org.apache.hadoop.mapred.FileAlreadyExistsException
+import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, TextOutputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
+import org.apache.hadoop.mapreduce.Job
import org.scalatest.FunSuite
import org.apache.spark.SparkContext._
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
class FileSuite extends FunSuite with LocalSparkContext {
@@ -236,7 +237,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
intercept[FileAlreadyExistsException] {
- randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath)
}
}
@@ -244,10 +245,36 @@ class FileSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test")
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
- randomRDD.saveAsTextFile(tempdir.getPath + "/output")
- assert(new File(tempdir.getPath + "/output/part-00000").exists() === true)
+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath + "/output")
+ assert(new File(tempdir.getPath + "/output/part-r-00000").exists() === true)
intercept[FileAlreadyExistsException] {
- randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath)
}
}
+
+ test ("save Hadoop Dataset through old Hadoop API") {
+ sc = new SparkContext("local", "test")
+ val tempdir = Files.createTempDir()
+ val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+ val job = new JobConf()
+ job.setOutputKeyClass(classOf[String])
+ job.setOutputValueClass(classOf[String])
+ job.set("mapred.output.format.class", classOf[TextOutputFormat[String, String]].getName)
+ job.set("mapred.output.dir", tempdir.getPath + "/outputDataset_old")
+ randomRDD.saveAsHadoopDataset(job)
+ assert(new File(tempdir.getPath + "/outputDataset_old/part-00000").exists() === true)
+ }
+
+ test ("save Hadoop Dataset through new Hadoop API") {
+ sc = new SparkContext("local", "test")
+ val tempdir = Files.createTempDir()
+ val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+ val job = new Job(sc.hadoopConfiguration)
+ job.setOutputKeyClass(classOf[String])
+ job.setOutputValueClass(classOf[String])
+ job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
+ job.getConfiguration.set("mapred.output.dir", tempdir.getPath + "/outputDataset_new")
+ randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
+ assert(new File(tempdir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
+ }
}