aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-06-08 16:58:42 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-06-09 11:39:35 -0700
commitd1bbcebae580220076ceaa65f84dcf984ab51a16 (patch)
tree0a612f1ff6287bccf6e24d293ee8c502266e8b32 /core/src/test/scala
parent74b91d53bc89b014cf10e457a5b5a3b965d28eb8 (diff)
downloadspark-d1bbcebae580220076ceaa65f84dcf984ab51a16.tar.gz
spark-d1bbcebae580220076ceaa65f84dcf984ab51a16.tar.bz2
spark-d1bbcebae580220076ceaa65f84dcf984ab51a16.zip
Adding compression to Hadoop save functions
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/spark/FileSuite.scala48
1 files changed, 48 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala
index 91b48c7456..a5d2028591 100644
--- a/core/src/test/scala/spark/FileSuite.scala
+++ b/core/src/test/scala/spark/FileSuite.scala
@@ -7,6 +7,8 @@ import scala.io.Source
import com.google.common.io.Files
import org.scalatest.FunSuite
import org.apache.hadoop.io._
+import org.apache.hadoop.io.compress.{DefaultCodec, CompressionCodec, GzipCodec}
+
import SparkContext._
@@ -26,6 +28,29 @@ class FileSuite extends FunSuite with LocalSparkContext {
assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
}
+ test("text files (compressed)") {
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val normalDir = new File(tempDir, "output_normal").getAbsolutePath
+ val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
+ val codec = new DefaultCodec()
+
+ val data = sc.parallelize("a" * 10000, 1)
+ data.saveAsTextFile(normalDir)
+ data.saveAsTextFile(compressedOutputDir, classOf[DefaultCodec])
+
+ val normalFile = new File(normalDir, "part-00000")
+ val normalContent = sc.textFile(normalDir).collect
+ assert(normalContent === Array.fill(10000)("a"))
+
+ val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
+ val compressedContent = sc.textFile(compressedOutputDir).collect
+ assert(compressedContent === Array.fill(10000)("a"))
+
+ assert(compressedFile.length < normalFile.length)
+ }
+
+
test("SequenceFiles") {
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
@@ -37,6 +62,29 @@ class FileSuite extends FunSuite with LocalSparkContext {
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
}
+ test("SequenceFile (compressed)") {
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val normalDir = new File(tempDir, "output_normal").getAbsolutePath
+ val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
+ val codec = new DefaultCodec()
+
+ val data = sc.parallelize(Seq.fill(100)("abc"), 1).map(x => (x, x))
+ data.saveAsSequenceFile(normalDir)
+ data.saveAsSequenceFile(compressedOutputDir, Some(classOf[DefaultCodec]))
+
+ val normalFile = new File(normalDir, "part-00000")
+ val normalContent = sc.sequenceFile[String, String](normalDir).collect
+ assert(normalContent === Array.fill(100)("abc", "abc"))
+
+ val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
+ val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect
+ assert(compressedContent === Array.fill(100)("abc", "abc"))
+
+ assert(compressedFile.length < normalFile.length)
+ }
+
+
test("SequenceFile with writable key") {
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()