diff options
author | root <root@ip-10-8-17-72.ec2.internal> | 2012-09-05 05:53:18 +0000 |
---|---|---|
committer | root <root@ip-10-8-17-72.ec2.internal> | 2012-09-05 05:53:18 +0000 |
commit | fc186dc18a1e9bd50aad32314331105009962295 (patch) | |
tree | e85faebc6608ce4b21b6c47c0ed4fa2d9bd7fc4e /streaming | |
parent | 4ea032a142ab7fb44f92b145cc8d850164419ab5 (diff) | |
parent | 25fd684b89ac5bdc6675b0a5d5e3caa9fe608d92 (diff) | |
download | spark-fc186dc18a1e9bd50aad32314331105009962295.tar.gz spark-fc186dc18a1e9bd50aad32314331105009962295.tar.bz2 spark-fc186dc18a1e9bd50aad32314331105009962295.zip |
Merge branch 'dev' of github.com:radlab/spark into dev
Diffstat (limited to 'streaming')
6 files changed, 201 insertions, 56 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 9b0115eef6..20f1c4db20 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -41,17 +41,17 @@ extends Logging with Serializable { */ // Variable to store the RDDs generated earlier in time - @transient private val generatedRDDs = new HashMap[Time, RDD[T]] () + @transient protected val generatedRDDs = new HashMap[Time, RDD[T]] () // Variable to be set to the first time seen by the DStream (effective time zero) - private[streaming] var zeroTime: Time = null + protected[streaming] var zeroTime: Time = null // Variable to specify storage level - private var storageLevel: StorageLevel = StorageLevel.NONE + protected var storageLevel: StorageLevel = StorageLevel.NONE // Checkpoint level and checkpoint interval - private var checkpointLevel: StorageLevel = StorageLevel.NONE // NONE means don't checkpoint - private var checkpointInterval: Time = null + protected var checkpointLevel: StorageLevel = StorageLevel.NONE // NONE means don't checkpoint + protected var checkpointInterval: Time = null // Change this RDD's storage level def persist( @@ -84,7 +84,7 @@ extends Logging with Serializable { * the validity of future times is calculated. This method also recursively initializes * its parent DStreams. */ - def initialize(time: Time) { + protected[streaming] def initialize(time: Time) { if (zeroTime == null) { zeroTime = time } @@ -93,7 +93,7 @@ extends Logging with Serializable { } /** This method checks whether the 'time' is valid wrt slideTime for generating RDD */ - private def isTimeValid (time: Time): Boolean = { + protected def isTimeValid (time: Time): Boolean = { if (!isInitialized) { throw new Exception (this.toString + " has not been initialized") } else if (time < zeroTime || ! (time - zeroTime).isMultipleOf(slideTime)) { @@ -208,7 +208,7 @@ extends Logging with Serializable { new TransformedDStream(this, ssc.sc.clean(transformFunc)) } - private[streaming] def toQueue = { + def toQueue = { val queue = new ArrayBlockingQueue[RDD[T]](10000) this.foreachRDD(rdd => { queue.add(rdd) diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index 13db34ac80..3fd0a16bf0 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -19,32 +19,32 @@ extends Serializable { /* DStream operations for key-value pairs */ /* ---------------------------------- */ - def groupByKey(): ShuffledDStream[K, V, ArrayBuffer[V]] = { + def groupByKey(): DStream[(K, Seq[V])] = { groupByKey(defaultPartitioner()) } - def groupByKey(numPartitions: Int): ShuffledDStream[K, V, ArrayBuffer[V]] = { + def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = { groupByKey(defaultPartitioner(numPartitions)) } - def groupByKey(partitioner: Partitioner): ShuffledDStream[K, V, ArrayBuffer[V]] = { + def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { def createCombiner(v: V) = ArrayBuffer[V](v) def mergeValue(c: ArrayBuffer[V], v: V) = (c += v) def mergeCombiner(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = (c1 ++ c2) - combineByKey[ArrayBuffer[V]](createCombiner _, mergeValue _, mergeCombiner _, partitioner) + combineByKey(createCombiner _, mergeValue _, mergeCombiner _, partitioner).asInstanceOf[DStream[(K, Seq[V])]] } - def reduceByKey(reduceFunc: (V, V) => V): ShuffledDStream[K, V, V] = { + def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = { reduceByKey(reduceFunc, defaultPartitioner()) } - def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): ShuffledDStream[K, V, V] = { + def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = { reduceByKey(reduceFunc, defaultPartitioner(numPartitions)) } - def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): ShuffledDStream[K, V, V] = { + def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) - combineByKey[V]((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner) + combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner) } private def combineByKey[C: ClassManifest]( @@ -55,11 +55,15 @@ extends Serializable { new ShuffledDStream[K, V, C](stream, createCombiner, mergeValue, mergeCombiner, partitioner) } - def groupByKeyAndWindow(windowTime: Time, slideTime: Time): ShuffledDStream[K, V, ArrayBuffer[V]] = { + def groupByKeyAndWindow(windowTime: Time, slideTime: Time): DStream[(K, Seq[V])] = { groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner()) } - def groupByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int): ShuffledDStream[K, V, ArrayBuffer[V]] = { + def groupByKeyAndWindow( + windowTime: Time, + slideTime: Time, + numPartitions: Int + ): DStream[(K, Seq[V])] = { groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner(numPartitions)) } @@ -67,15 +71,24 @@ extends Serializable { windowTime: Time, slideTime: Time, partitioner: Partitioner - ): ShuffledDStream[K, V, ArrayBuffer[V]] = { + ): DStream[(K, Seq[V])] = { stream.window(windowTime, slideTime).groupByKey(partitioner) } - def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowTime: Time, slideTime: Time): ShuffledDStream[K, V, V] = { + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowTime: Time, + slideTime: Time + ): DStream[(K, V)] = { reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner()) } - def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowTime: Time, slideTime: Time, numPartitions: Int): ShuffledDStream[K, V, V] = { + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowTime: Time, + slideTime: Time, + numPartitions: Int + ): DStream[(K, V)] = { reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions)) } @@ -84,7 +97,7 @@ extends Serializable { windowTime: Time, slideTime: Time, partitioner: Partitioner - ): ShuffledDStream[K, V, V] = { + ): DStream[(K, V)] = { stream.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), partitioner) } @@ -93,12 +106,13 @@ extends Serializable { // so that new elements introduced in the window can be "added" using // reduceFunc to the previous window's result and old elements can be // "subtracted using invReduceFunc. + def reduceByKeyAndWindow( reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowTime: Time, slideTime: Time - ): ReducedWindowedDStream[K, V] = { + ): DStream[(K, V)] = { reduceByKeyAndWindow( reduceFunc, invReduceFunc, windowTime, slideTime, defaultPartitioner()) @@ -110,7 +124,7 @@ extends Serializable { windowTime: Time, slideTime: Time, numPartitions: Int - ): ReducedWindowedDStream[K, V] = { + ): DStream[(K, V)] = { reduceByKeyAndWindow( reduceFunc, invReduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions)) @@ -122,7 +136,7 @@ extends Serializable { windowTime: Time, slideTime: Time, partitioner: Partitioner - ): ReducedWindowedDStream[K, V] = { + ): DStream[(K, V)] = { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) @@ -137,21 +151,21 @@ extends Serializable { // def updateStateByKey[S <: AnyRef : ClassManifest]( updateFunc: (Seq[V], S) => S - ): StateDStream[K, V, S] = { + ): DStream[(K, S)] = { updateStateByKey(updateFunc, defaultPartitioner()) } def updateStateByKey[S <: AnyRef : ClassManifest]( updateFunc: (Seq[V], S) => S, numPartitions: Int - ): StateDStream[K, V, S] = { + ): DStream[(K, S)] = { updateStateByKey(updateFunc, defaultPartitioner(numPartitions)) } def updateStateByKey[S <: AnyRef : ClassManifest]( updateFunc: (Seq[V], S) => S, partitioner: Partitioner - ): StateDStream[K, V, S] = { + ): DStream[(K, S)] = { val func = (iterator: Iterator[(K, Seq[V], S)]) => { iterator.map(tuple => (tuple._1, updateFunc(tuple._2, tuple._3))) } @@ -162,7 +176,7 @@ extends Serializable { updateFunc: (Iterator[(K, Seq[V], S)]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean - ): StateDStream[K, V, S] = { + ): DStream[(K, S)] = { new StateDStream(stream, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner) } } diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala index bab48ff954..de30297c7d 100644 --- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala @@ -7,7 +7,7 @@ import scala.collection.mutable.Queue import scala.collection.mutable.ArrayBuffer class QueueInputDStream[T: ClassManifest]( - ssc: StreamingContext, + @transient ssc: StreamingContext, val queue: Queue[RDD[T]], oneAtATime: Boolean, defaultRDD: RDD[T] diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala index f313d8c162..4cb780c006 100644 --- a/streaming/src/main/scala/spark/streaming/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala @@ -1,10 +1,11 @@ package spark.streaming import spark.RDD +import spark.BlockRDD import spark.Partitioner import spark.MapPartitionsRDD import spark.SparkContext._ - +import spark.storage.StorageLevel class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest]( parent: DStream[(K, V)], @@ -22,6 +23,47 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife override def slideTime = parent.slideTime + override def getOrCompute(time: Time): Option[RDD[(K, S)]] = { + generatedRDDs.get(time) match { + case Some(oldRDD) => { + if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval) && oldRDD.dependencies.size > 0) { + val r = oldRDD + val oldRDDBlockIds = oldRDD.splits.map(s => "rdd:" + r.id + ":" + s.index) + val checkpointedRDD = new BlockRDD[(K, S)](ssc.sc, oldRDDBlockIds) { + override val partitioner = oldRDD.partitioner + } + generatedRDDs.update(time, checkpointedRDD) + logInfo("Updated RDD of time " + time + " with its checkpointed version") + Some(checkpointedRDD) + } else { + Some(oldRDD) + } + } + case None => { + if (isTimeValid(time)) { + compute(time) match { + case Some(newRDD) => { + if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) { + newRDD.persist(checkpointLevel) + logInfo("Persisting " + newRDD + " to " + checkpointLevel + " at time " + time) + } else if (storageLevel != StorageLevel.NONE) { + newRDD.persist(storageLevel) + logInfo("Persisting " + newRDD + " to " + storageLevel + " at time " + time) + } + generatedRDDs.put(time, newRDD) + Some(newRDD) + } + case None => { + None + } + } + } else { + None + } + } + } + } + override def compute(validTime: Time): Option[RDD[(K, S)]] = { // Try to get the previous state RDD @@ -29,26 +71,27 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife case Some(prevStateRDD) => { // If previous state RDD exists - // Define the function for the mapPartition operation on cogrouped RDD; - // first map the cogrouped tuple to tuples of required type, - // and then apply the update function - val func = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => { - val i = iterator.map(t => { - (t._1, t._2._1, t._2._2.headOption.getOrElse(null.asInstanceOf[S])) - }) - updateFunc(i) - } - // Try to get the parent RDD parent.getOrCompute(validTime) match { case Some(parentRDD) => { // If parent RDD exists, then compute as usual + + // Define the function for the mapPartition operation on cogrouped RDD; + // first map the cogrouped tuple to tuples of required type, + // and then apply the update function + val updateFuncLocal = updateFunc + val mapPartitionFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => { + val i = iterator.map(t => { + (t._1, t._2._1, t._2._2.headOption.getOrElse(null.asInstanceOf[S])) + }) + updateFuncLocal(i) + } val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) - val stateRDD = new SpecialMapPartitionsRDD(cogroupedRDD, func) - logDebug("Generating state RDD for time " + validTime) + val stateRDD = new SpecialMapPartitionsRDD(cogroupedRDD, mapPartitionFunc) + //logDebug("Generating state RDD for time " + validTime) return Some(stateRDD) } case None => { // If parent RDD does not exist, then return old state RDD - logDebug("Generating state RDD for time " + validTime + " (no change)") + //logDebug("Generating state RDD for time " + validTime + " (no change)") return Some(prevStateRDD) } } @@ -56,23 +99,25 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife case None => { // If previous session RDD does not exist (first input data) - // Define the function for the mapPartition operation on grouped RDD; - // first map the grouped tuple to tuples of required type, - // and then apply the update function - val func = (iterator: Iterator[(K, Seq[V])]) => { - updateFunc(iterator.map(tuple => (tuple._1, tuple._2, null.asInstanceOf[S]))) - } - // Try to get the parent RDD parent.getOrCompute(validTime) match { case Some(parentRDD) => { // If parent RDD exists, then compute as usual + + // Define the function for the mapPartition operation on grouped RDD; + // first map the grouped tuple to tuples of required type, + // and then apply the update function + val updateFuncLocal = updateFunc + val mapPartitionFunc = (iterator: Iterator[(K, Seq[V])]) => { + updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, null.asInstanceOf[S]))) + } + val groupedRDD = parentRDD.groupByKey(partitioner) - val sessionRDD = new SpecialMapPartitionsRDD(groupedRDD, func) - logDebug("Generating state RDD for time " + validTime + " (first)") + val sessionRDD = new SpecialMapPartitionsRDD(groupedRDD, mapPartitionFunc) + //logDebug("Generating state RDD for time " + validTime + " (first)") return Some(sessionRDD) } case None => { // If parent RDD does not exist, then nothing to do! - logDebug("Not generating state RDD (no previous state, no parent)") + //logDebug("Not generating state RDD (no previous state, no parent)") return None } } diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala new file mode 100644 index 0000000000..be3188c5ed --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala @@ -0,0 +1,86 @@ +package spark.streaming.examples + +import spark.util.IntParam +import spark.storage.StorageLevel +import spark.streaming._ +import spark.streaming.StreamingContext._ + +object TopKWordCountRaw { + def main(args: Array[String]) { + if (args.length != 7) { + System.err.println("Usage: TopKWordCountRaw <master> <streams> <host> <port> <batchMs> <chkptMs> <reduces>") + System.exit(1) + } + + val Array(master, IntParam(streams), host, IntParam(port), IntParam(batchMs), + IntParam(chkptMs), IntParam(reduces)) = args + + // Create the context and set the batch size + val ssc = new StreamingContext(master, "TopKWordCountRaw") + ssc.setBatchDuration(Milliseconds(batchMs)) + + // Make sure some tasks have started on each node + ssc.sc.parallelize(1 to 1000, 1000).count() + ssc.sc.parallelize(1 to 1000, 1000).count() + ssc.sc.parallelize(1 to 1000, 1000).count() + + val rawStreams = (1 to streams).map(_ => + ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray + val union = new UnifiedDStream(rawStreams) + + import WordCount2_ExtraFunctions._ + + val windowedCounts = union.mapPartitions(splitAndCountPartitions) + .reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces) + windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, + Milliseconds(chkptMs)) + //windowedCounts.print() // TODO: something else? + + def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = { + val taken = new Array[(String, Long)](k) + + var i = 0 + var len = 0 + var done = false + var value: (String, Long) = null + var swap: (String, Long) = null + var count = 0 + + while(data.hasNext) { + value = data.next + count += 1 + println("count = " + count) + if (len == 0) { + taken(0) = value + len = 1 + } else if (len < k || value._2 > taken(len - 1)._2) { + if (len < k) { + len += 1 + } + taken(len - 1) = value + i = len - 1 + while(i > 0 && taken(i - 1)._2 < taken(i)._2) { + swap = taken(i) + taken(i) = taken(i-1) + taken(i - 1) = swap + i -= 1 + } + } + } + println("Took " + len + " out of " + count + " items") + return taken.toIterator + } + + val k = 50 + val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k)) + partialTopKWindowedCounts.foreachRDD(rdd => { + val collectedCounts = rdd.collect + println("Collected " + collectedCounts.size + " items") + topK(collectedCounts.toIterator, k).foreach(println) + }) + +// windowedCounts.foreachRDD(r => println("Element count: " + r.count())) + + ssc.start() + } +} diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala index 030f351080..fc00952afe 100644 --- a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala +++ b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala @@ -107,12 +107,12 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging { Seq(("a", 3), ("b", 3), ("c", 3)) ) - val updateStateOp =(s: DStream[String]) => { + val updateStateOp = (s: DStream[String]) => { val updateFunc = (values: Seq[Int], state: RichInt) => { var newState = 0 if (values != null) newState += values.reduce(_ + _) if (state != null) newState += state.self - //println("values = " + values + ", state = " + state + ", " + " new state = " + newState) + println("values = " + values + ", state = " + state + ", " + " new state = " + newState) new RichInt(newState) } s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self)) |