aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-30 11:13:24 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-30 11:13:24 -0800
commitf4e40661912af2a23e250a49f72f00675172e2de (patch)
tree97d40d041b08bf8a320f908e7b241cce9432c014 /streaming/src
parent6e43039614ed1ec55a134fb82fb3e8d4e80996ef (diff)
downloadspark-f4e40661912af2a23e250a49f72f00675172e2de.tar.gz
spark-f4e40661912af2a23e250a49f72f00675172e2de.tar.bz2
spark-f4e40661912af2a23e250a49f72f00675172e2de.zip
Refactored kafka, flume, zeromq, mqtt as separate external projects, with their own self-contained scala API, java API, scala unit tests and java unit tests. Updated examples to use the external projects.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala129
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala244
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala154
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala153
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala110
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala53
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala5
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java80
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java46
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala75
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala9
12 files changed, 81 insertions, 978 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 25b9b70b2c..41898b9228 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -17,21 +17,6 @@
package org.apache.spark.streaming
-import akka.actor.Props
-import akka.actor.SupervisorStrategy
-import akka.zeromq.Subscribe
-
-import org.apache.spark.streaming.dstream._
-
-import org.apache.spark._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.receivers.ActorReceiver
-import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
-import org.apache.spark.streaming.receivers.ZeroMQReceiver
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.MetadataCleaner
-import org.apache.spark.streaming.receivers.ActorReceiver
-
import scala.collection.mutable.Queue
import scala.collection.Map
import scala.reflect.ClassTag
@@ -40,15 +25,22 @@ import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
import java.util.UUID
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receivers._
+import org.apache.spark.streaming.scheduler._
+
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
-//import twitter4j.Status
-//import twitter4j.auth.Authorization
-import org.apache.spark.streaming.scheduler._
-import akka.util.ByteString
+
+import akka.actor.Props
+import akka.actor.SupervisorStrategy
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -224,74 +216,6 @@ class StreamingContext private (
}
/**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
- * and each frame has sequence of byte thus it needs the converter
- * (which might be deserializer of bytes) to translate from sequence
- * of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel RDD storage level. Defaults to memory-only.
- */
- def zeroMQStream[T: ClassTag](
- publisherUrl:String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
- supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
- ): DStream[T] = {
- actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
- "ZeroMQReceiver", storageLevel, supervisorStrategy)
- }
-
- /**
- * Create an input stream that pulls messages from a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel Storage level to use for storing the received objects
- * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: Map[String, Int],
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
- ): DStream[(String, String)] = {
- val kafkaParams = Map[String, String](
- "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
- "zookeeper.connection.timeout.ms" -> "10000")
- kafkaStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
- kafkaParams,
- topics,
- storageLevel)
- }
-
- /**
- * Create an input stream that pulls messages from a Kafka Broker.
- * @param kafkaParams Map of kafka configuration paramaters.
- * See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel Storage level to use for storing the received objects
- */
- def kafkaStream[
- K: ClassTag,
- V: ClassTag,
- U <: kafka.serializer.Decoder[_]: Manifest,
- T <: kafka.serializer.Decoder[_]: Manifest](
- kafkaParams: Map[String, String],
- topics: Map[String, Int],
- storageLevel: StorageLevel
- ): DStream[(K, V)] = {
- val inputStream = new KafkaInputDStream[K, V, U, T](this, kafkaParams, topics, storageLevel)
- registerInputStream(inputStream)
- inputStream
- }
-
- /**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
@@ -330,22 +254,6 @@ class StreamingContext private (
}
/**
- * Create a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- */
- def flumeStream (
- hostname: String,
- port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[SparkFlumeEvent] = {
- val inputStream = new FlumeInputDStream[SparkFlumeEvent](this, hostname, port, storageLevel)
- registerInputStream(inputStream)
- inputStream
- }
-
- /**
* Create a input stream from network source hostname:port, where data is received
* as serialized blocks (serialized using the Spark's serializer) that can be directly
* pushed into the block manager without deserializing them. This is the most efficient
@@ -467,21 +375,6 @@ class StreamingContext private (
inputStream
}
-/**
- * Create an input stream that receives messages pushed by a mqtt publisher.
- * @param brokerUrl Url of remote mqtt publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level. Defaults to memory-only.
- */
-
- def mqttStream(
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = {
- val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel)
- registerInputStream(inputStream)
- inputStream
- }
/**
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index b32cfbb677..ea4a0fe619 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -17,28 +17,21 @@
package org.apache.spark.streaming.api.java
-import java.lang.{Integer => JInt}
-import java.io.InputStream
-import java.util.{Map => JMap, List => JList}
-
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
+import java.io.InputStream
+import java.util.{Map => JMap, List => JList}
+
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-//import twitter4j.Status
import akka.actor.Props
import akka.actor.SupervisorStrategy
-import akka.zeromq.Subscribe
-import akka.util.ByteString
-
-//import twitter4j.auth.Authorization
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.scheduler.StreamingListener
/**
@@ -134,81 +127,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
val sc: JavaSparkContext = new JavaSparkContext(ssc.sc)
/**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: JMap[String, JInt])
- : JavaPairDStream[String, String] = {
- implicit val cmt: ClassTag[String] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
- StorageLevel.MEMORY_ONLY_SER_2)
-
- }
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level. Defaults to memory-only
- *
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: JMap[String, JInt],
- storageLevel: StorageLevel)
- : JavaPairDStream[String, String] = {
- implicit val cmt: ClassTag[String] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
- storageLevel)
- }
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param keyTypeClass Key type of RDD
- * @param valueTypeClass value type of RDD
- * @param keyDecoderClass Type of kafka key decoder
- * @param valueDecoderClass Type of kafka value decoder
- * @param kafkaParams Map of kafka configuration paramaters.
- * See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level. Defaults to memory-only
- */
- def kafkaStream[K, V, U <: kafka.serializer.Decoder[_], T <: kafka.serializer.Decoder[_]](
- keyTypeClass: Class[K],
- valueTypeClass: Class[V],
- keyDecoderClass: Class[U],
- valueDecoderClass: Class[T],
- kafkaParams: JMap[String, String],
- topics: JMap[String, JInt],
- storageLevel: StorageLevel)
- : JavaPairDStream[K, V] = {
- implicit val keyCmt: ClassTag[K] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
- implicit val valueCmt: ClassTag[V] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
-
- implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
- implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
-
- ssc.kafkaStream[K, V, U, T](
- kafkaParams.toMap,
- Map(topics.mapValues(_.intValue()).toSeq: _*),
- storageLevel)
- }
-
- /**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
* lines.
@@ -319,98 +237,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Creates a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- */
- def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
- JavaDStream[SparkFlumeEvent] = {
- ssc.flumeStream(hostname, port, storageLevel)
- }
-
-
- /**
- * Creates a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- */
- def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
- ssc.flumeStream(hostname, port)
- }
- /*
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J Authorization object
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def twitterStream(
- twitterAuth: Authorization,
- filters: Array[String],
- storageLevel: StorageLevel
- ): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * .consumerSecret, .accessToken and .accessTokenSecret to be set.
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def twitterStream(
- filters: Array[String],
- storageLevel: StorageLevel
- ): JavaDStream[Status] = {
- ssc.twitterStream(None, filters, storageLevel)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J Authorization
- * @param filters Set of filter strings to get only those tweets that match them
- */
- def twitterStream(
- twitterAuth: Authorization,
- filters: Array[String]
- ): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth), filters)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * .consumerSecret, .accessToken and .accessTokenSecret to be set.
- * @param filters Set of filter strings to get only those tweets that match them
- */
- def twitterStream(
- filters: Array[String]
- ): JavaDStream[Status] = {
- ssc.twitterStream(None, filters)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J Authorization
- */
- def twitterStream(
- twitterAuth: Authorization
- ): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth))
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * .consumerSecret, .accessToken and .accessTokenSecret to be set.
- */
- def twitterStream(): JavaDStream[Status] = {
- ssc.twitterStream()
- }
- */
- /**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
* @param name Name of the actor
@@ -473,70 +299,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel Storage level to use for storing the received objects
- */
- def zeroMQStream[T](
- publisherUrl:String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
- storageLevel: StorageLevel,
- supervisorStrategy: SupervisorStrategy
- ): JavaDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel RDD storage level. Defaults to memory-only.
- */
- def zeroMQStream[T](
- publisherUrl:String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
- storageLevel: StorageLevel
- ): JavaDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- */
- def zeroMQStream[T](
- publisherUrl:String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
- ): JavaDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
- }
-
- /**
* Registers an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
deleted file mode 100644
index 60d79175f1..0000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import java.net.InetSocketAddress
-import java.io.{ObjectInput, ObjectOutput, Externalizable}
-import java.nio.ByteBuffer
-
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-
-import org.apache.flume.source.avro.AvroSourceProtocol
-import org.apache.flume.source.avro.AvroFlumeEvent
-import org.apache.flume.source.avro.Status
-import org.apache.avro.ipc.specific.SpecificResponder
-import org.apache.avro.ipc.NettyServer
-
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.util.Utils
-import org.apache.spark.storage.StorageLevel
-
-private[streaming]
-class FlumeInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
- host: String,
- port: Int,
- storageLevel: StorageLevel
-) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
-
- override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
- new FlumeReceiver(host, port, storageLevel)
- }
-}
-
-/**
- * A wrapper class for AvroFlumeEvent's with a custom serialization format.
- *
- * This is necessary because AvroFlumeEvent uses inner data structures
- * which are not serializable.
- */
-class SparkFlumeEvent() extends Externalizable {
- var event : AvroFlumeEvent = new AvroFlumeEvent()
-
- /* De-serialize from bytes. */
- def readExternal(in: ObjectInput) {
- val bodyLength = in.readInt()
- val bodyBuff = new Array[Byte](bodyLength)
- in.read(bodyBuff)
-
- val numHeaders = in.readInt()
- val headers = new java.util.HashMap[CharSequence, CharSequence]
-
- for (i <- 0 until numHeaders) {
- val keyLength = in.readInt()
- val keyBuff = new Array[Byte](keyLength)
- in.read(keyBuff)
- val key : String = Utils.deserialize(keyBuff)
-
- val valLength = in.readInt()
- val valBuff = new Array[Byte](valLength)
- in.read(valBuff)
- val value : String = Utils.deserialize(valBuff)
-
- headers.put(key, value)
- }
-
- event.setBody(ByteBuffer.wrap(bodyBuff))
- event.setHeaders(headers)
- }
-
- /* Serialize to bytes. */
- def writeExternal(out: ObjectOutput) {
- val body = event.getBody.array()
- out.writeInt(body.length)
- out.write(body)
-
- val numHeaders = event.getHeaders.size()
- out.writeInt(numHeaders)
- for ((k, v) <- event.getHeaders) {
- val keyBuff = Utils.serialize(k.toString)
- out.writeInt(keyBuff.length)
- out.write(keyBuff)
- val valBuff = Utils.serialize(v.toString)
- out.writeInt(valBuff.length)
- out.write(valBuff)
- }
- }
-}
-
-private[streaming] object SparkFlumeEvent {
- def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
- val event = new SparkFlumeEvent
- event.event = in
- event
- }
-}
-
-/** A simple server that implements Flume's Avro protocol. */
-private[streaming]
-class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
- override def append(event : AvroFlumeEvent) : Status = {
- receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
- Status.OK
- }
-
- override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
- events.foreach (event =>
- receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
- Status.OK
- }
-}
-
-/** A NetworkReceiver which listens for events using the
- * Flume Avro interface.*/
-private[streaming]
-class FlumeReceiver(
- host: String,
- port: Int,
- storageLevel: StorageLevel
- ) extends NetworkReceiver[SparkFlumeEvent] {
-
- lazy val blockGenerator = new BlockGenerator(storageLevel)
-
- protected override def onStart() {
- val responder = new SpecificResponder(
- classOf[AvroSourceProtocol], new FlumeEventServer(this))
- val server = new NettyServer(responder, new InetSocketAddress(host, port))
- blockGenerator.start()
- server.start()
- logInfo("Flume receiver started")
- }
-
- protected override def onStop() {
- blockGenerator.stop()
- logInfo("Flume receiver stopped")
- }
-
- override def getLocationPreference = Some(host)
-}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
deleted file mode 100644
index 526f5564c7..0000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-
-import java.util.Properties
-import java.util.concurrent.Executors
-
-import kafka.consumer._
-import kafka.serializer.Decoder
-import kafka.utils.VerifiableProperties
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient._
-
-import scala.collection.Map
-import scala.reflect.ClassTag
-
-/**
- * Input stream that pulls messages from a Kafka Broker.
- *
- * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level.
- */
-private[streaming]
-class KafkaInputDStream[
- K: ClassTag,
- V: ClassTag,
- U <: Decoder[_]: Manifest,
- T <: Decoder[_]: Manifest](
- @transient ssc_ : StreamingContext,
- kafkaParams: Map[String, String],
- topics: Map[String, Int],
- storageLevel: StorageLevel
- ) extends NetworkInputDStream[(K, V)](ssc_) with Logging {
-
- def getReceiver(): NetworkReceiver[(K, V)] = {
- new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
- .asInstanceOf[NetworkReceiver[(K, V)]]
- }
-}
-
-private[streaming]
-class KafkaReceiver[
- K: ClassTag,
- V: ClassTag,
- U <: Decoder[_]: Manifest,
- T <: Decoder[_]: Manifest](
- kafkaParams: Map[String, String],
- topics: Map[String, Int],
- storageLevel: StorageLevel
- ) extends NetworkReceiver[Any] {
-
- // Handles pushing data into the BlockManager
- lazy protected val blockGenerator = new BlockGenerator(storageLevel)
- // Connection to Kafka
- var consumerConnector : ConsumerConnector = null
-
- def onStop() {
- blockGenerator.stop()
- }
-
- def onStart() {
-
- blockGenerator.start()
-
- // In case we are using multiple Threads to handle Kafka Messages
- val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
-
- logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
-
- // Kafka connection properties
- val props = new Properties()
- kafkaParams.foreach(param => props.put(param._1, param._2))
-
- // Create the connection to the cluster
- logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect"))
- val consumerConfig = new ConsumerConfig(props)
- consumerConnector = Consumer.create(consumerConfig)
- logInfo("Connected to " + kafkaParams("zookeeper.connect"))
-
- // When autooffset.reset is defined, it is our responsibility to try and whack the
- // consumer group zk node.
- if (kafkaParams.contains("auto.offset.reset")) {
- tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id"))
- }
-
- val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
- .newInstance(consumerConfig.props)
- .asInstanceOf[Decoder[K]]
- val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
- .newInstance(consumerConfig.props)
- .asInstanceOf[Decoder[V]]
-
- // Create Threads for each Topic/Message Stream we are listening
- val topicMessageStreams = consumerConnector.createMessageStreams(
- topics, keyDecoder, valueDecoder)
-
-
- // Start the messages handler for each partition
- topicMessageStreams.values.foreach { streams =>
- streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
- }
- }
-
- // Handles Kafka Messages
- private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
- extends Runnable {
- def run() {
- logInfo("Starting MessageHandler.")
- for (msgAndMetadata <- stream) {
- blockGenerator += (msgAndMetadata.key, msgAndMetadata.message)
- }
- }
- }
-
- // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because
- // Kafka 0.7.2 only honors this param when the group is not in zookeeper.
- //
- // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas'
- // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest':
- // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
- private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
- try {
- val dir = "/consumers/" + groupId
- logInfo("Cleaning up temporary zookeeper data under " + dir + ".")
- val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
- zk.deleteRecursive(dir)
- zk.close()
- } catch {
- case _ : Throwable => // swallow
- }
- }
-}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
deleted file mode 100644
index ef4a737568..0000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext }
-
-import java.util.Properties
-import java.util.concurrent.Executors
-import java.io.IOException
-
-import org.eclipse.paho.client.mqttv3.MqttCallback
-import org.eclipse.paho.client.mqttv3.MqttClient
-import org.eclipse.paho.client.mqttv3.MqttClientPersistence
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
-import org.eclipse.paho.client.mqttv3.MqttException
-import org.eclipse.paho.client.mqttv3.MqttMessage
-import org.eclipse.paho.client.mqttv3.MqttTopic
-
-import scala.collection.Map
-import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-
-/**
- * Input stream that subscribe messages from a Mqtt Broker.
- * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
- * @param brokerUrl Url of remote mqtt publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level.
- */
-
-private[streaming]
-class MQTTInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ) extends NetworkInputDStream[T](ssc_) with Logging {
-
- def getReceiver(): NetworkReceiver[T] = {
- new MQTTReceiver(brokerUrl, topic, storageLevel)
- .asInstanceOf[NetworkReceiver[T]]
- }
-}
-
-private[streaming]
-class MQTTReceiver(brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ) extends NetworkReceiver[Any] {
- lazy protected val blockGenerator = new BlockGenerator(storageLevel)
-
- def onStop() {
- blockGenerator.stop()
- }
-
- def onStart() {
-
- blockGenerator.start()
-
- // Set up persistence for messages
- var peristance: MqttClientPersistence = new MemoryPersistence()
-
- // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
- var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance)
-
- // Connect to MqttBroker
- client.connect()
-
- // Subscribe to Mqtt topic
- client.subscribe(topic)
-
- // Callback automatically triggers as and when new message arrives on specified topic
- var callback: MqttCallback = new MqttCallback() {
-
- // Handles Mqtt message
- override def messageArrived(arg0: String, arg1: MqttMessage) {
- blockGenerator += new String(arg1.getPayload())
- }
-
- override def deliveryComplete(arg0: IMqttDeliveryToken) {
- }
-
- override def connectionLost(arg0: Throwable) {
- logInfo("Connection lost " + arg0)
- }
- }
-
- // Set up callback for MqttClient
- client.setCallback(callback)
- }
-}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
deleted file mode 100644
index f164d516b0..0000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.receivers
-
-import scala.reflect.ClassTag
-
-import akka.actor.Actor
-import akka.util.ByteString
-import akka.zeromq._
-
-import org.apache.spark.Logging
-
-/**
- * A receiver to subscribe to ZeroMQ stream.
- */
-private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] ⇒ Iterator[T])
- extends Actor with Receiver with Logging {
-
- override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
- Connect(publisherUrl), subscribe)
-
- def receive: Receive = {
-
- case Connecting ⇒ logInfo("connecting ...")
-
- case m: ZMQMessage ⇒
- logDebug("Received message for:" + m.frame(0))
-
- //We ignore first frame for processing as it is the topic
- val bytes = m.frames.tail
- pushBlock(bytesToObjects(bytes))
-
- case Closed ⇒ logInfo("received closed ")
-
- }
-}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 1cd0b9b0a4..2734393ae9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -33,6 +33,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
val ssc = jobScheduler.ssc
val clockClass = System.getProperty(
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
+ logInfo("Using clock class = " + clockClass)
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => generateJobs(new Time(longTime)))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index abff55d77c..4a8e15db21 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -160,7 +160,10 @@ class NetworkInputTracker(
}
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
- ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
+ if (!ssc.sparkContext.isLocal) {
+ ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
+ }
+
// Distribute the receivers and start them
ssc.sparkContext.runJob(tempRDD, startReceiver)
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index daeb99f5b7..f4d26c0be6 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -17,22 +17,16 @@
package org.apache.spark.streaming;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
-
-import kafka.serializer.StringDecoder;
+import scala.Tuple2;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.spark.streaming.api.java.JavaDStreamLike;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
+import java.io.*;
+import java.util.*;
-import scala.Tuple2;
-import twitter4j.Status;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
@@ -43,39 +37,11 @@ import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.dstream.SparkFlumeEvent;
-import org.apache.spark.streaming.JavaTestUtils;
-import org.apache.spark.streaming.JavaCheckpointTestUtils;
-
-import java.io.*;
-import java.util.*;
-
-import akka.actor.Props;
-import akka.zeromq.Subscribe;
-
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
// see http://stackoverflow.com/questions/758570/.
-public class JavaAPISuite implements Serializable {
- private transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
-
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.driver.port");
- }
-
+public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
@Test
public void testCount() {
List<List<Integer>> inputData = Arrays.asList(
@@ -1597,26 +1563,6 @@ public class JavaAPISuite implements Serializable {
// Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
// InputStream functionality is deferred to the existing Scala tests.
@Test
- public void testKafkaStream() {
- HashMap<String, Integer> topics = Maps.newHashMap();
- JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics);
- JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics,
- StorageLevel.MEMORY_AND_DISK());
-
- HashMap<String, String> kafkaParams = Maps.newHashMap();
- kafkaParams.put("zookeeper.connect","localhost:12345");
- kafkaParams.put("group.id","consumer-group");
- JavaPairDStream<String, String> test3 = ssc.kafkaStream(
- String.class,
- String.class,
- StringDecoder.class,
- StringDecoder.class,
- kafkaParams,
- topics,
- StorageLevel.MEMORY_AND_DISK());
- }
-
- @Test
public void testSocketTextStream() {
JavaDStream<String> test = ssc.socketTextStream("localhost", 12345);
}
@@ -1654,16 +1600,10 @@ public class JavaAPISuite implements Serializable {
public void testRawSocketStream() {
JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345);
}
-
- @Test
- public void testFlumeStream() {
- JavaDStream<SparkFlumeEvent> test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
- }
-
+ /*
@Test
public void testFileStream() {
- JavaPairDStream<String, String> foo =
- ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo");
+ JavaPairDStream<String, String> foo = ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo");
}
@Test
@@ -1685,5 +1625,5 @@ public class JavaAPISuite implements Serializable {
return null;
}
});
- }
+ } */
}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000000..34bee56885
--- /dev/null
+++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+
+ protected transient JavaStreamingContext ssc;
+
+ @Before
+ public void setUp() {
+ System.clearProperty("spark.driver.port");
+ System.clearProperty("spark.hostPort");
+ System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ ssc.checkpoint("checkpoint");
+ }
+
+ @After
+ public void tearDown() {
+ ssc.stop();
+ ssc = null;
+
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port");
+ System.clearProperty("spark.hostPort");
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 62a9f120b4..0cffed64a7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -23,7 +23,7 @@ import akka.actor.IOManager
import akka.actor.Props
import akka.util.ByteString
-import org.apache.spark.streaming.dstream.{NetworkReceiver, SparkFlumeEvent}
+import org.apache.spark.streaming.dstream.{NetworkReceiver}
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
@@ -31,18 +31,11 @@ import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import util.ManualClock
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receivers.Receiver
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.Logging
import scala.util.Random
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
-import org.apache.flume.source.avro.AvroSourceProtocol
-import org.apache.flume.source.avro.AvroFlumeEvent
-import org.apache.flume.source.avro.Status
-import org.apache.avro.ipc.{specific, NettyTransceiver}
-import org.apache.avro.ipc.specific.SpecificRequestor
-import java.nio.ByteBuffer
import collection.JavaConversions._
-import java.nio.charset.Charset
import com.google.common.io.Files
import java.util.concurrent.atomic.AtomicInteger
@@ -99,55 +92,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
- test("flume input stream") {
- // Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework, batchDuration)
- val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
- val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
- with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputBuffer)
- ssc.registerOutputStream(outputStream)
- ssc.start()
-
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- val input = Seq(1, 2, 3, 4, 5)
- Thread.sleep(1000)
- val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
- val client = SpecificRequestor.getClient(
- classOf[AvroSourceProtocol], transceiver)
-
- for (i <- 0 until input.size) {
- val event = new AvroFlumeEvent
- event.setBody(ByteBuffer.wrap(input(i).toString.getBytes()))
- event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
- client.append(event)
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- }
-
- val startTime = System.currentTimeMillis()
- while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
- logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
- Thread.sleep(100)
- }
- Thread.sleep(1000)
- val timeTaken = System.currentTimeMillis() - startTime
- assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
- logInfo("Stopping context")
- ssc.stop()
-
- val decoder = Charset.forName("UTF-8").newDecoder()
-
- assert(outputBuffer.size === input.length)
- for (i <- 0 until outputBuffer.size) {
- assert(outputBuffer(i).size === 1)
- val str = decoder.decode(outputBuffer(i).head.event.getBody)
- assert(str.toString === input(i).toString)
- assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
- }
- }
-
-
test("file input stream") {
// Disable manual clock as FileInputDStream does not work with manual clock
System.clearProperty("spark.streaming.clock")
@@ -249,21 +193,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
}
- test("kafka input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
- val topics = Map("my-topic" -> 1)
- val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
- val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK)
-
- // Test specifying decoder
- val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
- val test3 = ssc.kafkaStream[
- String,
- String,
- kafka.serializer.StringDecoder,
- kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK)
- }
-
test("multi-thread receiver") {
// set up the test receiver
val numThreads = 10
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index e969e91d13..f56c0462f4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -137,11 +137,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// if you want to add your stuff to "before" (i.e., don't call before { } )
def beforeFunction() {
if (useManualClock) {
- System.setProperty(
- "spark.streaming.clock",
- "org.apache.spark.streaming.util.ManualClock"
- )
+ logInfo("Using manual clock")
+ System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
} else {
+ logInfo("Using real clock")
System.clearProperty("spark.streaming.clock")
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
@@ -273,7 +272,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val startTime = System.currentTimeMillis()
while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
- Thread.sleep(100)
+ Thread.sleep(10)
}
val timeTaken = System.currentTimeMillis() - startTime