aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala4
2 files changed, 3 insertions, 3 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index ff6d86c8f8..059ac6c2db 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -452,7 +452,7 @@ class BasicOperationsSuite extends TestSuiteBase {
test("rdd cleanup - updateStateByKey") {
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
- Some(values.foldLeft(0)(_ + _) + state.getOrElse(0))
+ Some(values.sum + state.getOrElse(0))
}
val stateStream = runCleanupTest(
conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3)))
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 10ad3c9e1a..8511390cb1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -70,7 +70,7 @@ class CheckpointSuite extends TestSuiteBase {
val input = (1 to 10).map(_ => Seq("a")).toSeq
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
- Some((values.foldLeft(0)(_ + _) + state.getOrElse(0)))
+ Some((values.sum + state.getOrElse(0)))
}
st.map(x => (x, 1))
.updateStateByKey(updateFunc)
@@ -214,7 +214,7 @@ class CheckpointSuite extends TestSuiteBase {
val output = (1 to 10).map(x => Seq(("a", x))).toSeq
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
- Some((values.foldLeft(0)(_ + _) + state.getOrElse(0)))
+ Some((values.sum + state.getOrElse(0)))
}
st.map(x => (x, 1))
.updateStateByKey(updateFunc)