diff options
author | jerryshao <saisai.shao@intel.com> | 2015-01-20 23:37:47 -0800 |
---|---|---|
committer | Patrick Wendell <patrick@databricks.com> | 2015-01-20 23:37:47 -0800 |
commit | 424d8c6ffff42e4231cc1088b7e69e3c0f5e6b56 (patch) | |
tree | 75713ab6e19a7e2eab63a332f8c4bdb5e83a74d2 /streaming/src/test | |
parent | ec5b0f2cef4b30047c7f88bdc00d10b6aa308124 (diff) | |
download | spark-424d8c6ffff42e4231cc1088b7e69e3c0f5e6b56.tar.gz spark-424d8c6ffff42e4231cc1088b7e69e3c0f5e6b56.tar.bz2 spark-424d8c6ffff42e4231cc1088b7e69e3c0f5e6b56.zip |
[SPARK-5297][Streaming] Fix Java file stream type erasure problem
Current Java file stream doesn't support custom key/value type because of loss of type information, details can be seen in [SPARK-5297](https://issues.apache.org/jira/browse/SPARK-5297). Fix this problem by getting correct `ClassTag` from `Class[_]`.
Author: jerryshao <saisai.shao@intel.com>
Closes #4101 from jerryshao/SPARK-5297 and squashes the following commits:
e022ca3 [jerryshao] Add Mima exclusion
ecd61b8 [jerryshao] Fix Java fileInputStream type erasure problem
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 70 |
1 files changed, 65 insertions, 5 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 12cc0de750..d92e7fe899 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -17,13 +17,20 @@ package org.apache.spark.streaming; +import java.io.*; +import java.lang.Iterable; +import java.nio.charset.Charset; +import java.util.*; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import scala.Tuple2; import org.junit.Assert; +import static org.junit.Assert.*; import org.junit.Test; -import java.io.*; -import java.util.*; -import java.lang.Iterable; import com.google.common.base.Optional; import com.google.common.collect.Lists; @@ -1743,13 +1750,66 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa StorageLevel.MEMORY_ONLY()); } + @SuppressWarnings("unchecked") + @Test + public void testTextFileStream() throws IOException { + File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir")); + List<List<String>> expected = fileTestPrepare(testDir); + + JavaDStream<String> input = ssc.textFileStream(testDir.toString()); + JavaTestUtils.attachTestOutputStream(input); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1); + + assertOrderInvariantEquals(expected, result); + } + + @SuppressWarnings("unchecked") @Test - public void testTextFileStream() { - JavaDStream<String> test = ssc.textFileStream("/tmp/foo"); + public void testFileStream() throws IOException { + File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir")); + List<List<String>> expected = fileTestPrepare(testDir); + + JavaPairInputDStream<LongWritable, Text> inputStream = ssc.fileStream( + testDir.toString(), + LongWritable.class, + Text.class, + TextInputFormat.class, + new Function<Path, Boolean>() { + @Override + public Boolean call(Path v1) throws Exception { + return Boolean.TRUE; + } + }, + true); + + JavaDStream<String> test = inputStream.map( + new Function<Tuple2<LongWritable, Text>, String>() { + @Override + public String call(Tuple2<LongWritable, Text> v1) throws Exception { + return v1._2().toString(); + } + }); + + JavaTestUtils.attachTestOutputStream(test); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1); + + assertOrderInvariantEquals(expected, result); } @Test public void testRawSocketStream() { JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345); } + + private List<List<String>> fileTestPrepare(File testDir) throws IOException { + File existingFile = new File(testDir, "0"); + Files.write("0\n", existingFile, Charset.forName("UTF-8")); + assertTrue(existingFile.setLastModified(1000) && existingFile.lastModified() == 1000); + + List<List<String>> expected = Arrays.asList( + Arrays.asList("0") + ); + + return expected; + } } |