From 3cbc72ff1dc660a835c032356ba7b57883c5df5e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 14 Sep 2012 07:00:30 +0000 Subject: Minor tweaks --- .../src/main/scala/spark/streaming/DStream.scala | 42 +++++++++++----------- .../main/scala/spark/streaming/StateDStream.scala | 4 +-- 2 files changed, 24 insertions(+), 22 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 3973ca1520..7e8098c346 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -284,9 +284,8 @@ extends Logging with Serializable { } -abstract class InputDStream[T: ClassManifest] ( - @transient ssc: StreamingContext) -extends DStream[T](ssc) { +abstract class InputDStream[T: ClassManifest] (@transient ssc: StreamingContext) + extends DStream[T](ssc) { override def dependencies = List() @@ -303,9 +302,9 @@ extends DStream[T](ssc) { */ class MappedDStream[T: ClassManifest, U: ClassManifest] ( - parent: DStream[T], - mapFunc: T => U) -extends DStream[U](parent.ssc) { + @transient parent: DStream[T], + mapFunc: T => U + ) extends DStream[U](parent.ssc) { override def dependencies = List(parent) @@ -322,9 +321,9 @@ extends DStream[U](parent.ssc) { */ class FlatMappedDStream[T: ClassManifest, U: ClassManifest]( - parent: DStream[T], - flatMapFunc: T => Traversable[U]) -extends DStream[U](parent.ssc) { + @transient parent: DStream[T], + flatMapFunc: T => Traversable[U] + ) extends DStream[U](parent.ssc) { override def dependencies = List(parent) @@ -340,8 +339,10 @@ extends DStream[U](parent.ssc) { * TODO */ -class FilteredDStream[T: ClassManifest](parent: DStream[T], filterFunc: T => Boolean) -extends DStream[T](parent.ssc) { +class FilteredDStream[T: ClassManifest]( + @transient parent: DStream[T], + filterFunc: T => Boolean + ) extends DStream[T](parent.ssc) { override def dependencies = List(parent) @@ -358,9 +359,9 @@ extends DStream[T](parent.ssc) { */ class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( - parent: DStream[T], - mapPartFunc: Iterator[T] => Iterator[U]) -extends DStream[U](parent.ssc) { + @transient parent: DStream[T], + mapPartFunc: Iterator[T] => Iterator[U] + ) extends DStream[U](parent.ssc) { override def dependencies = List(parent) @@ -376,7 +377,8 @@ extends DStream[U](parent.ssc) { * TODO */ -class GlommedDStream[T: ClassManifest](parent: DStream[T]) extends DStream[Array[T]](parent.ssc) { +class GlommedDStream[T: ClassManifest](@transient parent: DStream[T]) + extends DStream[Array[T]](parent.ssc) { override def dependencies = List(parent) @@ -393,7 +395,7 @@ class GlommedDStream[T: ClassManifest](parent: DStream[T]) extends DStream[Array */ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( - parent: DStream[(K,V)], + @transient parent: DStream[(K,V)], createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, @@ -418,7 +420,7 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( * TODO */ -class UnifiedDStream[T: ClassManifest](parents: Array[DStream[T]]) +class UnifiedDStream[T: ClassManifest](@transient parents: Array[DStream[T]]) extends DStream[T](parents(0).ssc) { if (parents.length == 0) { @@ -457,7 +459,7 @@ class UnifiedDStream[T: ClassManifest](parents: Array[DStream[T]]) */ class PerElementForEachDStream[T: ClassManifest] ( - parent: DStream[T], + @transient parent: DStream[T], foreachFunc: T => Unit ) extends DStream[Unit](parent.ssc) { @@ -488,7 +490,7 @@ class PerElementForEachDStream[T: ClassManifest] ( */ class PerRDDForEachDStream[T: ClassManifest] ( - parent: DStream[T], + @transient parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit ) extends DStream[Unit](parent.ssc) { @@ -516,7 +518,7 @@ class PerRDDForEachDStream[T: ClassManifest] ( */ class TransformedDStream[T: ClassManifest, U: ClassManifest] ( - parent: DStream[T], + @transient parent: DStream[T], transformFunc: (RDD[T], Time) => RDD[U] ) extends DStream[U](parent.ssc) { diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala index 72b71d5fab..c40f70c91d 100644 --- a/streaming/src/main/scala/spark/streaming/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala @@ -26,14 +26,14 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife override def getOrCompute(time: Time): Option[RDD[(K, S)]] = { generatedRDDs.get(time) match { case Some(oldRDD) => { - if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval) && oldRDD.dependencies.size > 0) { + if (checkpointInterval != null && time > zeroTime && (time - zeroTime).isMultipleOf(checkpointInterval) && oldRDD.dependencies.size > 0) { val r = oldRDD val oldRDDBlockIds = oldRDD.splits.map(s => "rdd:" + r.id + ":" + s.index) val checkpointedRDD = new BlockRDD[(K, S)](ssc.sc, oldRDDBlockIds) { override val partitioner = oldRDD.partitioner } generatedRDDs.update(time, checkpointedRDD) - logInfo("Updated RDD of time " + time + " with its checkpointed version") + logInfo("Checkpointed RDD " + oldRDD.id + " of time " + time + " with its new RDD " + checkpointedRDD.id) Some(checkpointedRDD) } else { Some(oldRDD) -- cgit v1.2.3