aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-17 18:43:00 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-17 18:43:00 -0800
commitee0314c3b3c60ab398e4ff1c802af7297fe27587 (patch)
tree6834b7c56431151a4c775a21dfbd9b36e5ef1dfd /streaming
parent70ba994d6d6f9e62269168e6a8a61ffce736a4d2 (diff)
parenteded21925ab549330d0337138fa1f81ae192e3e9 (diff)
downloadspark-ee0314c3b3c60ab398e4ff1c802af7297fe27587.tar.gz
spark-ee0314c3b3c60ab398e4ff1c802af7297fe27587.tar.bz2
spark-ee0314c3b3c60ab398e4ff1c802af7297fe27587.zip
Merge branch 'streaming' into streaming-java-api
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala6
1 files changed, 3 insertions, 3 deletions
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index f9e03c607d..f73f9b1823 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -151,10 +151,10 @@ class BasicOperationsSuite extends TestSuiteBase {
)
val updateStateOperation = (s: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
- Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ Some(values.foldLeft(0)(_ + _) + state.getOrElse(0))
}
- s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self))
+ s.map(x => (x, 1)).updateStateByKey[Int](updateFunc)
}
testOperation(inputData, updateStateOperation, outputData, true)