diff options
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 70 |
1 files changed, 65 insertions, 5 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 12cc0de750..d92e7fe899 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -17,13 +17,20 @@ package org.apache.spark.streaming; +import java.io.*; +import java.lang.Iterable; +import java.nio.charset.Charset; +import java.util.*; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import scala.Tuple2; import org.junit.Assert; +import static org.junit.Assert.*; import org.junit.Test; -import java.io.*; -import java.util.*; -import java.lang.Iterable; import com.google.common.base.Optional; import com.google.common.collect.Lists; @@ -1743,13 +1750,66 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa StorageLevel.MEMORY_ONLY()); } + @SuppressWarnings("unchecked") + @Test + public void testTextFileStream() throws IOException { + File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir")); + List<List<String>> expected = fileTestPrepare(testDir); + + JavaDStream<String> input = ssc.textFileStream(testDir.toString()); + JavaTestUtils.attachTestOutputStream(input); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1); + + assertOrderInvariantEquals(expected, result); + } + + @SuppressWarnings("unchecked") @Test - public void testTextFileStream() { - JavaDStream<String> test = ssc.textFileStream("/tmp/foo"); + public void testFileStream() throws IOException { + File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir")); + List<List<String>> expected = fileTestPrepare(testDir); + + JavaPairInputDStream<LongWritable, Text> inputStream = ssc.fileStream( + testDir.toString(), + LongWritable.class, + Text.class, + TextInputFormat.class, + new Function<Path, Boolean>() { + @Override + public Boolean call(Path v1) throws Exception { + return Boolean.TRUE; + } + }, + true); + + JavaDStream<String> test = inputStream.map( + new Function<Tuple2<LongWritable, Text>, String>() { + @Override + public String call(Tuple2<LongWritable, Text> v1) throws Exception { + return v1._2().toString(); + } + }); + + JavaTestUtils.attachTestOutputStream(test); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1); + + assertOrderInvariantEquals(expected, result); } @Test public void testRawSocketStream() { JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345); } + + private List<List<String>> fileTestPrepare(File testDir) throws IOException { + File existingFile = new File(testDir, "0"); + Files.write("0\n", existingFile, Charset.forName("UTF-8")); + assertTrue(existingFile.setLastModified(1000) && existingFile.lastModified() == 1000); + + List<List<String>> expected = Arrays.asList( + Arrays.asList("0") + ); + + return expected; + } } |