aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-09-17 14:26:06 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-09-17 14:26:06 -0700
commit6d1fe0268530fe555fa065b8fcfa72d53c931db0 (patch)
tree2a222396dba279a11aa66b346ffe8dbdf40abc99 /streaming
parent86d420478f711e0f4eccc64c238efddf030a9b0f (diff)
parent3cbc72ff1dc660a835c032356ba7b57883c5df5e (diff)
downloadspark-6d1fe0268530fe555fa065b8fcfa72d53c931db0.tar.gz
spark-6d1fe0268530fe555fa065b8fcfa72d53c931db0.tar.bz2
spark-6d1fe0268530fe555fa065b8fcfa72d53c931db0.zip
Merge branch 'dev' of github.com:radlab/spark into dev
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala42
-rw-r--r--streaming/src/main/scala/spark/streaming/QueueInputDStream.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StateDStream.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala19
5 files changed, 46 insertions, 31 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 3973ca1520..7e8098c346 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -284,9 +284,8 @@ extends Logging with Serializable {
}
-abstract class InputDStream[T: ClassManifest] (
- @transient ssc: StreamingContext)
-extends DStream[T](ssc) {
+abstract class InputDStream[T: ClassManifest] (@transient ssc: StreamingContext)
+ extends DStream[T](ssc) {
override def dependencies = List()
@@ -303,9 +302,9 @@ extends DStream[T](ssc) {
*/
class MappedDStream[T: ClassManifest, U: ClassManifest] (
- parent: DStream[T],
- mapFunc: T => U)
-extends DStream[U](parent.ssc) {
+ @transient parent: DStream[T],
+ mapFunc: T => U
+ ) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
@@ -322,9 +321,9 @@ extends DStream[U](parent.ssc) {
*/
class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
- parent: DStream[T],
- flatMapFunc: T => Traversable[U])
-extends DStream[U](parent.ssc) {
+ @transient parent: DStream[T],
+ flatMapFunc: T => Traversable[U]
+ ) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
@@ -340,8 +339,10 @@ extends DStream[U](parent.ssc) {
* TODO
*/
-class FilteredDStream[T: ClassManifest](parent: DStream[T], filterFunc: T => Boolean)
-extends DStream[T](parent.ssc) {
+class FilteredDStream[T: ClassManifest](
+ @transient parent: DStream[T],
+ filterFunc: T => Boolean
+ ) extends DStream[T](parent.ssc) {
override def dependencies = List(parent)
@@ -358,9 +359,9 @@ extends DStream[T](parent.ssc) {
*/
class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
- parent: DStream[T],
- mapPartFunc: Iterator[T] => Iterator[U])
-extends DStream[U](parent.ssc) {
+ @transient parent: DStream[T],
+ mapPartFunc: Iterator[T] => Iterator[U]
+ ) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
@@ -376,7 +377,8 @@ extends DStream[U](parent.ssc) {
* TODO
*/
-class GlommedDStream[T: ClassManifest](parent: DStream[T]) extends DStream[Array[T]](parent.ssc) {
+class GlommedDStream[T: ClassManifest](@transient parent: DStream[T])
+ extends DStream[Array[T]](parent.ssc) {
override def dependencies = List(parent)
@@ -393,7 +395,7 @@ class GlommedDStream[T: ClassManifest](parent: DStream[T]) extends DStream[Array
*/
class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
- parent: DStream[(K,V)],
+ @transient parent: DStream[(K,V)],
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
@@ -418,7 +420,7 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
* TODO
*/
-class UnifiedDStream[T: ClassManifest](parents: Array[DStream[T]])
+class UnifiedDStream[T: ClassManifest](@transient parents: Array[DStream[T]])
extends DStream[T](parents(0).ssc) {
if (parents.length == 0) {
@@ -457,7 +459,7 @@ class UnifiedDStream[T: ClassManifest](parents: Array[DStream[T]])
*/
class PerElementForEachDStream[T: ClassManifest] (
- parent: DStream[T],
+ @transient parent: DStream[T],
foreachFunc: T => Unit
) extends DStream[Unit](parent.ssc) {
@@ -488,7 +490,7 @@ class PerElementForEachDStream[T: ClassManifest] (
*/
class PerRDDForEachDStream[T: ClassManifest] (
- parent: DStream[T],
+ @transient parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit
) extends DStream[Unit](parent.ssc) {
@@ -516,7 +518,7 @@ class PerRDDForEachDStream[T: ClassManifest] (
*/
class TransformedDStream[T: ClassManifest, U: ClassManifest] (
- parent: DStream[T],
+ @transient parent: DStream[T],
transformFunc: (RDD[T], Time) => RDD[U]
) extends DStream[U](parent.ssc) {
diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
index de30297c7d..b794159b09 100644
--- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
@@ -25,7 +25,11 @@ class QueueInputDStream[T: ClassManifest](
buffer ++= queue
}
if (buffer.size > 0) {
- Some(new UnionRDD(ssc.sc, buffer.toSeq))
+ if (oneAtATime) {
+ Some(buffer.first)
+ } else {
+ Some(new UnionRDD(ssc.sc, buffer.toSeq))
+ }
} else if (defaultRDD != null) {
Some(defaultRDD)
} else {
@@ -33,4 +37,4 @@ class QueueInputDStream[T: ClassManifest](
}
}
-} \ No newline at end of file
+}
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 00136685d5..d2e907378d 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -26,7 +26,6 @@ extends Logging {
val timer = new RecurringTimer(clock, ssc.batchDuration, generateRDDs(_))
def start() {
-
val zeroTime = Time(timer.start())
outputStreams.foreach(_.initialize(zeroTime))
inputStreams.par.foreach(_.start())
@@ -40,6 +39,7 @@ extends Logging {
}
def generateRDDs (time: Time) {
+ SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
logInfo("Generating RDDs for time " + time)
outputStreams.foreach(outputStream => {
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala
index 4cb780c006..c40f70c91d 100644
--- a/streaming/src/main/scala/spark/streaming/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala
@@ -8,7 +8,7 @@ import spark.SparkContext._
import spark.storage.StorageLevel
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
- parent: DStream[(K, V)],
+ @transient parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], S)]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean
@@ -26,14 +26,14 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
override def getOrCompute(time: Time): Option[RDD[(K, S)]] = {
generatedRDDs.get(time) match {
case Some(oldRDD) => {
- if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval) && oldRDD.dependencies.size > 0) {
+ if (checkpointInterval != null && time > zeroTime && (time - zeroTime).isMultipleOf(checkpointInterval) && oldRDD.dependencies.size > 0) {
val r = oldRDD
val oldRDDBlockIds = oldRDD.splits.map(s => "rdd:" + r.id + ":" + s.index)
val checkpointedRDD = new BlockRDD[(K, S)](ssc.sc, oldRDDBlockIds) {
override val partitioner = oldRDD.partitioner
}
generatedRDDs.update(time, checkpointedRDD)
- logInfo("Updated RDD of time " + time + " with its checkpointed version")
+ logInfo("Checkpointed RDD " + oldRDD.id + " of time " + time + " with its new RDD " + checkpointedRDD.id)
Some(checkpointedRDD)
} else {
Some(oldRDD)
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
index be3188c5ed..3ba07d0448 100644
--- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
@@ -1,11 +1,24 @@
package spark.streaming.examples
import spark.util.IntParam
+import spark.SparkContext
+import spark.SparkContext._
import spark.storage.StorageLevel
import spark.streaming._
import spark.streaming.StreamingContext._
+import WordCount2_ExtraFunctions._
+
object TopKWordCountRaw {
+ def moreWarmup(sc: SparkContext) {
+ (0 until 40).foreach {i =>
+ sc.parallelize(1 to 20000000, 1000)
+ .map(_ % 1331).map(_.toString)
+ .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
+ .collect()
+ }
+ }
+
def main(args: Array[String]) {
if (args.length != 7) {
System.err.println("Usage: TopKWordCountRaw <master> <streams> <host> <port> <batchMs> <chkptMs> <reduces>")
@@ -20,16 +33,12 @@ object TopKWordCountRaw {
ssc.setBatchDuration(Milliseconds(batchMs))
// Make sure some tasks have started on each node
- ssc.sc.parallelize(1 to 1000, 1000).count()
- ssc.sc.parallelize(1 to 1000, 1000).count()
- ssc.sc.parallelize(1 to 1000, 1000).count()
+ moreWarmup(ssc.sc)
val rawStreams = (1 to streams).map(_ =>
ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
val union = new UnifiedDStream(rawStreams)
- import WordCount2_ExtraFunctions._
-
val windowedCounts = union.mapPartitions(splitAndCountPartitions)
.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces)
windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,