aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-09-14 07:00:30 +0000
committerTathagata Das <tathagata.das1565@gmail.com>2012-09-14 07:00:30 +0000
commit3cbc72ff1dc660a835c032356ba7b57883c5df5e (patch)
tree3b50db2285131524434a8794b4e6d5148ffbe80f /streaming
parentc63a6064584ea19d62e0abcbd3886d7b1e429ea1 (diff)
downloadspark-3cbc72ff1dc660a835c032356ba7b57883c5df5e.tar.gz
spark-3cbc72ff1dc660a835c032356ba7b57883c5df5e.tar.bz2
spark-3cbc72ff1dc660a835c032356ba7b57883c5df5e.zip
Minor tweaks
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala42
-rw-r--r--streaming/src/main/scala/spark/streaming/StateDStream.scala4
2 files changed, 24 insertions, 22 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 3973ca1520..7e8098c346 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -284,9 +284,8 @@ extends Logging with Serializable {
}
-abstract class InputDStream[T: ClassManifest] (
- @transient ssc: StreamingContext)
-extends DStream[T](ssc) {
+abstract class InputDStream[T: ClassManifest] (@transient ssc: StreamingContext)
+ extends DStream[T](ssc) {
override def dependencies = List()
@@ -303,9 +302,9 @@ extends DStream[T](ssc) {
*/
class MappedDStream[T: ClassManifest, U: ClassManifest] (
- parent: DStream[T],
- mapFunc: T => U)
-extends DStream[U](parent.ssc) {
+ @transient parent: DStream[T],
+ mapFunc: T => U
+ ) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
@@ -322,9 +321,9 @@ extends DStream[U](parent.ssc) {
*/
class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
- parent: DStream[T],
- flatMapFunc: T => Traversable[U])
-extends DStream[U](parent.ssc) {
+ @transient parent: DStream[T],
+ flatMapFunc: T => Traversable[U]
+ ) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
@@ -340,8 +339,10 @@ extends DStream[U](parent.ssc) {
* TODO
*/
-class FilteredDStream[T: ClassManifest](parent: DStream[T], filterFunc: T => Boolean)
-extends DStream[T](parent.ssc) {
+class FilteredDStream[T: ClassManifest](
+ @transient parent: DStream[T],
+ filterFunc: T => Boolean
+ ) extends DStream[T](parent.ssc) {
override def dependencies = List(parent)
@@ -358,9 +359,9 @@ extends DStream[T](parent.ssc) {
*/
class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
- parent: DStream[T],
- mapPartFunc: Iterator[T] => Iterator[U])
-extends DStream[U](parent.ssc) {
+ @transient parent: DStream[T],
+ mapPartFunc: Iterator[T] => Iterator[U]
+ ) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
@@ -376,7 +377,8 @@ extends DStream[U](parent.ssc) {
* TODO
*/
-class GlommedDStream[T: ClassManifest](parent: DStream[T]) extends DStream[Array[T]](parent.ssc) {
+class GlommedDStream[T: ClassManifest](@transient parent: DStream[T])
+ extends DStream[Array[T]](parent.ssc) {
override def dependencies = List(parent)
@@ -393,7 +395,7 @@ class GlommedDStream[T: ClassManifest](parent: DStream[T]) extends DStream[Array
*/
class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
- parent: DStream[(K,V)],
+ @transient parent: DStream[(K,V)],
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
@@ -418,7 +420,7 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
* TODO
*/
-class UnifiedDStream[T: ClassManifest](parents: Array[DStream[T]])
+class UnifiedDStream[T: ClassManifest](@transient parents: Array[DStream[T]])
extends DStream[T](parents(0).ssc) {
if (parents.length == 0) {
@@ -457,7 +459,7 @@ class UnifiedDStream[T: ClassManifest](parents: Array[DStream[T]])
*/
class PerElementForEachDStream[T: ClassManifest] (
- parent: DStream[T],
+ @transient parent: DStream[T],
foreachFunc: T => Unit
) extends DStream[Unit](parent.ssc) {
@@ -488,7 +490,7 @@ class PerElementForEachDStream[T: ClassManifest] (
*/
class PerRDDForEachDStream[T: ClassManifest] (
- parent: DStream[T],
+ @transient parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit
) extends DStream[Unit](parent.ssc) {
@@ -516,7 +518,7 @@ class PerRDDForEachDStream[T: ClassManifest] (
*/
class TransformedDStream[T: ClassManifest, U: ClassManifest] (
- parent: DStream[T],
+ @transient parent: DStream[T],
transformFunc: (RDD[T], Time) => RDD[U]
) extends DStream[U](parent.ssc) {
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala
index 72b71d5fab..c40f70c91d 100644
--- a/streaming/src/main/scala/spark/streaming/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala
@@ -26,14 +26,14 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
override def getOrCompute(time: Time): Option[RDD[(K, S)]] = {
generatedRDDs.get(time) match {
case Some(oldRDD) => {
- if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval) && oldRDD.dependencies.size > 0) {
+ if (checkpointInterval != null && time > zeroTime && (time - zeroTime).isMultipleOf(checkpointInterval) && oldRDD.dependencies.size > 0) {
val r = oldRDD
val oldRDDBlockIds = oldRDD.splits.map(s => "rdd:" + r.id + ":" + s.index)
val checkpointedRDD = new BlockRDD[(K, S)](ssc.sc, oldRDDBlockIds) {
override val partitioner = oldRDD.partitioner
}
generatedRDDs.update(time, checkpointedRDD)
- logInfo("Updated RDD of time " + time + " with its checkpointed version")
+ logInfo("Checkpointed RDD " + oldRDD.id + " of time " + time + " with its new RDD " + checkpointedRDD.id)
Some(checkpointedRDD)
} else {
Some(oldRDD)