aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-11 10:50:14 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-12 17:21:00 -0800
commitf4d77f8cb8a9eab43bea35e8e6c9bc0d2c2b53a8 (patch)
tree1736b3b4545e8ff5ba5f9ffe66cddabdaefaf449 /examples/src
parent288a878999848adb130041d1e40c14bfc879cec6 (diff)
downloadspark-f4d77f8cb8a9eab43bea35e8e6c9bc0d2c2b53a8.tar.gz
spark-f4d77f8cb8a9eab43bea35e8e6c9bc0d2c2b53a8.tar.bz2
spark-f4d77f8cb8a9eab43bea35e8e6c9bc0d2c2b53a8.zip
Rename DStream.foreach to DStream.foreachRDD
`foreachRDD` makes it clear that the granularity of this operator is per-RDD. As it stands, `foreach` is inconsistent with with `map`, `filter`, and the other DStream operators which get pushed down to individual records within each RDD.
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala2
5 files changed, 8 insertions, 8 deletions
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
index 3d08d86567..99b79c3949 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
@@ -58,7 +58,7 @@ object RawNetworkGrep {
val rawStreams = (1 to numStreams).map(_ =>
ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
val union = ssc.union(rawStreams)
- union.filter(_.contains("the")).count().foreach(r =>
+ union.filter(_.contains("the")).count().foreachRDD(r =>
println("Grep count: " + r.collect().mkString))
ssc.start()
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 80b5a98b14..483c4d3118 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -81,7 +81,7 @@ object TwitterAlgebirdCMS {
val exactTopUsers = users.map(id => (id, 1))
.reduceByKey((a, b) => a + b)
- approxTopUsers.foreach(rdd => {
+ approxTopUsers.foreachRDD(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
val partialTopK = partial.heavyHitters.map(id =>
@@ -96,7 +96,7 @@ object TwitterAlgebirdCMS {
}
})
- exactTopUsers.foreach(rdd => {
+ exactTopUsers.foreachRDD(rdd => {
if (rdd.count() != 0) {
val partialMap = rdd.collect().toMap
val partialTopK = rdd.map(
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index cb2f2c51a0..94c2bf29ac 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -67,7 +67,7 @@ object TwitterAlgebirdHLL {
val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
- approxUsers.foreach(rdd => {
+ approxUsers.foreachRDD(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
globalHll += partial
@@ -76,7 +76,7 @@ object TwitterAlgebirdHLL {
}
})
- exactUsers.foreach(rdd => {
+ exactUsers.foreachRDD(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
userSet ++= partial
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
index 16c10feaba..8a70d4a978 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
@@ -56,13 +56,13 @@ object TwitterPopularTags {
// Print popular hashtags
- topCounts60.foreach(rdd => {
+ topCounts60.foreachRDD(rdd => {
val topList = rdd.take(5)
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
- topCounts10.foreach(rdd => {
+ topCounts10.foreachRDD(rdd => {
val topList = rdd.take(5)
println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
index da6b67bcce..bb44bc3d06 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -91,7 +91,7 @@ object PageViewStream {
case "popularUsersSeen" =>
// Look for users in our existing dataset and print it out if we have a match
pageViews.map(view => (view.userID, 1))
- .foreach((rdd, time) => rdd.join(userList)
+ .foreachRDD((rdd, time) => rdd.join(userList)
.map(_._2._2)
.take(10)
.foreach(u => println("Saw user %s at time %s".format(u, time))))