From 424d8c6ffff42e4231cc1088b7e69e3c0f5e6b56 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 20 Jan 2015 23:37:47 -0800 Subject: [SPARK-5297][Streaming] Fix Java file stream type erasure problem Current Java file stream doesn't support custom key/value type because of loss of type information, details can be seen in [SPARK-5297](https://issues.apache.org/jira/browse/SPARK-5297). Fix this problem by getting correct `ClassTag` from `Class[_]`. Author: jerryshao Closes #4101 from jerryshao/SPARK-5297 and squashes the following commits: e022ca3 [jerryshao] Add Mima exclusion ecd61b8 [jerryshao] Fix Java fileInputStream type erasure problem --- .../streaming/api/java/JavaStreamingContext.scala | 53 ++++++++++++---- .../org/apache/spark/streaming/JavaAPISuite.java | 70 ++++++++++++++++++++-- 2 files changed, 108 insertions(+), 15 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index d8695b8e05..9a2254bcdc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -17,14 +17,15 @@ package org.apache.spark.streaming.api.java +import java.lang.{Boolean => JBoolean} +import java.io.{Closeable, InputStream} +import java.util.{List => JList, Map => JMap} import scala.collection.JavaConversions._ import scala.reflect.ClassTag -import java.io.{Closeable, InputStream} -import java.util.{List => JList, Map => JMap} - import akka.actor.{Props, SupervisorStrategy} +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark.{SparkConf, SparkContext} @@ -250,21 +251,53 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * Files must be written to the monitored directory by "moving" them from another * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file + * @param kClass class of key for reading HDFS file + * @param vClass class of value for reading HDFS file + * @param fClass class of input format for reading HDFS file * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file * @tparam F Input format for reading HDFS file */ def fileStream[K, V, F <: NewInputFormat[K, V]]( - directory: String): JavaPairInputDStream[K, V] = { - implicit val cmk: ClassTag[K] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] - implicit val cmv: ClassTag[V] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] - implicit val cmf: ClassTag[F] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]] + directory: String, + kClass: Class[K], + vClass: Class[V], + fClass: Class[F]): JavaPairInputDStream[K, V] = { + implicit val cmk: ClassTag[K] = ClassTag(kClass) + implicit val cmv: ClassTag[V] = ClassTag(vClass) + implicit val cmf: ClassTag[F] = ClassTag(fClass) ssc.fileStream[K, V, F](directory) } + /** + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + * @param kClass class of key for reading HDFS file + * @param vClass class of value for reading HDFS file + * @param fClass class of input format for reading HDFS file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[K, V, F <: NewInputFormat[K, V]]( + directory: String, + kClass: Class[K], + vClass: Class[V], + fClass: Class[F], + filter: JFunction[Path, JBoolean], + newFilesOnly: Boolean): JavaPairInputDStream[K, V] = { + implicit val cmk: ClassTag[K] = ClassTag(kClass) + implicit val cmv: ClassTag[V] = ClassTag(vClass) + implicit val cmf: ClassTag[F] = ClassTag(fClass) + def fn = (x: Path) => filter.call(x).booleanValue() + ssc.fileStream[K, V, F](directory, fn, newFilesOnly) + } + /** * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 12cc0de750..d92e7fe899 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -17,13 +17,20 @@ package org.apache.spark.streaming; +import java.io.*; +import java.lang.Iterable; +import java.nio.charset.Charset; +import java.util.*; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import scala.Tuple2; import org.junit.Assert; +import static org.junit.Assert.*; import org.junit.Test; -import java.io.*; -import java.util.*; -import java.lang.Iterable; import com.google.common.base.Optional; import com.google.common.collect.Lists; @@ -1743,13 +1750,66 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa StorageLevel.MEMORY_ONLY()); } + @SuppressWarnings("unchecked") + @Test + public void testTextFileStream() throws IOException { + File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir")); + List> expected = fileTestPrepare(testDir); + + JavaDStream input = ssc.textFileStream(testDir.toString()); + JavaTestUtils.attachTestOutputStream(input); + List> result = JavaTestUtils.runStreams(ssc, 1, 1); + + assertOrderInvariantEquals(expected, result); + } + + @SuppressWarnings("unchecked") @Test - public void testTextFileStream() { - JavaDStream test = ssc.textFileStream("/tmp/foo"); + public void testFileStream() throws IOException { + File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir")); + List> expected = fileTestPrepare(testDir); + + JavaPairInputDStream inputStream = ssc.fileStream( + testDir.toString(), + LongWritable.class, + Text.class, + TextInputFormat.class, + new Function() { + @Override + public Boolean call(Path v1) throws Exception { + return Boolean.TRUE; + } + }, + true); + + JavaDStream test = inputStream.map( + new Function, String>() { + @Override + public String call(Tuple2 v1) throws Exception { + return v1._2().toString(); + } + }); + + JavaTestUtils.attachTestOutputStream(test); + List> result = JavaTestUtils.runStreams(ssc, 1, 1); + + assertOrderInvariantEquals(expected, result); } @Test public void testRawSocketStream() { JavaReceiverInputDStream test = ssc.rawSocketStream("localhost", 12345); } + + private List> fileTestPrepare(File testDir) throws IOException { + File existingFile = new File(testDir, "0"); + Files.write("0\n", existingFile, Charset.forName("UTF-8")); + assertTrue(existingFile.setLastModified(1000) && existingFile.lastModified() == 1000); + + List> expected = Arrays.asList( + Arrays.asList("0") + ); + + return expected; + } } -- cgit v1.2.3