diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-30 11:13:24 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-30 11:13:24 -0800 |
commit | f4e40661912af2a23e250a49f72f00675172e2de (patch) | |
tree | 97d40d041b08bf8a320f908e7b241cce9432c014 /streaming | |
parent | 6e43039614ed1ec55a134fb82fb3e8d4e80996ef (diff) | |
download | spark-f4e40661912af2a23e250a49f72f00675172e2de.tar.gz spark-f4e40661912af2a23e250a49f72f00675172e2de.tar.bz2 spark-f4e40661912af2a23e250a49f72f00675172e2de.zip |
Refactored kafka, flume, zeromq, mqtt as separate external projects, with their own self-contained scala API, java API, scala unit tests and java unit tests. Updated examples to use the external projects.
Diffstat (limited to 'streaming')
12 files changed, 81 insertions, 978 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 25b9b70b2c..41898b9228 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -17,21 +17,6 @@ package org.apache.spark.streaming -import akka.actor.Props -import akka.actor.SupervisorStrategy -import akka.zeromq.Subscribe - -import org.apache.spark.streaming.dstream._ - -import org.apache.spark._ -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.receivers.ActorReceiver -import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy -import org.apache.spark.streaming.receivers.ZeroMQReceiver -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.MetadataCleaner -import org.apache.spark.streaming.receivers.ActorReceiver - import scala.collection.mutable.Queue import scala.collection.Map import scala.reflect.ClassTag @@ -40,15 +25,22 @@ import java.io.InputStream import java.util.concurrent.atomic.AtomicInteger import java.util.UUID +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.MetadataCleaner +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.receivers._ +import org.apache.spark.streaming.scheduler._ + import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path -//import twitter4j.Status -//import twitter4j.auth.Authorization -import org.apache.spark.streaming.scheduler._ -import akka.util.ByteString + +import akka.actor.Props +import akka.actor.SupervisorStrategy /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -224,74 +216,6 @@ class StreamingContext private ( } /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic - * and each frame has sequence of byte thus it needs the converter - * (which might be deserializer of bytes) to translate from sequence - * of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel RDD storage level. Defaults to memory-only. - */ - def zeroMQStream[T: ClassTag]( - publisherUrl:String, - subscribe: Subscribe, - bytesToObjects: Seq[ByteString] ⇒ Iterator[T], - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, - supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy - ): DStream[T] = { - actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), - "ZeroMQReceiver", storageLevel, supervisorStrategy) - } - - /** - * Create an input stream that pulls messages from a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel Storage level to use for storing the received objects - * (default: StorageLevel.MEMORY_AND_DISK_SER_2) - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: Map[String, Int], - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 - ): DStream[(String, String)] = { - val kafkaParams = Map[String, String]( - "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, - "zookeeper.connection.timeout.ms" -> "10000") - kafkaStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder]( - kafkaParams, - topics, - storageLevel) - } - - /** - * Create an input stream that pulls messages from a Kafka Broker. - * @param kafkaParams Map of kafka configuration paramaters. - * See: http://kafka.apache.org/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel Storage level to use for storing the received objects - */ - def kafkaStream[ - K: ClassTag, - V: ClassTag, - U <: kafka.serializer.Decoder[_]: Manifest, - T <: kafka.serializer.Decoder[_]: Manifest]( - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel - ): DStream[(K, V)] = { - val inputStream = new KafkaInputDStream[K, V, U, T](this, kafkaParams, topics, storageLevel) - registerInputStream(inputStream) - inputStream - } - - /** * Create a input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited * lines. @@ -330,22 +254,6 @@ class StreamingContext private ( } /** - * Create a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - * @param storageLevel Storage level to use for storing the received objects - */ - def flumeStream ( - hostname: String, - port: Int, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[SparkFlumeEvent] = { - val inputStream = new FlumeInputDStream[SparkFlumeEvent](this, hostname, port, storageLevel) - registerInputStream(inputStream) - inputStream - } - - /** * Create a input stream from network source hostname:port, where data is received * as serialized blocks (serialized using the Spark's serializer) that can be directly * pushed into the block manager without deserializing them. This is the most efficient @@ -467,21 +375,6 @@ class StreamingContext private ( inputStream } -/** - * Create an input stream that receives messages pushed by a mqtt publisher. - * @param brokerUrl Url of remote mqtt publisher - * @param topic topic name to subscribe to - * @param storageLevel RDD storage level. Defaults to memory-only. - */ - - def mqttStream( - brokerUrl: String, - topic: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = { - val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel) - registerInputStream(inputStream) - inputStream - } /** * Create a unified DStream from multiple DStreams of the same type and same slide duration. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index b32cfbb677..ea4a0fe619 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -17,28 +17,21 @@ package org.apache.spark.streaming.api.java -import java.lang.{Integer => JInt} -import java.io.InputStream -import java.util.{Map => JMap, List => JList} - import scala.collection.JavaConversions._ import scala.reflect.ClassTag +import java.io.InputStream +import java.util.{Map => JMap, List => JList} + import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -//import twitter4j.Status import akka.actor.Props import akka.actor.SupervisorStrategy -import akka.zeromq.Subscribe -import akka.util.ByteString - -//import twitter4j.auth.Authorization import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD} import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.scheduler.StreamingListener /** @@ -134,81 +127,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { val sc: JavaSparkContext = new JavaSparkContext(ssc.sc) /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt]) - : JavaPairDStream[String, String] = { - implicit val cmt: ClassTag[String] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), - StorageLevel.MEMORY_ONLY_SER_2) - - } - - /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. Defaults to memory-only - * - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt], - storageLevel: StorageLevel) - : JavaPairDStream[String, String] = { - implicit val cmt: ClassTag[String] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), - storageLevel) - } - - /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param keyTypeClass Key type of RDD - * @param valueTypeClass value type of RDD - * @param keyDecoderClass Type of kafka key decoder - * @param valueDecoderClass Type of kafka value decoder - * @param kafkaParams Map of kafka configuration paramaters. - * See: http://kafka.apache.org/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. Defaults to memory-only - */ - def kafkaStream[K, V, U <: kafka.serializer.Decoder[_], T <: kafka.serializer.Decoder[_]]( - keyTypeClass: Class[K], - valueTypeClass: Class[V], - keyDecoderClass: Class[U], - valueDecoderClass: Class[T], - kafkaParams: JMap[String, String], - topics: JMap[String, JInt], - storageLevel: StorageLevel) - : JavaPairDStream[K, V] = { - implicit val keyCmt: ClassTag[K] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] - implicit val valueCmt: ClassTag[V] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] - - implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] - implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] - - ssc.kafkaStream[K, V, U, T]( - kafkaParams.toMap, - Map(topics.mapValues(_.intValue()).toSeq: _*), - storageLevel) - } - - /** * Create a input stream from network source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited * lines. @@ -319,98 +237,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Creates a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - * @param storageLevel Storage level to use for storing the received objects - */ - def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): - JavaDStream[SparkFlumeEvent] = { - ssc.flumeStream(hostname, port, storageLevel) - } - - - /** - * Creates a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - */ - def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { - ssc.flumeStream(hostname, port) - } - /* - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization object - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - twitterAuth: Authorization, - filters: Array[String], - storageLevel: StorageLevel - ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth), filters, storageLevel) - } - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret to be set. - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - filters: Array[String], - storageLevel: StorageLevel - ): JavaDStream[Status] = { - ssc.twitterStream(None, filters, storageLevel) - } - - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization - * @param filters Set of filter strings to get only those tweets that match them - */ - def twitterStream( - twitterAuth: Authorization, - filters: Array[String] - ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth), filters) - } - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret to be set. - * @param filters Set of filter strings to get only those tweets that match them - */ - def twitterStream( - filters: Array[String] - ): JavaDStream[Status] = { - ssc.twitterStream(None, filters) - } - - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization - */ - def twitterStream( - twitterAuth: Authorization - ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth)) - } - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret to be set. - */ - def twitterStream(): JavaDStream[Status] = { - ssc.twitterStream() - } - */ - /** * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor * @param name Name of the actor @@ -473,70 +299,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel Storage level to use for storing the received objects - */ - def zeroMQStream[T]( - publisherUrl:String, - subscribe: Subscribe, - bytesToObjects: Seq[ByteString] ⇒ Iterator[T], - storageLevel: StorageLevel, - supervisorStrategy: SupervisorStrategy - ): JavaDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy) - } - - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel RDD storage level. Defaults to memory-only. - */ - def zeroMQStream[T]( - publisherUrl:String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], - storageLevel: StorageLevel - ): JavaDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) - } - - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - */ - def zeroMQStream[T]( - publisherUrl:String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] - ): JavaDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn) - } - - /** * Registers an output stream that will be computed every interval */ def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala deleted file mode 100644 index 60d79175f1..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import java.net.InetSocketAddress -import java.io.{ObjectInput, ObjectOutput, Externalizable} -import java.nio.ByteBuffer - -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag - -import org.apache.flume.source.avro.AvroSourceProtocol -import org.apache.flume.source.avro.AvroFlumeEvent -import org.apache.flume.source.avro.Status -import org.apache.avro.ipc.specific.SpecificResponder -import org.apache.avro.ipc.NettyServer - -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.util.Utils -import org.apache.spark.storage.StorageLevel - -private[streaming] -class FlumeInputDStream[T: ClassTag]( - @transient ssc_ : StreamingContext, - host: String, - port: Int, - storageLevel: StorageLevel -) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { - - override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = { - new FlumeReceiver(host, port, storageLevel) - } -} - -/** - * A wrapper class for AvroFlumeEvent's with a custom serialization format. - * - * This is necessary because AvroFlumeEvent uses inner data structures - * which are not serializable. - */ -class SparkFlumeEvent() extends Externalizable { - var event : AvroFlumeEvent = new AvroFlumeEvent() - - /* De-serialize from bytes. */ - def readExternal(in: ObjectInput) { - val bodyLength = in.readInt() - val bodyBuff = new Array[Byte](bodyLength) - in.read(bodyBuff) - - val numHeaders = in.readInt() - val headers = new java.util.HashMap[CharSequence, CharSequence] - - for (i <- 0 until numHeaders) { - val keyLength = in.readInt() - val keyBuff = new Array[Byte](keyLength) - in.read(keyBuff) - val key : String = Utils.deserialize(keyBuff) - - val valLength = in.readInt() - val valBuff = new Array[Byte](valLength) - in.read(valBuff) - val value : String = Utils.deserialize(valBuff) - - headers.put(key, value) - } - - event.setBody(ByteBuffer.wrap(bodyBuff)) - event.setHeaders(headers) - } - - /* Serialize to bytes. */ - def writeExternal(out: ObjectOutput) { - val body = event.getBody.array() - out.writeInt(body.length) - out.write(body) - - val numHeaders = event.getHeaders.size() - out.writeInt(numHeaders) - for ((k, v) <- event.getHeaders) { - val keyBuff = Utils.serialize(k.toString) - out.writeInt(keyBuff.length) - out.write(keyBuff) - val valBuff = Utils.serialize(v.toString) - out.writeInt(valBuff.length) - out.write(valBuff) - } - } -} - -private[streaming] object SparkFlumeEvent { - def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = { - val event = new SparkFlumeEvent - event.event = in - event - } -} - -/** A simple server that implements Flume's Avro protocol. */ -private[streaming] -class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { - override def append(event : AvroFlumeEvent) : Status = { - receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event) - Status.OK - } - - override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { - events.foreach (event => - receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)) - Status.OK - } -} - -/** A NetworkReceiver which listens for events using the - * Flume Avro interface.*/ -private[streaming] -class FlumeReceiver( - host: String, - port: Int, - storageLevel: StorageLevel - ) extends NetworkReceiver[SparkFlumeEvent] { - - lazy val blockGenerator = new BlockGenerator(storageLevel) - - protected override def onStart() { - val responder = new SpecificResponder( - classOf[AvroSourceProtocol], new FlumeEventServer(this)) - val server = new NettyServer(responder, new InetSocketAddress(host, port)) - blockGenerator.start() - server.start() - logInfo("Flume receiver started") - } - - protected override def onStop() { - blockGenerator.stop() - logInfo("Flume receiver stopped") - } - - override def getLocationPreference = Some(host) -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala deleted file mode 100644 index 526f5564c7..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext - -import java.util.Properties -import java.util.concurrent.Executors - -import kafka.consumer._ -import kafka.serializer.Decoder -import kafka.utils.VerifiableProperties -import kafka.utils.ZKStringSerializer -import org.I0Itec.zkclient._ - -import scala.collection.Map -import scala.reflect.ClassTag - -/** - * Input stream that pulls messages from a Kafka Broker. - * - * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. - */ -private[streaming] -class KafkaInputDStream[ - K: ClassTag, - V: ClassTag, - U <: Decoder[_]: Manifest, - T <: Decoder[_]: Manifest]( - @transient ssc_ : StreamingContext, - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel - ) extends NetworkInputDStream[(K, V)](ssc_) with Logging { - - def getReceiver(): NetworkReceiver[(K, V)] = { - new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) - .asInstanceOf[NetworkReceiver[(K, V)]] - } -} - -private[streaming] -class KafkaReceiver[ - K: ClassTag, - V: ClassTag, - U <: Decoder[_]: Manifest, - T <: Decoder[_]: Manifest]( - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel - ) extends NetworkReceiver[Any] { - - // Handles pushing data into the BlockManager - lazy protected val blockGenerator = new BlockGenerator(storageLevel) - // Connection to Kafka - var consumerConnector : ConsumerConnector = null - - def onStop() { - blockGenerator.stop() - } - - def onStart() { - - blockGenerator.start() - - // In case we are using multiple Threads to handle Kafka Messages - val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) - - logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) - - // Kafka connection properties - val props = new Properties() - kafkaParams.foreach(param => props.put(param._1, param._2)) - - // Create the connection to the cluster - logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect")) - val consumerConfig = new ConsumerConfig(props) - consumerConnector = Consumer.create(consumerConfig) - logInfo("Connected to " + kafkaParams("zookeeper.connect")) - - // When autooffset.reset is defined, it is our responsibility to try and whack the - // consumer group zk node. - if (kafkaParams.contains("auto.offset.reset")) { - tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id")) - } - - val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) - .newInstance(consumerConfig.props) - .asInstanceOf[Decoder[K]] - val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) - .newInstance(consumerConfig.props) - .asInstanceOf[Decoder[V]] - - // Create Threads for each Topic/Message Stream we are listening - val topicMessageStreams = consumerConnector.createMessageStreams( - topics, keyDecoder, valueDecoder) - - - // Start the messages handler for each partition - topicMessageStreams.values.foreach { streams => - streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } - } - } - - // Handles Kafka Messages - private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V]) - extends Runnable { - def run() { - logInfo("Starting MessageHandler.") - for (msgAndMetadata <- stream) { - blockGenerator += (msgAndMetadata.key, msgAndMetadata.message) - } - } - } - - // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because - // Kafka 0.7.2 only honors this param when the group is not in zookeeper. - // - // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas' - // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest': - // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala - private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { - try { - val dir = "/consumers/" + groupId - logInfo("Cleaning up temporary zookeeper data under " + dir + ".") - val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) - zk.deleteRecursive(dir) - zk.close() - } catch { - case _ : Throwable => // swallow - } - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala deleted file mode 100644 index ef4a737568..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext } - -import java.util.Properties -import java.util.concurrent.Executors -import java.io.IOException - -import org.eclipse.paho.client.mqttv3.MqttCallback -import org.eclipse.paho.client.mqttv3.MqttClient -import org.eclipse.paho.client.mqttv3.MqttClientPersistence -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken -import org.eclipse.paho.client.mqttv3.MqttException -import org.eclipse.paho.client.mqttv3.MqttMessage -import org.eclipse.paho.client.mqttv3.MqttTopic - -import scala.collection.Map -import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag - -/** - * Input stream that subscribe messages from a Mqtt Broker. - * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ - * @param brokerUrl Url of remote mqtt publisher - * @param topic topic name to subscribe to - * @param storageLevel RDD storage level. - */ - -private[streaming] -class MQTTInputDStream[T: ClassTag]( - @transient ssc_ : StreamingContext, - brokerUrl: String, - topic: String, - storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_) with Logging { - - def getReceiver(): NetworkReceiver[T] = { - new MQTTReceiver(brokerUrl, topic, storageLevel) - .asInstanceOf[NetworkReceiver[T]] - } -} - -private[streaming] -class MQTTReceiver(brokerUrl: String, - topic: String, - storageLevel: StorageLevel - ) extends NetworkReceiver[Any] { - lazy protected val blockGenerator = new BlockGenerator(storageLevel) - - def onStop() { - blockGenerator.stop() - } - - def onStart() { - - blockGenerator.start() - - // Set up persistence for messages - var peristance: MqttClientPersistence = new MemoryPersistence() - - // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance - var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance) - - // Connect to MqttBroker - client.connect() - - // Subscribe to Mqtt topic - client.subscribe(topic) - - // Callback automatically triggers as and when new message arrives on specified topic - var callback: MqttCallback = new MqttCallback() { - - // Handles Mqtt message - override def messageArrived(arg0: String, arg1: MqttMessage) { - blockGenerator += new String(arg1.getPayload()) - } - - override def deliveryComplete(arg0: IMqttDeliveryToken) { - } - - override def connectionLost(arg0: Throwable) { - logInfo("Connection lost " + arg0) - } - } - - // Set up callback for MqttClient - client.setCallback(callback) - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala deleted file mode 100644 index f164d516b0..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.receivers - -import scala.reflect.ClassTag - -import akka.actor.Actor -import akka.util.ByteString -import akka.zeromq._ - -import org.apache.spark.Logging - -/** - * A receiver to subscribe to ZeroMQ stream. - */ -private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, - subscribe: Subscribe, - bytesToObjects: Seq[ByteString] ⇒ Iterator[T]) - extends Actor with Receiver with Logging { - - override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self), - Connect(publisherUrl), subscribe) - - def receive: Receive = { - - case Connecting ⇒ logInfo("connecting ...") - - case m: ZMQMessage ⇒ - logDebug("Received message for:" + m.frame(0)) - - //We ignore first frame for processing as it is the topic - val bytes = m.frames.tail - pushBlock(bytesToObjects(bytes)) - - case Closed ⇒ logInfo("received closed ") - - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 1cd0b9b0a4..2734393ae9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -33,6 +33,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { val ssc = jobScheduler.ssc val clockClass = System.getProperty( "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") + logInfo("Using clock class = " + clockClass) val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => generateJobs(new Time(longTime))) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index abff55d77c..4a8e15db21 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -160,7 +160,10 @@ class NetworkInputTracker( } // Run the dummy Spark job to ensure that all slaves have registered. // This avoids all the receivers to be scheduled on the same node. - ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() + if (!ssc.sparkContext.isLocal) { + ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() + } + // Distribute the receivers and start them ssc.sparkContext.runJob(tempRDD, startReceiver) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index daeb99f5b7..f4d26c0be6 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -17,22 +17,16 @@ package org.apache.spark.streaming; -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Files; - -import kafka.serializer.StringDecoder; +import scala.Tuple2; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.spark.streaming.api.java.JavaDStreamLike; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; +import java.io.*; +import java.util.*; -import scala.Tuple2; -import twitter4j.Status; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.io.Files; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; @@ -43,39 +37,11 @@ import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.dstream.SparkFlumeEvent; -import org.apache.spark.streaming.JavaTestUtils; -import org.apache.spark.streaming.JavaCheckpointTestUtils; - -import java.io.*; -import java.util.*; - -import akka.actor.Props; -import akka.zeromq.Subscribe; - // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; // see http://stackoverflow.com/questions/758570/. -public class JavaAPISuite implements Serializable { - private transient JavaStreamingContext ssc; - - @Before - public void setUp() { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); - ssc.checkpoint("checkpoint"); - } - - @After - public void tearDown() { - ssc.stop(); - ssc = null; - - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port"); - } - +public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable { @Test public void testCount() { List<List<Integer>> inputData = Arrays.asList( @@ -1597,26 +1563,6 @@ public class JavaAPISuite implements Serializable { // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the // InputStream functionality is deferred to the existing Scala tests. @Test - public void testKafkaStream() { - HashMap<String, Integer> topics = Maps.newHashMap(); - JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics); - JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics, - StorageLevel.MEMORY_AND_DISK()); - - HashMap<String, String> kafkaParams = Maps.newHashMap(); - kafkaParams.put("zookeeper.connect","localhost:12345"); - kafkaParams.put("group.id","consumer-group"); - JavaPairDStream<String, String> test3 = ssc.kafkaStream( - String.class, - String.class, - StringDecoder.class, - StringDecoder.class, - kafkaParams, - topics, - StorageLevel.MEMORY_AND_DISK()); - } - - @Test public void testSocketTextStream() { JavaDStream<String> test = ssc.socketTextStream("localhost", 12345); } @@ -1654,16 +1600,10 @@ public class JavaAPISuite implements Serializable { public void testRawSocketStream() { JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345); } - - @Test - public void testFlumeStream() { - JavaDStream<SparkFlumeEvent> test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY()); - } - + /* @Test public void testFileStream() { - JavaPairDStream<String, String> foo = - ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo"); + JavaPairDStream<String, String> foo = ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo"); } @Test @@ -1685,5 +1625,5 @@ public class JavaAPISuite implements Serializable { return null; } }); - } + } */ } diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java new file mode 100644 index 0000000000..34bee56885 --- /dev/null +++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.After; +import org.junit.Before; + +public abstract class LocalJavaStreamingContext { + + protected transient JavaStreamingContext ssc; + + @Before + public void setUp() { + System.clearProperty("spark.driver.port"); + System.clearProperty("spark.hostPort"); + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc.checkpoint("checkpoint"); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port"); + System.clearProperty("spark.hostPort"); + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 62a9f120b4..0cffed64a7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -23,7 +23,7 @@ import akka.actor.IOManager import akka.actor.Props import akka.util.ByteString -import org.apache.spark.streaming.dstream.{NetworkReceiver, SparkFlumeEvent} +import org.apache.spark.streaming.dstream.{NetworkReceiver} import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} @@ -31,18 +31,11 @@ import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import util.ManualClock import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receivers.Receiver -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.Logging import scala.util.Random import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfter -import org.apache.flume.source.avro.AvroSourceProtocol -import org.apache.flume.source.avro.AvroFlumeEvent -import org.apache.flume.source.avro.Status -import org.apache.avro.ipc.{specific, NettyTransceiver} -import org.apache.avro.ipc.specific.SpecificRequestor -import java.nio.ByteBuffer import collection.JavaConversions._ -import java.nio.charset.Charset import com.google.common.io.Files import java.util.concurrent.atomic.AtomicInteger @@ -99,55 +92,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } - test("flume input stream") { - // Set up the streaming context and input streams - val ssc = new StreamingContext(master, framework, batchDuration) - val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) - val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] - with SynchronizedBuffer[Seq[SparkFlumeEvent]] - val outputStream = new TestOutputStream(flumeStream, outputBuffer) - ssc.registerOutputStream(outputStream) - ssc.start() - - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - val input = Seq(1, 2, 3, 4, 5) - Thread.sleep(1000) - val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) - val client = SpecificRequestor.getClient( - classOf[AvroSourceProtocol], transceiver) - - for (i <- 0 until input.size) { - val event = new AvroFlumeEvent - event.setBody(ByteBuffer.wrap(input(i).toString.getBytes())) - event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) - client.append(event) - Thread.sleep(500) - clock.addToTime(batchDuration.milliseconds) - } - - val startTime = System.currentTimeMillis() - while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size) - Thread.sleep(100) - } - Thread.sleep(1000) - val timeTaken = System.currentTimeMillis() - startTime - assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") - logInfo("Stopping context") - ssc.stop() - - val decoder = Charset.forName("UTF-8").newDecoder() - - assert(outputBuffer.size === input.length) - for (i <- 0 until outputBuffer.size) { - assert(outputBuffer(i).size === 1) - val str = decoder.decode(outputBuffer(i).head.event.getBody) - assert(str.toString === input(i).toString) - assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") - } - } - - test("file input stream") { // Disable manual clock as FileInputDStream does not work with manual clock System.clearProperty("spark.streaming.clock") @@ -249,21 +193,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } - test("kafka input stream") { - val ssc = new StreamingContext(master, framework, batchDuration) - val topics = Map("my-topic" -> 1) - val test1 = ssc.kafkaStream("localhost:12345", "group", topics) - val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK) - - // Test specifying decoder - val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group") - val test3 = ssc.kafkaStream[ - String, - String, - kafka.serializer.StringDecoder, - kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK) - } - test("multi-thread receiver") { // set up the test receiver val numThreads = 10 diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index e969e91d13..f56c0462f4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -137,11 +137,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // if you want to add your stuff to "before" (i.e., don't call before { } ) def beforeFunction() { if (useManualClock) { - System.setProperty( - "spark.streaming.clock", - "org.apache.spark.streaming.util.ManualClock" - ) + logInfo("Using manual clock") + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") } else { + logInfo("Using real clock") System.clearProperty("spark.streaming.clock") } // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown @@ -273,7 +272,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val startTime = System.currentTimeMillis() while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput) - Thread.sleep(100) + Thread.sleep(10) } val timeTaken = System.currentTimeMillis() - startTime |