diff options
Diffstat (limited to 'core/src/test/scala/org/apache/spark/FileSuite.scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/FileSuite.scala | 39 |
1 files changed, 33 insertions, 6 deletions
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 76173608e9..01af940771 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -24,11 +24,12 @@ import scala.io.Source import com.google.common.io.Files import org.apache.hadoop.io._ import org.apache.hadoop.io.compress.DefaultCodec -import org.apache.hadoop.mapred.FileAlreadyExistsException +import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, TextOutputFormat} +import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} +import org.apache.hadoop.mapreduce.Job import org.scalatest.FunSuite import org.apache.spark.SparkContext._ -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat class FileSuite extends FunSuite with LocalSparkContext { @@ -236,7 +237,7 @@ class FileSuite extends FunSuite with LocalSparkContext { val tempdir = Files.createTempDir() val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) intercept[FileAlreadyExistsException] { - randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath) + randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath) } } @@ -244,10 +245,36 @@ class FileSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val tempdir = Files.createTempDir() val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) - randomRDD.saveAsTextFile(tempdir.getPath + "/output") - assert(new File(tempdir.getPath + "/output/part-00000").exists() === true) + randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath + "/output") + assert(new File(tempdir.getPath + "/output/part-r-00000").exists() === true) intercept[FileAlreadyExistsException] { - randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath) + randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath) } } + + test ("save Hadoop Dataset through old Hadoop API") { + sc = new SparkContext("local", "test") + val tempdir = Files.createTempDir() + val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + val job = new JobConf() + job.setOutputKeyClass(classOf[String]) + job.setOutputValueClass(classOf[String]) + job.set("mapred.output.format.class", classOf[TextOutputFormat[String, String]].getName) + job.set("mapred.output.dir", tempdir.getPath + "/outputDataset_old") + randomRDD.saveAsHadoopDataset(job) + assert(new File(tempdir.getPath + "/outputDataset_old/part-00000").exists() === true) + } + + test ("save Hadoop Dataset through new Hadoop API") { + sc = new SparkContext("local", "test") + val tempdir = Files.createTempDir() + val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + val job = new Job(sc.hadoopConfiguration) + job.setOutputKeyClass(classOf[String]) + job.setOutputValueClass(classOf[String]) + job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]]) + job.getConfiguration.set("mapred.output.dir", tempdir.getPath + "/outputDataset_new") + randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration) + assert(new File(tempdir.getPath + "/outputDataset_new/part-r-00000").exists() === true) + } } |