aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-01-20 23:37:47 -0800
committerPatrick Wendell <patrick@databricks.com>2015-01-20 23:37:47 -0800
commit424d8c6ffff42e4231cc1088b7e69e3c0f5e6b56 (patch)
tree75713ab6e19a7e2eab63a332f8c4bdb5e83a74d2 /streaming/src/test/java
parentec5b0f2cef4b30047c7f88bdc00d10b6aa308124 (diff)
downloadspark-424d8c6ffff42e4231cc1088b7e69e3c0f5e6b56.tar.gz
spark-424d8c6ffff42e4231cc1088b7e69e3c0f5e6b56.tar.bz2
spark-424d8c6ffff42e4231cc1088b7e69e3c0f5e6b56.zip
[SPARK-5297][Streaming] Fix Java file stream type erasure problem
Current Java file stream doesn't support custom key/value type because of loss of type information, details can be seen in [SPARK-5297](https://issues.apache.org/jira/browse/SPARK-5297). Fix this problem by getting correct `ClassTag` from `Class[_]`. Author: jerryshao <saisai.shao@intel.com> Closes #4101 from jerryshao/SPARK-5297 and squashes the following commits: e022ca3 [jerryshao] Add Mima exclusion ecd61b8 [jerryshao] Fix Java fileInputStream type erasure problem
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java70
1 files changed, 65 insertions, 5 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 12cc0de750..d92e7fe899 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -17,13 +17,20 @@
package org.apache.spark.streaming;
+import java.io.*;
+import java.lang.Iterable;
+import java.nio.charset.Charset;
+import java.util.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import scala.Tuple2;
import org.junit.Assert;
+import static org.junit.Assert.*;
import org.junit.Test;
-import java.io.*;
-import java.util.*;
-import java.lang.Iterable;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
@@ -1743,13 +1750,66 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
StorageLevel.MEMORY_ONLY());
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testTextFileStream() throws IOException {
+ File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"));
+ List<List<String>> expected = fileTestPrepare(testDir);
+
+ JavaDStream<String> input = ssc.textFileStream(testDir.toString());
+ JavaTestUtils.attachTestOutputStream(input);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @SuppressWarnings("unchecked")
@Test
- public void testTextFileStream() {
- JavaDStream<String> test = ssc.textFileStream("/tmp/foo");
+ public void testFileStream() throws IOException {
+ File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"));
+ List<List<String>> expected = fileTestPrepare(testDir);
+
+ JavaPairInputDStream<LongWritable, Text> inputStream = ssc.fileStream(
+ testDir.toString(),
+ LongWritable.class,
+ Text.class,
+ TextInputFormat.class,
+ new Function<Path, Boolean>() {
+ @Override
+ public Boolean call(Path v1) throws Exception {
+ return Boolean.TRUE;
+ }
+ },
+ true);
+
+ JavaDStream<String> test = inputStream.map(
+ new Function<Tuple2<LongWritable, Text>, String>() {
+ @Override
+ public String call(Tuple2<LongWritable, Text> v1) throws Exception {
+ return v1._2().toString();
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(test);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);
+
+ assertOrderInvariantEquals(expected, result);
}
@Test
public void testRawSocketStream() {
JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345);
}
+
+ private List<List<String>> fileTestPrepare(File testDir) throws IOException {
+ File existingFile = new File(testDir, "0");
+ Files.write("0\n", existingFile, Charset.forName("UTF-8"));
+ assertTrue(existingFile.setLastModified(1000) && existingFile.lastModified() == 1000);
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("0")
+ );
+
+ return expected;
+ }
}