diff options
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala | 6 |
1 files changed, 3 insertions, 3 deletions
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index f9e03c607d..f73f9b1823 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -151,10 +151,10 @@ class BasicOperationsSuite extends TestSuiteBase { ) val updateStateOperation = (s: DStream[String]) => { - val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { - Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + Some(values.foldLeft(0)(_ + _) + state.getOrElse(0)) } - s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self)) + s.map(x => (x, 1)).updateStateByKey[Int](updateFunc) } testOperation(inputData, updateStateOperation, outputData, true) |