diff options
60 files changed, 2162 insertions, 721 deletions
diff --git a/examples/pom.xml b/examples/pom.xml index 7a7032c319..1839667fea 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -72,6 +72,36 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-twitter_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-zeromq_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>0.94.6</version> @@ -87,21 +117,6 @@ </exclusions> </dependency> <dependency> - <groupId>com.sksamuel.kafka</groupId> - <artifactId>kafka_${scala.binary.version}</artifactId> - <version>0.8.0-beta1</version> - <exclusions> - <exclusion> - <groupId>com.sun.jmx</groupId> - <artifactId>jmxri</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jdmk</groupId> - <artifactId>jmxtools</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> </dependency> diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java index 64ac72474b..83900a18df 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java @@ -20,7 +20,8 @@ package org.apache.spark.streaming.examples; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; -import org.apache.spark.streaming.dstream.SparkFlumeEvent; +import org.apache.spark.streaming.api.java.flume.FlumeFunctions; +import org.apache.spark.streaming.flume.SparkFlumeEvent; /** * Produces a count of events received from Flume. @@ -49,11 +50,11 @@ public class JavaFlumeEventCount { Duration batchInterval = new Duration(2000); - JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, + JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class)); - - JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port); + FlumeFunctions flumeFunc = new FlumeFunctions(ssc); + JavaDStream<SparkFlumeEvent> flumeStream = flumeFunc.flumeStream("localhost", port); flumeStream.count(); @@ -64,6 +65,6 @@ public class JavaFlumeEventCount { } }).print(); - sc.start(); + ssc.start(); } } diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java index 0a56e7abdf..51de4054cc 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -29,6 +29,7 @@ import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.api.java.kafka.KafkaFunctions; import scala.Tuple2; /** @@ -64,7 +65,8 @@ public class JavaKafkaWordCount { topicMap.put(topic, numThreads); } - JavaPairDStream<String, String> messages = ssc.kafkaStream(args[1], args[2], topicMap); + KafkaFunctions kafkaFunc = new KafkaFunctions(ssc); + JavaPairDStream<String, String> messages = kafkaFunc.kafkaStream(args[1], args[2], topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala index 5ef1928294..149640e0d1 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples import org.apache.spark.util.IntParam import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ +import org.apache.spark.streaming.flume._ /** * Produces a count of events received from Flume. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 172091be2e..633712e816 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -24,6 +24,7 @@ import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.util.RawTextHelper._ +import org.apache.spark.streaming.kafka._ /** * Consumes messages from one or more topics in Kafka and does wordcount. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index 2d02ef77c0..f65c3f8b91 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -17,11 +17,6 @@ package org.apache.spark.streaming.examples -import org.apache.spark.streaming.{ Seconds, StreamingContext } -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.dstream.MQTTReceiver -import org.apache.spark.storage.StorageLevel - import org.eclipse.paho.client.mqttv3.MqttClient import org.eclipse.paho.client.mqttv3.MqttClientPersistence import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence @@ -29,6 +24,11 @@ import org.eclipse.paho.client.mqttv3.MqttException import org.eclipse.paho.client.mqttv3.MqttMessage import org.eclipse.paho.client.mqttv3.MqttTopic +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.mqtt._ + /** * A simple Mqtt publisher for demonstration purposes, repeatedly publishes * Space separated String Message "hello mqtt demo for spark streaming" diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index 9d21d3178f..a60570f884 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -23,6 +23,8 @@ import com.twitter.algebird._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.twitter._ + /** * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute * windowed and global Top-K estimates of user IDs occurring in a Twitter stream. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala index 5111e6f62a..1382fa4d1d 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -21,7 +21,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import com.twitter.algebird.HyperLogLog._ import com.twitter.algebird.HyperLogLogMonoid -import org.apache.spark.streaming.dstream.TwitterInputDStream +import org.apache.spark.streaming.twitter._ /** * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala index 7a3df687b7..84842b3d65 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples import org.apache.spark.streaming.{Seconds, StreamingContext} import StreamingContext._ import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.twitter._ /** * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala index 89d3042123..789c5f2d08 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala @@ -20,11 +20,13 @@ package org.apache.spark.streaming.examples import akka.actor.ActorSystem import akka.actor.actorRef2Scala import akka.zeromq._ -import org.apache.spark.streaming.{ Seconds, StreamingContext } -import org.apache.spark.streaming.StreamingContext._ import akka.zeromq.Subscribe import akka.util.ByteString +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.zeromq._ + /** * A simple publisher for demonstration purposes, repeatedly publishes random Messages * every one second. diff --git a/external/flume/pom.xml b/external/flume/pom.xml new file mode 100644 index 0000000000..443910a03a --- /dev/null +++ b/external/flume/pom.xml @@ -0,0 +1,93 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent</artifactId> + <version>0.9.0-incubating-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-flume_2.10</artifactId> + <packaging>jar</packaging> + <name>Spark Project External Flume</name> + <url>http://spark.incubator.apache.org/</url> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sdk</artifactId> + <version>1.2.0</version> + <exclusions> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>org.xerial.snappy</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.novocode</groupId> + <artifactId>junit-interface</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + <plugins> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala new file mode 100644 index 0000000000..3347d19796 --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java.flume + +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.flume._ +import org.apache.spark.storage.StorageLevel + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating Flume input streams. + */ +class FlumeFunctions(javaStreamingContext: JavaStreamingContext) { + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + */ + def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { + javaStreamingContext.ssc.flumeStream(hostname, port) + } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): + JavaDStream[SparkFlumeEvent] = { + javaStreamingContext.ssc.flumeStream(hostname, port, storageLevel) + } +} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala new file mode 100644 index 0000000000..35e7a01abc --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + +/** + * Extra Flume input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversion. Import org.apache.spark.streaming.flume._ to use these functions. + */ +class FlumeFunctions(ssc: StreamingContext) { + /** + * Create a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def flumeStream ( + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[SparkFlumeEvent] = { + val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 60d79175f1..ce3ef47cfe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.streaming.dstream +package org.apache.spark.streaming.flume import java.net.InetSocketAddress import java.io.{ObjectInput, ObjectOutput, Externalizable} @@ -30,9 +30,10 @@ import org.apache.flume.source.avro.Status import org.apache.avro.ipc.specific.SpecificResponder import org.apache.avro.ipc.NettyServer -import org.apache.spark.streaming.StreamingContext import org.apache.spark.util.Utils import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ private[streaming] class FlumeInputDStream[T: ClassTag]( diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala new file mode 100644 index 0000000000..c087a39d1c --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +package object flume { + implicit def sscToFlumeFunctions(ssc: StreamingContext) = new FlumeFunctions(ssc) +} + diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java new file mode 100644 index 0000000000..5930fee925 --- /dev/null +++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java @@ -0,0 +1,35 @@ +package org.apache.spark.streaming.flume;/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.flume.FlumeFunctions; +import org.apache.spark.streaming.flume.SparkFlumeEvent; +import org.junit.Test; + +public class JavaFlumeStreamSuite extends LocalJavaStreamingContext { + @Test + public void testFlumeStream() { + FlumeFunctions flumeFunc = new FlumeFunctions(ssc); + + // tests the API, does not actually test data receiving + JavaDStream<SparkFlumeEvent> test1 = flumeFunc.flumeStream("localhost", 12345); + JavaDStream<SparkFlumeEvent> test2 = flumeFunc.flumeStream("localhost", 12345, + StorageLevel.MEMORY_AND_DISK_SER_2()); + } +} diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/flume/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala new file mode 100644 index 0000000000..74840f6499 --- /dev/null +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} + +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.charset.Charset + +import org.apache.avro.ipc.NettyTransceiver +import org.apache.avro.ipc.specific.SpecificRequestor +import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol} + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase} +import org.apache.spark.streaming.util.ManualClock + +class FlumeStreamSuite extends TestSuiteBase { + + val testPort = 9999 + + test("flume input stream") { + // Set up the streaming context and input streams + val ssc = new StreamingContext(conf, batchDuration) + val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] + with SynchronizedBuffer[Seq[SparkFlumeEvent]] + val outputStream = new TestOutputStream(flumeStream, outputBuffer) + ssc.registerOutputStream(outputStream) + ssc.start() + + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = Seq(1, 2, 3, 4, 5) + Thread.sleep(1000) + val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) + val client = SpecificRequestor.getClient( + classOf[AvroSourceProtocol], transceiver) + + for (i <- 0 until input.size) { + val event = new AvroFlumeEvent + event.setBody(ByteBuffer.wrap(input(i).toString.getBytes())) + event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) + client.append(event) + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + } + + val startTime = System.currentTimeMillis() + while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size) + Thread.sleep(100) + } + Thread.sleep(1000) + val timeTaken = System.currentTimeMillis() - startTime + assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") + logInfo("Stopping context") + ssc.stop() + + val decoder = Charset.forName("UTF-8").newDecoder() + + assert(outputBuffer.size === input.length) + for (i <- 0 until outputBuffer.size) { + assert(outputBuffer(i).size === 1) + val str = decoder.decode(outputBuffer(i).head.event.getBody) + assert(str.toString === input(i).toString) + assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") + } + } +} diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml new file mode 100644 index 0000000000..f782e0e126 --- /dev/null +++ b/external/kafka/pom.xml @@ -0,0 +1,97 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent</artifactId> + <version>0.9.0-incubating-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka_2.10</artifactId> + <packaging>jar</packaging> + <name>Spark Project External Kafka</name> + <url>http://spark.incubator.apache.org/</url> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.sksamuel.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + <version>0.8.0-beta1</version> + <exclusions> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>net.sf.jopt-simple</groupId> + <artifactId>jopt-simple</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.novocode</groupId> + <artifactId>junit-interface</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + <plugins> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala new file mode 100644 index 0000000000..491331bb37 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java.kafka + +import scala.reflect.ClassTag +import scala.collection.JavaConversions._ + +import java.lang.{Integer => JInt} +import java.util.{Map => JMap} + +import kafka.serializer.Decoder + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream} +import org.apache.spark.streaming.kafka._ + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating Kafka input streams. + */ +class KafkaFunctions(javaStreamingContext: JavaStreamingContext) { + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt] + ): JavaPairDStream[String, String] = { + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. + * + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt], + storageLevel: StorageLevel + ): JavaPairDStream[String, String] = { + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param keyTypeClass Key type of RDD + * @param valueTypeClass value type of RDD + * @param keyDecoderClass Type of kafka key decoder + * @param valueDecoderClass Type of kafka value decoder + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. Defaults to memory-only + */ + def kafkaStream[K, V, U <: Decoder[_], T <: Decoder[_]]( + keyTypeClass: Class[K], + valueTypeClass: Class[V], + keyDecoderClass: Class[U], + valueDecoderClass: Class[T], + kafkaParams: JMap[String, String], + topics: JMap[String, JInt], + storageLevel: StorageLevel + ): JavaPairDStream[K, V] = { + implicit val keyCmt: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val valueCmt: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + + implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] + implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] + + javaStreamingContext.ssc.kafkaStream[K, V, U, T]( + kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala new file mode 100644 index 0000000000..2135634a69 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.reflect.ClassTag + +import kafka.serializer.{Decoder, StringDecoder} + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + +/** + * Extra Kafka input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversion. Import org.apache.spark.streaming.kafka._ to use these functions. + */ +class KafkaFunctions(ssc: StreamingContext) { + /** + * Create an input stream that pulls messages from a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: Map[String, Int], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[(String, String)] = { + val kafkaParams = Map[String, String]( + "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, + "zookeeper.connection.timeout.ms" -> "10000") + kafkaStream[String, String, StringDecoder, StringDecoder]( + kafkaParams, + topics, + storageLevel) + } + + /** + * Create an input stream that pulls messages from a Kafka Broker. + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel Storage level to use for storing the received objects + */ + def kafkaStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ): DStream[(K, V)] = { + val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 526f5564c7..a2cd49c573 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -15,11 +15,10 @@ * limitations under the License. */ -package org.apache.spark.streaming.dstream +package org.apache.spark.streaming.kafka -import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext +import scala.collection.Map +import scala.reflect.ClassTag import java.util.Properties import java.util.concurrent.Executors @@ -30,8 +29,10 @@ import kafka.utils.VerifiableProperties import kafka.utils.ZKStringSerializer import org.I0Itec.zkclient._ -import scala.collection.Map -import scala.reflect.ClassTag +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ /** * Input stream that pulls messages from a Kafka Broker. diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala new file mode 100644 index 0000000000..44e7ce6e1b --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +package object kafka { + implicit def sscToKafkaFunctions(ssc: StreamingContext) = new KafkaFunctions(ssc) +} + diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java new file mode 100644 index 0000000000..fdea96e506 --- /dev/null +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka; + +import java.util.HashMap; + +import org.apache.spark.streaming.api.java.kafka.KafkaFunctions; +import org.junit.Test; +import com.google.common.collect.Maps; +import kafka.serializer.StringDecoder; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; + +public class JavaKafkaStreamSuite extends LocalJavaStreamingContext { + @Test + public void testKafkaStream() { + + HashMap<String, Integer> topics = Maps.newHashMap(); + KafkaFunctions kafkaFunc = new KafkaFunctions(ssc); + + // tests the API, does not actually test data receiving + JavaPairDStream<String, String> test1 = kafkaFunc.kafkaStream("localhost:12345", "group", topics); + JavaPairDStream<String, String> test2 = kafkaFunc.kafkaStream("localhost:12345", "group", topics, + StorageLevel.MEMORY_AND_DISK_SER_2()); + + HashMap<String, String> kafkaParams = Maps.newHashMap(); + kafkaParams.put("zookeeper.connect", "localhost:12345"); + kafkaParams.put("group.id","consumer-group"); + JavaPairDStream<String, String> test3 = kafkaFunc.kafkaStream( + String.class, String.class, StringDecoder.class, StringDecoder.class, + kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2()); + } +} diff --git a/external/kafka/src/test/resources/log4j.properties b/external/kafka/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/kafka/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala new file mode 100644 index 0000000000..2ef3e99c55 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import kafka.serializer.StringDecoder +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.storage.StorageLevel + +class KafkaStreamSuite extends TestSuiteBase { + + test("kafka input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val topics = Map("my-topic" -> 1) + + // tests the API, does not actually test data receiving + val test1 = ssc.kafkaStream("localhost:12345", "group", topics) + val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2) + val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group") + val test3 = ssc.kafkaStream[String, String, StringDecoder, StringDecoder]( + kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2) + + // TODO: Actually test receiving data + } +} diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml new file mode 100644 index 0000000000..31b4fa87de --- /dev/null +++ b/external/mqtt/pom.xml @@ -0,0 +1,108 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent</artifactId> + <version>0.9.0-incubating-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-mqtt_2.10</artifactId> + <packaging>jar</packaging> + <name>Spark Project External MQTT</name> + <url>http://spark.incubator.apache.org/</url> + + <repositories> + <repository> + <id>mqtt-repo</id> + <name>MQTT Repository</name> + <url>https://repo.eclipse.org/content/repositories/paho-releases</url> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.eclipse.paho</groupId> + <artifactId>mqtt-client</artifactId> + <version>0.4.0</version> + </dependency> + <dependency> + <groupId>${akka.group}</groupId> + <artifactId>akka-zeromq_${scala.binary.version}</artifactId> + <version>${akka.version}</version> + <exclusions> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.novocode</groupId> + <artifactId>junit-interface</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + <plugins> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala new file mode 100644 index 0000000000..72124956fc --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java.mqtt + +import scala.reflect.ClassTag + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.mqtt._ + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating MQTT input streams. + */ +class MQTTFunctions(javaStreamingContext: JavaStreamingContext) { + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param brokerUrl Url of remote MQTT publisher + * @param topic topic name to subscribe to + */ + def mqttStream( + brokerUrl: String, + topic: String + ): JavaDStream[String] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + javaStreamingContext.ssc.mqttStream(brokerUrl, topic) + } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param brokerUrl Url of remote MQTT publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. + */ + def mqttStream( + brokerUrl: String, + topic: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): JavaDStream[String] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + javaStreamingContext.ssc.mqttStream(brokerUrl, topic, storageLevel) + } +} diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala new file mode 100644 index 0000000000..86f4e9c724 --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + +/** + * Extra MQTT input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversions. Import org.apache.spark.streaming.mqtt._ to use these functions. + */ +class MQTTFunctions(ssc: StreamingContext) { + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param brokerUrl Url of remote MQTT publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + */ + def mqttStream( + brokerUrl: String, + topic: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[String] = { + val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index ef4a737568..c8987a3ee0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.spark.streaming.dstream +package org.apache.spark.streaming.mqtt -import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext } +import scala.collection.Map +import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import java.util.Properties import java.util.concurrent.Executors @@ -34,10 +35,10 @@ import org.eclipse.paho.client.mqttv3.MqttException import org.eclipse.paho.client.mqttv3.MqttMessage import org.eclipse.paho.client.mqttv3.MqttTopic -import scala.collection.Map -import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ /** * Input stream that subscribe messages from a Mqtt Broker. @@ -49,15 +50,14 @@ import scala.reflect.ClassTag private[streaming] class MQTTInputDStream[T: ClassTag]( - @transient ssc_ : StreamingContext, - brokerUrl: String, - topic: String, - storageLevel: StorageLevel + @transient ssc_ : StreamingContext, + brokerUrl: String, + topic: String, + storageLevel: StorageLevel ) extends NetworkInputDStream[T](ssc_) with Logging { def getReceiver(): NetworkReceiver[T] = { - new MQTTReceiver(brokerUrl, topic, storageLevel) - .asInstanceOf[NetworkReceiver[T]] + new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]] } } diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala new file mode 100644 index 0000000000..28a944f57e --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +package object mqtt { + implicit def sscToMQTTFunctions(ssc: StreamingContext) = new MQTTFunctions(ssc) +} + + diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java new file mode 100644 index 0000000000..3ddb4d084f --- /dev/null +++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.mqtt.MQTTFunctions; +import org.junit.Test; + +import org.apache.spark.streaming.LocalJavaStreamingContext; + +public class JavaMQTTStreamSuite extends LocalJavaStreamingContext { + @Test + public void testMQTTStream() { + String brokerUrl = "abc"; + String topic = "def"; + MQTTFunctions mqttFunc = new MQTTFunctions(ssc); + + // tests the API, does not actually test data receiving + JavaDStream<String> test1 = mqttFunc.mqttStream(brokerUrl, topic); + JavaDStream<String> test2 = mqttFunc.mqttStream(brokerUrl, topic, + StorageLevel.MEMORY_AND_DISK_SER_2()); + } +} diff --git a/external/mqtt/src/test/resources/log4j.properties b/external/mqtt/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/mqtt/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala new file mode 100644 index 0000000000..ab6542918b --- /dev/null +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.storage.StorageLevel + +class MQTTStreamSuite extends TestSuiteBase { + + test("MQTT input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val brokerUrl = "abc" + val topic = "def" + + // tests the API, does not actually test data receiving + val test1 = ssc.mqttStream(brokerUrl, topic) + val test2 = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + + // TODO: Actually test receiving data + } +} diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml new file mode 100644 index 0000000000..216e6c1d8f --- /dev/null +++ b/external/twitter/pom.xml @@ -0,0 +1,89 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent</artifactId> + <version>0.9.0-incubating-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-twitter_2.10</artifactId> + <packaging>jar</packaging> + <name>Spark Project External Twitter</name> + <url>http://spark.incubator.apache.org/</url> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-stream</artifactId> + <version>3.0.3</version> + <exclusions> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.novocode</groupId> + <artifactId>junit-interface</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + <plugins> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala new file mode 100644 index 0000000000..22e297a03a --- /dev/null +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java.twitter + +import twitter4j.Status +import twitter4j.auth.Authorization + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.twitter._ + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating Twitter input streams. + */ +class TwitterFunctions(javaStreamingContext: JavaStreamingContext) { + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + */ + def twitterStream(): JavaDStream[Status] = { + javaStreamingContext.ssc.twitterStream(None) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream(filters: Array[String]): JavaDStream[Status] = { + javaStreamingContext.ssc.twitterStream(None, filters) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = { + javaStreamingContext.ssc.twitterStream(None, filters, storageLevel) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization + */ + def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = { + javaStreamingContext.ssc.twitterStream(Some(twitterAuth)) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream( + twitterAuth: Authorization, + filters: Array[String] + ): JavaDStream[Status] = { + javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization object + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + twitterAuth: Authorization, + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters, storageLevel) + } +} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala new file mode 100644 index 0000000000..e91049d9b1 --- /dev/null +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter + +import twitter4j.Status +import twitter4j.auth.Authorization + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + +/** + * Extra Twitter input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversions. Import org.apache.spark.streaming.twitter._ to use these functions. + */ +class TwitterFunctions(ssc: StreamingContext) { + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth + * authorization; this uses the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + twitterAuth: Option[Authorization], + filters: Seq[String] = Nil, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[Status] = { + val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 387e15b0e6..5cc721d7f9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -15,18 +15,19 @@ * limitations under the License. */ -package org.apache.spark.streaming.dstream +package org.apache.spark.streaming.twitter -import org.apache.spark._ -import org.apache.spark.streaming._ -import storage.StorageLevel +import java.util.prefs.Preferences import twitter4j._ import twitter4j.auth.Authorization -import java.util.prefs.Preferences import twitter4j.conf.ConfigurationBuilder import twitter4j.conf.PropertyConfiguration import twitter4j.auth.OAuthAuthorization import twitter4j.auth.AccessToken +import org.apache.spark._ +import org.apache.spark.streaming._ +import org.apache.spark.streaming.dstream._ +import org.apache.spark.storage.StorageLevel /* A stream of Twitter statuses, potentially filtered by one or more keywords. * diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala new file mode 100644 index 0000000000..23f82c5885 --- /dev/null +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +package object twitter { + implicit def sscToTwitterFunctions(ssc: StreamingContext) = new TwitterFunctions(ssc) +} diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java new file mode 100644 index 0000000000..4564d6cd33 --- /dev/null +++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter; + +import java.util.Arrays; + +import org.apache.spark.streaming.api.java.twitter.TwitterFunctions; +import org.junit.Test; + +import twitter4j.Status; +import twitter4j.auth.Authorization; +import twitter4j.auth.NullAuthorization; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; + +public class JavaTwitterStreamSuite extends LocalJavaStreamingContext { + @Test + public void testTwitterStream() { + TwitterFunctions twitterFunc = new TwitterFunctions(ssc); + String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray(); + Authorization auth = NullAuthorization.getInstance(); + + // tests the API, does not actually test data receiving + JavaDStream<Status> test1 = twitterFunc.twitterStream(); + JavaDStream<Status> test2 = twitterFunc.twitterStream(filters); + JavaDStream<Status> test3 = + twitterFunc.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream<Status> test4 = twitterFunc.twitterStream(auth); + JavaDStream<Status> test5 = twitterFunc.twitterStream(auth, filters); + JavaDStream<Status> test6 = + twitterFunc.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + } +} diff --git a/external/twitter/src/test/resources/log4j.properties b/external/twitter/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/twitter/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala new file mode 100644 index 0000000000..d7f6d35e07 --- /dev/null +++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter + +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.storage.StorageLevel +import twitter4j.auth.{NullAuthorization, Authorization} + +class TwitterStreamSuite extends TestSuiteBase { + + test("kafka input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val filters = Seq("filter1", "filter2") + val authorization: Authorization = NullAuthorization.getInstance() + + // tests the API, does not actually test data receiving + val test1 = ssc.twitterStream(None) + val test2 = ssc.twitterStream(None, filters) + val test3 = ssc.twitterStream(None, filters, StorageLevel.MEMORY_AND_DISK_SER_2) + val test4 = ssc.twitterStream(Some(authorization)) + val test5 = ssc.twitterStream(Some(authorization), filters) + val test6 = ssc.twitterStream(Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2) + + // Note that actually testing the data receiving is hard as authentication keys are + // necessary for accessing Twitter live stream + } +} diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml new file mode 100644 index 0000000000..c240d59574 --- /dev/null +++ b/external/zeromq/pom.xml @@ -0,0 +1,89 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent</artifactId> + <version>0.9.0-incubating-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-zeromq_2.10</artifactId> + <packaging>jar</packaging> + <name>Spark Project External ZeroMQ</name> + <url>http://spark.incubator.apache.org/</url> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${akka.group}</groupId> + <artifactId>akka-zeromq_${scala.binary.version}</artifactId> + <version>${akka.version}</version> + <exclusions> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.novocode</groupId> + <artifactId>junit-interface</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + <plugins> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala new file mode 100644 index 0000000000..a9bbce71f5 --- /dev/null +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java.zeromq + +import scala.reflect.ClassTag +import scala.collection.JavaConversions._ + +import akka.actor.SupervisorStrategy +import akka.util.ByteString +import akka.zeromq.Subscribe + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.zeromq._ + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating ZeroMQ input streams. + */ +class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) { + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote ZeroMQ publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel Storage level to use for storing the received objects + */ + def zeroMQStream[T]( + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel, + supervisorStrategy: SupervisorStrategy + ): JavaDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. + */ + def zeroMQStream[T]( + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel + ): JavaDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + */ + def zeroMQStream[T]( + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] + ): JavaDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn) + } +} diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala new file mode 100644 index 0000000000..f4c75ab7c9 --- /dev/null +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.zeromq + +import scala.reflect.ClassTag + +import akka.actor.{Props, SupervisorStrategy} +import akka.util.ByteString +import akka.zeromq.Subscribe + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ +import org.apache.spark.streaming.receivers._ + +/** + * Extra ZeroMQ input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversions. Import org.apache.spark.streaming.zeromq._ to use these functions. + */ +class ZeroMQFunctions(ssc: StreamingContext) { + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic + * and each frame has sequence of byte thus it needs the converter + * (which might be deserializer of bytes) to translate from sequence + * of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + def zeroMQStream[T: ClassTag]( + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: Seq[ByteString] ⇒ Iterator[T], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy + ): DStream[T] = { + ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), + "ZeroMQReceiver", storageLevel, supervisorStrategy) + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala index f164d516b0..769761e3b8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.streaming.receivers +package org.apache.spark.streaming.zeromq import scala.reflect.ClassTag @@ -24,6 +24,7 @@ import akka.util.ByteString import akka.zeromq._ import org.apache.spark.Logging +import org.apache.spark.streaming.receivers._ /** * A receiver to subscribe to ZeroMQ stream. diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala new file mode 100644 index 0000000000..dc27178149 --- /dev/null +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +package object zeromq { + implicit def sscToZeroMQFunctions(ssc: StreamingContext) = new ZeroMQFunctions(ssc) +} + + diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java new file mode 100644 index 0000000000..b020ae4cef --- /dev/null +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.zeromq; + +import org.apache.spark.streaming.api.java.zeromq.ZeroMQFunctions; +import org.junit.Test; + +import akka.actor.SupervisorStrategy; +import akka.util.ByteString; +import akka.zeromq.Subscribe; + +import org.apache.spark.api.java.function.Function; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; + +public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { + + @Test // tests the API, does not actually test data receiving + public void testZeroMQStream() { + ZeroMQFunctions zeromqFunc = new ZeroMQFunctions(ssc); + String publishUrl = "abc"; + Subscribe subscribe = new Subscribe((ByteString)null); + Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() { + @Override + public Iterable<String> call(byte[][] bytes) throws Exception { + return null; + } + }; + + JavaDStream<String> test1 = zeromqFunc.<String>zeroMQStream( + publishUrl, subscribe, bytesToObjects); + JavaDStream<String> test2 = zeromqFunc.<String>zeroMQStream( + publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream<String> test3 = zeromqFunc.<String>zeroMQStream( + publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy()); + } +} diff --git a/external/zeromq/src/test/resources/log4j.properties b/external/zeromq/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/zeromq/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala new file mode 100644 index 0000000000..5adcdb821f --- /dev/null +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.zeromq + +import akka.actor.SupervisorStrategy +import akka.util.ByteString +import akka.zeromq.Subscribe + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} + +class ZeroMQStreamSuite extends TestSuiteBase { + + test("zeromq input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val publishUrl = "abc" + val subscribe = new Subscribe(null.asInstanceOf[ByteString]) + val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]] + + // tests the API, does not actually test data receiving + val test1 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects) + val test2 = ssc.zeroMQStream( + publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) + val test3 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects, + StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) + + // TODO: Actually test data receiving + } +} @@ -87,12 +87,17 @@ <modules> <module>core</module> <module>bagel</module> - <module>examples</module> <module>mllib</module> <module>tools</module> <module>streaming</module> <module>repl</module> <module>assembly</module> + <module>external/twitter</module> + <module>external/kafka</module> + <module>external/flume</module> + <module>external/zeromq</module> + <module>external/mqtt</module> + <module>examples</module> </modules> <properties> @@ -140,17 +145,6 @@ <enabled>false</enabled> </snapshots> </repository> - <repository> - <id>mqtt-repo</id> - <name>MQTT Repository</name> - <url>https://repo.eclipse.org/content/repositories/paho-releases</url> - <releases> - <enabled>true</enabled> - </releases> - <snapshots> - <enabled>false</enabled> - </snapshots> - </repository> </repositories> <dependencyManagement> @@ -259,17 +253,6 @@ </exclusions> </dependency> <dependency> - <groupId>${akka.group}</groupId> - <artifactId>akka-zeromq_${scala.binary.version}</artifactId> - <version>${akka.version}</version> - <exclusions> - <exclusion> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>it.unimi.dsi</groupId> <artifactId>fastutil</artifactId> <version>6.4.4</version> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a6c560d5c6..84becf6b41 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -50,9 +50,6 @@ object SparkBuild extends Build { lazy val repl = Project("repl", file("repl"), settings = replSettings) .dependsOn(core, bagel, mllib) - lazy val examples = Project("examples", file("examples"), settings = examplesSettings) - .dependsOn(core, mllib, bagel, streaming) - lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming) lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn(core) @@ -95,10 +92,31 @@ object SparkBuild extends Build { lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](if (isNewHadoop) yarn else yarnAlpha) else Seq[ClasspathDependency]() lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](if (isNewHadoop) yarn else yarnAlpha) else Seq[ProjectReference]() + lazy val externalTwitter = Project("external-twitter", file("external/twitter"), settings = twitterSettings) + .dependsOn(streaming % "compile->compile;test->test") + + lazy val externalKafka = Project("external-kafka", file("external/kafka"), settings = kafkaSettings) + .dependsOn(streaming % "compile->compile;test->test") + + lazy val externalFlume = Project("external-flume", file("external/flume"), settings = flumeSettings) + .dependsOn(streaming % "compile->compile;test->test") + + lazy val externalZeromq = Project("external-zeromq", file("external/zeromq"), settings = zeromqSettings) + .dependsOn(streaming % "compile->compile;test->test") + + lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings) + .dependsOn(streaming % "compile->compile;test->test") + + lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) + lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) + + lazy val examples = Project("examples", file("examples"), settings = examplesSettings) + .dependsOn(core, mllib, bagel, streaming, externalTwitter) dependsOn(allExternal: _*) + // Everything except assembly, tools and examples belong to packageProjects lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib) ++ maybeYarnRef - lazy val allProjects = packageProjects ++ Seq[ProjectReference](examples, tools, assemblyProj) + lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj) def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.apache.spark", @@ -127,7 +145,7 @@ object SparkBuild extends Build { // also check the local Maven repository ~/.m2 resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))), - // For Sonatype publishing + // For Sonatype publishing resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", "sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"), @@ -169,7 +187,7 @@ object SparkBuild extends Build { </issueManagement> ), -/* + /* publishTo <<= version { (v: String) => val nexus = "https://oss.sonatype.org/" if (v.trim.endsWith("SNAPSHOT")) @@ -178,8 +196,7 @@ object SparkBuild extends Build { Some("sonatype-staging" at nexus + "service/local/staging/deploy/maven2") }, -*/ - + */ libraryDependencies ++= Seq( "io.netty" % "netty-all" % "4.0.13.Final", @@ -268,7 +285,6 @@ object SparkBuild extends Build { libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v ) ) - def examplesSettings = sharedSettings ++ Seq( name := "spark-examples", libraryDependencies ++= Seq( @@ -305,22 +321,8 @@ object SparkBuild extends Build { def streamingSettings = sharedSettings ++ Seq( name := "spark-streaming", - resolvers ++= Seq( - "Eclipse Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/", - "Apache repo" at "https://repository.apache.org/content/repositories/releases" - ), - libraryDependencies ++= Seq( - "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy), - "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1" - exclude("com.sun.jdmk", "jmxtools") - exclude("com.sun.jmx", "jmxri") - exclude("net.sf.jopt-simple", "jopt-simple") - excludeAll(excludeNetty), - "org.eclipse.paho" % "mqtt-client" % "0.4.0", - "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), - "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), - "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty) + "commons-io" % "commons-io" % "2.4" ) ) @@ -354,8 +356,8 @@ object SparkBuild extends Build { def yarnEnabledSettings = Seq( libraryDependencies ++= Seq( // Exclude rule required for all ? - "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), - "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib) ) @@ -380,4 +382,43 @@ object SparkBuild extends Build { case _ => MergeStrategy.first } ) + + def twitterSettings() = sharedSettings ++ Seq( + name := "spark-streaming-twitter", + libraryDependencies ++= Seq( + "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty) + ) + ) + + def kafkaSettings() = sharedSettings ++ Seq( + name := "spark-streaming-kafka", + libraryDependencies ++= Seq( + "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), + "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1" + exclude("com.sun.jdmk", "jmxtools") + exclude("com.sun.jmx", "jmxri") + exclude("net.sf.jopt-simple", "jopt-simple") + excludeAll(excludeNetty) + ) + ) + + def flumeSettings() = sharedSettings ++ Seq( + name := "spark-streaming-flume", + libraryDependencies ++= Seq( + "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy) + ) + ) + + def zeromqSettings() = sharedSettings ++ Seq( + name := "spark-streaming-zeromq", + libraryDependencies ++= Seq( + "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty) + ) + ) + + def mqttSettings() = streamingSettings ++ Seq( + name := "spark-streaming-mqtt", + resolvers ++= Seq("Eclipse Repo" at "https://repo.eclipse.org/content/repositories/paho-releases/"), + libraryDependencies ++= Seq("org.eclipse.paho" % "mqtt-client" % "0.4.0") + ) } diff --git a/streaming/pom.xml b/streaming/pom.xml index e3b6fee9b2..459756912d 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -61,59 +61,10 @@ <version>1.9.11</version> </dependency> <dependency> - <groupId>com.sksamuel.kafka</groupId> - <artifactId>kafka_${scala.binary.version}</artifactId> - <version>0.8.0-beta1</version> - <exclusions> - <exclusion> - <groupId>com.sun.jmx</groupId> - <artifactId>jmxri</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jdmk</groupId> - <artifactId>jmxtools</artifactId> - </exclusion> - <exclusion> - <groupId>net.sf.jopt-simple</groupId> - <artifactId>jopt-simple</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.flume</groupId> - <artifactId>flume-ng-sdk</artifactId> - <version>1.2.0</version> - <exclusions> - <exclusion> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - <exclusion> - <groupId>org.xerial.snappy</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.twitter4j</groupId> - <artifactId>twitter4j-stream</artifactId> - <version>3.0.3</version> - <exclusions> - <exclusion> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </dependency> <dependency> - <groupId>${akka.group}</groupId> - <artifactId>akka-zeromq_${scala.binary.version}</artifactId> - </dependency> - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.binary.version}</artifactId> <scope>test</scope> @@ -137,11 +88,6 @@ <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> - <dependency> - <groupId>org.eclipse.paho</groupId> - <artifactId>mqtt-client</artifactId> - <version>0.4.0</version> - </dependency> </dependencies> <build> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> @@ -151,6 +97,35 @@ <groupId>org.scalatest</groupId> <artifactId>scalatest-maven-plugin</artifactId> </plugin> + + <!-- + This plugin forces the generation of jar containing streaming test classes, + so that the tests classes of external modules can use them. The two execution profiles + are necessary - first one for 'mvn package', second one for 'mvn compile'. Ideally, + 'mvn compile' should not compile test classes and therefore should not need this. + However, an open Maven bug (http://jira.codehaus.org/browse/MNG-3559) + causes the compilation to fail if streaming test-jar is not generated. Hence, the + second execution profile for 'mvn compile'. + --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.2</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + <execution> + <id>test-jar-on-compile</id> + <phase>compile</phase> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> </project> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index b3a7cf08b9..693cb7fc30 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -17,23 +17,6 @@ package org.apache.spark.streaming -import akka.actor.Props -import akka.actor.SupervisorStrategy -import akka.zeromq.Subscribe -import akka.util.ByteString - -import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.scheduler.StreamingListener - -import org.apache.spark._ -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy -import org.apache.spark.streaming.receivers.ZeroMQReceiver -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.MetadataCleaner -import org.apache.spark.streaming.receivers.ActorReceiver -import org.apache.spark.streaming.scheduler.{JobScheduler, NetworkInputTracker} - import scala.collection.mutable.Queue import scala.collection.Map import scala.reflect.ClassTag @@ -41,14 +24,21 @@ import scala.reflect.ClassTag import java.io.InputStream import java.util.concurrent.atomic.AtomicInteger +import akka.actor.Props +import akka.actor.SupervisorStrategy import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path -import twitter4j.Status -import twitter4j.auth.Authorization +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.MetadataCleaner +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.receivers._ +import org.apache.spark.streaming.scheduler._ /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -238,74 +228,6 @@ class StreamingContext private ( } /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic - * and each frame has sequence of byte thus it needs the converter - * (which might be deserializer of bytes) to translate from sequence - * of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel RDD storage level. Defaults to memory-only. - */ - def zeroMQStream[T: ClassTag]( - publisherUrl:String, - subscribe: Subscribe, - bytesToObjects: Seq[ByteString] ⇒ Iterator[T], - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, - supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy - ): DStream[T] = { - actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), - "ZeroMQReceiver", storageLevel, supervisorStrategy) - } - - /** - * Create an input stream that pulls messages from a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel Storage level to use for storing the received objects - * (default: StorageLevel.MEMORY_AND_DISK_SER_2) - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: Map[String, Int], - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 - ): DStream[(String, String)] = { - val kafkaParams = Map[String, String]( - "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, - "zookeeper.connection.timeout.ms" -> "10000") - kafkaStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder]( - kafkaParams, - topics, - storageLevel) - } - - /** - * Create an input stream that pulls messages from a Kafka Broker. - * @param kafkaParams Map of kafka configuration paramaters. - * See: http://kafka.apache.org/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel Storage level to use for storing the received objects - */ - def kafkaStream[ - K: ClassTag, - V: ClassTag, - U <: kafka.serializer.Decoder[_]: Manifest, - T <: kafka.serializer.Decoder[_]: Manifest]( - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel - ): DStream[(K, V)] = { - val inputStream = new KafkaInputDStream[K, V, U, T](this, kafkaParams, topics, storageLevel) - registerInputStream(inputStream) - inputStream - } - - /** * Create a input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited * lines. @@ -344,22 +266,6 @@ class StreamingContext private ( } /** - * Create a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - * @param storageLevel Storage level to use for storing the received objects - */ - def flumeStream ( - hostname: String, - port: Int, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[SparkFlumeEvent] = { - val inputStream = new FlumeInputDStream[SparkFlumeEvent](this, hostname, port, storageLevel) - registerInputStream(inputStream) - inputStream - } - - /** * Create a input stream from network source hostname:port, where data is received * as serialized blocks (serialized using the Spark's serializer) that can be directly * pushed into the block manager without deserializing them. This is the most efficient @@ -434,24 +340,6 @@ class StreamingContext private ( } /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth - * authorization; this uses the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret. - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - twitterAuth: Option[Authorization] = None, - filters: Seq[String] = Nil, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[Status] = { - val inputStream = new TwitterInputDStream(this, twitterAuth, filters, storageLevel) - registerInputStream(inputStream) - inputStream - } - - /** * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * @param queue Queue of RDDs @@ -484,21 +372,6 @@ class StreamingContext private ( inputStream } -/** - * Create an input stream that receives messages pushed by a mqtt publisher. - * @param brokerUrl Url of remote mqtt publisher - * @param topic topic name to subscribe to - * @param storageLevel RDD storage level. Defaults to memory-only. - */ - - def mqttStream( - brokerUrl: String, - topic: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = { - val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel) - registerInputStream(inputStream) - inputStream - } /** * Create a unified DStream from multiple DStreams of the same type and same slide duration. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 7dec4b3ad7..7068f32517 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -17,19 +17,16 @@ package org.apache.spark.streaming.api.java -import java.io.InputStream -import java.lang.{Integer => JInt} -import java.util.{List => JList, Map => JMap} import scala.collection.JavaConversions._ import scala.reflect.ClassTag +import java.io.InputStream +import java.lang.{Integer => JInt} +import java.util.{List => JList, Map => JMap} + import akka.actor.{Props, SupervisorStrategy} -import akka.util.ByteString -import akka.zeromq.Subscribe import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import twitter4j.Status -import twitter4j.auth.Authorization import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} @@ -37,7 +34,6 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.scheduler.StreamingListener /** @@ -141,81 +137,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { val sc: JavaSparkContext = new JavaSparkContext(ssc.sc) /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt]) - : JavaPairDStream[String, String] = { - implicit val cmt: ClassTag[String] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), - StorageLevel.MEMORY_ONLY_SER_2) - - } - - /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. Defaults to memory-only - * - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt], - storageLevel: StorageLevel) - : JavaPairDStream[String, String] = { - implicit val cmt: ClassTag[String] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), - storageLevel) - } - - /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param keyTypeClass Key type of RDD - * @param valueTypeClass value type of RDD - * @param keyDecoderClass Type of kafka key decoder - * @param valueDecoderClass Type of kafka value decoder - * @param kafkaParams Map of kafka configuration paramaters. - * See: http://kafka.apache.org/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. Defaults to memory-only - */ - def kafkaStream[K, V, U <: kafka.serializer.Decoder[_], T <: kafka.serializer.Decoder[_]]( - keyTypeClass: Class[K], - valueTypeClass: Class[V], - keyDecoderClass: Class[U], - valueDecoderClass: Class[T], - kafkaParams: JMap[String, String], - topics: JMap[String, JInt], - storageLevel: StorageLevel) - : JavaPairDStream[K, V] = { - implicit val keyCmt: ClassTag[K] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] - implicit val valueCmt: ClassTag[V] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] - - implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] - implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] - - ssc.kafkaStream[K, V, U, T]( - kafkaParams.toMap, - Map(topics.mapValues(_.intValue()).toSeq: _*), - storageLevel) - } - - /** * Create a input stream from network source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited * lines. @@ -329,98 +250,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Creates a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - * @param storageLevel Storage level to use for storing the received objects - */ - def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): - JavaDStream[SparkFlumeEvent] = { - ssc.flumeStream(hostname, port, storageLevel) - } - - - /** - * Create a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - */ - def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { - ssc.flumeStream(hostname, port) - } - - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization object - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - twitterAuth: Authorization, - filters: Array[String], - storageLevel: StorageLevel - ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth), filters, storageLevel) - } - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret to be set. - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - filters: Array[String], - storageLevel: StorageLevel - ): JavaDStream[Status] = { - ssc.twitterStream(None, filters, storageLevel) - } - - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization - * @param filters Set of filter strings to get only those tweets that match them - */ - def twitterStream( - twitterAuth: Authorization, - filters: Array[String] - ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth), filters) - } - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret to be set. - * @param filters Set of filter strings to get only those tweets that match them - */ - def twitterStream( - filters: Array[String] - ): JavaDStream[Status] = { - ssc.twitterStream(None, filters) - } - - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization - */ - def twitterStream( - twitterAuth: Authorization - ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth)) - } - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret to be set. - */ - def twitterStream(): JavaDStream[Status] = { - ssc.twitterStream() - } - - /** * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor * @param name Name of the actor @@ -483,70 +312,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel Storage level to use for storing the received objects - */ - def zeroMQStream[T]( - publisherUrl:String, - subscribe: Subscribe, - bytesToObjects: Seq[ByteString] ⇒ Iterator[T], - storageLevel: StorageLevel, - supervisorStrategy: SupervisorStrategy - ): JavaDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy) - } - - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel RDD storage level. Defaults to memory-only. - */ - def zeroMQStream[T]( - publisherUrl:String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], - storageLevel: StorageLevel - ): JavaDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) - } - - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - */ - def zeroMQStream[T]( - publisherUrl:String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] - ): JavaDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn) - } - - /** * Registers an output stream that will be computed every interval */ def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index abff55d77c..75f7244643 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -160,7 +160,9 @@ class NetworkInputTracker( } // Run the dummy Spark job to ensure that all slaves have registered. // This avoids all the receivers to be scheduled on the same node. - ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() + if (!ssc.sparkContext.isLocal) { + ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() + } // Distribute the receivers and start them ssc.sparkContext.runJob(tempRDD, startReceiver) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index d53d433693..0d2145da9a 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -17,24 +17,19 @@ package org.apache.spark.streaming; -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Files; - -import kafka.serializer.StringDecoder; +import scala.Tuple2; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.spark.SparkConf; -import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; +import java.io.*; +import java.util.*; -import scala.Tuple2; -import twitter4j.Status; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import org.apache.spark.SparkConf; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -44,42 +39,11 @@ import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.dstream.SparkFlumeEvent; -import org.apache.spark.streaming.JavaTestUtils; -import org.apache.spark.streaming.JavaCheckpointTestUtils; - -import java.io.*; -import java.util.*; - -import akka.actor.Props; -import akka.zeromq.Subscribe; - // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; // see http://stackoverflow.com/questions/758570/. -public class JavaAPISuite implements Serializable { - private transient JavaStreamingContext ssc; - - @Before - public void setUp() { - SparkConf conf = new SparkConf() - .setMaster("local[2]") - .setAppName("test") - .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); - ssc = new JavaStreamingContext(conf, new Duration(1000)); - ssc.checkpoint("checkpoint"); - } - - @After - public void tearDown() { - ssc.stop(); - ssc = null; - - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port"); - } - +public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable { @Test public void testCount() { List<List<Integer>> inputData = Arrays.asList( @@ -1601,26 +1565,6 @@ public class JavaAPISuite implements Serializable { // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the // InputStream functionality is deferred to the existing Scala tests. @Test - public void testKafkaStream() { - HashMap<String, Integer> topics = Maps.newHashMap(); - JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics); - JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics, - StorageLevel.MEMORY_AND_DISK()); - - HashMap<String, String> kafkaParams = Maps.newHashMap(); - kafkaParams.put("zookeeper.connect","localhost:12345"); - kafkaParams.put("group.id","consumer-group"); - JavaPairDStream<String, String> test3 = ssc.kafkaStream( - String.class, - String.class, - StringDecoder.class, - StringDecoder.class, - kafkaParams, - topics, - StorageLevel.MEMORY_AND_DISK()); - } - - @Test public void testSocketTextStream() { JavaDStream<String> test = ssc.socketTextStream("localhost", 12345); } @@ -1658,36 +1602,4 @@ public class JavaAPISuite implements Serializable { public void testRawSocketStream() { JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345); } - - @Test - public void testFlumeStream() { - JavaDStream<SparkFlumeEvent> test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY()); - } - - @Test - public void testFileStream() { - JavaPairDStream<String, String> foo = - ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo"); - } - - @Test - public void testTwitterStream() { - String[] filters = new String[] { "good", "bad", "ugly" }; - JavaDStream<Status> test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY()); - } - - @Test - public void testActorStream() { - JavaDStream<String> test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY()); - } - - @Test - public void testZeroMQStream() { - JavaDStream<String> test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() { - @Override - public Iterable<String> call(byte[][] b) throws Exception { - return null; - } - }); - } } diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java new file mode 100644 index 0000000000..34bee56885 --- /dev/null +++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.After; +import org.junit.Before; + +public abstract class LocalJavaStreamingContext { + + protected transient JavaStreamingContext ssc; + + @Before + public void setUp() { + System.clearProperty("spark.driver.port"); + System.clearProperty("spark.hostPort"); + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc.checkpoint("checkpoint"); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port"); + System.clearProperty("spark.hostPort"); + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 5185954521..a8e053278a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -23,7 +23,7 @@ import akka.actor.IOManager import akka.actor.Props import akka.util.ByteString -import org.apache.spark.streaming.dstream.{FileInputDStream, NetworkReceiver, SparkFlumeEvent} +import org.apache.spark.streaming.dstream.{NetworkReceiver} import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} @@ -31,18 +31,11 @@ import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import util.ManualClock import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receivers.Receiver -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.Logging import scala.util.Random import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfter -import org.apache.flume.source.avro.AvroSourceProtocol -import org.apache.flume.source.avro.AvroFlumeEvent -import org.apache.flume.source.avro.Status -import org.apache.avro.ipc.{specific, NettyTransceiver} -import org.apache.avro.ipc.specific.SpecificRequestor -import java.nio.ByteBuffer import collection.JavaConversions._ -import java.nio.charset.Charset import com.google.common.io.Files import java.util.concurrent.atomic.AtomicInteger @@ -56,7 +49,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testServer.start() // Set up the streaming context and input streams - val ssc = new StreamingContext(new SparkContext(conf), batchDuration) + val ssc = new StreamingContext(conf, batchDuration) val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) @@ -99,62 +92,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } - test("flume input stream") { - // Set up the streaming context and input streams - val ssc = new StreamingContext(new SparkContext(conf), batchDuration) - val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) - val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] - with SynchronizedBuffer[Seq[SparkFlumeEvent]] - val outputStream = new TestOutputStream(flumeStream, outputBuffer) - ssc.registerOutputStream(outputStream) - ssc.start() - - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - val input = Seq(1, 2, 3, 4, 5) - Thread.sleep(1000) - val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) - val client = SpecificRequestor.getClient( - classOf[AvroSourceProtocol], transceiver) - - for (i <- 0 until input.size) { - val event = new AvroFlumeEvent - event.setBody(ByteBuffer.wrap(input(i).toString.getBytes())) - event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) - client.append(event) - Thread.sleep(500) - clock.addToTime(batchDuration.milliseconds) - } - - val startTime = System.currentTimeMillis() - while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size) - Thread.sleep(100) - } - Thread.sleep(1000) - val timeTaken = System.currentTimeMillis() - startTime - assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") - logInfo("Stopping context") - ssc.stop() - - val decoder = Charset.forName("UTF-8").newDecoder() - - assert(outputBuffer.size === input.length) - for (i <- 0 until outputBuffer.size) { - assert(outputBuffer(i).size === 1) - val str = decoder.decode(outputBuffer(i).head.event.getBody) - assert(str.toString === input(i).toString) - assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") - } - } - - test("file input stream") { // Disable manual clock as FileInputDStream does not work with manual clock conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") // Set up the streaming context and input streams val testDir = Files.createTempDir() - val ssc = new StreamingContext(new SparkContext(conf), batchDuration) + val ssc = new StreamingContext(conf, batchDuration) val fileStream = ssc.textFileStream(testDir.toString) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] def output = outputBuffer.flatMap(x => x) @@ -206,7 +150,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testServer.start() // Set up the streaming context and input streams - val ssc = new StreamingContext(new SparkContext(conf), batchDuration) + val ssc = new StreamingContext(conf, batchDuration) val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor", StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] @@ -249,20 +193,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } - test("kafka input stream") { - val ssc = new StreamingContext(new SparkContext(conf), batchDuration) - val topics = Map("my-topic" -> 1) - val test1 = ssc.kafkaStream("localhost:12345", "group", topics) - val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK) - - // Test specifying decoder - val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group") - val test3 = ssc.kafkaStream[ - String, - String, - kafka.serializer.StringDecoder, - kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK) - } test("multi-thread receiver") { // set up the test receiver @@ -273,7 +203,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { MultiThreadTestReceiver.haveAllThreadsFinished = false // set up the network stream using the test receiver - val ssc = new StreamingContext(new SparkContext(conf), batchDuration) + val ssc = new StreamingContext(conf, batchDuration) val networkStream = ssc.networkStream[Int](testReceiver) val countStream = networkStream.count val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]] diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 33464bc3a1..b20d02f996 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -142,16 +142,12 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Default before function for any streaming test suite. Override this // if you want to add your stuff to "before" (i.e., don't call before { } ) def beforeFunction() { - //if (useManualClock) { - // System.setProperty( - // "spark.streaming.clock", - // "org.apache.spark.streaming.util.ManualClock" - // ) - //} else { - // System.clearProperty("spark.streaming.clock") - //} if (useManualClock) { + logInfo("Using manual clock") conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + } else { + logInfo("Using real clock") + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") } } @@ -175,9 +171,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { operation: DStream[U] => DStream[V], numPartitions: Int = numInputPartitions ): StreamingContext = { - val sc = new SparkContext(conf) // Create StreamingContext - val ssc = new StreamingContext(sc, batchDuration) + val ssc = new StreamingContext(conf, batchDuration) if (checkpointDir != null) { ssc.checkpoint(checkpointDir) } @@ -201,9 +196,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { input2: Seq[Seq[V]], operation: (DStream[U], DStream[V]) => DStream[W] ): StreamingContext = { - val sc = new SparkContext(conf) // Create StreamingContext - val ssc = new StreamingContext(sc, batchDuration) + val ssc = new StreamingContext(conf, batchDuration) if (checkpointDir != null) { ssc.checkpoint(checkpointDir) } @@ -279,7 +273,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val startTime = System.currentTimeMillis() while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput) - Thread.sleep(100) + Thread.sleep(10) } val timeTaken = System.currentTimeMillis() - startTime |