aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-12 19:49:36 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-12 19:49:36 -0800
commit28a6b0cdbc75d58e36b1da3dcf257c61e44b0f7a (patch)
tree7b06e10162f1704f5de69ad6ba676eaef8a73078 /streaming/src
parent074f50232fd8d8cf05eb88db0ac6f03f61452810 (diff)
parente6e20ceee0f7edc161be611ea903c0e1609f9069 (diff)
downloadspark-28a6b0cdbc75d58e36b1da3dcf257c61e44b0f7a.tar.gz
spark-28a6b0cdbc75d58e36b1da3dcf257c61e44b0f7a.tar.bz2
spark-28a6b0cdbc75d58e36b1da3dcf257c61e44b0f7a.zip
Merge pull request #398 from pwendell/streaming-api
Rename DStream.foreach to DStream.foreachRDD `foreachRDD` makes it clear that the granularity of this operator is per-RDD. As it stands, `foreach` is inconsistent with with `map`, `filter`, and the other DStream operators which get pushed down to individual records within each RDD.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStream.scala41
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala26
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala2
4 files changed, 55 insertions, 18 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index b98f4a5101..9432a709d0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -17,19 +17,20 @@
package org.apache.spark.streaming
-import StreamingContext._
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.scheduler.Job
-import org.apache.spark.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.MetadataCleaner
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+import scala.deprecated
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag
-import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
+import StreamingContext._
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.Job
+import org.apache.spark.util.MetadataCleaner
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -487,15 +488,29 @@ abstract class DStream[T: ClassTag] (
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
- def foreach(foreachFunc: RDD[T] => Unit) {
- this.foreach((r: RDD[T], t: Time) => foreachFunc(r))
+ @deprecated("use foreachRDD", "0.9.0")
+ def foreach(foreachFunc: RDD[T] => Unit) = this.foreachRDD(foreachFunc)
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ */
+ @deprecated("use foreachRDD", "0.9.0")
+ def foreach(foreachFunc: (RDD[T], Time) => Unit) = this.foreachRDD(foreachFunc)
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreachRDD(foreachFunc: RDD[T] => Unit) {
+ this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
}
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
- def foreach(foreachFunc: (RDD[T], Time) => Unit) {
+ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc)))
}
@@ -719,7 +734,7 @@ abstract class DStream[T: ClassTag] (
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsObjectFile(file)
}
- this.foreach(saveFunc)
+ this.foreachRDD(saveFunc)
}
/**
@@ -732,7 +747,7 @@ abstract class DStream[T: ClassTag] (
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
}
- this.foreach(saveFunc)
+ this.foreachRDD(saveFunc)
}
def register() {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index 56dbcbda23..69d80c3711 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@ -582,7 +582,7 @@ extends Serializable {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
}
- self.foreach(saveFunc)
+ self.foreachRDD(saveFunc)
}
/**
@@ -612,7 +612,7 @@ extends Serializable {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
}
- self.foreach(saveFunc)
+ self.foreachRDD(saveFunc)
}
private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 64f38ce1c0..cea4795eb5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -243,17 +243,39 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
+ *
+ * @deprecated As of release 0.9.0, replaced by foreachRDD
*/
+ @Deprecated
def foreach(foreachFunc: JFunction[R, Void]) {
- dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd)))
+ foreachRDD(foreachFunc)
}
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
+ *
+ * @deprecated As of release 0.9.0, replaced by foreachRDD
*/
+ @Deprecated
def foreach(foreachFunc: JFunction2[R, Time, Void]) {
- dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
+ foreachRDD(foreachFunc)
+ }
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreachRDD(foreachFunc: JFunction[R, Void]) {
+ dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))
+ }
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreachRDD(foreachFunc: JFunction2[R, Time, Void]) {
+ dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
}
/**
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index ee6b433d1f..9a187ce031 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -383,7 +383,7 @@ class BasicOperationsSuite extends TestSuiteBase {
val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
val stream = new TestInputStream[Int](ssc, input, 2)
ssc.registerInputStream(stream)
- stream.foreach(_ => {}) // Dummy output stream
+ stream.foreachRDD(_ => {}) // Dummy output stream
ssc.start()
Thread.sleep(2000)
def getInputFromSlice(fromMillis: Long, toMillis: Long) = {