aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-11 11:18:06 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commit5bcb048167fe0b90f749910233342c09fff3fce7 (patch)
tree4396166ddea50005377b0bcbcbf380d12316c9f6 /streaming/src
parent280b6d018691810bbb3dd3155f059132b4475995 (diff)
downloadspark-5bcb048167fe0b90f749910233342c09fff3fce7.tar.gz
spark-5bcb048167fe0b90f749910233342c09fff3fce7.tar.bz2
spark-5bcb048167fe0b90f749910233342c09fff3fce7.zip
More work on InputStreams
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala68
-rw-r--r--streaming/src/test/scala/spark/streaming/JavaAPISuite.java69
3 files changed, 130 insertions, 10 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index 03336d040d..b25a3f109c 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -283,7 +283,8 @@ object JavaPairDStream {
new JavaPairDStream[K, V](dstream.dstream)
}
- def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long]): JavaPairDStream[K, JLong] = {
+ def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long])
+ : JavaPairDStream[K, JLong] = {
StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
}
}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index 5a712d18c7..2833793b94 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -2,6 +2,7 @@ package spark.streaming.api.java
import scala.collection.JavaConversions._
import java.util.{List => JList}
+import java.lang.{Long => JLong, Integer => JInt}
import spark.streaming._
import dstream._
@@ -18,9 +19,53 @@ class JavaStreamingContext(val ssc: StreamingContext) {
// TODOs:
// - Test StreamingContext functions
// - Test to/from Hadoop functions
- // - Support registering InputStreams
- // - Add Kafka Stream
+ // - Support creating and registering InputStreams
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param hostname Zookeper hostname.
+ * @param port Zookeper port.
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ */
+ def kafkaStream[T](
+ hostname: String,
+ port: Int,
+ groupId: String,
+ topics: JMap[String, JInt])
+ : JavaDStream[T] = {
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.kafkaStream[T](hostname, port, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param hostname Zookeper hostname.
+ * @param port Zookeper port.
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param initialOffsets Optional initial offsets for each of the partitions to consume.
+ * By default the value is pulled from zookeper.
+ */
+ def kafkaStream[T](
+ hostname: String,
+ port: Int,
+ groupId: String,
+ topics: JMap[String, JInt],
+ initialOffsets: JMap[KafkaPartitionKey, JLong])
+ : JavaDStream[T] = {
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.kafkaStream[T](
+ hostname,
+ port,
+ groupId,
+ Map(topics.mapValues(_.intValue()).toSeq: _*),
+ Map(initialOffsets.mapValues(_.longValue()).toSeq: _*))
+ }
/**
* Create an input stream that pulls messages form a Kafka Broker.
@@ -31,18 +76,27 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* in its own thread.
* @param initialOffsets Optional initial offsets for each of the partitions to consume.
* By default the value is pulled from zookeper.
- * @param storageLevel RDD storage level. Defaults to memory-only.
+ * @param storageLevel RDD storage level. Defaults to memory-only
*/
def kafkaStream[T](
hostname: String,
port: Int,
groupId: String,
- topics: JMap[String, Int])
- : DStream[T] = {
+ topics: JMap[String, JInt],
+ initialOffsets: JMap[KafkaPartitionKey, JLong],
+ storageLevel: StorageLevel)
+ : JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- ssc.kafkaStream(hostname, port, groupId, Map(topics.toSeq: _*))
+ ssc.kafkaStream[T](
+ hostname,
+ port,
+ groupId,
+ Map(topics.mapValues(_.intValue()).toSeq: _*),
+ Map(initialOffsets.mapValues(_.longValue()).toSeq: _*),
+ storageLevel)
}
+
/**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
@@ -175,8 +229,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
ssc.flumeStream(hostname, port)
}
- // NOT SUPPORTED: registerInputStream
-
/**
* Registers an output stream that will be computed every interval
*/
diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
index 26ff5b1ccd..7475b9536b 100644
--- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
@@ -2,6 +2,7 @@ package spark.streaming;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -13,12 +14,15 @@ import spark.api.java.function.FlatMapFunction;
import spark.api.java.function.Function;
import spark.api.java.function.Function2;
import spark.api.java.function.PairFunction;
+import spark.storage.StorageLevel;
import spark.streaming.api.java.JavaDStream;
import spark.streaming.api.java.JavaPairDStream;
import spark.streaming.api.java.JavaStreamingContext;
import spark.streaming.JavaTestUtils;
+import spark.streaming.dstream.KafkaPartitionKey;
+import sun.org.mozilla.javascript.annotations.JSFunction;
-import java.io.Serializable;
+import java.io.*;
import java.util.*;
// The test suite itself is Serializable so that anonymous Function implementations can be
@@ -757,4 +761,67 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, result);
}
+
+ // Input stream tests. These mostly just test that we can instantiate a given InputStream with
+ // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
+ // InputStream functionality is deferred to the existing Scala tests.
+ @Test
+ public void testKafkaStream() {
+ HashMap<String, Integer> topics = Maps.newHashMap();
+ HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap();
+ JavaDStream test1 = sc.kafkaStream("localhost", 12345, "group", topics);
+ JavaDStream test2 = sc.kafkaStream("localhost", 12345, "group", topics, offsets);
+ JavaDStream test3 = sc.kafkaStream("localhost", 12345, "group", topics, offsets,
+ StorageLevel.MEMORY_AND_DISK());
+ }
+
+ @Test
+ public void testNetworkTextStream() {
+ JavaDStream test = sc.networkTextStream("localhost", 12345);
+ }
+
+ @Test
+ public void testNetworkString() {
+ class Converter extends Function<InputStream, Iterable<String>> {
+ public Iterable<String> call(InputStream in) {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ List<String> out = new ArrayList<String>();
+ try {
+ while (true) {
+ String line = reader.readLine();
+ if (line == null) { break; }
+ out.add(line);
+ }
+ } catch (IOException e) { }
+ return out;
+ }
+ }
+
+ JavaDStream test = sc.networkStream(
+ "localhost",
+ 12345,
+ new Converter(),
+ StorageLevel.MEMORY_ONLY());
+ }
+
+ @Test
+ public void testTextFileStream() {
+ JavaDStream test = sc.textFileStream("/tmp/foo");
+ }
+
+ @Test
+ public void testRawNetworkStream() {
+ JavaDStream test = sc.rawNetworkStream("localhost", 12345);
+ }
+
+ @Test
+ public void testFlumeStream() {
+ JavaDStream test = sc.flumeStream("localhost", 12345);
+ }
+
+ @Test
+ public void testFileStream() {
+ JavaPairDStream<String, String> foo =
+ sc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
+ }
}