From 6e43039614ed1ec55a134fb82fb3e8d4e80996ef Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 26 Dec 2013 18:02:49 -0800 Subject: Refactored streaming project to separate out the twitter functionality. --- project/SparkBuild.scala | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'project') diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 7bcbd90bd3..f9ff781f38 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -50,7 +50,7 @@ object SparkBuild extends Build { .dependsOn(core, bagel, mllib) lazy val examples = Project("examples", file("examples"), settings = examplesSettings) - .dependsOn(core, mllib, bagel, streaming) + .dependsOn(core, mllib, bagel, streaming, externalTwitter) lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming) @@ -60,6 +60,8 @@ object SparkBuild extends Build { lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core) + lazy val externalTwitter = Project("streaming-twitter", file("external/twitter"), settings = twitterSettings) dependsOn(streaming) + lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings) .dependsOn(core, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*) @@ -313,7 +315,7 @@ object SparkBuild extends Build { excludeAll(excludeNetty), "org.eclipse.paho" % "mqtt-client" % "0.4.0", "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), - "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), + // "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty) ) ) @@ -354,4 +356,11 @@ object SparkBuild extends Build { case _ => MergeStrategy.first } ) + + def twitterSettings() = streamingSettings ++ Seq( + name := "spark-twitter", + libraryDependencies ++= Seq( + "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty) + ) + ) } -- cgit v1.2.3 From f4e40661912af2a23e250a49f72f00675172e2de Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 30 Dec 2013 11:13:24 -0800 Subject: Refactored kafka, flume, zeromq, mqtt as separate external projects, with their own self-contained scala API, java API, scala unit tests and java unit tests. Updated examples to use the external projects. --- .../streaming/examples/JavaFlumeEventCount.java | 11 +- .../streaming/examples/JavaKafkaWordCount.java | 4 +- .../spark/streaming/examples/FlumeEventCount.scala | 1 + .../spark/streaming/examples/KafkaWordCount.scala | 1 + .../spark/streaming/examples/MQTTWordCount.scala | 10 +- .../spark/streaming/examples/ZeroMQWordCount.scala | 6 +- .../spark/streaming/flume/FlumeFunctions.scala | 43 ++++ .../spark/streaming/flume/FlumeInputDStream.scala | 155 +++++++++++++ .../flume/JavaStreamingContextWithFlume.scala | 48 ++++ .../org/apache/spark/streaming/flume/package.scala | 23 ++ .../flume/src/test/java/JavaFlumeStreamSuite.java | 38 ++++ external/flume/src/test/resources/log4j.properties | 29 +++ .../spark/streaming/flume/FlumeStreamSuite.scala | 86 ++++++++ .../kafka/JavaStreamingContextWithKafka.scala | 107 +++++++++ .../spark/streaming/kafka/KafkaFunctions.scala | 73 ++++++ .../spark/streaming/kafka/KafkaInputDStream.scala | 155 +++++++++++++ .../org/apache/spark/streaming/kafka/package.scala | 23 ++ .../streaming/kafka/JavaKafkaStreamSuite.java | 51 +++++ external/kafka/src/test/resources/log4j.properties | 29 +++ .../spark/streaming/kafka/KafkaStreamSuite.scala | 39 ++++ .../mqtt/JavaStreamingContextWithMQTT.scala | 59 +++++ .../spark/spark/streaming/mqtt/MQTTFunctions.scala | 43 ++++ .../spark/streaming/mqtt/MQTTInputDStream.scala | 110 ++++++++++ .../spark/spark/streaming/mqtt/package.scala | 24 ++ .../spark/streaming/mqtt/JavaMQTTStreamSuite.java | 41 ++++ external/mqtt/src/test/resources/log4j.properties | 29 +++ .../spark/streaming/mqtt/MQTTStreamSuite.scala | 36 +++ .../twitter/JavaStreamingContextWithTwitter.scala | 99 +++++++++ .../twitter/StreamingContextWithTwitter.scala | 27 --- .../spark/streaming/twitter/TwitterFunctions.scala | 49 +++++ .../apache/spark/streaming/twitter/package.scala | 21 +- .../streaming/twitter/JavaTwitterStreamSuite.java | 51 +++++ .../twitter/src/test/resources/log4j.properties | 29 +++ .../streaming/twitter/TwitterStreamSuite.scala | 42 ++++ .../zeromq/JavaStreamingContextWithZeroMQ.scala | 102 +++++++++ .../spark/streaming/zeromq/ZeroMQFunctions.scala | 57 +++++ .../spark/streaming/zeromq/ZeroMQReceiver.scala | 54 +++++ .../apache/spark/streaming/zeromq/package.scala | 24 ++ .../streaming/zeromq/JavaZeroMQStreamSuite.java | 55 +++++ .../zeromq/src/test/resources/log4j.properties | 29 +++ .../spark/streaming/zeromq/ZeroMQStreamSuite.scala | 44 ++++ project/SparkBuild.scala | 89 +++++--- .../apache/spark/streaming/StreamingContext.scala | 129 +---------- .../streaming/api/java/JavaStreamingContext.scala | 244 +-------------------- .../streaming/dstream/FlumeInputDStream.scala | 154 ------------- .../streaming/dstream/KafkaInputDStream.scala | 153 ------------- .../spark/streaming/dstream/MQTTInputDStream.scala | 110 ---------- .../spark/streaming/receivers/ZeroMQReceiver.scala | 53 ----- .../spark/streaming/scheduler/JobGenerator.scala | 1 + .../streaming/scheduler/NetworkInputTracker.scala | 5 +- .../org/apache/spark/streaming/JavaAPISuite.java | 80 +------ .../spark/streaming/LocalJavaStreamingContext.java | 46 ++++ .../apache/spark/streaming/InputStreamsSuite.scala | 75 +------ .../org/apache/spark/streaming/TestSuiteBase.scala | 9 +- 54 files changed, 2059 insertions(+), 1046 deletions(-) create mode 100644 external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala create mode 100644 external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala create mode 100644 external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala create mode 100644 external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala create mode 100644 external/flume/src/test/java/JavaFlumeStreamSuite.java create mode 100644 external/flume/src/test/resources/log4j.properties create mode 100644 external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala create mode 100644 external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java create mode 100644 external/kafka/src/test/resources/log4j.properties create mode 100644 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala create mode 100644 external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala create mode 100644 external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala create mode 100644 external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala create mode 100644 external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala create mode 100644 external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java create mode 100644 external/mqtt/src/test/resources/log4j.properties create mode 100644 external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala create mode 100644 external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala delete mode 100644 external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala create mode 100644 external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala create mode 100644 external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java create mode 100644 external/twitter/src/test/resources/log4j.properties create mode 100644 external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala create mode 100644 external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala create mode 100644 external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala create mode 100644 external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala create mode 100644 external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala create mode 100644 external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java create mode 100644 external/zeromq/src/test/resources/log4j.properties create mode 100644 external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala create mode 100644 streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java (limited to 'project') diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java index 261813bf2f..e53c4f9e83 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java @@ -20,7 +20,8 @@ package org.apache.spark.streaming.examples; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; -import org.apache.spark.streaming.dstream.SparkFlumeEvent; +import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume; +import org.apache.spark.streaming.flume.SparkFlumeEvent; /** * Produces a count of events received from Flume. @@ -49,10 +50,10 @@ public class JavaFlumeEventCount { Duration batchInterval = new Duration(2000); - JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, + JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); - - JavaDStream flumeStream = sc.flumeStream("localhost", port); + JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc); + JavaDStream flumeStream = sscWithFlume.flumeStream("localhost", port); flumeStream.count(); @@ -63,6 +64,6 @@ public class JavaFlumeEventCount { } }).print(); - sc.start(); + ssc.start(); } } diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java index 22994fb2ec..de0420ca83 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -29,6 +29,7 @@ import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.JavaStreamingContextWithKafka; import scala.Tuple2; /** @@ -63,7 +64,8 @@ public class JavaKafkaWordCount { topicMap.put(topic, numThreads); } - JavaPairDStream messages = ssc.kafkaStream(args[1], args[2], topicMap); + JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc); + JavaPairDStream messages = sscWithKafka.kafkaStream(args[1], args[2], topicMap); JavaDStream lines = messages.map(new Function, String>() { @Override diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala index 9f6e163454..f1641e299f 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples import org.apache.spark.util.IntParam import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ +import org.apache.spark.streaming.flume._ /** * Produces a count of events received from Flume. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 570ba4c81a..833c83a5ef 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -24,6 +24,7 @@ import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.util.RawTextHelper._ +import org.apache.spark.streaming.kafka._ /** * Consumes messages from one or more topics in Kafka and does wordcount. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index ff332a0282..edb46ac1da 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -17,11 +17,6 @@ package org.apache.spark.streaming.examples -import org.apache.spark.streaming.{ Seconds, StreamingContext } -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.dstream.MQTTReceiver -import org.apache.spark.storage.StorageLevel - import org.eclipse.paho.client.mqttv3.MqttClient import org.eclipse.paho.client.mqttv3.MqttClientPersistence import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence @@ -29,6 +24,11 @@ import org.eclipse.paho.client.mqttv3.MqttException import org.eclipse.paho.client.mqttv3.MqttMessage import org.eclipse.paho.client.mqttv3.MqttTopic +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.mqtt._ + /** * A simple Mqtt publisher for demonstration purposes, repeatedly publishes * Space separated String Message "hello mqtt demo for spark streaming" diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala index e83ce78aa5..037b96e9eb 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala @@ -20,11 +20,13 @@ package org.apache.spark.streaming.examples import akka.actor.ActorSystem import akka.actor.actorRef2Scala import akka.zeromq._ -import org.apache.spark.streaming.{ Seconds, StreamingContext } -import org.apache.spark.streaming.StreamingContext._ import akka.zeromq.Subscribe import akka.util.ByteString +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.zeromq._ + /** * A simple publisher for demonstration purposes, repeatedly publishes random Messages * every one second. diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala new file mode 100644 index 0000000000..35e7a01abc --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + +/** + * Extra Flume input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversion. Import org.apache.spark.streaming.flume._ to use these functions. + */ +class FlumeFunctions(ssc: StreamingContext) { + /** + * Create a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def flumeStream ( + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[SparkFlumeEvent] = { + val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala new file mode 100644 index 0000000000..ce3ef47cfe --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import java.net.InetSocketAddress +import java.io.{ObjectInput, ObjectOutput, Externalizable} +import java.nio.ByteBuffer + +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag + +import org.apache.flume.source.avro.AvroSourceProtocol +import org.apache.flume.source.avro.AvroFlumeEvent +import org.apache.flume.source.avro.Status +import org.apache.avro.ipc.specific.SpecificResponder +import org.apache.avro.ipc.NettyServer + +import org.apache.spark.util.Utils +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ + +private[streaming] +class FlumeInputDStream[T: ClassTag]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + storageLevel: StorageLevel +) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { + + override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = { + new FlumeReceiver(host, port, storageLevel) + } +} + +/** + * A wrapper class for AvroFlumeEvent's with a custom serialization format. + * + * This is necessary because AvroFlumeEvent uses inner data structures + * which are not serializable. + */ +class SparkFlumeEvent() extends Externalizable { + var event : AvroFlumeEvent = new AvroFlumeEvent() + + /* De-serialize from bytes. */ + def readExternal(in: ObjectInput) { + val bodyLength = in.readInt() + val bodyBuff = new Array[Byte](bodyLength) + in.read(bodyBuff) + + val numHeaders = in.readInt() + val headers = new java.util.HashMap[CharSequence, CharSequence] + + for (i <- 0 until numHeaders) { + val keyLength = in.readInt() + val keyBuff = new Array[Byte](keyLength) + in.read(keyBuff) + val key : String = Utils.deserialize(keyBuff) + + val valLength = in.readInt() + val valBuff = new Array[Byte](valLength) + in.read(valBuff) + val value : String = Utils.deserialize(valBuff) + + headers.put(key, value) + } + + event.setBody(ByteBuffer.wrap(bodyBuff)) + event.setHeaders(headers) + } + + /* Serialize to bytes. */ + def writeExternal(out: ObjectOutput) { + val body = event.getBody.array() + out.writeInt(body.length) + out.write(body) + + val numHeaders = event.getHeaders.size() + out.writeInt(numHeaders) + for ((k, v) <- event.getHeaders) { + val keyBuff = Utils.serialize(k.toString) + out.writeInt(keyBuff.length) + out.write(keyBuff) + val valBuff = Utils.serialize(v.toString) + out.writeInt(valBuff.length) + out.write(valBuff) + } + } +} + +private[streaming] object SparkFlumeEvent { + def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = { + val event = new SparkFlumeEvent + event.event = in + event + } +} + +/** A simple server that implements Flume's Avro protocol. */ +private[streaming] +class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { + override def append(event : AvroFlumeEvent) : Status = { + receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event) + Status.OK + } + + override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { + events.foreach (event => + receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)) + Status.OK + } +} + +/** A NetworkReceiver which listens for events using the + * Flume Avro interface.*/ +private[streaming] +class FlumeReceiver( + host: String, + port: Int, + storageLevel: StorageLevel + ) extends NetworkReceiver[SparkFlumeEvent] { + + lazy val blockGenerator = new BlockGenerator(storageLevel) + + protected override def onStart() { + val responder = new SpecificResponder( + classOf[AvroSourceProtocol], new FlumeEventServer(this)) + val server = new NettyServer(responder, new InetSocketAddress(host, port)) + blockGenerator.start() + server.start() + logInfo("Flume receiver started") + } + + protected override def onStop() { + blockGenerator.stop() + logInfo("Flume receiver stopped") + } + + override def getLocationPreference = Some(host) +} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala new file mode 100644 index 0000000000..4e66ae3535 --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.storage.StorageLevel + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating Flume input streams. + */ +class JavaStreamingContextWithFlume(javaStreamingContext: JavaStreamingContext) + extends JavaStreamingContext(javaStreamingContext.ssc) { + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + */ + def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { + ssc.flumeStream(hostname, port) + } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): + JavaDStream[SparkFlumeEvent] = { + ssc.flumeStream(hostname, port, storageLevel) + } +} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala new file mode 100644 index 0000000000..c087a39d1c --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +package object flume { + implicit def sscToFlumeFunctions(ssc: StreamingContext) = new FlumeFunctions(ssc) +} + diff --git a/external/flume/src/test/java/JavaFlumeStreamSuite.java b/external/flume/src/test/java/JavaFlumeStreamSuite.java new file mode 100644 index 0000000000..deffc78c4c --- /dev/null +++ b/external/flume/src/test/java/JavaFlumeStreamSuite.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume; +import org.apache.spark.streaming.flume.SparkFlumeEvent; +import org.junit.Test; + +public class JavaFlumeStreamSuite extends LocalJavaStreamingContext { + @Test + public void testFlumeStream() { + JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc); + + // tests the API, does not actually test data receiving + JavaDStream test1 = sscWithFlume.flumeStream("localhost", 12345); + JavaDStream test2 = sscWithFlume.flumeStream("localhost", 12345, + StorageLevel.MEMORY_AND_DISK_SER_2()); + + // To verify that JavaStreamingContextWithKafka is also StreamingContext + JavaDStream socketStream = sscWithFlume.socketTextStream("localhost", 9999); + } +} diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/flume/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala new file mode 100644 index 0000000000..ba33320d02 --- /dev/null +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} + +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.charset.Charset + +import org.apache.avro.ipc.NettyTransceiver +import org.apache.avro.ipc.specific.SpecificRequestor +import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol} + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase} +import org.apache.spark.streaming.util.ManualClock + +class FlumeStreamSuite extends TestSuiteBase { + + val testPort = 9999 + + test("flume input stream") { + // Set up the streaming context and input streams + val ssc = new StreamingContext(master, framework, batchDuration) + val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] + with SynchronizedBuffer[Seq[SparkFlumeEvent]] + val outputStream = new TestOutputStream(flumeStream, outputBuffer) + ssc.registerOutputStream(outputStream) + ssc.start() + + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = Seq(1, 2, 3, 4, 5) + Thread.sleep(1000) + val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) + val client = SpecificRequestor.getClient( + classOf[AvroSourceProtocol], transceiver) + + for (i <- 0 until input.size) { + val event = new AvroFlumeEvent + event.setBody(ByteBuffer.wrap(input(i).toString.getBytes())) + event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) + client.append(event) + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + } + + val startTime = System.currentTimeMillis() + while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size) + Thread.sleep(100) + } + Thread.sleep(1000) + val timeTaken = System.currentTimeMillis() - startTime + assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") + logInfo("Stopping context") + ssc.stop() + + val decoder = Charset.forName("UTF-8").newDecoder() + + assert(outputBuffer.size === input.length) + for (i <- 0 until outputBuffer.size) { + assert(outputBuffer(i).size === 1) + val str = decoder.decode(outputBuffer(i).head.event.getBody) + assert(str.toString === input(i).toString) + assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") + } + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala new file mode 100644 index 0000000000..ab0e8a6c8d --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.reflect.ClassTag +import scala.collection.JavaConversions._ + +import java.lang.{Integer => JInt} +import java.util.{Map => JMap} + +import kafka.serializer.Decoder + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream} + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating Kafka input streams. + */ +class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext) + extends JavaStreamingContext(javaStreamingContext.ssc) { + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt] + ): JavaPairDStream[String, String] = { + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. + * + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt], + storageLevel: StorageLevel + ): JavaPairDStream[String, String] = { + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param keyTypeClass Key type of RDD + * @param valueTypeClass value type of RDD + * @param keyDecoderClass Type of kafka key decoder + * @param valueDecoderClass Type of kafka value decoder + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. Defaults to memory-only + */ + def kafkaStream[K, V, U <: Decoder[_], T <: Decoder[_]]( + keyTypeClass: Class[K], + valueTypeClass: Class[V], + keyDecoderClass: Class[U], + valueDecoderClass: Class[T], + kafkaParams: JMap[String, String], + topics: JMap[String, JInt], + storageLevel: StorageLevel + ): JavaPairDStream[K, V] = { + implicit val keyCmt: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val valueCmt: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + + implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] + implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] + + ssc.kafkaStream[K, V, U, T]( + kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala new file mode 100644 index 0000000000..2135634a69 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.reflect.ClassTag + +import kafka.serializer.{Decoder, StringDecoder} + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + +/** + * Extra Kafka input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversion. Import org.apache.spark.streaming.kafka._ to use these functions. + */ +class KafkaFunctions(ssc: StreamingContext) { + /** + * Create an input stream that pulls messages from a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: Map[String, Int], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[(String, String)] = { + val kafkaParams = Map[String, String]( + "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, + "zookeeper.connection.timeout.ms" -> "10000") + kafkaStream[String, String, StringDecoder, StringDecoder]( + kafkaParams, + topics, + storageLevel) + } + + /** + * Create an input stream that pulls messages from a Kafka Broker. + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel Storage level to use for storing the received objects + */ + def kafkaStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ): DStream[(K, V)] = { + val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala new file mode 100644 index 0000000000..fd69328aba --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.collection.Map +import scala.reflect.ClassTag + +import java.util.Properties +import java.util.concurrent.Executors + +import kafka.consumer._ +import kafka.serializer.Decoder +import kafka.utils.VerifiableProperties +import kafka.utils.ZKStringSerializer +import org.I0Itec.zkclient._ + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ + + +/** + * Input stream that pulls messages from a Kafka Broker. + * + * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. + */ +private[streaming] +class KafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U <: Decoder[_]: Manifest, + T <: Decoder[_]: Manifest]( + @transient ssc_ : StreamingContext, + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ) extends NetworkInputDStream[(K, V)](ssc_) with Logging { + + def getReceiver(): NetworkReceiver[(K, V)] = { + new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) + .asInstanceOf[NetworkReceiver[(K, V)]] + } +} + +private[streaming] +class KafkaReceiver[ + K: ClassTag, + V: ClassTag, + U <: Decoder[_]: Manifest, + T <: Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ) extends NetworkReceiver[Any] { + + // Handles pushing data into the BlockManager + lazy protected val blockGenerator = new BlockGenerator(storageLevel) + // Connection to Kafka + var consumerConnector : ConsumerConnector = null + + def onStop() { + blockGenerator.stop() + } + + def onStart() { + + blockGenerator.start() + + // In case we are using multiple Threads to handle Kafka Messages + val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) + + logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) + + // Kafka connection properties + val props = new Properties() + kafkaParams.foreach(param => props.put(param._1, param._2)) + + // Create the connection to the cluster + logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect")) + val consumerConfig = new ConsumerConfig(props) + consumerConnector = Consumer.create(consumerConfig) + logInfo("Connected to " + kafkaParams("zookeeper.connect")) + + // When autooffset.reset is defined, it is our responsibility to try and whack the + // consumer group zk node. + if (kafkaParams.contains("auto.offset.reset")) { + tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id")) + } + + val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[K]] + val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[V]] + + // Create Threads for each Topic/Message Stream we are listening + val topicMessageStreams = consumerConnector.createMessageStreams( + topics, keyDecoder, valueDecoder) + + + // Start the messages handler for each partition + topicMessageStreams.values.foreach { streams => + streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } + } + } + + // Handles Kafka Messages + private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V]) + extends Runnable { + def run() { + logInfo("Starting MessageHandler.") + for (msgAndMetadata <- stream) { + blockGenerator += (msgAndMetadata.key, msgAndMetadata.message) + } + } + } + + // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because + // Kafka 0.7.2 only honors this param when the group is not in zookeeper. + // + // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas' + // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest': + // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala + private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { + try { + val dir = "/consumers/" + groupId + logInfo("Cleaning up temporary zookeeper data under " + dir + ".") + val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) + zk.deleteRecursive(dir) + zk.close() + } catch { + case _ : Throwable => // swallow + } + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala new file mode 100644 index 0000000000..44e7ce6e1b --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +package object kafka { + implicit def sscToKafkaFunctions(ssc: StreamingContext) = new KafkaFunctions(ssc) +} + diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java new file mode 100644 index 0000000000..66236df662 --- /dev/null +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka; + +import java.util.HashMap; +import org.junit.Test; +import com.google.common.collect.Maps; +import kafka.serializer.StringDecoder; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; + +public class JavaKafkaStreamSuite extends LocalJavaStreamingContext { + @Test + public void testKafkaStream() { + + HashMap topics = Maps.newHashMap(); + JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc); + + // tests the API, does not actually test data receiving + JavaPairDStream test1 = sscWithKafka.kafkaStream("localhost:12345", "group", topics); + JavaPairDStream test2 = sscWithKafka.kafkaStream("localhost:12345", "group", topics, + StorageLevel.MEMORY_AND_DISK_SER_2()); + + HashMap kafkaParams = Maps.newHashMap(); + kafkaParams.put("zookeeper.connect","localhost:12345"); + kafkaParams.put("group.id","consumer-group"); + JavaPairDStream test3 = sscWithKafka.kafkaStream( + String.class, String.class, StringDecoder.class, StringDecoder.class, + kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2()); + + // To verify that JavaStreamingContextWithKafka is also StreamingContext + JavaDStream socketStream = sscWithKafka.socketTextStream("localhost", 9999); + } +} diff --git a/external/kafka/src/test/resources/log4j.properties b/external/kafka/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/kafka/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala new file mode 100644 index 0000000000..2ef3e99c55 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import kafka.serializer.StringDecoder +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.storage.StorageLevel + +class KafkaStreamSuite extends TestSuiteBase { + + test("kafka input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val topics = Map("my-topic" -> 1) + + // tests the API, does not actually test data receiving + val test1 = ssc.kafkaStream("localhost:12345", "group", topics) + val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2) + val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group") + val test3 = ssc.kafkaStream[String, String, StringDecoder, StringDecoder]( + kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2) + + // TODO: Actually test receiving data + } +} diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala new file mode 100644 index 0000000000..d814da0f0d --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import scala.reflect.ClassTag + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating MQTT input streams. + */ +class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext) + extends JavaStreamingContext(javaStreamingContext.ssc) { + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param brokerUrl Url of remote MQTT publisher + * @param topic topic name to subscribe to + */ + def mqttStream( + brokerUrl: String, + topic: String + ): JavaDStream[String] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + ssc.mqttStream(brokerUrl, topic) + } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param brokerUrl Url of remote MQTT publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. + */ + def mqttStream( + brokerUrl: String, + topic: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): JavaDStream[String] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + ssc.mqttStream(brokerUrl, topic, storageLevel) + } +} diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala new file mode 100644 index 0000000000..86f4e9c724 --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + +/** + * Extra MQTT input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversions. Import org.apache.spark.streaming.mqtt._ to use these functions. + */ +class MQTTFunctions(ssc: StreamingContext) { + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param brokerUrl Url of remote MQTT publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + */ + def mqttStream( + brokerUrl: String, + topic: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[String] = { + val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala new file mode 100644 index 0000000000..c8987a3ee0 --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import scala.collection.Map +import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag + +import java.util.Properties +import java.util.concurrent.Executors +import java.io.IOException + +import org.eclipse.paho.client.mqttv3.MqttCallback +import org.eclipse.paho.client.mqttv3.MqttClient +import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken +import org.eclipse.paho.client.mqttv3.MqttException +import org.eclipse.paho.client.mqttv3.MqttMessage +import org.eclipse.paho.client.mqttv3.MqttTopic + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ + +/** + * Input stream that subscribe messages from a Mqtt Broker. + * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ + * @param brokerUrl Url of remote mqtt publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. + */ + +private[streaming] +class MQTTInputDStream[T: ClassTag]( + @transient ssc_ : StreamingContext, + brokerUrl: String, + topic: String, + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_) with Logging { + + def getReceiver(): NetworkReceiver[T] = { + new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]] + } +} + +private[streaming] +class MQTTReceiver(brokerUrl: String, + topic: String, + storageLevel: StorageLevel + ) extends NetworkReceiver[Any] { + lazy protected val blockGenerator = new BlockGenerator(storageLevel) + + def onStop() { + blockGenerator.stop() + } + + def onStart() { + + blockGenerator.start() + + // Set up persistence for messages + var peristance: MqttClientPersistence = new MemoryPersistence() + + // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance + var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance) + + // Connect to MqttBroker + client.connect() + + // Subscribe to Mqtt topic + client.subscribe(topic) + + // Callback automatically triggers as and when new message arrives on specified topic + var callback: MqttCallback = new MqttCallback() { + + // Handles Mqtt message + override def messageArrived(arg0: String, arg1: MqttMessage) { + blockGenerator += new String(arg1.getPayload()) + } + + override def deliveryComplete(arg0: IMqttDeliveryToken) { + } + + override def connectionLost(arg0: Throwable) { + logInfo("Connection lost " + arg0) + } + } + + // Set up callback for MqttClient + client.setCallback(callback) + } +} diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala new file mode 100644 index 0000000000..28a944f57e --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +package object mqtt { + implicit def sscToMQTTFunctions(ssc: StreamingContext) = new MQTTFunctions(ssc) +} + + diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java new file mode 100644 index 0000000000..c1f41640dc --- /dev/null +++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.junit.Test; + +import org.apache.spark.streaming.LocalJavaStreamingContext; + +public class JavaMQTTStreamSuite extends LocalJavaStreamingContext { + @Test + public void testMQTTStream() { + String brokerUrl = "abc"; + String topic = "def"; + JavaStreamingContextWithMQTT sscWithMQTT = new JavaStreamingContextWithMQTT(ssc); + + // tests the API, does not actually test data receiving + JavaDStream test1 = sscWithMQTT.mqttStream(brokerUrl, topic); + JavaDStream test2 = sscWithMQTT.mqttStream(brokerUrl, topic, + StorageLevel.MEMORY_AND_DISK_SER_2()); + + // To verify that JavaStreamingContextWithKafka is also StreamingContext + JavaDStream socketStream = sscWithMQTT.socketTextStream("localhost", 9999); + } +} diff --git a/external/mqtt/src/test/resources/log4j.properties b/external/mqtt/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/mqtt/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala new file mode 100644 index 0000000000..ab6542918b --- /dev/null +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.storage.StorageLevel + +class MQTTStreamSuite extends TestSuiteBase { + + test("MQTT input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val brokerUrl = "abc" + val topic = "def" + + // tests the API, does not actually test data receiving + val test1 = ssc.mqttStream(brokerUrl, topic) + val test2 = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + + // TODO: Actually test receiving data + } +} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala new file mode 100644 index 0000000000..0250364331 --- /dev/null +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter + +import twitter4j.Status +import twitter4j.auth.Authorization + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating Twitter input streams. + */ +class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext) + extends JavaStreamingContext(javaStreamingContext.ssc) { + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + */ + def twitterStream(): JavaDStream[Status] = { + ssc.twitterStream(None) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream(filters: Array[String]): JavaDStream[Status] = { + ssc.twitterStream(None, filters) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = { + ssc.twitterStream(None, filters, storageLevel) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization + */ + def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = { + ssc.twitterStream(Some(twitterAuth)) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream( + twitterAuth: Authorization, + filters: Array[String] + ): JavaDStream[Status] = { + ssc.twitterStream(Some(twitterAuth), filters) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization object + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + twitterAuth: Authorization, + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + ssc.twitterStream(Some(twitterAuth), filters, storageLevel) + } +} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala deleted file mode 100644 index fe66e28ce6..0000000000 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala +++ /dev/null @@ -1,27 +0,0 @@ -package org.apache.spark.streaming.twitter - -import twitter4j.Status -import twitter4j.auth.Authorization -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming._ - - -class StreamingContextWithTwitter(ssc: StreamingContext) { - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth - * authorization; this uses the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret. - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - twitterAuth: Option[Authorization] = None, - filters: Seq[String] = Nil, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[Status] = { - val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) - ssc.registerInputStream(inputStream) - inputStream - } -} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala new file mode 100644 index 0000000000..e91049d9b1 --- /dev/null +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter + +import twitter4j.Status +import twitter4j.auth.Authorization + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + +/** + * Extra Twitter input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversions. Import org.apache.spark.streaming.twitter._ to use these functions. + */ +class TwitterFunctions(ssc: StreamingContext) { + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth + * authorization; this uses the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + twitterAuth: Option[Authorization], + filters: Seq[String] = Nil, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[Status] = { + val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala index 89c202a730..23f82c5885 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala @@ -1,7 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming package object twitter { - implicit def enrichMyStreamingContext(ssc: StreamingContext): StreamingContextWithTwitter = { - new StreamingContextWithTwitter(ssc) - } + implicit def sscToTwitterFunctions(ssc: StreamingContext) = new TwitterFunctions(ssc) } diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java new file mode 100644 index 0000000000..34e4fbdd85 --- /dev/null +++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter; + +import java.util.Arrays; +import org.junit.Test; + +import twitter4j.Status; +import twitter4j.auth.Authorization; +import twitter4j.auth.NullAuthorization; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; + +public class JavaTwitterStreamSuite extends LocalJavaStreamingContext { + @Test + public void testTwitterStream() { + JavaStreamingContextWithTwitter sscWithTwitter = new JavaStreamingContextWithTwitter(ssc); + String[] filters = (String[])Arrays.asList("filter1", "filter2").toArray(); + Authorization auth = NullAuthorization.getInstance(); + + // tests the API, does not actually test data receiving + JavaDStream test1 = sscWithTwitter.twitterStream(); + JavaDStream test2 = sscWithTwitter.twitterStream(filters); + JavaDStream test3 = + sscWithTwitter.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream test4 = sscWithTwitter.twitterStream(auth); + JavaDStream test5 = sscWithTwitter.twitterStream(auth, filters); + JavaDStream test6 = + sscWithTwitter.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + + // To verify that JavaStreamingContextWithKafka is also StreamingContext + JavaDStream socketStream = sscWithTwitter.socketTextStream("localhost", 9999); + } +} diff --git a/external/twitter/src/test/resources/log4j.properties b/external/twitter/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/twitter/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala new file mode 100644 index 0000000000..d7f6d35e07 --- /dev/null +++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter + +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.storage.StorageLevel +import twitter4j.auth.{NullAuthorization, Authorization} + +class TwitterStreamSuite extends TestSuiteBase { + + test("kafka input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val filters = Seq("filter1", "filter2") + val authorization: Authorization = NullAuthorization.getInstance() + + // tests the API, does not actually test data receiving + val test1 = ssc.twitterStream(None) + val test2 = ssc.twitterStream(None, filters) + val test3 = ssc.twitterStream(None, filters, StorageLevel.MEMORY_AND_DISK_SER_2) + val test4 = ssc.twitterStream(Some(authorization)) + val test5 = ssc.twitterStream(Some(authorization), filters) + val test6 = ssc.twitterStream(Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2) + + // Note that actually testing the data receiving is hard as authentication keys are + // necessary for accessing Twitter live stream + } +} diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala new file mode 100644 index 0000000000..dc5d1f05be --- /dev/null +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.zeromq + +import scala.reflect.ClassTag +import scala.collection.JavaConversions._ + +import akka.actor.SupervisorStrategy +import akka.util.ByteString +import akka.zeromq.Subscribe + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating ZeroMQ input streams. + */ +class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext) + extends JavaStreamingContext(javaStreamingContext.ssc) { + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote ZeroMQ publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel Storage level to use for storing the received objects + */ + def zeroMQStream[T]( + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel, + supervisorStrategy: SupervisorStrategy + ): JavaDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. + */ + def zeroMQStream[T]( + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel + ): JavaDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + */ + def zeroMQStream[T]( + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] + ): JavaDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + ssc.zeroMQStream[T](publisherUrl, subscribe, fn) + } +} diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala new file mode 100644 index 0000000000..f4c75ab7c9 --- /dev/null +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.zeromq + +import scala.reflect.ClassTag + +import akka.actor.{Props, SupervisorStrategy} +import akka.util.ByteString +import akka.zeromq.Subscribe + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ +import org.apache.spark.streaming.receivers._ + +/** + * Extra ZeroMQ input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversions. Import org.apache.spark.streaming.zeromq._ to use these functions. + */ +class ZeroMQFunctions(ssc: StreamingContext) { + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic + * and each frame has sequence of byte thus it needs the converter + * (which might be deserializer of bytes) to translate from sequence + * of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + def zeroMQStream[T: ClassTag]( + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: Seq[ByteString] ⇒ Iterator[T], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy + ): DStream[T] = { + ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), + "ZeroMQReceiver", storageLevel, supervisorStrategy) + } +} + diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala new file mode 100644 index 0000000000..769761e3b8 --- /dev/null +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.zeromq + +import scala.reflect.ClassTag + +import akka.actor.Actor +import akka.util.ByteString +import akka.zeromq._ + +import org.apache.spark.Logging +import org.apache.spark.streaming.receivers._ + +/** + * A receiver to subscribe to ZeroMQ stream. + */ +private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: Seq[ByteString] ⇒ Iterator[T]) + extends Actor with Receiver with Logging { + + override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self), + Connect(publisherUrl), subscribe) + + def receive: Receive = { + + case Connecting ⇒ logInfo("connecting ...") + + case m: ZMQMessage ⇒ + logDebug("Received message for:" + m.frame(0)) + + //We ignore first frame for processing as it is the topic + val bytes = m.frames.tail + pushBlock(bytesToObjects(bytes)) + + case Closed ⇒ logInfo("received closed ") + + } +} diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala new file mode 100644 index 0000000000..dc27178149 --- /dev/null +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +package object zeromq { + implicit def sscToZeroMQFunctions(ssc: StreamingContext) = new ZeroMQFunctions(ssc) +} + + diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java new file mode 100644 index 0000000000..96af7d737d --- /dev/null +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.zeromq; + +import org.junit.Test; + +import akka.actor.SupervisorStrategy; +import akka.util.ByteString; +import akka.zeromq.Subscribe; + +import org.apache.spark.api.java.function.Function; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; + +public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { + + @Test // tests the API, does not actually test data receiving + public void testZeroMQStream() { + JavaStreamingContextWithZeroMQ sscWithZeroMQ = new JavaStreamingContextWithZeroMQ(ssc); + String publishUrl = "abc"; + Subscribe subscribe = new Subscribe((ByteString)null); + Function> bytesToObjects = new Function>() { + @Override + public Iterable call(byte[][] bytes) throws Exception { + return null; + } + }; + + JavaDStream test1 = sscWithZeroMQ.zeroMQStream( + publishUrl, subscribe, bytesToObjects); + JavaDStream test2 = sscWithZeroMQ.zeroMQStream( + publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream test3 = sscWithZeroMQ.zeroMQStream( + publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy()); + + // To verify that JavaStreamingContextWithKafka is also StreamingContext + JavaDStream socketStream = sscWithZeroMQ.socketTextStream("localhost", 9999); + } +} diff --git a/external/zeromq/src/test/resources/log4j.properties b/external/zeromq/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/zeromq/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala new file mode 100644 index 0000000000..5adcdb821f --- /dev/null +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.zeromq + +import akka.actor.SupervisorStrategy +import akka.util.ByteString +import akka.zeromq.Subscribe + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} + +class ZeroMQStreamSuite extends TestSuiteBase { + + test("zeromq input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val publishUrl = "abc" + val subscribe = new Subscribe(null.asInstanceOf[ByteString]) + val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]] + + // tests the API, does not actually test data receiving + val test1 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects) + val test2 = ssc.zeroMQStream( + publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) + val test3 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects, + StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) + + // TODO: Actually test data receiving + } +} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f9ff781f38..4e92d2a162 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -49,9 +49,6 @@ object SparkBuild extends Build { lazy val repl = Project("repl", file("repl"), settings = replSettings) .dependsOn(core, bagel, mllib) - lazy val examples = Project("examples", file("examples"), settings = examplesSettings) - .dependsOn(core, mllib, bagel, streaming, externalTwitter) - lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming) lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn(core) @@ -60,8 +57,6 @@ object SparkBuild extends Build { lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core) - lazy val externalTwitter = Project("streaming-twitter", file("external/twitter"), settings = twitterSettings) dependsOn(streaming) - lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings) .dependsOn(core, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*) @@ -94,10 +89,31 @@ object SparkBuild extends Build { lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]() lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]() + lazy val externalTwitter = Project("external-twitter", file("external/twitter"), settings = twitterSettings) + .dependsOn(streaming % "compile->compile;test->test") + + lazy val externalKafka = Project("external-kafka", file("external/kafka"), settings = kafkaSettings) + .dependsOn(streaming % "compile->compile;test->test") + + lazy val externalFlume = Project("external-flume", file("external/flume"), settings = flumeSettings) + .dependsOn(streaming % "compile->compile;test->test") + + lazy val externalZeromq = Project("external-zeromq", file("external/zeromq"), settings = zeromqSettings) + .dependsOn(streaming % "compile->compile;test->test") + + lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings) + .dependsOn(streaming % "compile->compile;test->test") + + lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) + lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) + + lazy val examples = Project("examples", file("examples"), settings = examplesSettings) + .dependsOn(core, mllib, bagel, streaming, externalTwitter) dependsOn(allExternal: _*) + // Everything except assembly, tools and examples belong to packageProjects lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib) ++ maybeYarnRef - lazy val allProjects = packageProjects ++ Seq[ProjectReference](examples, tools, assemblyProj) + lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj) def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.apache.spark", @@ -167,7 +183,7 @@ object SparkBuild extends Build { ), -/* + /* publishTo <<= version { (v: String) => val nexus = "https://oss.sonatype.org/" if (v.trim.endsWith("SNAPSHOT")) @@ -176,8 +192,7 @@ object SparkBuild extends Build { Some("sonatype-staging" at nexus + "service/local/staging/deploy/maven2") }, -*/ - + */ libraryDependencies ++= Seq( "io.netty" % "netty-all" % "4.0.0.CR1", @@ -264,7 +279,6 @@ object SparkBuild extends Build { libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v ) ) - def examplesSettings = sharedSettings ++ Seq( name := "spark-examples", libraryDependencies ++= Seq( @@ -302,21 +316,10 @@ object SparkBuild extends Build { def streamingSettings = sharedSettings ++ Seq( name := "spark-streaming", resolvers ++= Seq( - "Eclipse Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/", "Apache repo" at "https://repository.apache.org/content/repositories/releases" ), - libraryDependencies ++= Seq( - "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy), - "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1" - exclude("com.sun.jdmk", "jmxtools") - exclude("com.sun.jmx", "jmxri") - exclude("net.sf.jopt-simple", "jopt-simple") - excludeAll(excludeNetty), - "org.eclipse.paho" % "mqtt-client" % "0.4.0", - "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), - // "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), - "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty) + "commons-io" % "commons-io" % "2.4" ) ) @@ -331,8 +334,8 @@ object SparkBuild extends Build { def yarnEnabledSettings = Seq( libraryDependencies ++= Seq( // Exclude rule required for all ? - "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), - "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib) ) @@ -358,9 +361,45 @@ object SparkBuild extends Build { ) def twitterSettings() = streamingSettings ++ Seq( - name := "spark-twitter", + name := "spark-streaming-twitter", libraryDependencies ++= Seq( "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty) ) ) + + def kafkaSettings() = streamingSettings ++ Seq( + name := "spark-streaming-kafka", + libraryDependencies ++= Seq( + "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), + "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1" + exclude("com.sun.jdmk", "jmxtools") + exclude("com.sun.jmx", "jmxri") + exclude("net.sf.jopt-simple", "jopt-simple") + excludeAll(excludeNetty) + ) + ) + + def flumeSettings() = streamingSettings ++ Seq( + name := "spark-streaming-flume", + libraryDependencies ++= Seq( + "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy) + ) + ) + + def zeromqSettings() = streamingSettings ++ Seq( + name := "spark-streaming-zeromq", + libraryDependencies ++= Seq( + "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty) + ) + ) + + def mqttSettings() = streamingSettings ++ Seq( + name := "spark-streaming-mqtt", + resolvers ++= Seq( + "Apache repo" at "https://repository.apache.org/content/repositories/releases" + ), + libraryDependencies ++= Seq( + "org.eclipse.paho" % "mqtt-client" % "0.4.0" + ) + ) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 25b9b70b2c..41898b9228 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -17,21 +17,6 @@ package org.apache.spark.streaming -import akka.actor.Props -import akka.actor.SupervisorStrategy -import akka.zeromq.Subscribe - -import org.apache.spark.streaming.dstream._ - -import org.apache.spark._ -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.receivers.ActorReceiver -import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy -import org.apache.spark.streaming.receivers.ZeroMQReceiver -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.MetadataCleaner -import org.apache.spark.streaming.receivers.ActorReceiver - import scala.collection.mutable.Queue import scala.collection.Map import scala.reflect.ClassTag @@ -40,15 +25,22 @@ import java.io.InputStream import java.util.concurrent.atomic.AtomicInteger import java.util.UUID +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.MetadataCleaner +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.receivers._ +import org.apache.spark.streaming.scheduler._ + import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path -//import twitter4j.Status -//import twitter4j.auth.Authorization -import org.apache.spark.streaming.scheduler._ -import akka.util.ByteString + +import akka.actor.Props +import akka.actor.SupervisorStrategy /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -223,74 +215,6 @@ class StreamingContext private ( networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) } - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic - * and each frame has sequence of byte thus it needs the converter - * (which might be deserializer of bytes) to translate from sequence - * of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel RDD storage level. Defaults to memory-only. - */ - def zeroMQStream[T: ClassTag]( - publisherUrl:String, - subscribe: Subscribe, - bytesToObjects: Seq[ByteString] ⇒ Iterator[T], - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, - supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy - ): DStream[T] = { - actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), - "ZeroMQReceiver", storageLevel, supervisorStrategy) - } - - /** - * Create an input stream that pulls messages from a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel Storage level to use for storing the received objects - * (default: StorageLevel.MEMORY_AND_DISK_SER_2) - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: Map[String, Int], - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 - ): DStream[(String, String)] = { - val kafkaParams = Map[String, String]( - "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, - "zookeeper.connection.timeout.ms" -> "10000") - kafkaStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder]( - kafkaParams, - topics, - storageLevel) - } - - /** - * Create an input stream that pulls messages from a Kafka Broker. - * @param kafkaParams Map of kafka configuration paramaters. - * See: http://kafka.apache.org/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel Storage level to use for storing the received objects - */ - def kafkaStream[ - K: ClassTag, - V: ClassTag, - U <: kafka.serializer.Decoder[_]: Manifest, - T <: kafka.serializer.Decoder[_]: Manifest]( - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel - ): DStream[(K, V)] = { - val inputStream = new KafkaInputDStream[K, V, U, T](this, kafkaParams, topics, storageLevel) - registerInputStream(inputStream) - inputStream - } - /** * Create a input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited @@ -329,22 +253,6 @@ class StreamingContext private ( inputStream } - /** - * Create a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - * @param storageLevel Storage level to use for storing the received objects - */ - def flumeStream ( - hostname: String, - port: Int, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[SparkFlumeEvent] = { - val inputStream = new FlumeInputDStream[SparkFlumeEvent](this, hostname, port, storageLevel) - registerInputStream(inputStream) - inputStream - } - /** * Create a input stream from network source hostname:port, where data is received * as serialized blocks (serialized using the Spark's serializer) that can be directly @@ -467,21 +375,6 @@ class StreamingContext private ( inputStream } -/** - * Create an input stream that receives messages pushed by a mqtt publisher. - * @param brokerUrl Url of remote mqtt publisher - * @param topic topic name to subscribe to - * @param storageLevel RDD storage level. Defaults to memory-only. - */ - - def mqttStream( - brokerUrl: String, - topic: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = { - val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel) - registerInputStream(inputStream) - inputStream - } /** * Create a unified DStream from multiple DStreams of the same type and same slide duration. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index b32cfbb677..ea4a0fe619 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -17,28 +17,21 @@ package org.apache.spark.streaming.api.java -import java.lang.{Integer => JInt} -import java.io.InputStream -import java.util.{Map => JMap, List => JList} - import scala.collection.JavaConversions._ import scala.reflect.ClassTag +import java.io.InputStream +import java.util.{Map => JMap, List => JList} + import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -//import twitter4j.Status import akka.actor.Props import akka.actor.SupervisorStrategy -import akka.zeromq.Subscribe -import akka.util.ByteString - -//import twitter4j.auth.Authorization import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD} import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.scheduler.StreamingListener /** @@ -133,81 +126,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** The underlying SparkContext */ val sc: JavaSparkContext = new JavaSparkContext(ssc.sc) - /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt]) - : JavaPairDStream[String, String] = { - implicit val cmt: ClassTag[String] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), - StorageLevel.MEMORY_ONLY_SER_2) - - } - - /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. Defaults to memory-only - * - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt], - storageLevel: StorageLevel) - : JavaPairDStream[String, String] = { - implicit val cmt: ClassTag[String] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), - storageLevel) - } - - /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param keyTypeClass Key type of RDD - * @param valueTypeClass value type of RDD - * @param keyDecoderClass Type of kafka key decoder - * @param valueDecoderClass Type of kafka value decoder - * @param kafkaParams Map of kafka configuration paramaters. - * See: http://kafka.apache.org/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. Defaults to memory-only - */ - def kafkaStream[K, V, U <: kafka.serializer.Decoder[_], T <: kafka.serializer.Decoder[_]]( - keyTypeClass: Class[K], - valueTypeClass: Class[V], - keyDecoderClass: Class[U], - valueDecoderClass: Class[T], - kafkaParams: JMap[String, String], - topics: JMap[String, JInt], - storageLevel: StorageLevel) - : JavaPairDStream[K, V] = { - implicit val keyCmt: ClassTag[K] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] - implicit val valueCmt: ClassTag[V] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] - - implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] - implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] - - ssc.kafkaStream[K, V, U, T]( - kafkaParams.toMap, - Map(topics.mapValues(_.intValue()).toSeq: _*), - storageLevel) - } - /** * Create a input stream from network source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited @@ -318,98 +236,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { ssc.fileStream[K, V, F](directory) } - /** - * Creates a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - * @param storageLevel Storage level to use for storing the received objects - */ - def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): - JavaDStream[SparkFlumeEvent] = { - ssc.flumeStream(hostname, port, storageLevel) - } - - - /** - * Creates a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - */ - def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { - ssc.flumeStream(hostname, port) - } - /* - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization object - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - twitterAuth: Authorization, - filters: Array[String], - storageLevel: StorageLevel - ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth), filters, storageLevel) - } - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret to be set. - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - filters: Array[String], - storageLevel: StorageLevel - ): JavaDStream[Status] = { - ssc.twitterStream(None, filters, storageLevel) - } - - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization - * @param filters Set of filter strings to get only those tweets that match them - */ - def twitterStream( - twitterAuth: Authorization, - filters: Array[String] - ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth), filters) - } - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret to be set. - * @param filters Set of filter strings to get only those tweets that match them - */ - def twitterStream( - filters: Array[String] - ): JavaDStream[Status] = { - ssc.twitterStream(None, filters) - } - - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization - */ - def twitterStream( - twitterAuth: Authorization - ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth)) - } - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret to be set. - */ - def twitterStream(): JavaDStream[Status] = { - ssc.twitterStream() - } - */ /** * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor @@ -472,70 +298,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { ssc.actorStream[T](props, name) } - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel Storage level to use for storing the received objects - */ - def zeroMQStream[T]( - publisherUrl:String, - subscribe: Subscribe, - bytesToObjects: Seq[ByteString] ⇒ Iterator[T], - storageLevel: StorageLevel, - supervisorStrategy: SupervisorStrategy - ): JavaDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy) - } - - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel RDD storage level. Defaults to memory-only. - */ - def zeroMQStream[T]( - publisherUrl:String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], - storageLevel: StorageLevel - ): JavaDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) - } - - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - */ - def zeroMQStream[T]( - publisherUrl:String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] - ): JavaDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn) - } - /** * Registers an output stream that will be computed every interval */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala deleted file mode 100644 index 60d79175f1..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import java.net.InetSocketAddress -import java.io.{ObjectInput, ObjectOutput, Externalizable} -import java.nio.ByteBuffer - -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag - -import org.apache.flume.source.avro.AvroSourceProtocol -import org.apache.flume.source.avro.AvroFlumeEvent -import org.apache.flume.source.avro.Status -import org.apache.avro.ipc.specific.SpecificResponder -import org.apache.avro.ipc.NettyServer - -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.util.Utils -import org.apache.spark.storage.StorageLevel - -private[streaming] -class FlumeInputDStream[T: ClassTag]( - @transient ssc_ : StreamingContext, - host: String, - port: Int, - storageLevel: StorageLevel -) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { - - override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = { - new FlumeReceiver(host, port, storageLevel) - } -} - -/** - * A wrapper class for AvroFlumeEvent's with a custom serialization format. - * - * This is necessary because AvroFlumeEvent uses inner data structures - * which are not serializable. - */ -class SparkFlumeEvent() extends Externalizable { - var event : AvroFlumeEvent = new AvroFlumeEvent() - - /* De-serialize from bytes. */ - def readExternal(in: ObjectInput) { - val bodyLength = in.readInt() - val bodyBuff = new Array[Byte](bodyLength) - in.read(bodyBuff) - - val numHeaders = in.readInt() - val headers = new java.util.HashMap[CharSequence, CharSequence] - - for (i <- 0 until numHeaders) { - val keyLength = in.readInt() - val keyBuff = new Array[Byte](keyLength) - in.read(keyBuff) - val key : String = Utils.deserialize(keyBuff) - - val valLength = in.readInt() - val valBuff = new Array[Byte](valLength) - in.read(valBuff) - val value : String = Utils.deserialize(valBuff) - - headers.put(key, value) - } - - event.setBody(ByteBuffer.wrap(bodyBuff)) - event.setHeaders(headers) - } - - /* Serialize to bytes. */ - def writeExternal(out: ObjectOutput) { - val body = event.getBody.array() - out.writeInt(body.length) - out.write(body) - - val numHeaders = event.getHeaders.size() - out.writeInt(numHeaders) - for ((k, v) <- event.getHeaders) { - val keyBuff = Utils.serialize(k.toString) - out.writeInt(keyBuff.length) - out.write(keyBuff) - val valBuff = Utils.serialize(v.toString) - out.writeInt(valBuff.length) - out.write(valBuff) - } - } -} - -private[streaming] object SparkFlumeEvent { - def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = { - val event = new SparkFlumeEvent - event.event = in - event - } -} - -/** A simple server that implements Flume's Avro protocol. */ -private[streaming] -class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { - override def append(event : AvroFlumeEvent) : Status = { - receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event) - Status.OK - } - - override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { - events.foreach (event => - receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)) - Status.OK - } -} - -/** A NetworkReceiver which listens for events using the - * Flume Avro interface.*/ -private[streaming] -class FlumeReceiver( - host: String, - port: Int, - storageLevel: StorageLevel - ) extends NetworkReceiver[SparkFlumeEvent] { - - lazy val blockGenerator = new BlockGenerator(storageLevel) - - protected override def onStart() { - val responder = new SpecificResponder( - classOf[AvroSourceProtocol], new FlumeEventServer(this)) - val server = new NettyServer(responder, new InetSocketAddress(host, port)) - blockGenerator.start() - server.start() - logInfo("Flume receiver started") - } - - protected override def onStop() { - blockGenerator.stop() - logInfo("Flume receiver stopped") - } - - override def getLocationPreference = Some(host) -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala deleted file mode 100644 index 526f5564c7..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext - -import java.util.Properties -import java.util.concurrent.Executors - -import kafka.consumer._ -import kafka.serializer.Decoder -import kafka.utils.VerifiableProperties -import kafka.utils.ZKStringSerializer -import org.I0Itec.zkclient._ - -import scala.collection.Map -import scala.reflect.ClassTag - -/** - * Input stream that pulls messages from a Kafka Broker. - * - * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. - */ -private[streaming] -class KafkaInputDStream[ - K: ClassTag, - V: ClassTag, - U <: Decoder[_]: Manifest, - T <: Decoder[_]: Manifest]( - @transient ssc_ : StreamingContext, - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel - ) extends NetworkInputDStream[(K, V)](ssc_) with Logging { - - def getReceiver(): NetworkReceiver[(K, V)] = { - new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) - .asInstanceOf[NetworkReceiver[(K, V)]] - } -} - -private[streaming] -class KafkaReceiver[ - K: ClassTag, - V: ClassTag, - U <: Decoder[_]: Manifest, - T <: Decoder[_]: Manifest]( - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel - ) extends NetworkReceiver[Any] { - - // Handles pushing data into the BlockManager - lazy protected val blockGenerator = new BlockGenerator(storageLevel) - // Connection to Kafka - var consumerConnector : ConsumerConnector = null - - def onStop() { - blockGenerator.stop() - } - - def onStart() { - - blockGenerator.start() - - // In case we are using multiple Threads to handle Kafka Messages - val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) - - logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) - - // Kafka connection properties - val props = new Properties() - kafkaParams.foreach(param => props.put(param._1, param._2)) - - // Create the connection to the cluster - logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect")) - val consumerConfig = new ConsumerConfig(props) - consumerConnector = Consumer.create(consumerConfig) - logInfo("Connected to " + kafkaParams("zookeeper.connect")) - - // When autooffset.reset is defined, it is our responsibility to try and whack the - // consumer group zk node. - if (kafkaParams.contains("auto.offset.reset")) { - tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id")) - } - - val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) - .newInstance(consumerConfig.props) - .asInstanceOf[Decoder[K]] - val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) - .newInstance(consumerConfig.props) - .asInstanceOf[Decoder[V]] - - // Create Threads for each Topic/Message Stream we are listening - val topicMessageStreams = consumerConnector.createMessageStreams( - topics, keyDecoder, valueDecoder) - - - // Start the messages handler for each partition - topicMessageStreams.values.foreach { streams => - streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } - } - } - - // Handles Kafka Messages - private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V]) - extends Runnable { - def run() { - logInfo("Starting MessageHandler.") - for (msgAndMetadata <- stream) { - blockGenerator += (msgAndMetadata.key, msgAndMetadata.message) - } - } - } - - // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because - // Kafka 0.7.2 only honors this param when the group is not in zookeeper. - // - // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas' - // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest': - // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala - private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { - try { - val dir = "/consumers/" + groupId - logInfo("Cleaning up temporary zookeeper data under " + dir + ".") - val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) - zk.deleteRecursive(dir) - zk.close() - } catch { - case _ : Throwable => // swallow - } - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala deleted file mode 100644 index ef4a737568..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext } - -import java.util.Properties -import java.util.concurrent.Executors -import java.io.IOException - -import org.eclipse.paho.client.mqttv3.MqttCallback -import org.eclipse.paho.client.mqttv3.MqttClient -import org.eclipse.paho.client.mqttv3.MqttClientPersistence -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken -import org.eclipse.paho.client.mqttv3.MqttException -import org.eclipse.paho.client.mqttv3.MqttMessage -import org.eclipse.paho.client.mqttv3.MqttTopic - -import scala.collection.Map -import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag - -/** - * Input stream that subscribe messages from a Mqtt Broker. - * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ - * @param brokerUrl Url of remote mqtt publisher - * @param topic topic name to subscribe to - * @param storageLevel RDD storage level. - */ - -private[streaming] -class MQTTInputDStream[T: ClassTag]( - @transient ssc_ : StreamingContext, - brokerUrl: String, - topic: String, - storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_) with Logging { - - def getReceiver(): NetworkReceiver[T] = { - new MQTTReceiver(brokerUrl, topic, storageLevel) - .asInstanceOf[NetworkReceiver[T]] - } -} - -private[streaming] -class MQTTReceiver(brokerUrl: String, - topic: String, - storageLevel: StorageLevel - ) extends NetworkReceiver[Any] { - lazy protected val blockGenerator = new BlockGenerator(storageLevel) - - def onStop() { - blockGenerator.stop() - } - - def onStart() { - - blockGenerator.start() - - // Set up persistence for messages - var peristance: MqttClientPersistence = new MemoryPersistence() - - // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance - var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance) - - // Connect to MqttBroker - client.connect() - - // Subscribe to Mqtt topic - client.subscribe(topic) - - // Callback automatically triggers as and when new message arrives on specified topic - var callback: MqttCallback = new MqttCallback() { - - // Handles Mqtt message - override def messageArrived(arg0: String, arg1: MqttMessage) { - blockGenerator += new String(arg1.getPayload()) - } - - override def deliveryComplete(arg0: IMqttDeliveryToken) { - } - - override def connectionLost(arg0: Throwable) { - logInfo("Connection lost " + arg0) - } - } - - // Set up callback for MqttClient - client.setCallback(callback) - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala deleted file mode 100644 index f164d516b0..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.receivers - -import scala.reflect.ClassTag - -import akka.actor.Actor -import akka.util.ByteString -import akka.zeromq._ - -import org.apache.spark.Logging - -/** - * A receiver to subscribe to ZeroMQ stream. - */ -private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, - subscribe: Subscribe, - bytesToObjects: Seq[ByteString] ⇒ Iterator[T]) - extends Actor with Receiver with Logging { - - override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self), - Connect(publisherUrl), subscribe) - - def receive: Receive = { - - case Connecting ⇒ logInfo("connecting ...") - - case m: ZMQMessage ⇒ - logDebug("Received message for:" + m.frame(0)) - - //We ignore first frame for processing as it is the topic - val bytes = m.frames.tail - pushBlock(bytesToObjects(bytes)) - - case Closed ⇒ logInfo("received closed ") - - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 1cd0b9b0a4..2734393ae9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -33,6 +33,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { val ssc = jobScheduler.ssc val clockClass = System.getProperty( "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") + logInfo("Using clock class = " + clockClass) val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => generateJobs(new Time(longTime))) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index abff55d77c..4a8e15db21 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -160,7 +160,10 @@ class NetworkInputTracker( } // Run the dummy Spark job to ensure that all slaves have registered. // This avoids all the receivers to be scheduled on the same node. - ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() + if (!ssc.sparkContext.isLocal) { + ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() + } + // Distribute the receivers and start them ssc.sparkContext.runJob(tempRDD, startReceiver) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index daeb99f5b7..f4d26c0be6 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -17,22 +17,16 @@ package org.apache.spark.streaming; -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Files; - -import kafka.serializer.StringDecoder; +import scala.Tuple2; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.spark.streaming.api.java.JavaDStreamLike; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; +import java.io.*; +import java.util.*; -import scala.Tuple2; -import twitter4j.Status; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.io.Files; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; @@ -43,39 +37,11 @@ import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.dstream.SparkFlumeEvent; -import org.apache.spark.streaming.JavaTestUtils; -import org.apache.spark.streaming.JavaCheckpointTestUtils; - -import java.io.*; -import java.util.*; - -import akka.actor.Props; -import akka.zeromq.Subscribe; - // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; // see http://stackoverflow.com/questions/758570/. -public class JavaAPISuite implements Serializable { - private transient JavaStreamingContext ssc; - - @Before - public void setUp() { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); - ssc.checkpoint("checkpoint"); - } - - @After - public void tearDown() { - ssc.stop(); - ssc = null; - - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port"); - } - +public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable { @Test public void testCount() { List> inputData = Arrays.asList( @@ -1596,26 +1562,6 @@ public class JavaAPISuite implements Serializable { // Input stream tests. These mostly just test that we can instantiate a given InputStream with // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the // InputStream functionality is deferred to the existing Scala tests. - @Test - public void testKafkaStream() { - HashMap topics = Maps.newHashMap(); - JavaPairDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); - JavaPairDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, - StorageLevel.MEMORY_AND_DISK()); - - HashMap kafkaParams = Maps.newHashMap(); - kafkaParams.put("zookeeper.connect","localhost:12345"); - kafkaParams.put("group.id","consumer-group"); - JavaPairDStream test3 = ssc.kafkaStream( - String.class, - String.class, - StringDecoder.class, - StringDecoder.class, - kafkaParams, - topics, - StorageLevel.MEMORY_AND_DISK()); - } - @Test public void testSocketTextStream() { JavaDStream test = ssc.socketTextStream("localhost", 12345); @@ -1654,16 +1600,10 @@ public class JavaAPISuite implements Serializable { public void testRawSocketStream() { JavaDStream test = ssc.rawSocketStream("localhost", 12345); } - - @Test - public void testFlumeStream() { - JavaDStream test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY()); - } - + /* @Test public void testFileStream() { - JavaPairDStream foo = - ssc.>fileStream("/tmp/foo"); + JavaPairDStream foo = ssc.>fileStream("/tmp/foo"); } @Test @@ -1685,5 +1625,5 @@ public class JavaAPISuite implements Serializable { return null; } }); - } + } */ } diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java new file mode 100644 index 0000000000..34bee56885 --- /dev/null +++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.After; +import org.junit.Before; + +public abstract class LocalJavaStreamingContext { + + protected transient JavaStreamingContext ssc; + + @Before + public void setUp() { + System.clearProperty("spark.driver.port"); + System.clearProperty("spark.hostPort"); + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc.checkpoint("checkpoint"); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port"); + System.clearProperty("spark.hostPort"); + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 62a9f120b4..0cffed64a7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -23,7 +23,7 @@ import akka.actor.IOManager import akka.actor.Props import akka.util.ByteString -import org.apache.spark.streaming.dstream.{NetworkReceiver, SparkFlumeEvent} +import org.apache.spark.streaming.dstream.{NetworkReceiver} import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} @@ -31,18 +31,11 @@ import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import util.ManualClock import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receivers.Receiver -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.Logging import scala.util.Random import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfter -import org.apache.flume.source.avro.AvroSourceProtocol -import org.apache.flume.source.avro.AvroFlumeEvent -import org.apache.flume.source.avro.Status -import org.apache.avro.ipc.{specific, NettyTransceiver} -import org.apache.avro.ipc.specific.SpecificRequestor -import java.nio.ByteBuffer import collection.JavaConversions._ -import java.nio.charset.Charset import com.google.common.io.Files import java.util.concurrent.atomic.AtomicInteger @@ -99,55 +92,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } - test("flume input stream") { - // Set up the streaming context and input streams - val ssc = new StreamingContext(master, framework, batchDuration) - val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) - val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] - with SynchronizedBuffer[Seq[SparkFlumeEvent]] - val outputStream = new TestOutputStream(flumeStream, outputBuffer) - ssc.registerOutputStream(outputStream) - ssc.start() - - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - val input = Seq(1, 2, 3, 4, 5) - Thread.sleep(1000) - val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) - val client = SpecificRequestor.getClient( - classOf[AvroSourceProtocol], transceiver) - - for (i <- 0 until input.size) { - val event = new AvroFlumeEvent - event.setBody(ByteBuffer.wrap(input(i).toString.getBytes())) - event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) - client.append(event) - Thread.sleep(500) - clock.addToTime(batchDuration.milliseconds) - } - - val startTime = System.currentTimeMillis() - while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size) - Thread.sleep(100) - } - Thread.sleep(1000) - val timeTaken = System.currentTimeMillis() - startTime - assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") - logInfo("Stopping context") - ssc.stop() - - val decoder = Charset.forName("UTF-8").newDecoder() - - assert(outputBuffer.size === input.length) - for (i <- 0 until outputBuffer.size) { - assert(outputBuffer(i).size === 1) - val str = decoder.decode(outputBuffer(i).head.event.getBody) - assert(str.toString === input(i).toString) - assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") - } - } - - test("file input stream") { // Disable manual clock as FileInputDStream does not work with manual clock System.clearProperty("spark.streaming.clock") @@ -249,21 +193,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } - test("kafka input stream") { - val ssc = new StreamingContext(master, framework, batchDuration) - val topics = Map("my-topic" -> 1) - val test1 = ssc.kafkaStream("localhost:12345", "group", topics) - val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK) - - // Test specifying decoder - val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group") - val test3 = ssc.kafkaStream[ - String, - String, - kafka.serializer.StringDecoder, - kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK) - } - test("multi-thread receiver") { // set up the test receiver val numThreads = 10 diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index e969e91d13..f56c0462f4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -137,11 +137,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // if you want to add your stuff to "before" (i.e., don't call before { } ) def beforeFunction() { if (useManualClock) { - System.setProperty( - "spark.streaming.clock", - "org.apache.spark.streaming.util.ManualClock" - ) + logInfo("Using manual clock") + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") } else { + logInfo("Using real clock") System.clearProperty("spark.streaming.clock") } // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown @@ -273,7 +272,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val startTime = System.currentTimeMillis() while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput) - Thread.sleep(100) + Thread.sleep(10) } val timeTaken = System.currentTimeMillis() - startTime -- cgit v1.2.3 From 97630849ff2cdaa2ff8a115c3e8e6ca8dba7477d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 31 Dec 2013 00:28:57 -0800 Subject: Added pom.xml for external projects and removed unnecessary dependencies and repositoris from other poms and sbt. --- examples/pom.xml | 45 +++++++++++++------- external/flume/pom.xml | 93 ++++++++++++++++++++++++++++++++++++++++ external/kafka/pom.xml | 97 ++++++++++++++++++++++++++++++++++++++++++ external/mqtt/pom.xml | 108 +++++++++++++++++++++++++++++++++++++++++++++++ external/twitter/pom.xml | 89 ++++++++++++++++++++++++++++++++++++++ external/zeromq/pom.xml | 89 ++++++++++++++++++++++++++++++++++++++ pom.xml | 29 +++---------- project/SparkBuild.scala | 21 +++------ streaming/pom.xml | 83 +++++++++++++----------------------- 9 files changed, 548 insertions(+), 106 deletions(-) create mode 100644 external/flume/pom.xml create mode 100644 external/kafka/pom.xml create mode 100644 external/mqtt/pom.xml create mode 100644 external/twitter/pom.xml create mode 100644 external/zeromq/pom.xml (limited to 'project') diff --git a/examples/pom.xml b/examples/pom.xml index 7a7032c319..1839667fea 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -71,6 +71,36 @@ ${project.version} provided + + org.apache.spark + spark-streaming-twitter_${scala.binary.version} + ${project.version} + provided + + + org.apache.spark + spark-streaming-kafka_${scala.binary.version} + ${project.version} + provided + + + org.apache.spark + spark-streaming-flume_${scala.binary.version} + ${project.version} + provided + + + org.apache.spark + spark-streaming-zeromq_${scala.binary.version} + ${project.version} + provided + + + org.apache.spark + spark-streaming-mqtt_${scala.binary.version} + ${project.version} + provided + org.apache.hbase hbase @@ -86,21 +116,6 @@ - - com.sksamuel.kafka - kafka_${scala.binary.version} - 0.8.0-beta1 - - - com.sun.jmx - jmxri - - - com.sun.jdmk - jmxtools - - - org.eclipse.jetty jetty-server diff --git a/external/flume/pom.xml b/external/flume/pom.xml new file mode 100644 index 0000000000..443910a03a --- /dev/null +++ b/external/flume/pom.xml @@ -0,0 +1,93 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent + 0.9.0-incubating-SNAPSHOT + ../../pom.xml + + + org.apache.spark + spark-streaming-flume_2.10 + jar + Spark Project External Flume + http://spark.incubator.apache.org/ + + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.flume + flume-ng-sdk + 1.2.0 + + + org.jboss.netty + netty + + + org.xerial.snappy + * + + + + + org.scalatest + scalatest_${scala.binary.version} + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + com.novocode + junit-interface + test + + + org.slf4j + slf4j-log4j12 + test + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.scalatest + scalatest-maven-plugin + + + + diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml new file mode 100644 index 0000000000..f782e0e126 --- /dev/null +++ b/external/kafka/pom.xml @@ -0,0 +1,97 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent + 0.9.0-incubating-SNAPSHOT + ../../pom.xml + + + org.apache.spark + spark-streaming-kafka_2.10 + jar + Spark Project External Kafka + http://spark.incubator.apache.org/ + + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + test-jar + test + + + com.sksamuel.kafka + kafka_${scala.binary.version} + 0.8.0-beta1 + + + com.sun.jmx + jmxri + + + com.sun.jdmk + jmxtools + + + net.sf.jopt-simple + jopt-simple + + + + + org.scalatest + scalatest_${scala.binary.version} + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + com.novocode + junit-interface + test + + + org.slf4j + slf4j-log4j12 + test + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.scalatest + scalatest-maven-plugin + + + + diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml new file mode 100644 index 0000000000..31b4fa87de --- /dev/null +++ b/external/mqtt/pom.xml @@ -0,0 +1,108 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent + 0.9.0-incubating-SNAPSHOT + ../../pom.xml + + + org.apache.spark + spark-streaming-mqtt_2.10 + jar + Spark Project External MQTT + http://spark.incubator.apache.org/ + + + + mqtt-repo + MQTT Repository + https://repo.eclipse.org/content/repositories/paho-releases + + true + + + false + + + + + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + test-jar + test + + + org.eclipse.paho + mqtt-client + 0.4.0 + + + ${akka.group} + akka-zeromq_${scala.binary.version} + ${akka.version} + + + org.jboss.netty + netty + + + + + org.scalatest + scalatest_${scala.binary.version} + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + com.novocode + junit-interface + test + + + org.slf4j + slf4j-log4j12 + test + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.scalatest + scalatest-maven-plugin + + + + diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml new file mode 100644 index 0000000000..216e6c1d8f --- /dev/null +++ b/external/twitter/pom.xml @@ -0,0 +1,89 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent + 0.9.0-incubating-SNAPSHOT + ../../pom.xml + + + org.apache.spark + spark-streaming-twitter_2.10 + jar + Spark Project External Twitter + http://spark.incubator.apache.org/ + + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + test-jar + test + + + org.twitter4j + twitter4j-stream + 3.0.3 + + + org.jboss.netty + netty + + + + + org.scalatest + scalatest_${scala.binary.version} + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + com.novocode + junit-interface + test + + + org.slf4j + slf4j-log4j12 + test + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.scalatest + scalatest-maven-plugin + + + + diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml new file mode 100644 index 0000000000..c240d59574 --- /dev/null +++ b/external/zeromq/pom.xml @@ -0,0 +1,89 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent + 0.9.0-incubating-SNAPSHOT + ../../pom.xml + + + org.apache.spark + spark-streaming-zeromq_2.10 + jar + Spark Project External ZeroMQ + http://spark.incubator.apache.org/ + + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + test-jar + test + + + ${akka.group} + akka-zeromq_${scala.binary.version} + ${akka.version} + + + org.jboss.netty + netty + + + + + org.scalatest + scalatest_${scala.binary.version} + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + com.novocode + junit-interface + test + + + org.slf4j + slf4j-log4j12 + test + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.scalatest + scalatest-maven-plugin + + + + diff --git a/pom.xml b/pom.xml index 57e843596f..0257a08184 100644 --- a/pom.xml +++ b/pom.xml @@ -87,12 +87,17 @@ core bagel - examples mllib tools streaming repl assembly + external/twitter + external/kafka + external/flume + external/zeromq + external/mqtt + examples @@ -140,17 +145,6 @@ false - - mqtt-repo - MQTT Repository - https://repo.eclipse.org/content/repositories/paho-releases - - true - - - false - - @@ -253,17 +247,6 @@ - - ${akka.group} - akka-zeromq_${scala.binary.version} - ${akka.version} - - - org.jboss.netty - netty - - - it.unimi.dsi fastutil diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 4e92d2a162..b7aa73208a 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -141,7 +141,7 @@ object SparkBuild extends Build { // also check the local Maven repository ~/.m2 resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))), - // For Sonatype publishing + // For Sonatype publishing resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", "sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"), @@ -315,9 +315,6 @@ object SparkBuild extends Build { def streamingSettings = sharedSettings ++ Seq( name := "spark-streaming", - resolvers ++= Seq( - "Apache repo" at "https://repository.apache.org/content/repositories/releases" - ), libraryDependencies ++= Seq( "commons-io" % "commons-io" % "2.4" ) @@ -360,14 +357,14 @@ object SparkBuild extends Build { } ) - def twitterSettings() = streamingSettings ++ Seq( + def twitterSettings() = sharedSettings ++ Seq( name := "spark-streaming-twitter", libraryDependencies ++= Seq( "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty) ) ) - def kafkaSettings() = streamingSettings ++ Seq( + def kafkaSettings() = sharedSettings ++ Seq( name := "spark-streaming-kafka", libraryDependencies ++= Seq( "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), @@ -379,14 +376,14 @@ object SparkBuild extends Build { ) ) - def flumeSettings() = streamingSettings ++ Seq( + def flumeSettings() = sharedSettings ++ Seq( name := "spark-streaming-flume", libraryDependencies ++= Seq( "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy) ) ) - def zeromqSettings() = streamingSettings ++ Seq( + def zeromqSettings() = sharedSettings ++ Seq( name := "spark-streaming-zeromq", libraryDependencies ++= Seq( "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty) @@ -395,11 +392,7 @@ object SparkBuild extends Build { def mqttSettings() = streamingSettings ++ Seq( name := "spark-streaming-mqtt", - resolvers ++= Seq( - "Apache repo" at "https://repository.apache.org/content/repositories/releases" - ), - libraryDependencies ++= Seq( - "org.eclipse.paho" % "mqtt-client" % "0.4.0" - ) + resolvers ++= Seq("Eclipse Repo" at "https://repo.eclipse.org/content/repositories/paho-releases/"), + libraryDependencies ++= Seq("org.eclipse.paho" % "mqtt-client" % "0.4.0") ) } diff --git a/streaming/pom.xml b/streaming/pom.xml index e3b6fee9b2..459756912d 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -60,59 +60,10 @@ jackson-mapper-asl 1.9.11 - - com.sksamuel.kafka - kafka_${scala.binary.version} - 0.8.0-beta1 - - - com.sun.jmx - jmxri - - - com.sun.jdmk - jmxtools - - - net.sf.jopt-simple - jopt-simple - - - - - org.apache.flume - flume-ng-sdk - 1.2.0 - - - org.jboss.netty - netty - - - org.xerial.snappy - * - - - - - org.twitter4j - twitter4j-stream - 3.0.3 - - - org.jboss.netty - netty - - - org.scala-lang scala-library - - ${akka.group} - akka-zeromq_${scala.binary.version} - org.scalatest scalatest_${scala.binary.version} @@ -137,11 +88,6 @@ commons-io commons-io - - org.eclipse.paho - mqtt-client - 0.4.0 - target/scala-${scala.binary.version}/classes @@ -151,6 +97,35 @@ org.scalatest scalatest-maven-plugin + + + + org.apache.maven.plugins + maven-jar-plugin + 2.2 + + + + test-jar + + + + test-jar-on-compile + compile + + test-jar + + + + -- cgit v1.2.3