aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorAaditya Ramesh <aramesh@conviva.com>2016-11-15 13:01:01 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-11-15 13:01:01 -0800
commit6f9e598ccf92f6272bbfb56ac56d3101387131b9 (patch)
treec8af7879458ea0cb0c022e6b11b437ff874dbfcf /streaming/src/main
parent745ab8bc50da89c42b297de9dcb833e5f2074481 (diff)
downloadspark-6f9e598ccf92f6272bbfb56ac56d3101387131b9.tar.gz
spark-6f9e598ccf92f6272bbfb56ac56d3101387131b9.tar.bz2
spark-6f9e598ccf92f6272bbfb56ac56d3101387131b9.zip
[SPARK-13027][STREAMING] Added batch time as a parameter to updateStateByKey
Added RDD batch time as an input parameter to the update function in updateStateByKey. Author: Aaditya Ramesh <aramesh@conviva.com> Closes #11122 from aramesh117/SPARK-13027.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala40
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala28
2 files changed, 48 insertions, 20 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 2f2a6d13dd..ac739411fd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -453,9 +453,12 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
def updateStateByKey[S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
- rememberPartitioner: Boolean
- ): DStream[(K, S)] = ssc.withScope {
- new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)
+ rememberPartitioner: Boolean): DStream[(K, S)] = ssc.withScope {
+ val cleanedFunc = ssc.sc.clean(updateFunc)
+ val newUpdateFunc = (_: Time, it: Iterator[(K, Seq[V], Option[S])]) => {
+ cleanedFunc(it)
+ }
+ new StateDStream(self, newUpdateFunc, partitioner, rememberPartitioner, None)
}
/**
@@ -499,10 +502,33 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean,
- initialRDD: RDD[(K, S)]
- ): DStream[(K, S)] = ssc.withScope {
- new StateDStream(self, ssc.sc.clean(updateFunc), partitioner,
- rememberPartitioner, Some(initialRDD))
+ initialRDD: RDD[(K, S)]): DStream[(K, S)] = ssc.withScope {
+ val cleanedFunc = ssc.sc.clean(updateFunc)
+ val newUpdateFunc = (_: Time, it: Iterator[(K, Seq[V], Option[S])]) => {
+ cleanedFunc(it)
+ }
+ new StateDStream(self, newUpdateFunc, partitioner, rememberPartitioner, Some(initialRDD))
+ }
+
+ /**
+ * Return a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of the key.
+ * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+ * DStream.
+ * @tparam S State type
+ */
+ def updateStateByKey[S: ClassTag](updateFunc: (Time, K, Seq[V], Option[S]) => Option[S],
+ partitioner: Partitioner,
+ rememberPartitioner: Boolean,
+ initialRDD: Option[RDD[(K, S)]] = None): DStream[(K, S)] = ssc.withScope {
+ val cleanedFunc = ssc.sc.clean(updateFunc)
+ val newUpdateFunc = (time: Time, iterator: Iterator[(K, Seq[V], Option[S])]) => {
+ iterator.flatMap(t => cleanedFunc(time, t._1, t._2, t._3).map(s => (t._1, s)))
+ }
+ new StateDStream(self, newUpdateFunc, partitioner, rememberPartitioner, initialRDD)
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 8efb09a8ce..5bf1dabf08 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -27,7 +27,7 @@ import org.apache.spark.streaming.{Duration, Time}
private[streaming]
class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
parent: DStream[(K, V)],
- updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
+ updateFunc: (Time, Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
preservePartitioning: Boolean,
initialRDD: Option[RDD[(K, S)]]
@@ -41,8 +41,10 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
override val mustCheckpoint = true
- private [this] def computeUsingPreviousRDD (
- parentRDD: RDD[(K, V)], prevStateRDD: RDD[(K, S)]) = {
+ private [this] def computeUsingPreviousRDD(
+ batchTime: Time,
+ parentRDD: RDD[(K, V)],
+ prevStateRDD: RDD[(K, S)]) = {
// Define the function for the mapPartition operation on cogrouped RDD;
// first map the cogrouped tuple to tuples of required type,
// and then apply the update function
@@ -53,7 +55,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
val headOption = if (itr.hasNext) Some(itr.next()) else None
(t._1, t._2._1.toSeq, headOption)
}
- updateFuncLocal(i)
+ updateFuncLocal(batchTime, i)
}
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
@@ -68,15 +70,14 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
case Some(prevStateRDD) => // If previous state RDD exists
// Try to get the parent RDD
parent.getOrCompute(validTime) match {
- case Some(parentRDD) => // If parent RDD exists, then compute as usual
- computeUsingPreviousRDD(parentRDD, prevStateRDD)
- case None => // If parent RDD does not exist
-
+ case Some(parentRDD) => // If parent RDD exists, then compute as usual
+ computeUsingPreviousRDD (validTime, parentRDD, prevStateRDD)
+ case None => // If parent RDD does not exist
// Re-apply the update function to the old state RDD
val updateFuncLocal = updateFunc
val finalFunc = (iterator: Iterator[(K, S)]) => {
val i = iterator.map(t => (t._1, Seq[V](), Option(t._2)))
- updateFuncLocal(i)
+ updateFuncLocal(validTime, i)
}
val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)
Some(stateRDD)
@@ -93,15 +94,16 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// and then apply the update function
val updateFuncLocal = updateFunc
val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => {
- updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2.toSeq, None)))
+ updateFuncLocal (validTime,
+ iterator.map (tuple => (tuple._1, tuple._2.toSeq, None)))
}
val groupedRDD = parentRDD.groupByKey(partitioner)
val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)
// logDebug("Generating state RDD for time " + validTime + " (first)")
- Some(sessionRDD)
- case Some(initialStateRDD) =>
- computeUsingPreviousRDD(parentRDD, initialStateRDD)
+ Some (sessionRDD)
+ case Some (initialStateRDD) =>
+ computeUsingPreviousRDD(validTime, parentRDD, initialStateRDD)
}
case None => // If parent RDD does not exist, then nothing to do!
// logDebug("Not generating state RDD (no previous state, no parent)")