From 424d8c6ffff42e4231cc1088b7e69e3c0f5e6b56 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 20 Jan 2015 23:37:47 -0800 Subject: [SPARK-5297][Streaming] Fix Java file stream type erasure problem Current Java file stream doesn't support custom key/value type because of loss of type information, details can be seen in [SPARK-5297](https://issues.apache.org/jira/browse/SPARK-5297). Fix this problem by getting correct `ClassTag` from `Class[_]`. Author: jerryshao Closes #4101 from jerryshao/SPARK-5297 and squashes the following commits: e022ca3 [jerryshao] Add Mima exclusion ecd61b8 [jerryshao] Fix Java fileInputStream type erasure problem --- .../org/apache/spark/streaming/JavaAPISuite.java | 70 ++++++++++++++++++++-- 1 file changed, 65 insertions(+), 5 deletions(-) (limited to 'streaming/src/test/java') diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 12cc0de750..d92e7fe899 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -17,13 +17,20 @@ package org.apache.spark.streaming; +import java.io.*; +import java.lang.Iterable; +import java.nio.charset.Charset; +import java.util.*; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import scala.Tuple2; import org.junit.Assert; +import static org.junit.Assert.*; import org.junit.Test; -import java.io.*; -import java.util.*; -import java.lang.Iterable; import com.google.common.base.Optional; import com.google.common.collect.Lists; @@ -1743,13 +1750,66 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa StorageLevel.MEMORY_ONLY()); } + @SuppressWarnings("unchecked") + @Test + public void testTextFileStream() throws IOException { + File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir")); + List> expected = fileTestPrepare(testDir); + + JavaDStream input = ssc.textFileStream(testDir.toString()); + JavaTestUtils.attachTestOutputStream(input); + List> result = JavaTestUtils.runStreams(ssc, 1, 1); + + assertOrderInvariantEquals(expected, result); + } + + @SuppressWarnings("unchecked") @Test - public void testTextFileStream() { - JavaDStream test = ssc.textFileStream("/tmp/foo"); + public void testFileStream() throws IOException { + File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir")); + List> expected = fileTestPrepare(testDir); + + JavaPairInputDStream inputStream = ssc.fileStream( + testDir.toString(), + LongWritable.class, + Text.class, + TextInputFormat.class, + new Function() { + @Override + public Boolean call(Path v1) throws Exception { + return Boolean.TRUE; + } + }, + true); + + JavaDStream test = inputStream.map( + new Function, String>() { + @Override + public String call(Tuple2 v1) throws Exception { + return v1._2().toString(); + } + }); + + JavaTestUtils.attachTestOutputStream(test); + List> result = JavaTestUtils.runStreams(ssc, 1, 1); + + assertOrderInvariantEquals(expected, result); } @Test public void testRawSocketStream() { JavaReceiverInputDStream test = ssc.rawSocketStream("localhost", 12345); } + + private List> fileTestPrepare(File testDir) throws IOException { + File existingFile = new File(testDir, "0"); + Files.write("0\n", existingFile, Charset.forName("UTF-8")); + assertTrue(existingFile.setLastModified(1000) && existingFile.lastModified() == 1000); + + List> expected = Arrays.asList( + Arrays.asList("0") + ); + + return expected; + } } -- cgit v1.2.3