aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-13 02:43:03 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-13 02:43:03 -0800
commitc3ccd14cf8d7c5a867992758b74922890408541e (patch)
tree95124a3197d7b56a73fb2f2bdda1d20f4ea9f8fb /streaming
parent8a25d530edfa3abcdbe2effcd6bfbe484ac40acb (diff)
downloadspark-c3ccd14cf8d7c5a867992758b74922890408541e.tar.gz
spark-c3ccd14cf8d7c5a867992758b74922890408541e.tar.bz2
spark-c3ccd14cf8d7c5a867992758b74922890408541e.zip
Replaced StateRDD in StateDStream with MapPartitionsRDD.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/StateDStream.scala15
1 files changed, 3 insertions, 12 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala
index cb261808f5..b7e4c1c30c 100644
--- a/streaming/src/main/scala/spark/streaming/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala
@@ -7,20 +7,11 @@ import spark.rdd.MapPartitionsRDD
import spark.SparkContext._
import spark.storage.StorageLevel
-
-class StateRDD[U: ClassManifest, T: ClassManifest](
- prev: RDD[T],
- f: Iterator[T] => Iterator[U],
- rememberPartitioner: Boolean
- ) extends MapPartitionsRDD[U, T](prev, f) {
- override val partitioner = if (rememberPartitioner) prev.partitioner else None
-}
-
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
- rememberPartitioner: Boolean
+ preservePartitioning: Boolean
) extends DStream[(K, S)](parent.ssc) {
super.persist(StorageLevel.MEMORY_ONLY_SER)
@@ -53,7 +44,7 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
updateFuncLocal(i)
}
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
- val stateRDD = new StateRDD(cogroupedRDD, finalFunc, rememberPartitioner)
+ val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
//logDebug("Generating state RDD for time " + validTime)
return Some(stateRDD)
}
@@ -78,7 +69,7 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
}
val groupedRDD = parentRDD.groupByKey(partitioner)
- val sessionRDD = new StateRDD(groupedRDD, finalFunc, rememberPartitioner)
+ val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)
//logDebug("Generating state RDD for time " + validTime + " (first)")
return Some(sessionRDD)
}