aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-11 10:50:14 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-12 17:21:00 -0800
commitf4d77f8cb8a9eab43bea35e8e6c9bc0d2c2b53a8 (patch)
tree1736b3b4545e8ff5ba5f9ffe66cddabdaefaf449 /streaming/src
parent288a878999848adb130041d1e40c14bfc879cec6 (diff)
downloadspark-f4d77f8cb8a9eab43bea35e8e6c9bc0d2c2b53a8.tar.gz
spark-f4d77f8cb8a9eab43bea35e8e6c9bc0d2c2b53a8.tar.bz2
spark-f4d77f8cb8a9eab43bea35e8e6c9bc0d2c2b53a8.zip
Rename DStream.foreach to DStream.foreachRDD
`foreachRDD` makes it clear that the granularity of this operator is per-RDD. As it stands, `foreach` is inconsistent with with `map`, `filter`, and the other DStream operators which get pushed down to individual records within each RDD.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStream.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala2
4 files changed, 12 insertions, 12 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index b98f4a5101..93d57db494 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -487,15 +487,15 @@ abstract class DStream[T: ClassTag] (
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
- def foreach(foreachFunc: RDD[T] => Unit) {
- this.foreach((r: RDD[T], t: Time) => foreachFunc(r))
+ def foreachRDD(foreachFunc: RDD[T] => Unit) {
+ this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
}
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
- def foreach(foreachFunc: (RDD[T], Time) => Unit) {
+ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc)))
}
@@ -719,7 +719,7 @@ abstract class DStream[T: ClassTag] (
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsObjectFile(file)
}
- this.foreach(saveFunc)
+ this.foreachRDD(saveFunc)
}
/**
@@ -732,7 +732,7 @@ abstract class DStream[T: ClassTag] (
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
}
- this.foreach(saveFunc)
+ this.foreachRDD(saveFunc)
}
def register() {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index 56dbcbda23..69d80c3711 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@ -582,7 +582,7 @@ extends Serializable {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
}
- self.foreach(saveFunc)
+ self.foreachRDD(saveFunc)
}
/**
@@ -612,7 +612,7 @@ extends Serializable {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
}
- self.foreach(saveFunc)
+ self.foreachRDD(saveFunc)
}
private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 64f38ce1c0..4b5d5ece52 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -244,16 +244,16 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
- def foreach(foreachFunc: JFunction[R, Void]) {
- dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd)))
+ def foreachRDD(foreachFunc: JFunction[R, Void]) {
+ dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))
}
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
- def foreach(foreachFunc: JFunction2[R, Time, Void]) {
- dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
+ def foreachRDD(foreachFunc: JFunction2[R, Time, Void]) {
+ dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
}
/**
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index ee6b433d1f..9a187ce031 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -383,7 +383,7 @@ class BasicOperationsSuite extends TestSuiteBase {
val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
val stream = new TestInputStream[Int](ssc, input, 2)
ssc.registerInputStream(stream)
- stream.foreach(_ => {}) // Dummy output stream
+ stream.foreachRDD(_ => {}) // Dummy output stream
ssc.start()
Thread.sleep(2000)
def getInputFromSlice(fromMillis: Long, toMillis: Long) = {