diff options
author | seanm <sean.mcnamara@webtrends.com> | 2013-01-15 20:21:21 -0700 |
---|---|---|
committer | seanm <sean.mcnamara@webtrends.com> | 2013-01-15 20:29:01 -0700 |
commit | a4639400eae9bea68343f54448cf485271863413 (patch) | |
tree | cf8e92ff2c3c02d9b02ef0a4567f439ea7c7aeb8 /streaming | |
parent | c203a292963a018bd9b84f02bb522fd191a110af (diff) | |
parent | eded21925ab549330d0337138fa1f81ae192e3e9 (diff) | |
download | spark-a4639400eae9bea68343f54448cf485271863413.tar.gz spark-a4639400eae9bea68343f54448cf485271863413.tar.bz2 spark-a4639400eae9bea68343f54448cf485271863413.zip |
Merge branch 'streaming' into sm-updateStateByKey
Diffstat (limited to 'streaming')
3 files changed, 8 insertions, 8 deletions
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index 3952457339..3dbef69868 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -377,7 +377,7 @@ extends Serializable { * corresponding state key-value pair will be eliminated. * @tparam S State type */ - def updateStateByKey[S <: AnyRef : ClassManifest]( + def updateStateByKey[S: ClassManifest]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] = { updateStateByKey(updateFunc, defaultPartitioner()) @@ -392,7 +392,7 @@ extends Serializable { * @param numPartitions Number of partitions of each RDD in the new DStream. * @tparam S State type */ - def updateStateByKey[S <: AnyRef : ClassManifest]( + def updateStateByKey[S: ClassManifest]( updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int ): DStream[(K, S)] = { @@ -408,7 +408,7 @@ extends Serializable { * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. * @tparam S State type */ - def updateStateByKey[S <: AnyRef : ClassManifest]( + def updateStateByKey[S: ClassManifest]( updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner ): DStream[(K, S)] = { @@ -431,7 +431,7 @@ extends Serializable { * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. * @tparam S State type */ - def updateStateByKey[S <: AnyRef : ClassManifest]( + def updateStateByKey[S: ClassManifest]( updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala index 4e57968eed..db62955036 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala @@ -7,7 +7,7 @@ import spark.storage.StorageLevel import spark.streaming.{Duration, Time, DStream} private[streaming] -class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest]( +class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest]( parent: DStream[(K, V)], updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index f9e03c607d..f73f9b1823 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -151,10 +151,10 @@ class BasicOperationsSuite extends TestSuiteBase { ) val updateStateOperation = (s: DStream[String]) => { - val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { - Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + Some(values.foldLeft(0)(_ + _) + state.getOrElse(0)) } - s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self)) + s.map(x => (x, 1)).updateStateByKey[Int](updateFunc) } testOperation(inputData, updateStateOperation, outputData, true) |