aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorYadong Qi <qiyadong2010@gmail.com>2014-08-28 14:08:48 -0700
committerReynold Xin <rxin@apache.org>2014-08-28 14:08:48 -0700
commit39012452daa0746fe5d218493b85f9b5f96190c1 (patch)
tree66794dbf4b13bbea3427abae79c09afa7a137a3e /streaming
parentbe53c54b5c685e1d04d49bd554e05029a5a106e1 (diff)
downloadspark-39012452daa0746fe5d218493b85f9b5f96190c1.tar.gz
spark-39012452daa0746fe5d218493b85f9b5f96190c1.tar.bz2
spark-39012452daa0746fe5d218493b85f9b5f96190c1.zip
[SPARK-3285] [examples] Using values.sum is easier to understand than using values.foldLeft(0)(_ + _)
def sum[B >: A](implicit num: Numeric[B]): B = foldLeft(num.zero)(num.plus) Using values.sum is easier to understand than using values.foldLeft(0)(_ + _), so we'd better use values.sum instead of values.foldLeft(0)(_ + _) Author: Yadong Qi <qiyadong2010@gmail.com> Closes #2182 from watermen/bug-fix3 and squashes the following commits: 17be9fb [Yadong Qi] Update CheckpointSuite.scala 714bda5 [Yadong Qi] Update BasicOperationsSuite.scala 57e704c [Yadong Qi] Update StatefulNetworkWordCount.scala
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala4
2 files changed, 3 insertions, 3 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index ff6d86c8f8..059ac6c2db 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -452,7 +452,7 @@ class BasicOperationsSuite extends TestSuiteBase {
test("rdd cleanup - updateStateByKey") {
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
- Some(values.foldLeft(0)(_ + _) + state.getOrElse(0))
+ Some(values.sum + state.getOrElse(0))
}
val stateStream = runCleanupTest(
conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3)))
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 10ad3c9e1a..8511390cb1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -70,7 +70,7 @@ class CheckpointSuite extends TestSuiteBase {
val input = (1 to 10).map(_ => Seq("a")).toSeq
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
- Some((values.foldLeft(0)(_ + _) + state.getOrElse(0)))
+ Some((values.sum + state.getOrElse(0)))
}
st.map(x => (x, 1))
.updateStateByKey(updateFunc)
@@ -214,7 +214,7 @@ class CheckpointSuite extends TestSuiteBase {
val output = (1 to 10).map(x => Seq(("a", x))).toSeq
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
- Some((values.foldLeft(0)(_ + _) + state.getOrElse(0)))
+ Some((values.sum + state.getOrElse(0)))
}
st.map(x => (x, 1))
.updateStateByKey(updateFunc)