From d0fd3b9ad238294346eb3465c489eabd41fb2380 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 6 Jan 2014 01:47:53 -0800 Subject: Changed JavaStreamingContextWith*** to ***Function in streaming.api.java.*** package. Also fixed packages of Flume and MQTT tests. --- .../streaming/examples/JavaFlumeEventCount.java | 6 +- .../streaming/examples/JavaKafkaWordCount.java | 6 +- .../streaming/api/java/flume/FlumeFunctions.scala | 48 +++++++++ .../flume/JavaStreamingContextWithFlume.scala | 48 --------- .../flume/src/test/java/JavaFlumeStreamSuite.java | 38 ------- .../streaming/flume/JavaFlumeStreamSuite.java | 35 +++++++ .../streaming/api/java/kafka/KafkaFunctions.scala | 107 ++++++++++++++++++++ .../kafka/JavaStreamingContextWithKafka.scala | 107 -------------------- .../streaming/kafka/JavaKafkaStreamSuite.java | 15 ++- .../mqtt/JavaStreamingContextWithMQTT.scala | 59 ----------- .../spark/spark/streaming/mqtt/MQTTFunctions.scala | 43 -------- .../spark/streaming/mqtt/MQTTInputDStream.scala | 110 --------------------- .../spark/spark/streaming/mqtt/package.scala | 24 ----- .../streaming/api/java/mqtt/MQTTFunctions.scala | 59 +++++++++++ .../spark/streaming/mqtt/MQTTFunctions.scala | 43 ++++++++ .../spark/streaming/mqtt/MQTTInputDStream.scala | 110 +++++++++++++++++++++ .../org/apache/spark/streaming/mqtt/package.scala | 24 +++++ .../spark/streaming/mqtt/JavaMQTTStreamSuite.java | 10 +- .../api/java/twitter/TwitterFunctions.scala | 99 +++++++++++++++++++ .../twitter/JavaStreamingContextWithTwitter.scala | 99 ------------------- .../streaming/twitter/JavaTwitterStreamSuite.java | 19 ++-- .../api/java/zeromq/ZeroMQFunctions.scala | 102 +++++++++++++++++++ .../zeromq/JavaStreamingContextWithZeroMQ.scala | 102 ------------------- .../streaming/zeromq/JavaZeroMQStreamSuite.java | 12 +-- 24 files changed, 658 insertions(+), 667 deletions(-) create mode 100644 external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala delete mode 100644 external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala delete mode 100644 external/flume/src/test/java/JavaFlumeStreamSuite.java create mode 100644 external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala delete mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala delete mode 100644 external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala delete mode 100644 external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala delete mode 100644 external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala delete mode 100644 external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala create mode 100644 external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala create mode 100644 external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala create mode 100644 external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala create mode 100644 external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala create mode 100644 external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala delete mode 100644 external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala create mode 100644 external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala delete mode 100644 external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java index e53c4f9e83..64832a9721 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java @@ -20,7 +20,7 @@ package org.apache.spark.streaming.examples; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; -import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume; +import org.apache.spark.streaming.api.java.flume.FlumeFunctions; import org.apache.spark.streaming.flume.SparkFlumeEvent; /** @@ -52,8 +52,8 @@ public class JavaFlumeEventCount { JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); - JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc); - JavaDStream flumeStream = sscWithFlume.flumeStream("localhost", port); + FlumeFunctions flumeFunc = new FlumeFunctions(ssc); + JavaDStream flumeStream = flumeFunc.flumeStream("localhost", port); flumeStream.count(); diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java index de0420ca83..207ce8cd4f 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -29,7 +29,7 @@ import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.kafka.JavaStreamingContextWithKafka; +import org.apache.spark.streaming.api.java.kafka.KafkaFunctions; import scala.Tuple2; /** @@ -64,8 +64,8 @@ public class JavaKafkaWordCount { topicMap.put(topic, numThreads); } - JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc); - JavaPairDStream messages = sscWithKafka.kafkaStream(args[1], args[2], topicMap); + KafkaFunctions kafkaFunc = new KafkaFunctions(ssc); + JavaPairDStream messages = kafkaFunc.kafkaStream(args[1], args[2], topicMap); JavaDStream lines = messages.map(new Function, String>() { @Override diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala new file mode 100644 index 0000000000..3347d19796 --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java.flume + +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.flume._ +import org.apache.spark.storage.StorageLevel + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating Flume input streams. + */ +class FlumeFunctions(javaStreamingContext: JavaStreamingContext) { + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + */ + def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { + javaStreamingContext.ssc.flumeStream(hostname, port) + } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): + JavaDStream[SparkFlumeEvent] = { + javaStreamingContext.ssc.flumeStream(hostname, port, storageLevel) + } +} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala deleted file mode 100644 index 4e66ae3535..0000000000 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.flume - -import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} -import org.apache.spark.storage.StorageLevel - -/** - * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra - * functions for creating Flume input streams. - */ -class JavaStreamingContextWithFlume(javaStreamingContext: JavaStreamingContext) - extends JavaStreamingContext(javaStreamingContext.ssc) { - /** - * Creates a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - */ - def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { - ssc.flumeStream(hostname, port) - } - - /** - * Creates a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - * @param storageLevel Storage level to use for storing the received objects - */ - def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): - JavaDStream[SparkFlumeEvent] = { - ssc.flumeStream(hostname, port, storageLevel) - } -} diff --git a/external/flume/src/test/java/JavaFlumeStreamSuite.java b/external/flume/src/test/java/JavaFlumeStreamSuite.java deleted file mode 100644 index deffc78c4c..0000000000 --- a/external/flume/src/test/java/JavaFlumeStreamSuite.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.LocalJavaStreamingContext; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume; -import org.apache.spark.streaming.flume.SparkFlumeEvent; -import org.junit.Test; - -public class JavaFlumeStreamSuite extends LocalJavaStreamingContext { - @Test - public void testFlumeStream() { - JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc); - - // tests the API, does not actually test data receiving - JavaDStream test1 = sscWithFlume.flumeStream("localhost", 12345); - JavaDStream test2 = sscWithFlume.flumeStream("localhost", 12345, - StorageLevel.MEMORY_AND_DISK_SER_2()); - - // To verify that JavaStreamingContextWithKafka is also StreamingContext - JavaDStream socketStream = sscWithFlume.socketTextStream("localhost", 9999); - } -} diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java new file mode 100644 index 0000000000..5930fee925 --- /dev/null +++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java @@ -0,0 +1,35 @@ +package org.apache.spark.streaming.flume;/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.flume.FlumeFunctions; +import org.apache.spark.streaming.flume.SparkFlumeEvent; +import org.junit.Test; + +public class JavaFlumeStreamSuite extends LocalJavaStreamingContext { + @Test + public void testFlumeStream() { + FlumeFunctions flumeFunc = new FlumeFunctions(ssc); + + // tests the API, does not actually test data receiving + JavaDStream test1 = flumeFunc.flumeStream("localhost", 12345); + JavaDStream test2 = flumeFunc.flumeStream("localhost", 12345, + StorageLevel.MEMORY_AND_DISK_SER_2()); + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala new file mode 100644 index 0000000000..491331bb37 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java.kafka + +import scala.reflect.ClassTag +import scala.collection.JavaConversions._ + +import java.lang.{Integer => JInt} +import java.util.{Map => JMap} + +import kafka.serializer.Decoder + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream} +import org.apache.spark.streaming.kafka._ + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating Kafka input streams. + */ +class KafkaFunctions(javaStreamingContext: JavaStreamingContext) { + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt] + ): JavaPairDStream[String, String] = { + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. + * + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt], + storageLevel: StorageLevel + ): JavaPairDStream[String, String] = { + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param keyTypeClass Key type of RDD + * @param valueTypeClass value type of RDD + * @param keyDecoderClass Type of kafka key decoder + * @param valueDecoderClass Type of kafka value decoder + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. Defaults to memory-only + */ + def kafkaStream[K, V, U <: Decoder[_], T <: Decoder[_]]( + keyTypeClass: Class[K], + valueTypeClass: Class[V], + keyDecoderClass: Class[U], + valueDecoderClass: Class[T], + kafkaParams: JMap[String, String], + topics: JMap[String, JInt], + storageLevel: StorageLevel + ): JavaPairDStream[K, V] = { + implicit val keyCmt: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val valueCmt: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + + implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] + implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] + + javaStreamingContext.ssc.kafkaStream[K, V, U, T]( + kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala deleted file mode 100644 index ab0e8a6c8d..0000000000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import scala.reflect.ClassTag -import scala.collection.JavaConversions._ - -import java.lang.{Integer => JInt} -import java.util.{Map => JMap} - -import kafka.serializer.Decoder - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream} - -/** - * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra - * functions for creating Kafka input streams. - */ -class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext) - extends JavaStreamingContext(javaStreamingContext.ssc) { - - /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt] - ): JavaPairDStream[String, String] = { - implicit val cmt: ClassTag[String] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) - } - - /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. - * - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt], - storageLevel: StorageLevel - ): JavaPairDStream[String, String] = { - implicit val cmt: ClassTag[String] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) - } - - /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param keyTypeClass Key type of RDD - * @param valueTypeClass value type of RDD - * @param keyDecoderClass Type of kafka key decoder - * @param valueDecoderClass Type of kafka value decoder - * @param kafkaParams Map of kafka configuration paramaters. - * See: http://kafka.apache.org/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. Defaults to memory-only - */ - def kafkaStream[K, V, U <: Decoder[_], T <: Decoder[_]]( - keyTypeClass: Class[K], - valueTypeClass: Class[V], - keyDecoderClass: Class[U], - valueDecoderClass: Class[T], - kafkaParams: JMap[String, String], - topics: JMap[String, JInt], - storageLevel: StorageLevel - ): JavaPairDStream[K, V] = { - implicit val keyCmt: ClassTag[K] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] - implicit val valueCmt: ClassTag[V] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] - - implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] - implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] - - ssc.kafkaStream[K, V, U, T]( - kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) - } -} diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 66236df662..fdea96e506 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -18,6 +18,8 @@ package org.apache.spark.streaming.kafka; import java.util.HashMap; + +import org.apache.spark.streaming.api.java.kafka.KafkaFunctions; import org.junit.Test; import com.google.common.collect.Maps; import kafka.serializer.StringDecoder; @@ -31,21 +33,18 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext { public void testKafkaStream() { HashMap topics = Maps.newHashMap(); - JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc); + KafkaFunctions kafkaFunc = new KafkaFunctions(ssc); // tests the API, does not actually test data receiving - JavaPairDStream test1 = sscWithKafka.kafkaStream("localhost:12345", "group", topics); - JavaPairDStream test2 = sscWithKafka.kafkaStream("localhost:12345", "group", topics, + JavaPairDStream test1 = kafkaFunc.kafkaStream("localhost:12345", "group", topics); + JavaPairDStream test2 = kafkaFunc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2()); HashMap kafkaParams = Maps.newHashMap(); - kafkaParams.put("zookeeper.connect","localhost:12345"); + kafkaParams.put("zookeeper.connect", "localhost:12345"); kafkaParams.put("group.id","consumer-group"); - JavaPairDStream test3 = sscWithKafka.kafkaStream( + JavaPairDStream test3 = kafkaFunc.kafkaStream( String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2()); - - // To verify that JavaStreamingContextWithKafka is also StreamingContext - JavaDStream socketStream = sscWithKafka.socketTextStream("localhost", 9999); } } diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala deleted file mode 100644 index d814da0f0d..0000000000 --- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.mqtt - -import scala.reflect.ClassTag - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} - -/** - * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra - * functions for creating MQTT input streams. - */ -class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext) - extends JavaStreamingContext(javaStreamingContext.ssc) { - - /** - * Create an input stream that receives messages pushed by a MQTT publisher. - * @param brokerUrl Url of remote MQTT publisher - * @param topic topic name to subscribe to - */ - def mqttStream( - brokerUrl: String, - topic: String - ): JavaDStream[String] = { - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.mqttStream(brokerUrl, topic) - } - - /** - * Create an input stream that receives messages pushed by a MQTT publisher. - * @param brokerUrl Url of remote MQTT publisher - * @param topic topic name to subscribe to - * @param storageLevel RDD storage level. - */ - def mqttStream( - brokerUrl: String, - topic: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): JavaDStream[String] = { - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.mqttStream(brokerUrl, topic, storageLevel) - } -} diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala deleted file mode 100644 index 86f4e9c724..0000000000 --- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.mqtt - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming._ - -/** - * Extra MQTT input stream functions available on [[org.apache.spark.streaming.StreamingContext]] - * through implicit conversions. Import org.apache.spark.streaming.mqtt._ to use these functions. - */ -class MQTTFunctions(ssc: StreamingContext) { - /** - * Create an input stream that receives messages pushed by a MQTT publisher. - * @param brokerUrl Url of remote MQTT publisher - * @param topic topic name to subscribe to - * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. - */ - def mqttStream( - brokerUrl: String, - topic: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[String] = { - val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel) - ssc.registerInputStream(inputStream) - inputStream - } -} diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala deleted file mode 100644 index c8987a3ee0..0000000000 --- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.mqtt - -import scala.collection.Map -import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag - -import java.util.Properties -import java.util.concurrent.Executors -import java.io.IOException - -import org.eclipse.paho.client.mqttv3.MqttCallback -import org.eclipse.paho.client.mqttv3.MqttClient -import org.eclipse.paho.client.mqttv3.MqttClientPersistence -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken -import org.eclipse.paho.client.mqttv3.MqttException -import org.eclipse.paho.client.mqttv3.MqttMessage -import org.eclipse.paho.client.mqttv3.MqttTopic - -import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.dstream._ - -/** - * Input stream that subscribe messages from a Mqtt Broker. - * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ - * @param brokerUrl Url of remote mqtt publisher - * @param topic topic name to subscribe to - * @param storageLevel RDD storage level. - */ - -private[streaming] -class MQTTInputDStream[T: ClassTag]( - @transient ssc_ : StreamingContext, - brokerUrl: String, - topic: String, - storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_) with Logging { - - def getReceiver(): NetworkReceiver[T] = { - new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]] - } -} - -private[streaming] -class MQTTReceiver(brokerUrl: String, - topic: String, - storageLevel: StorageLevel - ) extends NetworkReceiver[Any] { - lazy protected val blockGenerator = new BlockGenerator(storageLevel) - - def onStop() { - blockGenerator.stop() - } - - def onStart() { - - blockGenerator.start() - - // Set up persistence for messages - var peristance: MqttClientPersistence = new MemoryPersistence() - - // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance - var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance) - - // Connect to MqttBroker - client.connect() - - // Subscribe to Mqtt topic - client.subscribe(topic) - - // Callback automatically triggers as and when new message arrives on specified topic - var callback: MqttCallback = new MqttCallback() { - - // Handles Mqtt message - override def messageArrived(arg0: String, arg1: MqttMessage) { - blockGenerator += new String(arg1.getPayload()) - } - - override def deliveryComplete(arg0: IMqttDeliveryToken) { - } - - override def connectionLost(arg0: Throwable) { - logInfo("Connection lost " + arg0) - } - } - - // Set up callback for MqttClient - client.setCallback(callback) - } -} diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala deleted file mode 100644 index 28a944f57e..0000000000 --- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -package object mqtt { - implicit def sscToMQTTFunctions(ssc: StreamingContext) = new MQTTFunctions(ssc) -} - - diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala new file mode 100644 index 0000000000..72124956fc --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java.mqtt + +import scala.reflect.ClassTag + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.mqtt._ + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating MQTT input streams. + */ +class MQTTFunctions(javaStreamingContext: JavaStreamingContext) { + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param brokerUrl Url of remote MQTT publisher + * @param topic topic name to subscribe to + */ + def mqttStream( + brokerUrl: String, + topic: String + ): JavaDStream[String] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + javaStreamingContext.ssc.mqttStream(brokerUrl, topic) + } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param brokerUrl Url of remote MQTT publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. + */ + def mqttStream( + brokerUrl: String, + topic: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): JavaDStream[String] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + javaStreamingContext.ssc.mqttStream(brokerUrl, topic, storageLevel) + } +} diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala new file mode 100644 index 0000000000..86f4e9c724 --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + +/** + * Extra MQTT input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversions. Import org.apache.spark.streaming.mqtt._ to use these functions. + */ +class MQTTFunctions(ssc: StreamingContext) { + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param brokerUrl Url of remote MQTT publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + */ + def mqttStream( + brokerUrl: String, + topic: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[String] = { + val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala new file mode 100644 index 0000000000..c8987a3ee0 --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import scala.collection.Map +import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag + +import java.util.Properties +import java.util.concurrent.Executors +import java.io.IOException + +import org.eclipse.paho.client.mqttv3.MqttCallback +import org.eclipse.paho.client.mqttv3.MqttClient +import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken +import org.eclipse.paho.client.mqttv3.MqttException +import org.eclipse.paho.client.mqttv3.MqttMessage +import org.eclipse.paho.client.mqttv3.MqttTopic + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ + +/** + * Input stream that subscribe messages from a Mqtt Broker. + * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ + * @param brokerUrl Url of remote mqtt publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. + */ + +private[streaming] +class MQTTInputDStream[T: ClassTag]( + @transient ssc_ : StreamingContext, + brokerUrl: String, + topic: String, + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_) with Logging { + + def getReceiver(): NetworkReceiver[T] = { + new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]] + } +} + +private[streaming] +class MQTTReceiver(brokerUrl: String, + topic: String, + storageLevel: StorageLevel + ) extends NetworkReceiver[Any] { + lazy protected val blockGenerator = new BlockGenerator(storageLevel) + + def onStop() { + blockGenerator.stop() + } + + def onStart() { + + blockGenerator.start() + + // Set up persistence for messages + var peristance: MqttClientPersistence = new MemoryPersistence() + + // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance + var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance) + + // Connect to MqttBroker + client.connect() + + // Subscribe to Mqtt topic + client.subscribe(topic) + + // Callback automatically triggers as and when new message arrives on specified topic + var callback: MqttCallback = new MqttCallback() { + + // Handles Mqtt message + override def messageArrived(arg0: String, arg1: MqttMessage) { + blockGenerator += new String(arg1.getPayload()) + } + + override def deliveryComplete(arg0: IMqttDeliveryToken) { + } + + override def connectionLost(arg0: Throwable) { + logInfo("Connection lost " + arg0) + } + } + + // Set up callback for MqttClient + client.setCallback(callback) + } +} diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala new file mode 100644 index 0000000000..28a944f57e --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +package object mqtt { + implicit def sscToMQTTFunctions(ssc: StreamingContext) = new MQTTFunctions(ssc) +} + + diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java index c1f41640dc..3ddb4d084f 100644 --- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java +++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java @@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.mqtt.MQTTFunctions; import org.junit.Test; import org.apache.spark.streaming.LocalJavaStreamingContext; @@ -28,14 +29,11 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext { public void testMQTTStream() { String brokerUrl = "abc"; String topic = "def"; - JavaStreamingContextWithMQTT sscWithMQTT = new JavaStreamingContextWithMQTT(ssc); + MQTTFunctions mqttFunc = new MQTTFunctions(ssc); // tests the API, does not actually test data receiving - JavaDStream test1 = sscWithMQTT.mqttStream(brokerUrl, topic); - JavaDStream test2 = sscWithMQTT.mqttStream(brokerUrl, topic, + JavaDStream test1 = mqttFunc.mqttStream(brokerUrl, topic); + JavaDStream test2 = mqttFunc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2()); - - // To verify that JavaStreamingContextWithKafka is also StreamingContext - JavaDStream socketStream = sscWithMQTT.socketTextStream("localhost", 9999); } } diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala new file mode 100644 index 0000000000..22e297a03a --- /dev/null +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java.twitter + +import twitter4j.Status +import twitter4j.auth.Authorization + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.twitter._ + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating Twitter input streams. + */ +class TwitterFunctions(javaStreamingContext: JavaStreamingContext) { + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + */ + def twitterStream(): JavaDStream[Status] = { + javaStreamingContext.ssc.twitterStream(None) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream(filters: Array[String]): JavaDStream[Status] = { + javaStreamingContext.ssc.twitterStream(None, filters) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = { + javaStreamingContext.ssc.twitterStream(None, filters, storageLevel) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization + */ + def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = { + javaStreamingContext.ssc.twitterStream(Some(twitterAuth)) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream( + twitterAuth: Authorization, + filters: Array[String] + ): JavaDStream[Status] = { + javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization object + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + twitterAuth: Authorization, + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters, storageLevel) + } +} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala deleted file mode 100644 index 0250364331..0000000000 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.twitter - -import twitter4j.Status -import twitter4j.auth.Authorization - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} - -/** - * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra - * functions for creating Twitter input streams. - */ -class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext) - extends JavaStreamingContext(javaStreamingContext.ssc) { - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and - * twitter4j.oauth.accessTokenSecret. - */ - def twitterStream(): JavaDStream[Status] = { - ssc.twitterStream(None) - } - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and - * twitter4j.oauth.accessTokenSecret. - * @param filters Set of filter strings to get only those tweets that match them - */ - def twitterStream(filters: Array[String]): JavaDStream[Status] = { - ssc.twitterStream(None, filters) - } - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and - * twitter4j.oauth.accessTokenSecret. - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = { - ssc.twitterStream(None, filters, storageLevel) - } - - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization - */ - def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth)) - } - - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization - * @param filters Set of filter strings to get only those tweets that match them - */ - def twitterStream( - twitterAuth: Authorization, - filters: Array[String] - ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth), filters) - } - - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization object - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - twitterAuth: Authorization, - filters: Array[String], - storageLevel: StorageLevel - ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth), filters, storageLevel) - } -} diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java index 34e4fbdd85..4564d6cd33 100644 --- a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java +++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java @@ -18,6 +18,8 @@ package org.apache.spark.streaming.twitter; import java.util.Arrays; + +import org.apache.spark.streaming.api.java.twitter.TwitterFunctions; import org.junit.Test; import twitter4j.Status; @@ -31,21 +33,18 @@ import org.apache.spark.streaming.api.java.JavaDStream; public class JavaTwitterStreamSuite extends LocalJavaStreamingContext { @Test public void testTwitterStream() { - JavaStreamingContextWithTwitter sscWithTwitter = new JavaStreamingContextWithTwitter(ssc); + TwitterFunctions twitterFunc = new TwitterFunctions(ssc); String[] filters = (String[])Arrays.asList("filter1", "filter2").toArray(); Authorization auth = NullAuthorization.getInstance(); // tests the API, does not actually test data receiving - JavaDStream test1 = sscWithTwitter.twitterStream(); - JavaDStream test2 = sscWithTwitter.twitterStream(filters); + JavaDStream test1 = twitterFunc.twitterStream(); + JavaDStream test2 = twitterFunc.twitterStream(filters); JavaDStream test3 = - sscWithTwitter.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaDStream test4 = sscWithTwitter.twitterStream(auth); - JavaDStream test5 = sscWithTwitter.twitterStream(auth, filters); + twitterFunc.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream test4 = twitterFunc.twitterStream(auth); + JavaDStream test5 = twitterFunc.twitterStream(auth, filters); JavaDStream test6 = - sscWithTwitter.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); - - // To verify that JavaStreamingContextWithKafka is also StreamingContext - JavaDStream socketStream = sscWithTwitter.socketTextStream("localhost", 9999); + twitterFunc.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); } } diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala new file mode 100644 index 0000000000..a9bbce71f5 --- /dev/null +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java.zeromq + +import scala.reflect.ClassTag +import scala.collection.JavaConversions._ + +import akka.actor.SupervisorStrategy +import akka.util.ByteString +import akka.zeromq.Subscribe + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.zeromq._ + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating ZeroMQ input streams. + */ +class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) { + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote ZeroMQ publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel Storage level to use for storing the received objects + */ + def zeroMQStream[T]( + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel, + supervisorStrategy: SupervisorStrategy + ): JavaDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. + */ + def zeroMQStream[T]( + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel + ): JavaDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + */ + def zeroMQStream[T]( + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] + ): JavaDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn) + } +} diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala deleted file mode 100644 index dc5d1f05be..0000000000 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.zeromq - -import scala.reflect.ClassTag -import scala.collection.JavaConversions._ - -import akka.actor.SupervisorStrategy -import akka.util.ByteString -import akka.zeromq.Subscribe - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.api.java.function.{Function => JFunction} -import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} - -/** - * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra - * functions for creating ZeroMQ input streams. - */ -class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext) - extends JavaStreamingContext(javaStreamingContext.ssc) { - - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote ZeroMQ publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel Storage level to use for storing the received objects - */ - def zeroMQStream[T]( - publisherUrl: String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], - storageLevel: StorageLevel, - supervisorStrategy: SupervisorStrategy - ): JavaDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) - } - - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel RDD storage level. - */ - def zeroMQStream[T]( - publisherUrl: String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], - storageLevel: StorageLevel - ): JavaDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) - } - - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - */ - def zeroMQStream[T]( - publisherUrl: String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] - ): JavaDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn) - } -} diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java index 96af7d737d..b020ae4cef 100644 --- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -17,6 +17,7 @@ package org.apache.spark.streaming.zeromq; +import org.apache.spark.streaming.api.java.zeromq.ZeroMQFunctions; import org.junit.Test; import akka.actor.SupervisorStrategy; @@ -32,7 +33,7 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { @Test // tests the API, does not actually test data receiving public void testZeroMQStream() { - JavaStreamingContextWithZeroMQ sscWithZeroMQ = new JavaStreamingContextWithZeroMQ(ssc); + ZeroMQFunctions zeromqFunc = new ZeroMQFunctions(ssc); String publishUrl = "abc"; Subscribe subscribe = new Subscribe((ByteString)null); Function> bytesToObjects = new Function>() { @@ -42,14 +43,11 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { } }; - JavaDStream test1 = sscWithZeroMQ.zeroMQStream( + JavaDStream test1 = zeromqFunc.zeroMQStream( publishUrl, subscribe, bytesToObjects); - JavaDStream test2 = sscWithZeroMQ.zeroMQStream( + JavaDStream test2 = zeromqFunc.zeroMQStream( publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaDStream test3 = sscWithZeroMQ.zeroMQStream( + JavaDStream test3 = zeromqFunc.zeroMQStream( publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy()); - - // To verify that JavaStreamingContextWithKafka is also StreamingContext - JavaDStream socketStream = sscWithZeroMQ.socketTextStream("localhost", 9999); } } -- cgit v1.2.3