From e6e20ceee0f7edc161be611ea903c0e1609f9069 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 12 Jan 2014 18:54:03 -0800 Subject: Adding deprecated versions of old code --- .../scala/org/apache/spark/streaming/DStream.scala | 31 ++++++++++++++++------ .../spark/streaming/api/java/JavaDStreamLike.scala | 22 +++++++++++++++ 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index 93d57db494..9432a709d0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -17,19 +17,20 @@ package org.apache.spark.streaming -import StreamingContext._ -import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.scheduler.Job -import org.apache.spark.Logging -import org.apache.spark.rdd.RDD -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.MetadataCleaner +import java.io.{IOException, ObjectInputStream, ObjectOutputStream} +import scala.deprecated import scala.collection.mutable.HashMap import scala.reflect.ClassTag -import java.io.{ObjectInputStream, IOException, ObjectOutputStream} +import StreamingContext._ +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.Job +import org.apache.spark.util.MetadataCleaner /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous @@ -483,6 +484,20 @@ abstract class DStream[T: ClassTag] ( def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] = this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * 'this' DStream will be registered as an output stream and therefore materialized. + */ + @deprecated("use foreachRDD", "0.9.0") + def foreach(foreachFunc: RDD[T] => Unit) = this.foreachRDD(foreachFunc) + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * 'this' DStream will be registered as an output stream and therefore materialized. + */ + @deprecated("use foreachRDD", "0.9.0") + def foreach(foreachFunc: (RDD[T], Time) => Unit) = this.foreachRDD(foreachFunc) + /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 4b5d5ece52..cea4795eb5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -240,6 +240,28 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T new util.ArrayList(dstream.slice(fromTime, toTime).map(wrapRDD(_)).toSeq) } + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * 'this' DStream will be registered as an output stream and therefore materialized. + * + * @deprecated As of release 0.9.0, replaced by foreachRDD + */ + @Deprecated + def foreach(foreachFunc: JFunction[R, Void]) { + foreachRDD(foreachFunc) + } + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * 'this' DStream will be registered as an output stream and therefore materialized. + * + * @deprecated As of release 0.9.0, replaced by foreachRDD + */ + @Deprecated + def foreach(foreachFunc: JFunction2[R, Time, Void]) { + foreachRDD(foreachFunc) + } + /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. -- cgit v1.2.3