diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-30 11:13:24 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-30 11:13:24 -0800 |
commit | f4e40661912af2a23e250a49f72f00675172e2de (patch) | |
tree | 97d40d041b08bf8a320f908e7b241cce9432c014 /external | |
parent | 6e43039614ed1ec55a134fb82fb3e8d4e80996ef (diff) | |
download | spark-f4e40661912af2a23e250a49f72f00675172e2de.tar.gz spark-f4e40661912af2a23e250a49f72f00675172e2de.tar.bz2 spark-f4e40661912af2a23e250a49f72f00675172e2de.zip |
Refactored kafka, flume, zeromq, mqtt as separate external projects, with their own self-contained scala API, java API, scala unit tests and java unit tests. Updated examples to use the external projects.
Diffstat (limited to 'external')
35 files changed, 1894 insertions, 30 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala new file mode 100644 index 0000000000..35e7a01abc --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + +/** + * Extra Flume input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversion. Import org.apache.spark.streaming.flume._ to use these functions. + */ +class FlumeFunctions(ssc: StreamingContext) { + /** + * Create a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def flumeStream ( + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[SparkFlumeEvent] = { + val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala new file mode 100644 index 0000000000..ce3ef47cfe --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import java.net.InetSocketAddress +import java.io.{ObjectInput, ObjectOutput, Externalizable} +import java.nio.ByteBuffer + +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag + +import org.apache.flume.source.avro.AvroSourceProtocol +import org.apache.flume.source.avro.AvroFlumeEvent +import org.apache.flume.source.avro.Status +import org.apache.avro.ipc.specific.SpecificResponder +import org.apache.avro.ipc.NettyServer + +import org.apache.spark.util.Utils +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ + +private[streaming] +class FlumeInputDStream[T: ClassTag]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + storageLevel: StorageLevel +) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { + + override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = { + new FlumeReceiver(host, port, storageLevel) + } +} + +/** + * A wrapper class for AvroFlumeEvent's with a custom serialization format. + * + * This is necessary because AvroFlumeEvent uses inner data structures + * which are not serializable. + */ +class SparkFlumeEvent() extends Externalizable { + var event : AvroFlumeEvent = new AvroFlumeEvent() + + /* De-serialize from bytes. */ + def readExternal(in: ObjectInput) { + val bodyLength = in.readInt() + val bodyBuff = new Array[Byte](bodyLength) + in.read(bodyBuff) + + val numHeaders = in.readInt() + val headers = new java.util.HashMap[CharSequence, CharSequence] + + for (i <- 0 until numHeaders) { + val keyLength = in.readInt() + val keyBuff = new Array[Byte](keyLength) + in.read(keyBuff) + val key : String = Utils.deserialize(keyBuff) + + val valLength = in.readInt() + val valBuff = new Array[Byte](valLength) + in.read(valBuff) + val value : String = Utils.deserialize(valBuff) + + headers.put(key, value) + } + + event.setBody(ByteBuffer.wrap(bodyBuff)) + event.setHeaders(headers) + } + + /* Serialize to bytes. */ + def writeExternal(out: ObjectOutput) { + val body = event.getBody.array() + out.writeInt(body.length) + out.write(body) + + val numHeaders = event.getHeaders.size() + out.writeInt(numHeaders) + for ((k, v) <- event.getHeaders) { + val keyBuff = Utils.serialize(k.toString) + out.writeInt(keyBuff.length) + out.write(keyBuff) + val valBuff = Utils.serialize(v.toString) + out.writeInt(valBuff.length) + out.write(valBuff) + } + } +} + +private[streaming] object SparkFlumeEvent { + def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = { + val event = new SparkFlumeEvent + event.event = in + event + } +} + +/** A simple server that implements Flume's Avro protocol. */ +private[streaming] +class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { + override def append(event : AvroFlumeEvent) : Status = { + receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event) + Status.OK + } + + override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { + events.foreach (event => + receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)) + Status.OK + } +} + +/** A NetworkReceiver which listens for events using the + * Flume Avro interface.*/ +private[streaming] +class FlumeReceiver( + host: String, + port: Int, + storageLevel: StorageLevel + ) extends NetworkReceiver[SparkFlumeEvent] { + + lazy val blockGenerator = new BlockGenerator(storageLevel) + + protected override def onStart() { + val responder = new SpecificResponder( + classOf[AvroSourceProtocol], new FlumeEventServer(this)) + val server = new NettyServer(responder, new InetSocketAddress(host, port)) + blockGenerator.start() + server.start() + logInfo("Flume receiver started") + } + + protected override def onStop() { + blockGenerator.stop() + logInfo("Flume receiver stopped") + } + + override def getLocationPreference = Some(host) +} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala new file mode 100644 index 0000000000..4e66ae3535 --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.storage.StorageLevel + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating Flume input streams. + */ +class JavaStreamingContextWithFlume(javaStreamingContext: JavaStreamingContext) + extends JavaStreamingContext(javaStreamingContext.ssc) { + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + */ + def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { + ssc.flumeStream(hostname, port) + } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): + JavaDStream[SparkFlumeEvent] = { + ssc.flumeStream(hostname, port, storageLevel) + } +} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala new file mode 100644 index 0000000000..c087a39d1c --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +package object flume { + implicit def sscToFlumeFunctions(ssc: StreamingContext) = new FlumeFunctions(ssc) +} + diff --git a/external/flume/src/test/java/JavaFlumeStreamSuite.java b/external/flume/src/test/java/JavaFlumeStreamSuite.java new file mode 100644 index 0000000000..deffc78c4c --- /dev/null +++ b/external/flume/src/test/java/JavaFlumeStreamSuite.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume; +import org.apache.spark.streaming.flume.SparkFlumeEvent; +import org.junit.Test; + +public class JavaFlumeStreamSuite extends LocalJavaStreamingContext { + @Test + public void testFlumeStream() { + JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc); + + // tests the API, does not actually test data receiving + JavaDStream<SparkFlumeEvent> test1 = sscWithFlume.flumeStream("localhost", 12345); + JavaDStream<SparkFlumeEvent> test2 = sscWithFlume.flumeStream("localhost", 12345, + StorageLevel.MEMORY_AND_DISK_SER_2()); + + // To verify that JavaStreamingContextWithKafka is also StreamingContext + JavaDStream<String> socketStream = sscWithFlume.socketTextStream("localhost", 9999); + } +} diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/flume/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala new file mode 100644 index 0000000000..ba33320d02 --- /dev/null +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} + +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.charset.Charset + +import org.apache.avro.ipc.NettyTransceiver +import org.apache.avro.ipc.specific.SpecificRequestor +import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol} + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase} +import org.apache.spark.streaming.util.ManualClock + +class FlumeStreamSuite extends TestSuiteBase { + + val testPort = 9999 + + test("flume input stream") { + // Set up the streaming context and input streams + val ssc = new StreamingContext(master, framework, batchDuration) + val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] + with SynchronizedBuffer[Seq[SparkFlumeEvent]] + val outputStream = new TestOutputStream(flumeStream, outputBuffer) + ssc.registerOutputStream(outputStream) + ssc.start() + + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = Seq(1, 2, 3, 4, 5) + Thread.sleep(1000) + val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) + val client = SpecificRequestor.getClient( + classOf[AvroSourceProtocol], transceiver) + + for (i <- 0 until input.size) { + val event = new AvroFlumeEvent + event.setBody(ByteBuffer.wrap(input(i).toString.getBytes())) + event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) + client.append(event) + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + } + + val startTime = System.currentTimeMillis() + while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size) + Thread.sleep(100) + } + Thread.sleep(1000) + val timeTaken = System.currentTimeMillis() - startTime + assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") + logInfo("Stopping context") + ssc.stop() + + val decoder = Charset.forName("UTF-8").newDecoder() + + assert(outputBuffer.size === input.length) + for (i <- 0 until outputBuffer.size) { + assert(outputBuffer(i).size === 1) + val str = decoder.decode(outputBuffer(i).head.event.getBody) + assert(str.toString === input(i).toString) + assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") + } + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala new file mode 100644 index 0000000000..ab0e8a6c8d --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.reflect.ClassTag +import scala.collection.JavaConversions._ + +import java.lang.{Integer => JInt} +import java.util.{Map => JMap} + +import kafka.serializer.Decoder + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream} + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating Kafka input streams. + */ +class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext) + extends JavaStreamingContext(javaStreamingContext.ssc) { + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt] + ): JavaPairDStream[String, String] = { + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. + * + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt], + storageLevel: StorageLevel + ): JavaPairDStream[String, String] = { + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param keyTypeClass Key type of RDD + * @param valueTypeClass value type of RDD + * @param keyDecoderClass Type of kafka key decoder + * @param valueDecoderClass Type of kafka value decoder + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. Defaults to memory-only + */ + def kafkaStream[K, V, U <: Decoder[_], T <: Decoder[_]]( + keyTypeClass: Class[K], + valueTypeClass: Class[V], + keyDecoderClass: Class[U], + valueDecoderClass: Class[T], + kafkaParams: JMap[String, String], + topics: JMap[String, JInt], + storageLevel: StorageLevel + ): JavaPairDStream[K, V] = { + implicit val keyCmt: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val valueCmt: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + + implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] + implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] + + ssc.kafkaStream[K, V, U, T]( + kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala new file mode 100644 index 0000000000..2135634a69 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.reflect.ClassTag + +import kafka.serializer.{Decoder, StringDecoder} + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + +/** + * Extra Kafka input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversion. Import org.apache.spark.streaming.kafka._ to use these functions. + */ +class KafkaFunctions(ssc: StreamingContext) { + /** + * Create an input stream that pulls messages from a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: Map[String, Int], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[(String, String)] = { + val kafkaParams = Map[String, String]( + "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, + "zookeeper.connection.timeout.ms" -> "10000") + kafkaStream[String, String, StringDecoder, StringDecoder]( + kafkaParams, + topics, + storageLevel) + } + + /** + * Create an input stream that pulls messages from a Kafka Broker. + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel Storage level to use for storing the received objects + */ + def kafkaStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ): DStream[(K, V)] = { + val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala new file mode 100644 index 0000000000..fd69328aba --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.collection.Map +import scala.reflect.ClassTag + +import java.util.Properties +import java.util.concurrent.Executors + +import kafka.consumer._ +import kafka.serializer.Decoder +import kafka.utils.VerifiableProperties +import kafka.utils.ZKStringSerializer +import org.I0Itec.zkclient._ + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ + + +/** + * Input stream that pulls messages from a Kafka Broker. + * + * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. + */ +private[streaming] +class KafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U <: Decoder[_]: Manifest, + T <: Decoder[_]: Manifest]( + @transient ssc_ : StreamingContext, + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ) extends NetworkInputDStream[(K, V)](ssc_) with Logging { + + def getReceiver(): NetworkReceiver[(K, V)] = { + new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) + .asInstanceOf[NetworkReceiver[(K, V)]] + } +} + +private[streaming] +class KafkaReceiver[ + K: ClassTag, + V: ClassTag, + U <: Decoder[_]: Manifest, + T <: Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ) extends NetworkReceiver[Any] { + + // Handles pushing data into the BlockManager + lazy protected val blockGenerator = new BlockGenerator(storageLevel) + // Connection to Kafka + var consumerConnector : ConsumerConnector = null + + def onStop() { + blockGenerator.stop() + } + + def onStart() { + + blockGenerator.start() + + // In case we are using multiple Threads to handle Kafka Messages + val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) + + logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) + + // Kafka connection properties + val props = new Properties() + kafkaParams.foreach(param => props.put(param._1, param._2)) + + // Create the connection to the cluster + logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect")) + val consumerConfig = new ConsumerConfig(props) + consumerConnector = Consumer.create(consumerConfig) + logInfo("Connected to " + kafkaParams("zookeeper.connect")) + + // When autooffset.reset is defined, it is our responsibility to try and whack the + // consumer group zk node. + if (kafkaParams.contains("auto.offset.reset")) { + tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id")) + } + + val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[K]] + val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[V]] + + // Create Threads for each Topic/Message Stream we are listening + val topicMessageStreams = consumerConnector.createMessageStreams( + topics, keyDecoder, valueDecoder) + + + // Start the messages handler for each partition + topicMessageStreams.values.foreach { streams => + streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } + } + } + + // Handles Kafka Messages + private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V]) + extends Runnable { + def run() { + logInfo("Starting MessageHandler.") + for (msgAndMetadata <- stream) { + blockGenerator += (msgAndMetadata.key, msgAndMetadata.message) + } + } + } + + // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because + // Kafka 0.7.2 only honors this param when the group is not in zookeeper. + // + // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas' + // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest': + // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala + private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { + try { + val dir = "/consumers/" + groupId + logInfo("Cleaning up temporary zookeeper data under " + dir + ".") + val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) + zk.deleteRecursive(dir) + zk.close() + } catch { + case _ : Throwable => // swallow + } + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala new file mode 100644 index 0000000000..44e7ce6e1b --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +package object kafka { + implicit def sscToKafkaFunctions(ssc: StreamingContext) = new KafkaFunctions(ssc) +} + diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java new file mode 100644 index 0000000000..66236df662 --- /dev/null +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka; + +import java.util.HashMap; +import org.junit.Test; +import com.google.common.collect.Maps; +import kafka.serializer.StringDecoder; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; + +public class JavaKafkaStreamSuite extends LocalJavaStreamingContext { + @Test + public void testKafkaStream() { + + HashMap<String, Integer> topics = Maps.newHashMap(); + JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc); + + // tests the API, does not actually test data receiving + JavaPairDStream<String, String> test1 = sscWithKafka.kafkaStream("localhost:12345", "group", topics); + JavaPairDStream<String, String> test2 = sscWithKafka.kafkaStream("localhost:12345", "group", topics, + StorageLevel.MEMORY_AND_DISK_SER_2()); + + HashMap<String, String> kafkaParams = Maps.newHashMap(); + kafkaParams.put("zookeeper.connect","localhost:12345"); + kafkaParams.put("group.id","consumer-group"); + JavaPairDStream<String, String> test3 = sscWithKafka.kafkaStream( + String.class, String.class, StringDecoder.class, StringDecoder.class, + kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2()); + + // To verify that JavaStreamingContextWithKafka is also StreamingContext + JavaDStream<String> socketStream = sscWithKafka.socketTextStream("localhost", 9999); + } +} diff --git a/external/kafka/src/test/resources/log4j.properties b/external/kafka/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/kafka/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala new file mode 100644 index 0000000000..2ef3e99c55 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import kafka.serializer.StringDecoder +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.storage.StorageLevel + +class KafkaStreamSuite extends TestSuiteBase { + + test("kafka input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val topics = Map("my-topic" -> 1) + + // tests the API, does not actually test data receiving + val test1 = ssc.kafkaStream("localhost:12345", "group", topics) + val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2) + val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group") + val test3 = ssc.kafkaStream[String, String, StringDecoder, StringDecoder]( + kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2) + + // TODO: Actually test receiving data + } +} diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala new file mode 100644 index 0000000000..d814da0f0d --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import scala.reflect.ClassTag + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating MQTT input streams. + */ +class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext) + extends JavaStreamingContext(javaStreamingContext.ssc) { + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param brokerUrl Url of remote MQTT publisher + * @param topic topic name to subscribe to + */ + def mqttStream( + brokerUrl: String, + topic: String + ): JavaDStream[String] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + ssc.mqttStream(brokerUrl, topic) + } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param brokerUrl Url of remote MQTT publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. + */ + def mqttStream( + brokerUrl: String, + topic: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): JavaDStream[String] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + ssc.mqttStream(brokerUrl, topic, storageLevel) + } +} diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala new file mode 100644 index 0000000000..86f4e9c724 --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + +/** + * Extra MQTT input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversions. Import org.apache.spark.streaming.mqtt._ to use these functions. + */ +class MQTTFunctions(ssc: StreamingContext) { + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param brokerUrl Url of remote MQTT publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + */ + def mqttStream( + brokerUrl: String, + topic: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[String] = { + val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala new file mode 100644 index 0000000000..c8987a3ee0 --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import scala.collection.Map +import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag + +import java.util.Properties +import java.util.concurrent.Executors +import java.io.IOException + +import org.eclipse.paho.client.mqttv3.MqttCallback +import org.eclipse.paho.client.mqttv3.MqttClient +import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken +import org.eclipse.paho.client.mqttv3.MqttException +import org.eclipse.paho.client.mqttv3.MqttMessage +import org.eclipse.paho.client.mqttv3.MqttTopic + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ + +/** + * Input stream that subscribe messages from a Mqtt Broker. + * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ + * @param brokerUrl Url of remote mqtt publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. + */ + +private[streaming] +class MQTTInputDStream[T: ClassTag]( + @transient ssc_ : StreamingContext, + brokerUrl: String, + topic: String, + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_) with Logging { + + def getReceiver(): NetworkReceiver[T] = { + new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]] + } +} + +private[streaming] +class MQTTReceiver(brokerUrl: String, + topic: String, + storageLevel: StorageLevel + ) extends NetworkReceiver[Any] { + lazy protected val blockGenerator = new BlockGenerator(storageLevel) + + def onStop() { + blockGenerator.stop() + } + + def onStart() { + + blockGenerator.start() + + // Set up persistence for messages + var peristance: MqttClientPersistence = new MemoryPersistence() + + // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance + var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance) + + // Connect to MqttBroker + client.connect() + + // Subscribe to Mqtt topic + client.subscribe(topic) + + // Callback automatically triggers as and when new message arrives on specified topic + var callback: MqttCallback = new MqttCallback() { + + // Handles Mqtt message + override def messageArrived(arg0: String, arg1: MqttMessage) { + blockGenerator += new String(arg1.getPayload()) + } + + override def deliveryComplete(arg0: IMqttDeliveryToken) { + } + + override def connectionLost(arg0: Throwable) { + logInfo("Connection lost " + arg0) + } + } + + // Set up callback for MqttClient + client.setCallback(callback) + } +} diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala new file mode 100644 index 0000000000..28a944f57e --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +package object mqtt { + implicit def sscToMQTTFunctions(ssc: StreamingContext) = new MQTTFunctions(ssc) +} + + diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java new file mode 100644 index 0000000000..c1f41640dc --- /dev/null +++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.junit.Test; + +import org.apache.spark.streaming.LocalJavaStreamingContext; + +public class JavaMQTTStreamSuite extends LocalJavaStreamingContext { + @Test + public void testMQTTStream() { + String brokerUrl = "abc"; + String topic = "def"; + JavaStreamingContextWithMQTT sscWithMQTT = new JavaStreamingContextWithMQTT(ssc); + + // tests the API, does not actually test data receiving + JavaDStream<String> test1 = sscWithMQTT.mqttStream(brokerUrl, topic); + JavaDStream<String> test2 = sscWithMQTT.mqttStream(brokerUrl, topic, + StorageLevel.MEMORY_AND_DISK_SER_2()); + + // To verify that JavaStreamingContextWithKafka is also StreamingContext + JavaDStream<String> socketStream = sscWithMQTT.socketTextStream("localhost", 9999); + } +} diff --git a/external/mqtt/src/test/resources/log4j.properties b/external/mqtt/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/mqtt/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala new file mode 100644 index 0000000000..ab6542918b --- /dev/null +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.storage.StorageLevel + +class MQTTStreamSuite extends TestSuiteBase { + + test("MQTT input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val brokerUrl = "abc" + val topic = "def" + + // tests the API, does not actually test data receiving + val test1 = ssc.mqttStream(brokerUrl, topic) + val test2 = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + + // TODO: Actually test receiving data + } +} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala new file mode 100644 index 0000000000..0250364331 --- /dev/null +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter + +import twitter4j.Status +import twitter4j.auth.Authorization + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating Twitter input streams. + */ +class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext) + extends JavaStreamingContext(javaStreamingContext.ssc) { + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + */ + def twitterStream(): JavaDStream[Status] = { + ssc.twitterStream(None) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream(filters: Array[String]): JavaDStream[Status] = { + ssc.twitterStream(None, filters) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = { + ssc.twitterStream(None, filters, storageLevel) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization + */ + def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = { + ssc.twitterStream(Some(twitterAuth)) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream( + twitterAuth: Authorization, + filters: Array[String] + ): JavaDStream[Status] = { + ssc.twitterStream(Some(twitterAuth), filters) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization object + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + twitterAuth: Authorization, + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + ssc.twitterStream(Some(twitterAuth), filters, storageLevel) + } +} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala deleted file mode 100644 index fe66e28ce6..0000000000 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala +++ /dev/null @@ -1,27 +0,0 @@ -package org.apache.spark.streaming.twitter - -import twitter4j.Status -import twitter4j.auth.Authorization -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming._ - - -class StreamingContextWithTwitter(ssc: StreamingContext) { - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth - * authorization; this uses the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret. - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - twitterAuth: Option[Authorization] = None, - filters: Seq[String] = Nil, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[Status] = { - val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) - ssc.registerInputStream(inputStream) - inputStream - } -} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala new file mode 100644 index 0000000000..e91049d9b1 --- /dev/null +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter + +import twitter4j.Status +import twitter4j.auth.Authorization + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + +/** + * Extra Twitter input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversions. Import org.apache.spark.streaming.twitter._ to use these functions. + */ +class TwitterFunctions(ssc: StreamingContext) { + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth + * authorization; this uses the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + twitterAuth: Option[Authorization], + filters: Seq[String] = Nil, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[Status] = { + val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala index 89c202a730..23f82c5885 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala @@ -1,7 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming package object twitter { - implicit def enrichMyStreamingContext(ssc: StreamingContext): StreamingContextWithTwitter = { - new StreamingContextWithTwitter(ssc) - } + implicit def sscToTwitterFunctions(ssc: StreamingContext) = new TwitterFunctions(ssc) } diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java new file mode 100644 index 0000000000..34e4fbdd85 --- /dev/null +++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter; + +import java.util.Arrays; +import org.junit.Test; + +import twitter4j.Status; +import twitter4j.auth.Authorization; +import twitter4j.auth.NullAuthorization; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; + +public class JavaTwitterStreamSuite extends LocalJavaStreamingContext { + @Test + public void testTwitterStream() { + JavaStreamingContextWithTwitter sscWithTwitter = new JavaStreamingContextWithTwitter(ssc); + String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray(); + Authorization auth = NullAuthorization.getInstance(); + + // tests the API, does not actually test data receiving + JavaDStream<Status> test1 = sscWithTwitter.twitterStream(); + JavaDStream<Status> test2 = sscWithTwitter.twitterStream(filters); + JavaDStream<Status> test3 = + sscWithTwitter.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream<Status> test4 = sscWithTwitter.twitterStream(auth); + JavaDStream<Status> test5 = sscWithTwitter.twitterStream(auth, filters); + JavaDStream<Status> test6 = + sscWithTwitter.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + + // To verify that JavaStreamingContextWithKafka is also StreamingContext + JavaDStream<String> socketStream = sscWithTwitter.socketTextStream("localhost", 9999); + } +} diff --git a/external/twitter/src/test/resources/log4j.properties b/external/twitter/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/twitter/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala new file mode 100644 index 0000000000..d7f6d35e07 --- /dev/null +++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter + +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.storage.StorageLevel +import twitter4j.auth.{NullAuthorization, Authorization} + +class TwitterStreamSuite extends TestSuiteBase { + + test("kafka input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val filters = Seq("filter1", "filter2") + val authorization: Authorization = NullAuthorization.getInstance() + + // tests the API, does not actually test data receiving + val test1 = ssc.twitterStream(None) + val test2 = ssc.twitterStream(None, filters) + val test3 = ssc.twitterStream(None, filters, StorageLevel.MEMORY_AND_DISK_SER_2) + val test4 = ssc.twitterStream(Some(authorization)) + val test5 = ssc.twitterStream(Some(authorization), filters) + val test6 = ssc.twitterStream(Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2) + + // Note that actually testing the data receiving is hard as authentication keys are + // necessary for accessing Twitter live stream + } +} diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala new file mode 100644 index 0000000000..dc5d1f05be --- /dev/null +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.zeromq + +import scala.reflect.ClassTag +import scala.collection.JavaConversions._ + +import akka.actor.SupervisorStrategy +import akka.util.ByteString +import akka.zeromq.Subscribe + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating ZeroMQ input streams. + */ +class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext) + extends JavaStreamingContext(javaStreamingContext.ssc) { + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote ZeroMQ publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel Storage level to use for storing the received objects + */ + def zeroMQStream[T]( + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel, + supervisorStrategy: SupervisorStrategy + ): JavaDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. + */ + def zeroMQStream[T]( + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel + ): JavaDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + */ + def zeroMQStream[T]( + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] + ): JavaDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + ssc.zeroMQStream[T](publisherUrl, subscribe, fn) + } +} diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala new file mode 100644 index 0000000000..f4c75ab7c9 --- /dev/null +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.zeromq + +import scala.reflect.ClassTag + +import akka.actor.{Props, SupervisorStrategy} +import akka.util.ByteString +import akka.zeromq.Subscribe + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ +import org.apache.spark.streaming.receivers._ + +/** + * Extra ZeroMQ input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversions. Import org.apache.spark.streaming.zeromq._ to use these functions. + */ +class ZeroMQFunctions(ssc: StreamingContext) { + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic + * and each frame has sequence of byte thus it needs the converter + * (which might be deserializer of bytes) to translate from sequence + * of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + def zeroMQStream[T: ClassTag]( + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: Seq[ByteString] ⇒ Iterator[T], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy + ): DStream[T] = { + ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), + "ZeroMQReceiver", storageLevel, supervisorStrategy) + } +} + diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala new file mode 100644 index 0000000000..769761e3b8 --- /dev/null +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.zeromq + +import scala.reflect.ClassTag + +import akka.actor.Actor +import akka.util.ByteString +import akka.zeromq._ + +import org.apache.spark.Logging +import org.apache.spark.streaming.receivers._ + +/** + * A receiver to subscribe to ZeroMQ stream. + */ +private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: Seq[ByteString] ⇒ Iterator[T]) + extends Actor with Receiver with Logging { + + override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self), + Connect(publisherUrl), subscribe) + + def receive: Receive = { + + case Connecting ⇒ logInfo("connecting ...") + + case m: ZMQMessage ⇒ + logDebug("Received message for:" + m.frame(0)) + + //We ignore first frame for processing as it is the topic + val bytes = m.frames.tail + pushBlock(bytesToObjects(bytes)) + + case Closed ⇒ logInfo("received closed ") + + } +} diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala new file mode 100644 index 0000000000..dc27178149 --- /dev/null +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +package object zeromq { + implicit def sscToZeroMQFunctions(ssc: StreamingContext) = new ZeroMQFunctions(ssc) +} + + diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java new file mode 100644 index 0000000000..96af7d737d --- /dev/null +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.zeromq; + +import org.junit.Test; + +import akka.actor.SupervisorStrategy; +import akka.util.ByteString; +import akka.zeromq.Subscribe; + +import org.apache.spark.api.java.function.Function; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; + +public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { + + @Test // tests the API, does not actually test data receiving + public void testZeroMQStream() { + JavaStreamingContextWithZeroMQ sscWithZeroMQ = new JavaStreamingContextWithZeroMQ(ssc); + String publishUrl = "abc"; + Subscribe subscribe = new Subscribe((ByteString)null); + Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() { + @Override + public Iterable<String> call(byte[][] bytes) throws Exception { + return null; + } + }; + + JavaDStream<String> test1 = sscWithZeroMQ.<String>zeroMQStream( + publishUrl, subscribe, bytesToObjects); + JavaDStream<String> test2 = sscWithZeroMQ.<String>zeroMQStream( + publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream<String> test3 = sscWithZeroMQ.<String>zeroMQStream( + publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy()); + + // To verify that JavaStreamingContextWithKafka is also StreamingContext + JavaDStream<String> socketStream = sscWithZeroMQ.socketTextStream("localhost", 9999); + } +} diff --git a/external/zeromq/src/test/resources/log4j.properties b/external/zeromq/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/zeromq/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala new file mode 100644 index 0000000000..5adcdb821f --- /dev/null +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.zeromq + +import akka.actor.SupervisorStrategy +import akka.util.ByteString +import akka.zeromq.Subscribe + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} + +class ZeroMQStreamSuite extends TestSuiteBase { + + test("zeromq input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val publishUrl = "abc" + val subscribe = new Subscribe(null.asInstanceOf[ByteString]) + val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]] + + // tests the API, does not actually test data receiving + val test1 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects) + val test2 = ssc.zeroMQStream( + publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) + val test3 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects, + StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) + + // TODO: Actually test data receiving + } +} |