diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-11-21 11:55:48 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-11-21 11:55:48 +0530 |
commit | 199e9cf02dfaa372c1f067bca54556e1f6ce787d (patch) | |
tree | 7ea8ac8abb51f3e3cb918b7ba147cb4d909c5f99 /streaming/src | |
parent | 6860b79f6e4cc0d38b08848f19127c259d9b5069 (diff) | |
parent | f6b2e590b1ef35611f68c3ff7eb5c632d31a0dcc (diff) | |
download | spark-199e9cf02dfaa372c1f067bca54556e1f6ce787d.tar.gz spark-199e9cf02dfaa372c1f067bca54556e1f6ce787d.tar.bz2 spark-199e9cf02dfaa372c1f067bca54556e1f6ce787d.zip |
Merge branch 'scala210-master' of github.com:colorant/incubator-spark into scala-2.10
Conflicts:
core/src/main/scala/org/apache/spark/deploy/client/Client.scala
core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Diffstat (limited to 'streaming/src')
22 files changed, 1454 insertions, 257 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 2d8f072624..bb9febad38 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.Logging import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.MetadataCleaner private[streaming] @@ -40,6 +41,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.jobManager.getPendingTimes() + val delaySeconds = MetadataCleaner.getDelaySeconds def validate() { assert(master != null, "Checkpoint.master is null") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index cd404fd408..329d2b5835 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -38,7 +38,7 @@ import org.apache.hadoop.conf.Configuration /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous - * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]] + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]] * for more details on RDDs). DStreams can either be created from live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each @@ -439,6 +439,13 @@ abstract class DStream[T: ClassTag] ( */ def glom(): DStream[Array[T]] = new GlommedDStream(this) + + /** + * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + * returned DStream has exactly numPartitions partitions. + */ + def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions)) + /** * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition @@ -480,7 +487,7 @@ abstract class DStream[T: ClassTag] ( /** * Apply a function to each RDD in this DStream. This is an output operator, so - * this DStream will be registered as an output stream and therefore materialized. + * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: RDD[T] => Unit) { this.foreach((r: RDD[T], t: Time) => foreachFunc(r)) @@ -488,7 +495,7 @@ abstract class DStream[T: ClassTag] ( /** * Apply a function to each RDD in this DStream. This is an output operator, so - * this DStream will be registered as an output stream and therefore materialized. + * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: (RDD[T], Time) => Unit) { ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc))) @@ -496,18 +503,52 @@ abstract class DStream[T: ClassTag] ( /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = { - transform((r: RDD[T], t: Time) => transformFunc(r)) + transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) } /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { - new TransformedDStream(this, context.sparkContext.clean(transformFunc)) + //new TransformedDStream(this, context.sparkContext.clean(transformFunc)) + val cleanedF = context.sparkContext.clean(transformFunc) + val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + assert(rdds.length == 1) + cleanedF(rdds.head.asInstanceOf[RDD[T]], time) + } + new TransformedDStream[U](Seq(this), realTransformFunc) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream and 'other' DStream. + */ + def transformWith[U: ClassTag, V: ClassTag]( + other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] + ): DStream[V] = { + val cleanedF = ssc.sparkContext.clean(transformFunc) + transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream and 'other' DStream. + */ + def transformWith[U: ClassTag, V: ClassTag]( + other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] + ): DStream[V] = { + val cleanedF = ssc.sparkContext.clean(transformFunc) + val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + assert(rdds.length == 2) + val rdd1 = rdds(0).asInstanceOf[RDD[T]] + val rdd2 = rdds(1).asInstanceOf[RDD[U]] + cleanedF(rdd1, rdd2, time) + } + new TransformedDStream[V](Seq(this, other), realTransformFunc) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala index b761646dff..6e9a781978 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala @@ -25,14 +25,16 @@ import org.apache.spark.SparkContext._ import scala.collection.mutable.HashMap import scala.collection.mutable.Queue +import scala.concurrent.duration._ import akka.actor._ import akka.pattern.ask -import scala.concurrent.duration._ +import akka.dispatch._ +import org.apache.spark.storage.BlockId private[streaming] sealed trait NetworkInputTrackerMessage private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage -private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage +private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) extends NetworkInputTrackerMessage private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage /** @@ -47,7 +49,7 @@ class NetworkInputTracker( val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*) val receiverExecutor = new ReceiverExecutor() val receiverInfo = new HashMap[Int, ActorRef] - val receivedBlockIds = new HashMap[Int, Queue[String]] + val receivedBlockIds = new HashMap[Int, Queue[BlockId]] val timeout = 5000.milliseconds var currentTime: Time = null @@ -66,9 +68,9 @@ class NetworkInputTracker( } /** Return all the blocks received from a receiver. */ - def getBlockIds(receiverId: Int, time: Time): Array[String] = synchronized { + def getBlockIds(receiverId: Int, time: Time): Array[BlockId] = synchronized { val queue = receivedBlockIds.synchronized { - receivedBlockIds.getOrElse(receiverId, new Queue[String]()) + receivedBlockIds.getOrElse(receiverId, new Queue[BlockId]()) } val result = queue.synchronized { queue.dequeueAll(x => true) @@ -91,7 +93,7 @@ class NetworkInputTracker( case AddBlocks(streamId, blockIds, metadata) => { val tmp = receivedBlockIds.synchronized { if (!receivedBlockIds.contains(streamId)) { - receivedBlockIds += ((streamId, new Queue[String])) + receivedBlockIds += ((streamId, new Queue[BlockId])) } receivedBlockIds(streamId) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index f021e29619..80af96c060 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -18,9 +18,7 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.dstream.{ReducedWindowedDStream, StateDStream} -import org.apache.spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream} -import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream} +import org.apache.spark.streaming.dstream._ import org.apache.spark.{Partitioner, HashPartitioner} import org.apache.spark.SparkContext._ @@ -360,7 +358,7 @@ extends Serializable { } /** - * Create a new "state" DStream where the state for each key is updated by applying + * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then @@ -399,11 +397,18 @@ extends Serializable { new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner) } - + /** + * Return a new DStream by applying a map function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = { new MapValuedDStream[K, V, U](self, mapValuesFunc) } + /** + * Return a new DStream by applying a flatmap function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ def flatMapValues[U: ClassTag]( flatMapValuesFunc: V => TraversableOnce[U] ): DStream[(K, U)] = { @@ -411,9 +416,8 @@ extends Serializable { } /** - * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` - * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that - * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number * of partitions. */ def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { @@ -421,56 +425,132 @@ extends Serializable { } /** - * Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this` - * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that - * key in both RDDs. Partitioner is used to partition each generated RDD. + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = { + cogroup(other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs. */ def cogroup[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (Seq[V], Seq[W]))] = { - - val cgd = new CoGroupedDStream[K]( - Seq(self.asInstanceOf[DStream[(K, _)]], other.asInstanceOf[DStream[(K, _)]]), - partitioner - ) - val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)( - classTag[K], - ClassTags.seqSeqClassTag + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner) ) - pdfs.mapValues { - case Seq(vs, ws) => - (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) - } } /** - * Join `this` DStream with `other` DStream. HashPartitioner is used - * to partition each generated RDD into default number of partitions. + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = { join[W](other, defaultPartitioner()) } /** - * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will - * be generated by joining RDDs from `this` and other DStream. Uses the given - * Partitioner to partition each generated RDD. + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = { + join[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ def join[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (V, W))] = { - this.cogroup(other, partitioner) - .flatMapValues{ - case (vs, ws) => - for (v <- vs.iterator; w <- ws.iterator) yield (v, w) - } + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner) + ) + } + + /** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = { + leftOuterJoin[W](other, defaultPartitioner()) + } + + /** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def leftOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + numPartitions: Int + ): DStream[(K, (V, Option[W]))] = { + leftOuterJoin[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. + */ + def leftOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (V, Option[W]))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner) + ) + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = { + rightOuterJoin[W](other, defaultPartitioner()) + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def rightOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + numPartitions: Int + ): DStream[(K, (Option[V], W))] = { + rightOuterJoin[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. + */ + def rightOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (Option[V], W))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner) + ) } /** - * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated - * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval + * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" */ def saveAsHadoopFiles[F <: OutputFormat[K, V]]( prefix: String, @@ -480,8 +560,8 @@ extends Serializable { } /** - * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated - * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval + * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" */ def saveAsHadoopFiles( prefix: String, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index c722aa15ab..d2c4fdee65 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -102,6 +102,10 @@ class StreamingContext private ( "both SparkContext and checkpoint as null") } + if(cp_ != null && cp_.delaySeconds >= 0 && MetadataCleaner.getDelaySeconds < 0) { + MetadataCleaner.setDelaySeconds(cp_.delaySeconds) + } + if (MetadataCleaner.getDelaySeconds < 0) { throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; " + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)") @@ -254,10 +258,14 @@ class StreamingContext private ( groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 - ): DStream[String] = { + ): DStream[(String, String)] = { val kafkaParams = Map[String, String]( - "zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000") - kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, storageLevel) + "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, + "zookeeper.connection.timeout.ms" -> "10000") + kafkaStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder]( + kafkaParams, + topics, + storageLevel) } /** @@ -268,12 +276,16 @@ class StreamingContext private ( * in its own thread. * @param storageLevel Storage level to use for storing the received objects */ - def kafkaStream[T: ClassTag, D <: kafka.serializer.Decoder[_]: Manifest]( + def kafkaStream[ + K: ClassTag, + V: ClassTag, + U <: kafka.serializer.Decoder[_]: Manifest, + T <: kafka.serializer.Decoder[_]: Manifest]( kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel - ): DStream[T] = { - val inputStream = new KafkaInputDStream[T, D](this, kafkaParams, topics, storageLevel) + ): DStream[(K, V)] = { + val inputStream = new KafkaInputDStream[K, V, U, T](this, kafkaParams, topics, storageLevel) registerInputStream(inputStream) inputStream } @@ -452,14 +464,40 @@ class StreamingContext private ( inputStream } +/** + * Create an input stream that receives messages pushed by a mqtt publisher. + * @param brokerUrl Url of remote mqtt publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + + def mqttStream( + brokerUrl: String, + topic: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = { + val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel) + registerInputStream(inputStream) + inputStream + } /** - * Create a unified DStream from multiple DStreams of the same type and same interval + * Create a unified DStream from multiple DStreams of the same type and same slide duration. */ def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = { new UnionDStream[T](streams.toArray) } /** + * Create a new DStream in which each RDD is generated by applying a function on RDDs of + * the DStreams. + */ + def transform[T: ClassTag]( + dstreams: Seq[DStream[_]], + transformFunc: (Seq[RDD[_]], Time) => RDD[T] + ): DStream[T] = { + new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc)) + } + + /** * Register an input stream that will be started (InputDStream.start() called) to get the * input data. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala index 0d54d78ed3..d29033df32 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala @@ -27,7 +27,7 @@ import scala.reflect.ClassTag /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous - * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]] + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]] * for more details on RDDs). DStreams can either be created from live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each @@ -96,6 +96,12 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T] */ def union(that: JavaDStream[T]): JavaDStream[T] = dstream.union(that.dstream) + + /** + * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + * returned DStream has exactly numPartitions partitions. + */ + def repartition(numPartitions: Int): JavaDStream[T] = dstream.repartition(numPartitions) } object JavaDStream { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 4508e48590..64f38ce1c0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -25,7 +25,8 @@ import scala.reflect.ClassTag import org.apache.spark.streaming._ import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD} -import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} +import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.spark.api.java.function.{Function3 => JFunction3, _} import java.util import org.apache.spark.rdd.RDD import JavaDStream._ @@ -121,10 +122,12 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * this DStream. Applying glom() to an RDD coalesces all elements within each partition into * an array. */ - def glom(): JavaDStream[JList[T]] = + def glom(): JavaDStream[JList[T]] = { new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) + } + - /** Return the StreamingContext associated with this DStream */ + /** Return the [[org.apache.spark.streaming.StreamingContext]] associated with this DStream */ def context(): StreamingContext = dstream.context() /** Return a new DStream by applying a function to all elements of this DStream. */ @@ -239,7 +242,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Apply a function to each RDD in this DStream. This is an output operator, so - * this DStream will be registered as an output stream and therefore materialized. + * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: JFunction[R, Void]) { dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd))) @@ -247,7 +250,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Apply a function to each RDD in this DStream. This is an output operator, so - * this DStream will be registered as an output stream and therefore materialized. + * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: JFunction2[R, Time, Void]) { dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time)) @@ -255,7 +258,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = { implicit val cm: ClassTag[U] = @@ -267,7 +270,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = { implicit val cm: ClassTag[U] = @@ -279,7 +282,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]): JavaPairDStream[K2, V2] = { @@ -294,7 +297,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]): JavaPairDStream[K2, V2] = { @@ -308,6 +311,82 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T } /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream and 'other' DStream. + */ + def transformWith[U, W]( + other: JavaDStream[U], + transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]] + ): JavaDStream[W] = { + implicit val cmu: ClassTag[U] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] + implicit val cmv: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] = + transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd + dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream and 'other' DStream. + */ + def transformWith[U, K2, V2]( + other: JavaDStream[U], + transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]] + ): JavaPairDStream[K2, V2] = { + implicit val cmu: ClassTag[U] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] + implicit val cmk2: ClassTag[K2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] + implicit val cmv2: ClassTag[V2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] + def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] = + transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd + dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream and 'other' DStream. + */ + def transformWith[K2, V2, W]( + other: JavaPairDStream[K2, V2], + transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]] + ): JavaDStream[W] = { + implicit val cmk2: ClassTag[K2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] + implicit val cmv2: ClassTag[V2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] + implicit val cmw: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] = + transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd + dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream and 'other' DStream. + */ + def transformWith[K2, V2, K3, V3]( + other: JavaPairDStream[K2, V2], + transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]] + ): JavaPairDStream[K3, V3] = { + implicit val cmk2: ClassTag[K2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] + implicit val cmv2: ClassTag[V2] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] + implicit val cmk3: ClassTag[K3] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K3]] + implicit val cmv3: ClassTag[V3] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V3]] + def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] = + transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd + dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _)) + } + + /** * Enable periodic checkpointing of RDDs of this DStream * @param interval Time interval after which generated RDD will be checkpointed */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index c80545b530..dfd6e27c3e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -25,7 +25,7 @@ import scala.reflect.ClassTag import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3} import org.apache.spark.Partitioner import org.apache.hadoop.mapred.{JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} @@ -37,8 +37,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PairRDDFunctions class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( - implicit val kTag: ClassTag[K], - implicit val vTag: ClassTag[V]) + implicit val kManifest: ClassTag[K], + implicit val vManifest: ClassTag[V]) extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] { override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd) @@ -60,6 +60,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** Persist the RDDs of this DStream with the given storage level */ def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel) + /** + * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + * returned DStream has exactly numPartitions partitions. + */ + def repartition(numPartitions: Int): JavaPairDStream[K, V] = dstream.repartition(numPartitions) + /** Method that generates a RDD for the given Duration */ def compute(validTime: Time): JavaPairRDD[K, V] = { dstream.compute(validTime) match { @@ -149,7 +155,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the - * combineByKey for RDDs. Please refer to combineByKey in [[PairRDDFunctions]] for more + * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more * information. */ def combineByKey[C](createCombiner: JFunction[V, C], @@ -414,7 +420,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Create a new "state" DStream where the state for each key is updated by applying + * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. * @param updateFunc State update function. If `this` function returns None, then @@ -429,7 +435,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Create a new "state" DStream where the state for each key is updated by applying + * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param updateFunc State update function. If `this` function returns None, then @@ -437,15 +443,17 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param numPartitions Number of partitions of each RDD in the new DStream. * @tparam S State type */ - def updateStateByKey[S: ClassTag]( + def updateStateByKey[S]( updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], numPartitions: Int) : JavaPairDStream[K, S] = { + implicit val cm: ClassTag[S] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]] dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions) } /** - * Create a new "state" DStream where the state for each key is updated by applying + * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then @@ -453,19 +461,30 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. * @tparam S State type */ - def updateStateByKey[S: ClassTag]( + def updateStateByKey[S]( updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], partitioner: Partitioner ): JavaPairDStream[K, S] = { + implicit val cm: ClassTag[S] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]] dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner) } + + /** + * Return a new DStream by applying a map function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { implicit val cm: ClassTag[U] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] dstream.mapValues(f) } + /** + * Return a new DStream by applying a flatmap function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = { import scala.collection.JavaConverters._ def fn = (x: V) => f.apply(x).asScala @@ -475,9 +494,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` - * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that - * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number * of partitions. */ def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = { @@ -487,21 +505,36 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` - * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that - * key in both RDDs. Partitioner is used to partition each generated RDD. + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ - def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner) - : JavaPairDStream[K, (JList[V], JList[W])] = { + def cogroup[W]( + other: JavaPairDStream[K, W], + numPartitions: Int + ): JavaPairDStream[K, (JList[V], JList[W])] = { + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + dstream.cogroup(other.dstream, numPartitions) + .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + } + + /** + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def cogroup[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (JList[V], JList[W])] = { implicit val cm: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.cogroup(other.dstream, partitioner) - .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } /** - * Join `this` DStream with `other` DStream. HashPartitioner is used - * to partition each generated RDD into default number of partitions. + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = { implicit val cm: ClassTag[W] = @@ -510,18 +543,112 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will - * be generated by joining RDDs from `this` and other DStream. Uses the given - * Partitioner to partition each generated RDD. + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = { + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + dstream.join(other.dstream, numPartitions) + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ - def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner) - : JavaPairDStream[K, (V, W)] = { + def join[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (V, W)] = { implicit val cm: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.join(other.dstream, partitioner) } /** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = { + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + val joinResult = dstream.leftOuterJoin(other.dstream) + joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} + } + + /** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def leftOuterJoin[W]( + other: JavaPairDStream[K, W], + numPartitions: Int + ): JavaPairDStream[K, (V, Optional[W])] = { + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions) + joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + */ + def leftOuterJoin[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (V, Optional[W])] = { + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + val joinResult = dstream.leftOuterJoin(other.dstream, partitioner) + joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = { + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + val joinResult = dstream.rightOuterJoin(other.dstream) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def rightOuterJoin[W]( + other: JavaPairDStream[K, W], + numPartitions: Int + ): JavaPairDStream[K, (Optional[V], W)] = { + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. + */ + def rightOuterJoin[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (Optional[V], W)] = { + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + val joinResult = dstream.rightOuterJoin(other.dstream, partitioner) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} + } + + /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ @@ -591,14 +718,19 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } + /** Convert to a JavaDStream */ + def toJavaDStream(): JavaDStream[(K, V)] = { + new JavaDStream[(K, V)](dstream) + } + override val classTag: ClassTag[(K, V)] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]] } object JavaPairDStream { - implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) - :JavaPairDStream[K, V] = + implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) = { new JavaPairDStream[K, V](dstream) + } def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = { implicit val cmk: ClassTag[K] = diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 8242af6d5f..ca0c905932 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.api.java import java.lang.{Long => JLong, Integer => JInt} import java.io.InputStream -import java.util.{Map => JMap} +import java.util.{Map => JMap, List => JList} import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -36,7 +36,7 @@ import twitter4j.auth.Authorization import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} -import org.apache.spark.api.java.{JavaSparkContext, JavaRDD} +import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaSparkContext, JavaRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receivers.{ActorReceiver, ReceiverSupervisorStrategy} @@ -144,7 +144,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { zkQuorum: String, groupId: String, topics: JMap[String, JInt]) - : JavaDStream[String] = { + : JavaPairDStream[String, String] = { implicit val cmt: ClassTag[String] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), @@ -166,7 +166,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { groupId: String, topics: JMap[String, JInt], storageLevel: StorageLevel) - : JavaDStream[String] = { + : JavaPairDStream[String, String] = { implicit val cmt: ClassTag[String] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), @@ -175,25 +175,34 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create an input stream that pulls messages form a Kafka Broker. - * @param typeClass Type of RDD - * @param decoderClass Type of kafka decoder + * @param keyTypeClass Key type of RDD + * @param valueTypeClass value type of RDD + * @param keyDecoderClass Type of kafka key decoder + * @param valueDecoderClass Type of kafka value decoder * @param kafkaParams Map of kafka configuration paramaters. * See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. * @param storageLevel RDD storage level. Defaults to memory-only */ - def kafkaStream[T, D <: kafka.serializer.Decoder[_]]( - typeClass: Class[T], - decoderClass: Class[D], + def kafkaStream[K, V, U <: kafka.serializer.Decoder[_], T <: kafka.serializer.Decoder[_]]( + keyTypeClass: Class[K], + valueTypeClass: Class[V], + keyDecoderClass: Class[U], + valueDecoderClass: Class[T], kafkaParams: JMap[String, String], topics: JMap[String, JInt], storageLevel: StorageLevel) - : JavaDStream[T] = { - implicit val cmt: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - implicit val cmd: Manifest[D] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[D]] - ssc.kafkaStream[T, D]( + : JavaPairDStream[K, V] = { + implicit val keyCmt: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val valueCmt: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + + implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] + implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] + + ssc.kafkaStream[K, V, U, T]( kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) @@ -589,6 +598,77 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** + * Create a unified DStream from multiple DStreams of the same type and same slide duration. + */ + def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = { + val dstreams: Seq[DStream[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream) + implicit val cm: ClassTag[T] = first.classTag + ssc.union(dstreams)(cm) + } + + /** + * Create a unified DStream from multiple DStreams of the same type and same slide duration. + */ + def union[K, V]( + first: JavaPairDStream[K, V], + rest: JList[JavaPairDStream[K, V]] + ): JavaPairDStream[K, V] = { + val dstreams: Seq[DStream[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream) + implicit val cm: ClassTag[(K, V)] = first.classTag + implicit val kcm: ClassTag[K] = first.kManifest + implicit val vcm: ClassTag[V] = first.vManifest + new JavaPairDStream[K, V](ssc.union(dstreams)(cm))(kcm, vcm) + } + + /** + * Create a new DStream in which each RDD is generated by applying a function on RDDs of + * the DStreams. The order of the JavaRDDs in the transform function parameter will be the + * same as the order of corresponding DStreams in the list. Note that for adding a + * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using + * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream(). + * In the transform function, convert the JavaRDD corresponding to that JavaDStream to + * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD(). + */ + def transform[T]( + dstreams: JList[JavaDStream[_]], + transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaRDD[T]] + ): JavaDStream[T] = { + implicit val cmt: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + val scalaDStreams = dstreams.map(_.dstream).toSeq + val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList + transformFunc.call(jrdds, time).rdd + } + ssc.transform(scalaDStreams, scalaTransformFunc) + } + + /** + * Create a new DStream in which each RDD is generated by applying a function on RDDs of + * the DStreams. The order of the JavaRDDs in the transform function parameter will be the + * same as the order of corresponding DStreams in the list. Note that for adding a + * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using + * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream(). + * In the transform function, convert the JavaRDD corresponding to that JavaDStream to + * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD(). + */ + def transform[K, V]( + dstreams: JList[JavaDStream[_]], + transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]] + ): JavaPairDStream[K, V] = { + implicit val cmk: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val cmv: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + val scalaDStreams = dstreams.map(_.dstream).toSeq + val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList + transformFunc.call(jrdds, time).rdd + } + ssc.transform(scalaDStreams, scalaTransformFunc) + } + + /** * Sets the context to periodically checkpoint the DStream operations for master * fault-tolerance. The graph will be checkpointed every batch interval. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala deleted file mode 100644 index 16c1567355..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import org.apache.spark.Partitioner -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.CoGroupedRDD -import org.apache.spark.streaming.{Time, DStream, Duration} -import scala.reflect.ClassTag - -private[streaming] -class CoGroupedDStream[K : ClassTag]( - parents: Seq[DStream[(K, _)]], - partitioner: Partitioner - ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) { - - if (parents.length == 0) { - throw new IllegalArgumentException("Empty array of parents") - } - - if (parents.map(_.ssc).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different StreamingContexts") - } - - if (parents.map(_.slideDuration).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different slide times") - } - - override def dependencies = parents.toList - - override def slideDuration: Duration = parents.head.slideDuration - - override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = { - val part = partitioner - val rdds = parents.flatMap(_.getOrCompute(validTime)) - if (rdds.size > 0) { - val q = new CoGroupedRDD[K](rdds, part) - Some(q) - } else { - None - } - } - -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala index 96134868cc..526f5564c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala @@ -19,22 +19,18 @@ package org.apache.spark.streaming.dstream import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Time, DStreamCheckpointData, StreamingContext} +import org.apache.spark.streaming.StreamingContext import java.util.Properties import java.util.concurrent.Executors import kafka.consumer._ -import kafka.message.{Message, MessageSet, MessageAndMetadata} import kafka.serializer.Decoder -import kafka.utils.{Utils, ZKGroupTopicDirs} -import kafka.utils.ZkUtils._ +import kafka.utils.VerifiableProperties import kafka.utils.ZKStringSerializer import org.I0Itec.zkclient._ import scala.collection.Map -import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ import scala.reflect.ClassTag /** @@ -46,25 +42,32 @@ import scala.reflect.ClassTag * @param storageLevel RDD storage level. */ private[streaming] -class KafkaInputDStream[T: ClassTag, D <: Decoder[_]: Manifest]( +class KafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U <: Decoder[_]: Manifest, + T <: Decoder[_]: Manifest]( @transient ssc_ : StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_ ) with Logging { + ) extends NetworkInputDStream[(K, V)](ssc_) with Logging { - - def getReceiver(): NetworkReceiver[T] = { - new KafkaReceiver[T, D](kafkaParams, topics, storageLevel) - .asInstanceOf[NetworkReceiver[T]] + def getReceiver(): NetworkReceiver[(K, V)] = { + new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) + .asInstanceOf[NetworkReceiver[(K, V)]] } } private[streaming] -class KafkaReceiver[T: ClassTag, D <: Decoder[_]: Manifest]( - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel +class KafkaReceiver[ + K: ClassTag, + V: ClassTag, + U <: Decoder[_]: Manifest, + T <: Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel ) extends NetworkReceiver[Any] { // Handles pushing data into the BlockManager @@ -83,27 +86,35 @@ class KafkaReceiver[T: ClassTag, D <: Decoder[_]: Manifest]( // In case we are using multiple Threads to handle Kafka Messages val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) - logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("groupid")) + logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) // Kafka connection properties val props = new Properties() kafkaParams.foreach(param => props.put(param._1, param._2)) // Create the connection to the cluster - logInfo("Connecting to Zookeper: " + kafkaParams("zk.connect")) + logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect")) val consumerConfig = new ConsumerConfig(props) consumerConnector = Consumer.create(consumerConfig) - logInfo("Connected to " + kafkaParams("zk.connect")) + logInfo("Connected to " + kafkaParams("zookeeper.connect")) // When autooffset.reset is defined, it is our responsibility to try and whack the // consumer group zk node. - if (kafkaParams.contains("autooffset.reset")) { - tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid")) + if (kafkaParams.contains("auto.offset.reset")) { + tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id")) } + val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[K]] + val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[V]] + // Create Threads for each Topic/Message Stream we are listening - val decoder = manifest[D].runtimeClass.newInstance.asInstanceOf[Decoder[T]] - val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder) + val topicMessageStreams = consumerConnector.createMessageStreams( + topics, keyDecoder, valueDecoder) + // Start the messages handler for each partition topicMessageStreams.values.foreach { streams => @@ -112,11 +123,12 @@ class KafkaReceiver[T: ClassTag, D <: Decoder[_]: Manifest]( } // Handles Kafka Messages - private class MessageHandler[T: ClassTag](stream: KafkaStream[T]) extends Runnable { + private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V]) + extends Runnable { def run() { logInfo("Starting MessageHandler.") for (msgAndMetadata <- stream) { - blockGenerator += msgAndMetadata.message + blockGenerator += (msgAndMetadata.key, msgAndMetadata.message) } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala new file mode 100644 index 0000000000..ef4a737568 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext } + +import java.util.Properties +import java.util.concurrent.Executors +import java.io.IOException + +import org.eclipse.paho.client.mqttv3.MqttCallback +import org.eclipse.paho.client.mqttv3.MqttClient +import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken +import org.eclipse.paho.client.mqttv3.MqttException +import org.eclipse.paho.client.mqttv3.MqttMessage +import org.eclipse.paho.client.mqttv3.MqttTopic + +import scala.collection.Map +import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag + +/** + * Input stream that subscribe messages from a Mqtt Broker. + * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ + * @param brokerUrl Url of remote mqtt publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. + */ + +private[streaming] +class MQTTInputDStream[T: ClassTag]( + @transient ssc_ : StreamingContext, + brokerUrl: String, + topic: String, + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_) with Logging { + + def getReceiver(): NetworkReceiver[T] = { + new MQTTReceiver(brokerUrl, topic, storageLevel) + .asInstanceOf[NetworkReceiver[T]] + } +} + +private[streaming] +class MQTTReceiver(brokerUrl: String, + topic: String, + storageLevel: StorageLevel + ) extends NetworkReceiver[Any] { + lazy protected val blockGenerator = new BlockGenerator(storageLevel) + + def onStop() { + blockGenerator.stop() + } + + def onStart() { + + blockGenerator.start() + + // Set up persistence for messages + var peristance: MqttClientPersistence = new MemoryPersistence() + + // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance + var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance) + + // Connect to MqttBroker + client.connect() + + // Subscribe to Mqtt topic + client.subscribe(topic) + + // Callback automatically triggers as and when new message arrives on specified topic + var callback: MqttCallback = new MqttCallback() { + + // Handles Mqtt message + override def messageArrived(arg0: String, arg1: MqttMessage) { + blockGenerator += new String(arg1.getPayload()) + } + + override def deliveryComplete(arg0: IMqttDeliveryToken) { + } + + override def connectionLost(arg0: Throwable) { + logInfo("Connection lost " + arg0) + } + } + + // Set up callback for MqttClient + client.setCallback(callback) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index b2f9f8b224..d5ae8aef92 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -32,7 +32,7 @@ import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} import org.apache.spark.streaming._ import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.rdd.{RDD, BlockRDD} -import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} /** * Abstract class for defining any InputDStream that has to start a receiver on worker @@ -70,7 +70,7 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime) Some(new BlockRDD[T](ssc.sc, blockIds)) } else { - Some(new BlockRDD[T](ssc.sc, Array[String]())) + Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) } } } @@ -78,7 +78,7 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte private[streaming] sealed trait NetworkReceiverMessage private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage -private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage +private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) extends NetworkReceiverMessage private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage /** @@ -159,7 +159,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging /** * Pushes a block (as an ArrayBuffer filled with data) into the block manager. */ - def pushBlock(blockId: String, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) { + def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) { env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level) actor ! ReportBlock(blockId, metadata) } @@ -167,7 +167,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging /** * Pushes a block (as bytes) into the block manager. */ - def pushBlock(blockId: String, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { + def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { env.blockManager.putBytes(blockId, bytes, level) actor ! ReportBlock(blockId, metadata) } @@ -210,7 +210,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging class BlockGenerator(storageLevel: StorageLevel) extends Serializable with Logging { - case class Block(id: String, buffer: ArrayBuffer[T], metadata: Any = null) + case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null) val clock = new SystemClock() val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong @@ -233,16 +233,16 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging logInfo("Data handler stopped") } - def += (obj: T) { + def += (obj: T): Unit = synchronized { currentBuffer += obj } - private def updateCurrentBuffer(time: Long) { + private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[T] if (newBlockBuffer.size > 0) { - val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time - blockInterval) + val blockId = StreamBlockId(NetworkReceiver.this.streamId, time - blockInterval) val newBlock = new Block(blockId, newBlockBuffer) blocksForPushing.add(newBlock) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala index a4746f06ad..dea0f26f90 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.dstream import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.StreamingContext import scala.reflect.ClassTag @@ -73,7 +73,7 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel) var nextBlockNumber = 0 while (true) { val buffer = queue.take() - val blockId = "input-" + streamId + "-" + nextBlockNumber + val blockId = StreamBlockId(streamId, nextBlockNumber) nextBlockNumber += 1 pushBlock(blockId, buffer, null, storageLevel) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 73e1ddf7a4..aeea060df7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -22,16 +22,22 @@ import org.apache.spark.streaming.{Duration, DStream, Time} import scala.reflect.ClassTag private[streaming] -class TransformedDStream[T: ClassTag, U: ClassTag] ( - parent: DStream[T], - transformFunc: (RDD[T], Time) => RDD[U] - ) extends DStream[U](parent.ssc) { +class TransformedDStream[U: ClassTag] ( + parents: Seq[DStream[_]], + transformFunc: (Seq[RDD[_]], Time) => RDD[U] + ) extends DStream[U](parents.head.ssc) { - override def dependencies = List(parent) + require(parents.length > 0, "List of DStreams to transform is empty") + require(parents.map(_.ssc).distinct.size == 1, "Some of the DStreams have different contexts") + require(parents.map(_.slideDuration).distinct.size == 1, + "Some of the DStreams have different slide durations") - override def slideDuration: Duration = parent.slideDuration + override def dependencies = parents.toList + + override def slideDuration: Duration = parents.head.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - parent.getOrCompute(validTime).map(transformFunc(_, validTime)) + val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq + Some(transformFunc(parentRDDs, validTime)) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala index ee087a1cf0..fdf5371a89 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala @@ -25,7 +25,7 @@ import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ import scala.reflect.ClassTag -import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.dstream.NetworkReceiver import java.util.concurrent.atomic.AtomicInteger @@ -160,7 +160,7 @@ private[streaming] class ActorReceiver[T: ClassTag]( protected def pushBlock(iter: Iterator[T]) { val buffer = new ArrayBuffer[T] buffer ++= iter - pushBlock("block-" + streamId + "-" + System.nanoTime(), buffer, null, storageLevel) + pushBlock(StreamBlockId(streamId, System.nanoTime()), buffer, null, storageLevel) } protected def onStart() = { diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 076fb53fa1..daeb99f5b7 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -25,6 +25,7 @@ import com.google.common.io.Files; import kafka.serializer.StringDecoder; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -186,6 +187,39 @@ public class JavaAPISuite implements Serializable { } @Test + public void testRepartitionMorePartitions() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2); + JavaDStream repartitioned = stream.repartition(4); + JavaTestUtils.attachTestOutputStream(repartitioned); + List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); + Assert.assertEquals(2, result.size()); + for (List<List<Integer>> rdd : result) { + Assert.assertEquals(4, rdd.size()); + Assert.assertEquals( + 10, rdd.get(0).size() + rdd.get(1).size() + rdd.get(2).size() + rdd.get(3).size()); + } + } + + @Test + public void testRepartitionFewerPartitions() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4); + JavaDStream repartitioned = stream.repartition(2); + JavaTestUtils.attachTestOutputStream(repartitioned); + List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); + Assert.assertEquals(2, result.size()); + for (List<List<Integer>> rdd : result) { + Assert.assertEquals(2, rdd.size()); + Assert.assertEquals(10, rdd.get(0).size() + rdd.get(1).size()); + } + } + + @Test public void testGlom() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), @@ -225,7 +259,7 @@ public class JavaAPISuite implements Serializable { } }); JavaTestUtils.attachTestOutputStream(mapped); - List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -294,8 +328,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(7,8,9)); JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc()); - JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3)); - JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6)); + JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1, 2, 3)); + JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4, 5, 6)); JavaRDD<Integer> rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9)); LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList(); @@ -322,17 +356,19 @@ public class JavaAPISuite implements Serializable { Arrays.asList(9,10,11)); JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> transformed = - stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { - @Override - public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { - return in.map(new Function<Integer, Integer>() { - @Override - public Integer call(Integer i) throws Exception { - return i + 2; - } - }); - }}); + JavaDStream<Integer> transformed = stream.transform( + new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { + @Override + public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { + return in.map(new Function<Integer, Integer>() { + @Override + public Integer call(Integer i) throws Exception { + return i + 2; + } + }); + } + }); + JavaTestUtils.attachTestOutputStream(transformed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -340,6 +376,316 @@ public class JavaAPISuite implements Serializable { } @Test + public void testVariousTransform() { + // tests whether all variations of transform can be called from Java + + List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1)); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + + List<List<Tuple2<String, Integer>>> pairInputData = + Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1))); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); + + JavaDStream<Integer> transformed1 = stream.transform( + new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { + @Override + public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { + return null; + } + } + ); + + JavaDStream<Integer> transformed2 = stream.transform( + new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() { + @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<String, Integer> transformed3 = stream.transform( + new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() { + @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception { + return null; + } + } + ); + + JavaPairDStream<String, Integer> transformed4 = stream.transform( + new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() { + @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream<Integer> pairTransformed1 = pairStream.transform( + new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() { + @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) throws Exception { + return null; + } + } + ); + + JavaDStream<Integer> pairTransformed2 = pairStream.transform( + new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() { + @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<String, String> pairTransformed3 = pairStream.transform( + new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() { + @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception { + return null; + } + } + ); + + JavaPairDStream<String, String> pairTransformed4 = pairStream.transform( + new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() { + @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception { + return null; + } + } + ); + + } + + @Test + public void testTransformWith() { + List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( + Arrays.asList( + new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("new york", "yankees")), + Arrays.asList( + new Tuple2<String, String>("california", "sharks"), + new Tuple2<String, String>("new york", "rangers"))); + + List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( + Arrays.asList( + new Tuple2<String, String>("california", "giants"), + new Tuple2<String, String>("new york", "mets")), + Arrays.asList( + new Tuple2<String, String>("california", "ducks"), + new Tuple2<String, String>("new york", "islanders"))); + + + List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Tuple2<String, String>>("california", + new Tuple2<String, String>("dodgers", "giants")), + new Tuple2<String, Tuple2<String, String>>("new york", + new Tuple2<String, String>("yankees", "mets"))), + Arrays.asList( + new Tuple2<String, Tuple2<String, String>>("california", + new Tuple2<String, String>("sharks", "ducks")), + new Tuple2<String, Tuple2<String, String>>("new york", + new Tuple2<String, String>("rangers", "islanders")))); + + JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith( + pairStream2, + new Function3< + JavaPairRDD<String, String>, + JavaPairRDD<String, String>, + Time, + JavaPairRDD<String, Tuple2<String, String>> + >() { + @Override + public JavaPairRDD<String, Tuple2<String, String>> call( + JavaPairRDD<String, String> rdd1, + JavaPairRDD<String, String> rdd2, + Time time + ) throws Exception { + return rdd1.join(rdd2); + } + } + ); + + JavaTestUtils.attachTestOutputStream(joined); + List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + + @Test + public void testVariousTransformWith() { + // tests whether all variations of transformWith can be called from Java + + List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1)); + List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x")); + JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1); + JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1); + + List<List<Tuple2<String, Integer>>> pairInputData1 = + Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1))); + List<List<Tuple2<Double, Character>>> pairInputData2 = + Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x'))); + JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1)); + JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1)); + + JavaDStream<Double> transformed1 = stream1.transformWith( + stream2, + new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() { + @Override + public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream<Double> transformed2 = stream1.transformWith( + pairStream1, + new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() { + @Override + public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<Double, Double> transformed3 = stream1.transformWith( + stream2, + new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { + @Override + public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<Double, Double> transformed4 = stream1.transformWith( + pairStream1, + new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() { + @Override + public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream<Double> pairTransformed1 = pairStream1.transformWith( + stream2, + new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() { + @Override + public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream<Double> pairTransformed2_ = pairStream1.transformWith( + pairStream1, + new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() { + @Override + public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWith( + stream2, + new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { + @Override + public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith( + pairStream2, + new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() { + @Override + public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) throws Exception { + return null; + } + } + ); + } + + @Test + public void testStreamingContextTransform(){ + List<List<Integer>> stream1input = Arrays.asList( + Arrays.asList(1), + Arrays.asList(2) + ); + + List<List<Integer>> stream2input = Arrays.asList( + Arrays.asList(3), + Arrays.asList(4) + ); + + List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList( + Arrays.asList(new Tuple2<Integer, String>(1, "x")), + Arrays.asList(new Tuple2<Integer, String>(2, "y")) + ); + + List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))), + Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y"))) + ); + + JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1); + JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1); + JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1)); + + List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2); + + // This is just to test whether this transform to JavaStream compiles + JavaDStream<Long> transformed1 = ssc.transform( + listOfDStreams1, + new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() { + public JavaRDD<Long> call(List<JavaRDD<?>> listOfRDDs, Time time) { + assert(listOfRDDs.size() == 2); + return null; + } + } + ); + + List<JavaDStream<?>> listOfDStreams2 = + Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream()); + + JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transform( + listOfDStreams2, + new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() { + public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, Time time) { + assert(listOfRDDs.size() == 3); + JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0); + JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1); + JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2); + JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3); + PairFunction<Integer, Integer, Integer> mapToTuple = new PairFunction<Integer, Integer, Integer>() { + @Override + public Tuple2<Integer, Integer> call(Integer i) throws Exception { + return new Tuple2<Integer, Integer>(i, i); + } + }; + return rdd1.union(rdd2).map(mapToTuple).join(prdd3); + } + } + ); + JavaTestUtils.attachTestOutputStream(transformed2); + List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + Assert.assertEquals(expected, result); + } + + @Test public void testFlatMap() { List<List<String>> inputData = Arrays.asList( Arrays.asList("go", "giants"), @@ -1101,7 +1447,7 @@ public class JavaAPISuite implements Serializable { JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2); JavaTestUtils.attachTestOutputStream(grouped); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -1144,7 +1490,38 @@ public class JavaAPISuite implements Serializable { JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2); JavaTestUtils.attachTestOutputStream(joined); - List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testLeftOuterJoin() { + List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("new york", "yankees")), + Arrays.asList(new Tuple2<String, String>("california", "sharks") )); + + List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "giants") ), + Arrays.asList(new Tuple2<String, String>("new york", "islanders") ) + + ); + + List<List<Long>> expected = Arrays.asList(Arrays.asList(2L), Arrays.asList(1L)); + + JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream<String, Tuple2<String, Optional<String>>> joined = pairStream1.leftOuterJoin(pairStream2); + JavaDStream<Long> counted = joined.count(); + JavaTestUtils.attachTestOutputStream(counted); + List<List<Long>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -1222,14 +1599,20 @@ public class JavaAPISuite implements Serializable { @Test public void testKafkaStream() { HashMap<String, Integer> topics = Maps.newHashMap(); - JavaDStream<String> test1 = ssc.kafkaStream("localhost:12345", "group", topics); - JavaDStream<String> test2 = ssc.kafkaStream("localhost:12345", "group", topics, + JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics); + JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK()); HashMap<String, String> kafkaParams = Maps.newHashMap(); - kafkaParams.put("zk.connect","localhost:12345"); - kafkaParams.put("groupid","consumer-group"); - JavaDStream<String> test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, + kafkaParams.put("zookeeper.connect","localhost:12345"); + kafkaParams.put("group.id","consumer-group"); + JavaPairDStream<String, String> test3 = ssc.kafkaStream( + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + topics, StorageLevel.MEMORY_AND_DISK()); } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index d5cdad4998..42ab9590d6 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -35,9 +35,9 @@ trait JavaTestBase extends TestSuiteBase { * The stream will be derived from the supplied lists of Java objects. */ def attachTestInputStream[T]( - ssc: JavaStreamingContext, - data: JList[JList[T]], - numPartitions: Int) = { + ssc: JavaStreamingContext, + data: JList[JList[T]], + numPartitions: Int) = { val seqData = data.map(Seq(_:_*)) implicit val cm: ClassTag[T] = @@ -52,12 +52,11 @@ trait JavaTestBase extends TestSuiteBase { * [[org.apache.spark.streaming.TestOutputStream]]. **/ def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]( - dstream: JavaDStreamLike[T, This, R]) = + dstream: JavaDStreamLike[T, This, R]) = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - val ostream = new TestOutputStream(dstream.dstream, - new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]) + val ostream = new TestOutputStreamWithPartitions(dstream.dstream) dstream.dstream.ssc.registerOutputStream(ostream) } @@ -65,9 +64,11 @@ trait JavaTestBase extends TestSuiteBase { * Process all registered streams for a numBatches batches, failing if * numExpectedOutput RDD's are not generated. Generated RDD's are collected * and returned, represented as a list for each batch interval. + * + * Returns a list of items for each RDD. */ def runStreams[V]( - ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { + ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { implicit val cm: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) @@ -75,6 +76,27 @@ trait JavaTestBase extends TestSuiteBase { res.map(entry => out.append(new ArrayList[V](entry))) out } + + /** + * Process all registered streams for a numBatches batches, failing if + * numExpectedOutput RDD's are not generated. Generated RDD's are collected + * and returned, represented as a list for each batch interval. + * + * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each + * representing one partition. + */ + def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int, + numExpectedOutput: Int): JList[JList[JList[V]]] = { + implicit val cm: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput) + val out = new ArrayList[JList[JList[V]]]() + res.map{entry => + val lists = entry.map(new ArrayList[V](_)) + out.append(new ArrayList[JList[V]](lists)) + } + out + } } object JavaTestUtils extends JavaTestBase { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 11586f72b6..259ef1608c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -18,7 +18,10 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ -import scala.runtime.RichInt + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ + import util.ManualClock class BasicOperationsSuite extends TestSuiteBase { @@ -82,6 +85,44 @@ class BasicOperationsSuite extends TestSuiteBase { testOperation(input, operation, output, true) } + test("repartition (more partitions)") { + val input = Seq(1 to 100, 101 to 200, 201 to 300) + val operation = (r: DStream[Int]) => r.repartition(5) + val ssc = setupStreams(input, operation, 2) + val output = runStreamsWithPartitions(ssc, 3, 3) + assert(output.size === 3) + val first = output(0) + val second = output(1) + val third = output(2) + + assert(first.size === 5) + assert(second.size === 5) + assert(third.size === 5) + + assert(first.flatten.toSet === (1 to 100).toSet) + assert(second.flatten.toSet === (101 to 200).toSet) + assert(third.flatten.toSet === (201 to 300).toSet) + } + + test("repartition (fewer partitions)") { + val input = Seq(1 to 100, 101 to 200, 201 to 300) + val operation = (r: DStream[Int]) => r.repartition(2) + val ssc = setupStreams(input, operation, 5) + val output = runStreamsWithPartitions(ssc, 3, 3) + assert(output.size === 3) + val first = output(0) + val second = output(1) + val third = output(2) + + assert(first.size === 2) + assert(second.size === 2) + assert(third.size === 2) + + assert(first.flatten.toSet === (1 to 100).toSet) + assert(second.flatten.toSet === (101 to 200).toSet) + assert(third.flatten.toSet === (201 to 300).toSet) + } + test("groupByKey") { testOperation( Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), @@ -143,6 +184,72 @@ class BasicOperationsSuite extends TestSuiteBase { ) } + test("union") { + val input = Seq(1 to 4, 101 to 104, 201 to 204) + val output = Seq(1 to 8, 101 to 108, 201 to 208) + testOperation( + input, + (s: DStream[Int]) => s.union(s.map(_ + 4)) , + output + ) + } + + test("StreamingContext.union") { + val input = Seq(1 to 4, 101 to 104, 201 to 204) + val output = Seq(1 to 12, 101 to 112, 201 to 212) + // union over 3 DStreams + testOperation( + input, + (s: DStream[Int]) => s.context.union(Seq(s, s.map(_ + 4), s.map(_ + 8))), + output + ) + } + + test("transform") { + val input = Seq(1 to 4, 5 to 8, 9 to 12) + testOperation( + input, + (r: DStream[Int]) => r.transform(rdd => rdd.map(_.toString)), // RDD.map in transform + input.map(_.map(_.toString)) + ) + } + + test("transformWith") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (1, "x")), ("b", (1, "x")) ), + Seq( ("", (1, "x")) ), + Seq( ), + Seq( ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + val t1 = s1.map(x => (x, 1)) + val t2 = s2.map(x => (x, "x")) + t1.transformWith( // RDD.join in transform + t2, + (rdd1: RDD[(String, Int)], rdd2: RDD[(String, String)]) => rdd1.join(rdd2) + ) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + + test("StreamingContext.transform") { + val input = Seq(1 to 4, 101 to 104, 201 to 204) + val output = Seq(1 to 12, 101 to 112, 201 to 212) + + // transform over 3 DStreams by doing union of the 3 RDDs + val operation = (s: DStream[Int]) => { + s.context.transform( + Seq(s, s.map(_ + 4), s.map(_ + 8)), // 3 DStreams + (rdds: Seq[RDD[_]], time: Time) => + rdds.head.context.union(rdds.map(_.asInstanceOf[RDD[Int]])) // union of RDDs + ) + } + + testOperation(input, operation, output) + } + test("cogroup") { val inputData1 = Seq( Seq("a", "a", "b"), Seq("a", ""), Seq(""), Seq() ) val inputData2 = Seq( Seq("a", "a", "b"), Seq("b", ""), Seq(), Seq() ) @@ -168,7 +275,37 @@ class BasicOperationsSuite extends TestSuiteBase { Seq( ) ) val operation = (s1: DStream[String], s2: DStream[String]) => { - s1.map(x => (x,1)).join(s2.map(x => (x,"x"))) + s1.map(x => (x, 1)).join(s2.map(x => (x, "x"))) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + + test("leftOuterJoin") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (1, Some("x"))), ("b", (1, Some("x"))) ), + Seq( ("", (1, Some("x"))), ("a", (1, None)) ), + Seq( ("", (1, None)) ), + Seq( ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + s1.map(x => (x, 1)).leftOuterJoin(s2.map(x => (x, "x"))) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + + test("rightOuterJoin") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (Some(1), "x")), ("b", (Some(1), "x")) ), + Seq( ("", (Some(1), "x")), ("b", (None, "x")) ), + Seq( ), + Seq( ("", (None, "x")) ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + s1.map(x => (x, 1)).rightOuterJoin(s2.map(x => (x, "x"))) } testOperation(inputData1, inputData2, operation, outputData, true) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 07de51bebb..e81287b44e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -372,7 +372,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { logInfo("Manual clock after advancing = " + clock.time) Thread.sleep(batchDuration.milliseconds) - val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] - outputStream.output + val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]] + outputStream.output.map(_.flatten) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 42e3e51e3f..a559db468a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -23,15 +23,15 @@ import akka.actor.IOManager import akka.actor.Props import akka.util.ByteString -import dstream.SparkFlumeEvent +import org.apache.spark.streaming.dstream.{NetworkReceiver, SparkFlumeEvent} import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} -import java.util.concurrent.{TimeUnit, ArrayBlockingQueue} +import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import util.ManualClock import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receivers.Receiver -import org.apache.spark.Logging +import org.apache.spark.{SparkContext, Logging} import scala.util.Random import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfter @@ -44,6 +44,7 @@ import java.nio.ByteBuffer import collection.JavaConversions._ import java.nio.charset.Charset import com.google.common.io.Files +import java.util.concurrent.atomic.AtomicInteger class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { @@ -61,7 +62,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { System.clearProperty("spark.hostPort") } - test("socket input stream") { // Start the server val testServer = new TestServer() @@ -268,13 +268,56 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK) // Test specifying decoder - val kafkaParams = Map("zk.connect"->"localhost:12345","groupid"->"consumer-group") - val test3 = ssc.kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK) + val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group") + val test3 = ssc.kafkaStream[ + String, + String, + kafka.serializer.StringDecoder, + kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK) + } + + test("multi-thread receiver") { + // set up the test receiver + val numThreads = 10 + val numRecordsPerThread = 1000 + val numTotalRecords = numThreads * numRecordsPerThread + val testReceiver = new MultiThreadTestReceiver(numThreads, numRecordsPerThread) + MultiThreadTestReceiver.haveAllThreadsFinished = false + + // set up the network stream using the test receiver + val ssc = new StreamingContext(master, framework, batchDuration) + val networkStream = ssc.networkStream[Int](testReceiver) + val countStream = networkStream.count + val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]] + val outputStream = new TestOutputStream(countStream, outputBuffer) + def output = outputBuffer.flatMap(x => x) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Let the data from the receiver be received + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val startTime = System.currentTimeMillis() + while((!MultiThreadTestReceiver.haveAllThreadsFinished || output.sum < numTotalRecords) && + System.currentTimeMillis() - startTime < 5000) { + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(1000) + logInfo("Stopping context") + ssc.stop() + + // Verify whether data received was as expected + logInfo("--------------------------------") + logInfo("output.size = " + outputBuffer.size) + logInfo("output") + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + assert(output.sum === numTotalRecords) } } -/** This is server to test the network input stream */ +/** This is a server to test the network input stream */ class TestServer() extends Logging { val queue = new ArrayBlockingQueue[String](100) @@ -336,6 +379,7 @@ object TestServer { } } +/** This is an actor for testing actor input stream */ class TestActor(port: Int) extends Actor with Receiver { def bytesToString(byteString: ByteString) = byteString.utf8String @@ -347,3 +391,36 @@ class TestActor(port: Int) extends Actor with Receiver { pushBlock(bytesToString(bytes)) } } + +/** This is a receiver to test multiple threads inserting data using block generator */ +class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int) + extends NetworkReceiver[Int] { + lazy val executorPool = Executors.newFixedThreadPool(numThreads) + lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY) + lazy val finishCount = new AtomicInteger(0) + + protected def onStart() { + blockGenerator.start() + (1 to numThreads).map(threadId => { + val runnable = new Runnable { + def run() { + (1 to numRecordsPerThread).foreach(i => + blockGenerator += (threadId * numRecordsPerThread + i) ) + if (finishCount.incrementAndGet == numThreads) { + MultiThreadTestReceiver.haveAllThreadsFinished = true + } + logInfo("Finished thread " + threadId) + } + } + executorPool.submit(runnable) + }) + } + + protected def onStop() { + executorPool.shutdown() + } +} + +object MultiThreadTestReceiver { + var haveAllThreadsFinished = false +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index c91f9ba46d..126915abc9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -61,8 +61,11 @@ class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], /** * This is a output stream just for the testsuites. All the output is collected into a * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. + * + * The buffer contains a sequence of RDD's, each containing a sequence of items */ -class TestOutputStream[T: ClassTag](parent: DStream[T], val output: ArrayBuffer[Seq[T]]) +class TestOutputStream[T: ClassTag](parent: DStream[T], + val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]()) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.collect() output += collected @@ -77,6 +80,30 @@ class TestOutputStream[T: ClassTag](parent: DStream[T], val output: ArrayBuffer[ } /** + * This is a output stream just for the testsuites. All the output is collected into a + * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. + * + * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each + * containing a sequence of items. + */ +class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T], + val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]()) + extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { + val collected = rdd.glom().collect().map(_.toSeq) + output += collected + }) { + + // This is to clear the output buffer every it is read from a checkpoint + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + ois.defaultReadObject() + output.clear() + } + + def toTestOutputStream = new TestOutputStream[T](this.parent, this.output.map(_.flatten)) +} + +/** * This is the base trait for Spark Streaming testsuites. This provides basic functionality * to run user-defined set of input on user-defined stream operations, and verify the output. */ @@ -109,7 +136,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { */ def setupStreams[U: ClassTag, V: ClassTag]( input: Seq[Seq[U]], - operation: DStream[U] => DStream[V] + operation: DStream[U] => DStream[V], + numPartitions: Int = numInputPartitions ): StreamingContext = { // Create StreamingContext @@ -119,9 +147,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { } // Setup the stream computation - val inputStream = new TestInputStream(ssc, input, numInputPartitions) + val inputStream = new TestInputStream(ssc, input, numPartitions) val operatedStream = operation(inputStream) - val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[V]] with SynchronizedBuffer[Seq[V]]) + val outputStream = new TestOutputStreamWithPartitions(operatedStream, + new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]]) ssc.registerInputStream(inputStream) ssc.registerOutputStream(outputStream) ssc @@ -147,7 +176,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val inputStream1 = new TestInputStream(ssc, input1, numInputPartitions) val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions) val operatedStream = operation(inputStream1, inputStream2) - val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[W]] with SynchronizedBuffer[Seq[W]]) + val outputStream = new TestOutputStreamWithPartitions(operatedStream, + new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]]) ssc.registerInputStream(inputStream1) ssc.registerInputStream(inputStream2) ssc.registerOutputStream(outputStream) @@ -158,18 +188,37 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and * returns the collected output. It will wait until `numExpectedOutput` number of * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached. + * + * Returns a sequence of items for each RDD. */ def runStreams[V: ClassTag]( ssc: StreamingContext, numBatches: Int, numExpectedOutput: Int ): Seq[Seq[V]] = { + // Flatten each RDD into a single Seq + runStreamsWithPartitions(ssc, numBatches, numExpectedOutput).map(_.flatten.toSeq) + } + + /** + * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and + * returns the collected output. It will wait until `numExpectedOutput` number of + * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached. + * + * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each + * representing one partition. + */ + def runStreamsWithPartitions[V: ClassTag]( + ssc: StreamingContext, + numBatches: Int, + numExpectedOutput: Int + ): Seq[Seq[Seq[V]]] = { assert(numBatches > 0, "Number of batches to run stream computation is zero") assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero") logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput) // Get the output buffer - val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] + val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]] val output = outputStream.output try { |