aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-01-20 23:37:47 -0800
committerPatrick Wendell <patrick@databricks.com>2015-01-20 23:37:47 -0800
commit424d8c6ffff42e4231cc1088b7e69e3c0f5e6b56 (patch)
tree75713ab6e19a7e2eab63a332f8c4bdb5e83a74d2 /streaming
parentec5b0f2cef4b30047c7f88bdc00d10b6aa308124 (diff)
downloadspark-424d8c6ffff42e4231cc1088b7e69e3c0f5e6b56.tar.gz
spark-424d8c6ffff42e4231cc1088b7e69e3c0f5e6b56.tar.bz2
spark-424d8c6ffff42e4231cc1088b7e69e3c0f5e6b56.zip
[SPARK-5297][Streaming] Fix Java file stream type erasure problem
Current Java file stream doesn't support custom key/value type because of loss of type information, details can be seen in [SPARK-5297](https://issues.apache.org/jira/browse/SPARK-5297). Fix this problem by getting correct `ClassTag` from `Class[_]`. Author: jerryshao <saisai.shao@intel.com> Closes #4101 from jerryshao/SPARK-5297 and squashes the following commits: e022ca3 [jerryshao] Add Mima exclusion ecd61b8 [jerryshao] Fix Java fileInputStream type erasure problem
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala53
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java70
2 files changed, 108 insertions, 15 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index d8695b8e05..9a2254bcdc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -17,14 +17,15 @@
package org.apache.spark.streaming.api.java
+import java.lang.{Boolean => JBoolean}
+import java.io.{Closeable, InputStream}
+import java.util.{List => JList, Map => JMap}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
-import java.io.{Closeable, InputStream}
-import java.util.{List => JList, Map => JMap}
-
import akka.actor.{Props, SupervisorStrategy}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.{SparkConf, SparkContext}
@@ -250,22 +251,54 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
+ * @param kClass class of key for reading HDFS file
+ * @param vClass class of value for reading HDFS file
+ * @param fClass class of input format for reading HDFS file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](
- directory: String): JavaPairInputDStream[K, V] = {
- implicit val cmk: ClassTag[K] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
- implicit val cmv: ClassTag[V] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
- implicit val cmf: ClassTag[F] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]]
+ directory: String,
+ kClass: Class[K],
+ vClass: Class[V],
+ fClass: Class[F]): JavaPairInputDStream[K, V] = {
+ implicit val cmk: ClassTag[K] = ClassTag(kClass)
+ implicit val cmv: ClassTag[V] = ClassTag(vClass)
+ implicit val cmf: ClassTag[F] = ClassTag(fClass)
ssc.fileStream[K, V, F](directory)
}
/**
+ * Create an input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * Files must be written to the monitored directory by "moving" them from another
+ * location within the same file system. File names starting with . are ignored.
+ * @param directory HDFS directory to monitor for new file
+ * @param kClass class of key for reading HDFS file
+ * @param vClass class of value for reading HDFS file
+ * @param fClass class of input format for reading HDFS file
+ * @param filter Function to filter paths to process
+ * @param newFilesOnly Should process only new files and ignore existing files in the directory
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
+ */
+ def fileStream[K, V, F <: NewInputFormat[K, V]](
+ directory: String,
+ kClass: Class[K],
+ vClass: Class[V],
+ fClass: Class[F],
+ filter: JFunction[Path, JBoolean],
+ newFilesOnly: Boolean): JavaPairInputDStream[K, V] = {
+ implicit val cmk: ClassTag[K] = ClassTag(kClass)
+ implicit val cmv: ClassTag[V] = ClassTag(vClass)
+ implicit val cmf: ClassTag[F] = ClassTag(fClass)
+ def fn = (x: Path) => filter.call(x).booleanValue()
+ ssc.fileStream[K, V, F](directory, fn, newFilesOnly)
+ }
+
+ /**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
* @param name Name of the actor
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 12cc0de750..d92e7fe899 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -17,13 +17,20 @@
package org.apache.spark.streaming;
+import java.io.*;
+import java.lang.Iterable;
+import java.nio.charset.Charset;
+import java.util.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import scala.Tuple2;
import org.junit.Assert;
+import static org.junit.Assert.*;
import org.junit.Test;
-import java.io.*;
-import java.util.*;
-import java.lang.Iterable;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
@@ -1743,13 +1750,66 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
StorageLevel.MEMORY_ONLY());
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testTextFileStream() throws IOException {
+ File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"));
+ List<List<String>> expected = fileTestPrepare(testDir);
+
+ JavaDStream<String> input = ssc.textFileStream(testDir.toString());
+ JavaTestUtils.attachTestOutputStream(input);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @SuppressWarnings("unchecked")
@Test
- public void testTextFileStream() {
- JavaDStream<String> test = ssc.textFileStream("/tmp/foo");
+ public void testFileStream() throws IOException {
+ File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"));
+ List<List<String>> expected = fileTestPrepare(testDir);
+
+ JavaPairInputDStream<LongWritable, Text> inputStream = ssc.fileStream(
+ testDir.toString(),
+ LongWritable.class,
+ Text.class,
+ TextInputFormat.class,
+ new Function<Path, Boolean>() {
+ @Override
+ public Boolean call(Path v1) throws Exception {
+ return Boolean.TRUE;
+ }
+ },
+ true);
+
+ JavaDStream<String> test = inputStream.map(
+ new Function<Tuple2<LongWritable, Text>, String>() {
+ @Override
+ public String call(Tuple2<LongWritable, Text> v1) throws Exception {
+ return v1._2().toString();
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(test);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);
+
+ assertOrderInvariantEquals(expected, result);
}
@Test
public void testRawSocketStream() {
JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345);
}
+
+ private List<List<String>> fileTestPrepare(File testDir) throws IOException {
+ File existingFile = new File(testDir, "0");
+ Files.write("0\n", existingFile, Charset.forName("UTF-8"));
+ assertTrue(existingFile.setLastModified(1000) && existingFile.lastModified() == 1000);
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("0")
+ );
+
+ return expected;
+ }
}