From 1638fcb0dce296da22ffc90127d5148a8fab745e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 14 Jan 2013 17:18:39 -0800 Subject: Fixed updateStateByKey to work with primitive types. --- .../src/main/scala/spark/streaming/PairDStreamFunctions.scala | 8 ++++---- .../src/main/scala/spark/streaming/dstream/StateDStream.scala | 2 +- .../src/test/scala/spark/streaming/BasicOperationsSuite.scala | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index 3952457339..3dbef69868 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -377,7 +377,7 @@ extends Serializable { * corresponding state key-value pair will be eliminated. * @tparam S State type */ - def updateStateByKey[S <: AnyRef : ClassManifest]( + def updateStateByKey[S: ClassManifest]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] = { updateStateByKey(updateFunc, defaultPartitioner()) @@ -392,7 +392,7 @@ extends Serializable { * @param numPartitions Number of partitions of each RDD in the new DStream. * @tparam S State type */ - def updateStateByKey[S <: AnyRef : ClassManifest]( + def updateStateByKey[S: ClassManifest]( updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int ): DStream[(K, S)] = { @@ -408,7 +408,7 @@ extends Serializable { * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. * @tparam S State type */ - def updateStateByKey[S <: AnyRef : ClassManifest]( + def updateStateByKey[S: ClassManifest]( updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner ): DStream[(K, S)] = { @@ -431,7 +431,7 @@ extends Serializable { * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. * @tparam S State type */ - def updateStateByKey[S <: AnyRef : ClassManifest]( + def updateStateByKey[S: ClassManifest]( updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala index a1ec2f5454..b4506c74aa 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala @@ -7,7 +7,7 @@ import spark.storage.StorageLevel import spark.streaming.{Duration, Time, DStream} private[streaming] -class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest]( +class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest]( parent: DStream[(K, V)], updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index f9e03c607d..f73f9b1823 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -151,10 +151,10 @@ class BasicOperationsSuite extends TestSuiteBase { ) val updateStateOperation = (s: DStream[String]) => { - val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { - Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + Some(values.foldLeft(0)(_ + _) + state.getOrElse(0)) } - s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self)) + s.map(x => (x, 1)).updateStateByKey[Int](updateFunc) } testOperation(inputData, updateStateOperation, outputData, true) -- cgit v1.2.3