aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorroot <root@ip-10-8-17-72.ec2.internal>2012-09-05 05:53:18 +0000
committerroot <root@ip-10-8-17-72.ec2.internal>2012-09-05 05:53:18 +0000
commitfc186dc18a1e9bd50aad32314331105009962295 (patch)
treee85faebc6608ce4b21b6c47c0ed4fa2d9bd7fc4e
parent4ea032a142ab7fb44f92b145cc8d850164419ab5 (diff)
parent25fd684b89ac5bdc6675b0a5d5e3caa9fe608d92 (diff)
downloadspark-fc186dc18a1e9bd50aad32314331105009962295.tar.gz
spark-fc186dc18a1e9bd50aad32314331105009962295.tar.bz2
spark-fc186dc18a1e9bd50aad32314331105009962295.zip
Merge branch 'dev' of github.com:radlab/spark into dev
-rw-r--r--core/src/main/scala/spark/RDD.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala16
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala56
-rw-r--r--streaming/src/main/scala/spark/streaming/QueueInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StateDStream.scala93
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala86
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamSuite.scala4
7 files changed, 202 insertions, 57 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 3fe8e8a4bf..d28f3593fe 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -94,7 +94,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def getStorageLevel = storageLevel
- def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER): RDD[T] = {
+ def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER_2): RDD[T] = {
if (!level.useDisk && level.replication < 2) {
throw new Exception("Cannot checkpoint without using disk or replication (level requested was " + level + ")")
}
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 9b0115eef6..20f1c4db20 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -41,17 +41,17 @@ extends Logging with Serializable {
*/
// Variable to store the RDDs generated earlier in time
- @transient private val generatedRDDs = new HashMap[Time, RDD[T]] ()
+ @transient protected val generatedRDDs = new HashMap[Time, RDD[T]] ()
// Variable to be set to the first time seen by the DStream (effective time zero)
- private[streaming] var zeroTime: Time = null
+ protected[streaming] var zeroTime: Time = null
// Variable to specify storage level
- private var storageLevel: StorageLevel = StorageLevel.NONE
+ protected var storageLevel: StorageLevel = StorageLevel.NONE
// Checkpoint level and checkpoint interval
- private var checkpointLevel: StorageLevel = StorageLevel.NONE // NONE means don't checkpoint
- private var checkpointInterval: Time = null
+ protected var checkpointLevel: StorageLevel = StorageLevel.NONE // NONE means don't checkpoint
+ protected var checkpointInterval: Time = null
// Change this RDD's storage level
def persist(
@@ -84,7 +84,7 @@ extends Logging with Serializable {
* the validity of future times is calculated. This method also recursively initializes
* its parent DStreams.
*/
- def initialize(time: Time) {
+ protected[streaming] def initialize(time: Time) {
if (zeroTime == null) {
zeroTime = time
}
@@ -93,7 +93,7 @@ extends Logging with Serializable {
}
/** This method checks whether the 'time' is valid wrt slideTime for generating RDD */
- private def isTimeValid (time: Time): Boolean = {
+ protected def isTimeValid (time: Time): Boolean = {
if (!isInitialized) {
throw new Exception (this.toString + " has not been initialized")
} else if (time < zeroTime || ! (time - zeroTime).isMultipleOf(slideTime)) {
@@ -208,7 +208,7 @@ extends Logging with Serializable {
new TransformedDStream(this, ssc.sc.clean(transformFunc))
}
- private[streaming] def toQueue = {
+ def toQueue = {
val queue = new ArrayBlockingQueue[RDD[T]](10000)
this.foreachRDD(rdd => {
queue.add(rdd)
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 13db34ac80..3fd0a16bf0 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -19,32 +19,32 @@ extends Serializable {
/* DStream operations for key-value pairs */
/* ---------------------------------- */
- def groupByKey(): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ def groupByKey(): DStream[(K, Seq[V])] = {
groupByKey(defaultPartitioner())
}
- def groupByKey(numPartitions: Int): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
groupByKey(defaultPartitioner(numPartitions))
}
- def groupByKey(partitioner: Partitioner): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
def createCombiner(v: V) = ArrayBuffer[V](v)
def mergeValue(c: ArrayBuffer[V], v: V) = (c += v)
def mergeCombiner(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = (c1 ++ c2)
- combineByKey[ArrayBuffer[V]](createCombiner _, mergeValue _, mergeCombiner _, partitioner)
+ combineByKey(createCombiner _, mergeValue _, mergeCombiner _, partitioner).asInstanceOf[DStream[(K, Seq[V])]]
}
- def reduceByKey(reduceFunc: (V, V) => V): ShuffledDStream[K, V, V] = {
+ def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner())
}
- def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): ShuffledDStream[K, V, V] = {
+ def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
}
- def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): ShuffledDStream[K, V, V] = {
+ def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
- combineByKey[V]((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
+ combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
}
private def combineByKey[C: ClassManifest](
@@ -55,11 +55,15 @@ extends Serializable {
new ShuffledDStream[K, V, C](stream, createCombiner, mergeValue, mergeCombiner, partitioner)
}
- def groupByKeyAndWindow(windowTime: Time, slideTime: Time): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ def groupByKeyAndWindow(windowTime: Time, slideTime: Time): DStream[(K, Seq[V])] = {
groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner())
}
- def groupByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ def groupByKeyAndWindow(
+ windowTime: Time,
+ slideTime: Time,
+ numPartitions: Int
+ ): DStream[(K, Seq[V])] = {
groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner(numPartitions))
}
@@ -67,15 +71,24 @@ extends Serializable {
windowTime: Time,
slideTime: Time,
partitioner: Partitioner
- ): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ ): DStream[(K, Seq[V])] = {
stream.window(windowTime, slideTime).groupByKey(partitioner)
}
- def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowTime: Time, slideTime: Time): ShuffledDStream[K, V, V] = {
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ windowTime: Time,
+ slideTime: Time
+ ): DStream[(K, V)] = {
reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner())
}
- def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowTime: Time, slideTime: Time, numPartitions: Int): ShuffledDStream[K, V, V] = {
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ windowTime: Time,
+ slideTime: Time,
+ numPartitions: Int
+ ): DStream[(K, V)] = {
reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions))
}
@@ -84,7 +97,7 @@ extends Serializable {
windowTime: Time,
slideTime: Time,
partitioner: Partitioner
- ): ShuffledDStream[K, V, V] = {
+ ): DStream[(K, V)] = {
stream.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), partitioner)
}
@@ -93,12 +106,13 @@ extends Serializable {
// so that new elements introduced in the window can be "added" using
// reduceFunc to the previous window's result and old elements can be
// "subtracted using invReduceFunc.
+
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowTime: Time,
slideTime: Time
- ): ReducedWindowedDStream[K, V] = {
+ ): DStream[(K, V)] = {
reduceByKeyAndWindow(
reduceFunc, invReduceFunc, windowTime, slideTime, defaultPartitioner())
@@ -110,7 +124,7 @@ extends Serializable {
windowTime: Time,
slideTime: Time,
numPartitions: Int
- ): ReducedWindowedDStream[K, V] = {
+ ): DStream[(K, V)] = {
reduceByKeyAndWindow(
reduceFunc, invReduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions))
@@ -122,7 +136,7 @@ extends Serializable {
windowTime: Time,
slideTime: Time,
partitioner: Partitioner
- ): ReducedWindowedDStream[K, V] = {
+ ): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
@@ -137,21 +151,21 @@ extends Serializable {
//
def updateStateByKey[S <: AnyRef : ClassManifest](
updateFunc: (Seq[V], S) => S
- ): StateDStream[K, V, S] = {
+ ): DStream[(K, S)] = {
updateStateByKey(updateFunc, defaultPartitioner())
}
def updateStateByKey[S <: AnyRef : ClassManifest](
updateFunc: (Seq[V], S) => S,
numPartitions: Int
- ): StateDStream[K, V, S] = {
+ ): DStream[(K, S)] = {
updateStateByKey(updateFunc, defaultPartitioner(numPartitions))
}
def updateStateByKey[S <: AnyRef : ClassManifest](
updateFunc: (Seq[V], S) => S,
partitioner: Partitioner
- ): StateDStream[K, V, S] = {
+ ): DStream[(K, S)] = {
val func = (iterator: Iterator[(K, Seq[V], S)]) => {
iterator.map(tuple => (tuple._1, updateFunc(tuple._2, tuple._3)))
}
@@ -162,7 +176,7 @@ extends Serializable {
updateFunc: (Iterator[(K, Seq[V], S)]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean
- ): StateDStream[K, V, S] = {
+ ): DStream[(K, S)] = {
new StateDStream(stream, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)
}
}
diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
index bab48ff954..de30297c7d 100644
--- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
@@ -7,7 +7,7 @@ import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer
class QueueInputDStream[T: ClassManifest](
- ssc: StreamingContext,
+ @transient ssc: StreamingContext,
val queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala
index f313d8c162..4cb780c006 100644
--- a/streaming/src/main/scala/spark/streaming/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala
@@ -1,10 +1,11 @@
package spark.streaming
import spark.RDD
+import spark.BlockRDD
import spark.Partitioner
import spark.MapPartitionsRDD
import spark.SparkContext._
-
+import spark.storage.StorageLevel
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
parent: DStream[(K, V)],
@@ -22,6 +23,47 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
override def slideTime = parent.slideTime
+ override def getOrCompute(time: Time): Option[RDD[(K, S)]] = {
+ generatedRDDs.get(time) match {
+ case Some(oldRDD) => {
+ if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval) && oldRDD.dependencies.size > 0) {
+ val r = oldRDD
+ val oldRDDBlockIds = oldRDD.splits.map(s => "rdd:" + r.id + ":" + s.index)
+ val checkpointedRDD = new BlockRDD[(K, S)](ssc.sc, oldRDDBlockIds) {
+ override val partitioner = oldRDD.partitioner
+ }
+ generatedRDDs.update(time, checkpointedRDD)
+ logInfo("Updated RDD of time " + time + " with its checkpointed version")
+ Some(checkpointedRDD)
+ } else {
+ Some(oldRDD)
+ }
+ }
+ case None => {
+ if (isTimeValid(time)) {
+ compute(time) match {
+ case Some(newRDD) => {
+ if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) {
+ newRDD.persist(checkpointLevel)
+ logInfo("Persisting " + newRDD + " to " + checkpointLevel + " at time " + time)
+ } else if (storageLevel != StorageLevel.NONE) {
+ newRDD.persist(storageLevel)
+ logInfo("Persisting " + newRDD + " to " + storageLevel + " at time " + time)
+ }
+ generatedRDDs.put(time, newRDD)
+ Some(newRDD)
+ }
+ case None => {
+ None
+ }
+ }
+ } else {
+ None
+ }
+ }
+ }
+ }
+
override def compute(validTime: Time): Option[RDD[(K, S)]] = {
// Try to get the previous state RDD
@@ -29,26 +71,27 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
case Some(prevStateRDD) => { // If previous state RDD exists
- // Define the function for the mapPartition operation on cogrouped RDD;
- // first map the cogrouped tuple to tuples of required type,
- // and then apply the update function
- val func = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => {
- val i = iterator.map(t => {
- (t._1, t._2._1, t._2._2.headOption.getOrElse(null.asInstanceOf[S]))
- })
- updateFunc(i)
- }
-
// Try to get the parent RDD
parent.getOrCompute(validTime) match {
case Some(parentRDD) => { // If parent RDD exists, then compute as usual
+
+ // Define the function for the mapPartition operation on cogrouped RDD;
+ // first map the cogrouped tuple to tuples of required type,
+ // and then apply the update function
+ val updateFuncLocal = updateFunc
+ val mapPartitionFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => {
+ val i = iterator.map(t => {
+ (t._1, t._2._1, t._2._2.headOption.getOrElse(null.asInstanceOf[S]))
+ })
+ updateFuncLocal(i)
+ }
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
- val stateRDD = new SpecialMapPartitionsRDD(cogroupedRDD, func)
- logDebug("Generating state RDD for time " + validTime)
+ val stateRDD = new SpecialMapPartitionsRDD(cogroupedRDD, mapPartitionFunc)
+ //logDebug("Generating state RDD for time " + validTime)
return Some(stateRDD)
}
case None => { // If parent RDD does not exist, then return old state RDD
- logDebug("Generating state RDD for time " + validTime + " (no change)")
+ //logDebug("Generating state RDD for time " + validTime + " (no change)")
return Some(prevStateRDD)
}
}
@@ -56,23 +99,25 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
case None => { // If previous session RDD does not exist (first input data)
- // Define the function for the mapPartition operation on grouped RDD;
- // first map the grouped tuple to tuples of required type,
- // and then apply the update function
- val func = (iterator: Iterator[(K, Seq[V])]) => {
- updateFunc(iterator.map(tuple => (tuple._1, tuple._2, null.asInstanceOf[S])))
- }
-
// Try to get the parent RDD
parent.getOrCompute(validTime) match {
case Some(parentRDD) => { // If parent RDD exists, then compute as usual
+
+ // Define the function for the mapPartition operation on grouped RDD;
+ // first map the grouped tuple to tuples of required type,
+ // and then apply the update function
+ val updateFuncLocal = updateFunc
+ val mapPartitionFunc = (iterator: Iterator[(K, Seq[V])]) => {
+ updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, null.asInstanceOf[S])))
+ }
+
val groupedRDD = parentRDD.groupByKey(partitioner)
- val sessionRDD = new SpecialMapPartitionsRDD(groupedRDD, func)
- logDebug("Generating state RDD for time " + validTime + " (first)")
+ val sessionRDD = new SpecialMapPartitionsRDD(groupedRDD, mapPartitionFunc)
+ //logDebug("Generating state RDD for time " + validTime + " (first)")
return Some(sessionRDD)
}
case None => { // If parent RDD does not exist, then nothing to do!
- logDebug("Not generating state RDD (no previous state, no parent)")
+ //logDebug("Not generating state RDD (no previous state, no parent)")
return None
}
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
new file mode 100644
index 0000000000..be3188c5ed
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
@@ -0,0 +1,86 @@
+package spark.streaming.examples
+
+import spark.util.IntParam
+import spark.storage.StorageLevel
+import spark.streaming._
+import spark.streaming.StreamingContext._
+
+object TopKWordCountRaw {
+ def main(args: Array[String]) {
+ if (args.length != 7) {
+ System.err.println("Usage: TopKWordCountRaw <master> <streams> <host> <port> <batchMs> <chkptMs> <reduces>")
+ System.exit(1)
+ }
+
+ val Array(master, IntParam(streams), host, IntParam(port), IntParam(batchMs),
+ IntParam(chkptMs), IntParam(reduces)) = args
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "TopKWordCountRaw")
+ ssc.setBatchDuration(Milliseconds(batchMs))
+
+ // Make sure some tasks have started on each node
+ ssc.sc.parallelize(1 to 1000, 1000).count()
+ ssc.sc.parallelize(1 to 1000, 1000).count()
+ ssc.sc.parallelize(1 to 1000, 1000).count()
+
+ val rawStreams = (1 to streams).map(_ =>
+ ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
+ val union = new UnifiedDStream(rawStreams)
+
+ import WordCount2_ExtraFunctions._
+
+ val windowedCounts = union.mapPartitions(splitAndCountPartitions)
+ .reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces)
+ windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,
+ Milliseconds(chkptMs))
+ //windowedCounts.print() // TODO: something else?
+
+ def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = {
+ val taken = new Array[(String, Long)](k)
+
+ var i = 0
+ var len = 0
+ var done = false
+ var value: (String, Long) = null
+ var swap: (String, Long) = null
+ var count = 0
+
+ while(data.hasNext) {
+ value = data.next
+ count += 1
+ println("count = " + count)
+ if (len == 0) {
+ taken(0) = value
+ len = 1
+ } else if (len < k || value._2 > taken(len - 1)._2) {
+ if (len < k) {
+ len += 1
+ }
+ taken(len - 1) = value
+ i = len - 1
+ while(i > 0 && taken(i - 1)._2 < taken(i)._2) {
+ swap = taken(i)
+ taken(i) = taken(i-1)
+ taken(i - 1) = swap
+ i -= 1
+ }
+ }
+ }
+ println("Took " + len + " out of " + count + " items")
+ return taken.toIterator
+ }
+
+ val k = 50
+ val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
+ partialTopKWindowedCounts.foreachRDD(rdd => {
+ val collectedCounts = rdd.collect
+ println("Collected " + collectedCounts.size + " items")
+ topK(collectedCounts.toIterator, k).foreach(println)
+ })
+
+// windowedCounts.foreachRDD(r => println("Element count: " + r.count()))
+
+ ssc.start()
+ }
+}
diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
index 030f351080..fc00952afe 100644
--- a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
@@ -107,12 +107,12 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
Seq(("a", 3), ("b", 3), ("c", 3))
)
- val updateStateOp =(s: DStream[String]) => {
+ val updateStateOp = (s: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: RichInt) => {
var newState = 0
if (values != null) newState += values.reduce(_ + _)
if (state != null) newState += state.self
- //println("values = " + values + ", state = " + state + ", " + " new state = " + newState)
+ println("values = " + values + ", state = " + state + ", " + " new state = " + newState)
new RichInt(newState)
}
s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self))