diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-06-09 18:09:46 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-06-09 18:09:46 -0700 |
commit | ef14dc2e7736732932d4edceb3be8d81ba9f8bc7 (patch) | |
tree | c9464c24319531adee59850b3b0bf9a0514bbe80 /core/src/test/scala | |
parent | df592192e736edca9e382a7f92e15bead390ef65 (diff) | |
download | spark-ef14dc2e7736732932d4edceb3be8d81ba9f8bc7.tar.gz spark-ef14dc2e7736732932d4edceb3be8d81ba9f8bc7.tar.bz2 spark-ef14dc2e7736732932d4edceb3be8d81ba9f8bc7.zip |
Adding Java-API version of compression codec
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/spark/JavaAPISuite.java | 46 |
1 files changed, 46 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 93bb69b41c..6caa85119a 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -8,6 +8,7 @@ import java.util.*; import scala.Tuple2; import com.google.common.base.Charsets; +import org.apache.hadoop.io.compress.DefaultCodec; import com.google.common.io.Files; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; @@ -474,6 +475,19 @@ public class JavaAPISuite implements Serializable { } @Test + public void textFilesCompressed() throws IOException { + File tempDir = Files.createTempDir(); + String outputDir = new File(tempDir, "output").getAbsolutePath(); + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); + rdd.saveAsTextFile(outputDir, DefaultCodec.class); + + // Try reading it in as a text file RDD + List<String> expected = Arrays.asList("1", "2", "3", "4"); + JavaRDD<String> readRDD = sc.textFile(outputDir); + Assert.assertEquals(expected, readRDD.collect()); + } + + @Test public void sequenceFile() { File tempDir = Files.createTempDir(); String outputDir = new File(tempDir, "output").getAbsolutePath(); @@ -620,6 +634,38 @@ public class JavaAPISuite implements Serializable { } @Test + public void hadoopFileCompressed() { + File tempDir = Files.createTempDir(); + String outputDir = new File(tempDir, "output_compressed").getAbsolutePath(); + List<Tuple2<Integer, String>> pairs = Arrays.asList( + new Tuple2<Integer, String>(1, "a"), + new Tuple2<Integer, String>(2, "aa"), + new Tuple2<Integer, String>(3, "aaa") + ); + JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); + + rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { + @Override + public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { + return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())); + } + }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class, + DefaultCodec.class); + + System.out.println(outputDir); + JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir, + SequenceFileInputFormat.class, IntWritable.class, Text.class); + + Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, + String>() { + @Override + public String call(Tuple2<IntWritable, Text> x) { + return x.toString(); + } + }).collect().toString()); + } + + @Test public void zip() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() { |