aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-06-09 18:09:46 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-06-09 18:09:46 -0700
commitef14dc2e7736732932d4edceb3be8d81ba9f8bc7 (patch)
treec9464c24319531adee59850b3b0bf9a0514bbe80 /core/src/test/scala
parentdf592192e736edca9e382a7f92e15bead390ef65 (diff)
downloadspark-ef14dc2e7736732932d4edceb3be8d81ba9f8bc7.tar.gz
spark-ef14dc2e7736732932d4edceb3be8d81ba9f8bc7.tar.bz2
spark-ef14dc2e7736732932d4edceb3be8d81ba9f8bc7.zip
Adding Java-API version of compression codec
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java46
1 files changed, 46 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 93bb69b41c..6caa85119a 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -8,6 +8,7 @@ import java.util.*;
import scala.Tuple2;
import com.google.common.base.Charsets;
+import org.apache.hadoop.io.compress.DefaultCodec;
import com.google.common.io.Files;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
@@ -474,6 +475,19 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void textFilesCompressed() throws IOException {
+ File tempDir = Files.createTempDir();
+ String outputDir = new File(tempDir, "output").getAbsolutePath();
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+ rdd.saveAsTextFile(outputDir, DefaultCodec.class);
+
+ // Try reading it in as a text file RDD
+ List<String> expected = Arrays.asList("1", "2", "3", "4");
+ JavaRDD<String> readRDD = sc.textFile(outputDir);
+ Assert.assertEquals(expected, readRDD.collect());
+ }
+
+ @Test
public void sequenceFile() {
File tempDir = Files.createTempDir();
String outputDir = new File(tempDir, "output").getAbsolutePath();
@@ -620,6 +634,38 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void hadoopFileCompressed() {
+ File tempDir = Files.createTempDir();
+ String outputDir = new File(tempDir, "output_compressed").getAbsolutePath();
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<Integer, String>(1, "a"),
+ new Tuple2<Integer, String>(2, "aa"),
+ new Tuple2<Integer, String>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+ rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+ @Override
+ public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+ return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+ }
+ }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class,
+ DefaultCodec.class);
+
+ System.out.println(outputDir);
+ JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
+ SequenceFileInputFormat.class, IntWritable.class, Text.class);
+
+ Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
+ String>() {
+ @Override
+ public String call(Tuple2<IntWritable, Text> x) {
+ return x.toString();
+ }
+ }).collect().toString());
+ }
+
+ @Test
public void zip() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {