diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-11 11:18:06 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-14 09:42:36 -0800 |
commit | 5bcb048167fe0b90f749910233342c09fff3fce7 (patch) | |
tree | 4396166ddea50005377b0bcbcbf380d12316c9f6 /streaming | |
parent | 280b6d018691810bbb3dd3155f059132b4475995 (diff) | |
download | spark-5bcb048167fe0b90f749910233342c09fff3fce7.tar.gz spark-5bcb048167fe0b90f749910233342c09fff3fce7.tar.bz2 spark-5bcb048167fe0b90f749910233342c09fff3fce7.zip |
More work on InputStreams
Diffstat (limited to 'streaming')
3 files changed, 130 insertions, 10 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index 03336d040d..b25a3f109c 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -283,7 +283,8 @@ object JavaPairDStream { new JavaPairDStream[K, V](dstream.dstream) } - def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long]): JavaPairDStream[K, JLong] = { + def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long]) + : JavaPairDStream[K, JLong] = { StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_)) } } diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 5a712d18c7..2833793b94 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -2,6 +2,7 @@ package spark.streaming.api.java import scala.collection.JavaConversions._ import java.util.{List => JList} +import java.lang.{Long => JLong, Integer => JInt} import spark.streaming._ import dstream._ @@ -18,9 +19,53 @@ class JavaStreamingContext(val ssc: StreamingContext) { // TODOs: // - Test StreamingContext functions // - Test to/from Hadoop functions - // - Support registering InputStreams - // - Add Kafka Stream + // - Support creating and registering InputStreams + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param hostname Zookeper hostname. + * @param port Zookeper port. + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + */ + def kafkaStream[T]( + hostname: String, + port: Int, + groupId: String, + topics: JMap[String, JInt]) + : JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.kafkaStream[T](hostname, port, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param hostname Zookeper hostname. + * @param port Zookeper port. + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param initialOffsets Optional initial offsets for each of the partitions to consume. + * By default the value is pulled from zookeper. + */ + def kafkaStream[T]( + hostname: String, + port: Int, + groupId: String, + topics: JMap[String, JInt], + initialOffsets: JMap[KafkaPartitionKey, JLong]) + : JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.kafkaStream[T]( + hostname, + port, + groupId, + Map(topics.mapValues(_.intValue()).toSeq: _*), + Map(initialOffsets.mapValues(_.longValue()).toSeq: _*)) + } /** * Create an input stream that pulls messages form a Kafka Broker. @@ -31,18 +76,27 @@ class JavaStreamingContext(val ssc: StreamingContext) { * in its own thread. * @param initialOffsets Optional initial offsets for each of the partitions to consume. * By default the value is pulled from zookeper. - * @param storageLevel RDD storage level. Defaults to memory-only. + * @param storageLevel RDD storage level. Defaults to memory-only */ def kafkaStream[T]( hostname: String, port: Int, groupId: String, - topics: JMap[String, Int]) - : DStream[T] = { + topics: JMap[String, JInt], + initialOffsets: JMap[KafkaPartitionKey, JLong], + storageLevel: StorageLevel) + : JavaDStream[T] = { implicit val cmt: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - ssc.kafkaStream(hostname, port, groupId, Map(topics.toSeq: _*)) + ssc.kafkaStream[T]( + hostname, + port, + groupId, + Map(topics.mapValues(_.intValue()).toSeq: _*), + Map(initialOffsets.mapValues(_.longValue()).toSeq: _*), + storageLevel) } + /** * Create a input stream from network source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited @@ -175,8 +229,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { ssc.flumeStream(hostname, port) } - // NOT SUPPORTED: registerInputStream - /** * Registers an output stream that will be computed every interval */ diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java index 26ff5b1ccd..7475b9536b 100644 --- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java @@ -2,6 +2,7 @@ package spark.streaming; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -13,12 +14,15 @@ import spark.api.java.function.FlatMapFunction; import spark.api.java.function.Function; import spark.api.java.function.Function2; import spark.api.java.function.PairFunction; +import spark.storage.StorageLevel; import spark.streaming.api.java.JavaDStream; import spark.streaming.api.java.JavaPairDStream; import spark.streaming.api.java.JavaStreamingContext; import spark.streaming.JavaTestUtils; +import spark.streaming.dstream.KafkaPartitionKey; +import sun.org.mozilla.javascript.annotations.JSFunction; -import java.io.Serializable; +import java.io.*; import java.util.*; // The test suite itself is Serializable so that anonymous Function implementations can be @@ -757,4 +761,67 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, result); } + + // Input stream tests. These mostly just test that we can instantiate a given InputStream with + // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the + // InputStream functionality is deferred to the existing Scala tests. + @Test + public void testKafkaStream() { + HashMap<String, Integer> topics = Maps.newHashMap(); + HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap(); + JavaDStream test1 = sc.kafkaStream("localhost", 12345, "group", topics); + JavaDStream test2 = sc.kafkaStream("localhost", 12345, "group", topics, offsets); + JavaDStream test3 = sc.kafkaStream("localhost", 12345, "group", topics, offsets, + StorageLevel.MEMORY_AND_DISK()); + } + + @Test + public void testNetworkTextStream() { + JavaDStream test = sc.networkTextStream("localhost", 12345); + } + + @Test + public void testNetworkString() { + class Converter extends Function<InputStream, Iterable<String>> { + public Iterable<String> call(InputStream in) { + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + List<String> out = new ArrayList<String>(); + try { + while (true) { + String line = reader.readLine(); + if (line == null) { break; } + out.add(line); + } + } catch (IOException e) { } + return out; + } + } + + JavaDStream test = sc.networkStream( + "localhost", + 12345, + new Converter(), + StorageLevel.MEMORY_ONLY()); + } + + @Test + public void testTextFileStream() { + JavaDStream test = sc.textFileStream("/tmp/foo"); + } + + @Test + public void testRawNetworkStream() { + JavaDStream test = sc.rawNetworkStream("localhost", 12345); + } + + @Test + public void testFlumeStream() { + JavaDStream test = sc.flumeStream("localhost", 12345); + } + + @Test + public void testFileStream() { + JavaPairDStream<String, String> foo = + sc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo"); + } } |