aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuciano Resende <lresende@apache.org>2016-02-21 16:27:56 +0000
committerSean Owen <sowen@cloudera.com>2016-02-21 16:27:56 +0000
commit1a340da8d7590d831b040c74f5a6eb560e14d585 (patch)
treea8b02a674783f2ea82fc666ebdab1bd230e169a3
parentd9efe63ecdc60a9955f1924de0e8a00bcb6a559d (diff)
downloadspark-1a340da8d7590d831b040c74f5a6eb560e14d585.tar.gz
spark-1a340da8d7590d831b040c74f5a6eb560e14d585.tar.bz2
spark-1a340da8d7590d831b040c74f5a6eb560e14d585.zip
[SPARK-13248][STREAMING] Remove deprecated Streaming APIs.
Remove deprecated Streaming APIs and adjust sample applications. Author: Luciano Resende <lresende@apache.org> Closes #11139 from lresende/streaming-deprecated-apis.
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java18
-rw-r--r--project/MimaExcludes.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala38
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala64
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala96
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala22
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala7
8 files changed, 22 insertions, 239 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index bc8cbcdef7..f9929fc86d 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -26,17 +26,14 @@ import java.util.List;
import java.util.regex.Pattern;
import scala.Tuple2;
+
import com.google.common.io.Files;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.api.java.function.VoidFunction2;
+import org.apache.spark.api.java.function.*;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
@@ -44,7 +41,6 @@ import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
/**
* Use this singleton to get or register a Broadcast variable.
@@ -204,13 +200,17 @@ public final class JavaRecoverableNetworkWordCount {
final int port = Integer.parseInt(args[1]);
final String checkpointDirectory = args[2];
final String outputPath = args[3];
- JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
+
+ // Function to create JavaStreamingContext without any output operations
+ // (used to detect the new context)
+ Function0<JavaStreamingContext> createContextFunc = new Function0<JavaStreamingContext>() {
@Override
- public JavaStreamingContext create() {
+ public JavaStreamingContext call() {
return createContext(ip, port, checkpointDirectory, outputPath);
}
};
- JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
+
+ JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);
ssc.start();
ssc.awaitTermination();
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 65375a3ea7..8f31a81ed0 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -220,6 +220,19 @@ object MimaExcludes {
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.streaming.zeromq.ZeroMQReceiver"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.streaming.receiver.ActorReceiver$Supervisor")
) ++ Seq(
+ // SPARK-12348 Remove deprecated Streaming APIs.
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions$default$4"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.awaitTermination"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.networkStream"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.streaming.api.java.JavaStreamingContextFactory"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.awaitTermination"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.sc"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaDStreamLike.foreachRDD"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaDStreamLike.foreach"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate")
+ ) ++ Seq(
// SPARK-12847 Remove StreamingListenerBus and post all Streaming events to the same thread as Spark events
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index a1b25c9f7d..25e61578a1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -271,20 +271,6 @@ class StreamingContext private[streaming] (
/**
* Create an input stream with any arbitrary user implemented receiver.
- * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- * @param receiver Custom implementation of Receiver
- *
- * @deprecated As of 1.0.0 replaced by `receiverStream`.
- */
- @deprecated("Use receiverStream", "1.0.0")
- def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
- withNamedScope("network stream") {
- receiverStream(receiver)
- }
- }
-
- /**
- * Create an input stream with any arbitrary user implemented receiver.
* Find more details at http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
@@ -624,18 +610,6 @@ class StreamingContext private[streaming] (
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
- * @param timeout time to wait in milliseconds
- *
- * @deprecated As of 1.3.0, replaced by `awaitTerminationOrTimeout(Long)`.
- */
- @deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0")
- def awaitTermination(timeout: Long) {
- waiter.waitForStopOrError(timeout)
- }
-
- /**
- * Wait for the execution to stop. Any exceptions that occurs during the execution
- * will be thrown in this thread.
*
* @param timeout time to wait in milliseconds
* @return `true` if it's stopped; or throw the reported error during the execution; or `false`
@@ -778,18 +752,6 @@ object StreamingContext extends Logging {
}
/**
- * @deprecated As of 1.3.0, replaced by implicit functions in the DStream companion object.
- * This is kept here only for backward compatibility.
- */
- @deprecated("Replaced by implicit functions in the DStream companion object. This is " +
- "kept here only for backward compatibility.", "1.3.0")
- def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
- (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
- : PairDStreamFunctions[K, V] = {
- DStream.toPairDStreamFunctions(stream)(kt, vt, ord)
- }
-
- /**
* :: Experimental ::
*
* Either return the "active" StreamingContext (that is, started but not stopped), or create a
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 9931a46d33..65aab2fac1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -220,26 +220,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @deprecated As this API is not Java compatible.
- */
- @deprecated("Use Java-compatible version of reduceByWindow", "1.3.0")
- def reduceByWindow(
- reduceFunc: (T, T) => T,
- windowDuration: Duration,
- slideDuration: Duration
- ): DStream[T] = {
- dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
- }
-
- /**
- * Return a new DStream in which each RDD has a single element generated by reducing all
- * elements in a sliding window over this DStream.
- * @param reduceFunc associative and commutative reduce function
- * @param windowDuration width of the window; must be a multiple of this DStream's
- * batching interval
- * @param slideDuration sliding interval of the window (i.e., the interval after which
- * the new DStream will generate RDDs); must be a multiple of this
- * DStream's batching interval
*/
def reduceByWindow(
reduceFunc: JFunction2[T, T, T],
@@ -284,50 +264,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
- *
- * @deprecated As of release 0.9.0, replaced by foreachRDD
- */
- @deprecated("Use foreachRDD", "0.9.0")
- def foreach(foreachFunc: JFunction[R, Void]) {
- foreachRDD(foreachFunc)
- }
-
- /**
- * Apply a function to each RDD in this DStream. This is an output operator, so
- * 'this' DStream will be registered as an output stream and therefore materialized.
- *
- * @deprecated As of release 0.9.0, replaced by foreachRDD
- */
- @deprecated("Use foreachRDD", "0.9.0")
- def foreach(foreachFunc: JFunction2[R, Time, Void]) {
- foreachRDD(foreachFunc)
- }
-
- /**
- * Apply a function to each RDD in this DStream. This is an output operator, so
- * 'this' DStream will be registered as an output stream and therefore materialized.
- *
- * @deprecated As of release 1.6.0, replaced by foreachRDD(JVoidFunction)
- */
- @deprecated("Use foreachRDD(foreachFunc: JVoidFunction[R])", "1.6.0")
- def foreachRDD(foreachFunc: JFunction[R, Void]) {
- dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))
- }
-
- /**
- * Apply a function to each RDD in this DStream. This is an output operator, so
- * 'this' DStream will be registered as an output stream and therefore materialized.
- *
- * @deprecated As of release 1.6.0, replaced by foreachRDD(JVoidFunction2)
- */
- @deprecated("Use foreachRDD(foreachFunc: JVoidFunction2[R, Time])", "1.6.0")
- def foreachRDD(foreachFunc: JFunction2[R, Time, Void]) {
- dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
- }
-
- /**
- * Apply a function to each RDD in this DStream. This is an output operator, so
- * 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreachRDD(foreachFunc: JVoidFunction[R]) {
dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 7a25ce54b6..f8f1336693 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -155,12 +155,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
val sparkContext = new JavaSparkContext(ssc.sc)
/**
- * @deprecated As of 0.9.0, replaced by `sparkContext`
- */
- @deprecated("use sparkContext", "0.9.0")
- val sc: JavaSparkContext = sparkContext
-
- /**
* Create an input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
* lines.
@@ -571,17 +565,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
- * @param timeout time to wait in milliseconds
- * @deprecated As of 1.3.0, replaced by `awaitTerminationOrTimeout(Long)`.
- */
- @deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0")
- def awaitTermination(timeout: Long): Unit = {
- ssc.awaitTermination(timeout)
- }
-
- /**
- * Wait for the execution to stop. Any exceptions that occurs during the execution
- * will be thrown in this thread.
*
* @param timeout time to wait in milliseconds
* @return `true` if it's stopped; or throw the reported error during the execution; or `false`
@@ -630,78 +613,6 @@ object JavaStreamingContext {
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
- * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext
- * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory.
- */
- @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0")
- def getOrCreate(
- checkpointPath: String,
- factory: JavaStreamingContextFactory
- ): JavaStreamingContext = {
- val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
- factory.create.ssc
- })
- new JavaStreamingContext(ssc)
- }
-
- /**
- * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
- * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
- * recreated from the checkpoint data. If the data does not exist, then the provided factory
- * will be used to create a JavaStreamingContext.
- *
- * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
- * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext
- * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible
- * file system
- * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory.
- */
- @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0")
- def getOrCreate(
- checkpointPath: String,
- hadoopConf: Configuration,
- factory: JavaStreamingContextFactory
- ): JavaStreamingContext = {
- val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
- factory.create.ssc
- }, hadoopConf)
- new JavaStreamingContext(ssc)
- }
-
- /**
- * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
- * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
- * recreated from the checkpoint data. If the data does not exist, then the provided factory
- * will be used to create a JavaStreamingContext.
- *
- * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
- * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext
- * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible
- * file system
- * @param createOnError Whether to create a new JavaStreamingContext if there is an
- * error in reading checkpoint data.
- * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory.
- */
- @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0")
- def getOrCreate(
- checkpointPath: String,
- hadoopConf: Configuration,
- factory: JavaStreamingContextFactory,
- createOnError: Boolean
- ): JavaStreamingContext = {
- val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
- factory.create.ssc
- }, hadoopConf, createOnError)
- new JavaStreamingContext(ssc)
- }
-
- /**
- * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
- * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
- * recreated from the checkpoint data. If the data does not exist, then the provided factory
- * will be used to create a JavaStreamingContext.
- *
- * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
* @param creatingFunc Function to create a new JavaStreamingContext
*/
def getOrCreate(
@@ -767,10 +678,3 @@ object JavaStreamingContext {
*/
def jarOfClass(cls: Class[_]): Array[String] = SparkContext.jarOfClass(cls).toArray
}
-
-/**
- * Factory interface for creating a new JavaStreamingContext
- */
-trait JavaStreamingContextFactory {
- def create(): JavaStreamingContext
-}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 70e1d8abde..102a030818 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -621,28 +621,6 @@ abstract class DStream[T: ClassTag] (
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
- *
- * @deprecated As of 0.9.0, replaced by `foreachRDD`.
- */
- @deprecated("use foreachRDD", "0.9.0")
- def foreach(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
- this.foreachRDD(foreachFunc)
- }
-
- /**
- * Apply a function to each RDD in this DStream. This is an output operator, so
- * 'this' DStream will be registered as an output stream and therefore materialized.
- *
- * @deprecated As of 0.9.0, replaced by `foreachRDD`.
- */
- @deprecated("use foreachRDD", "0.9.0")
- def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
- this.foreachRDD(foreachFunc)
- }
-
- /**
- * Apply a function to each RDD in this DStream. This is an output operator, so
- * 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
val cleanedF = context.sparkContext.clean(foreachFunc, false)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 436eb0a566..5b2b959f81 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -41,9 +41,6 @@ case class BatchInfo(
outputOperationInfos: Map[Int, OutputOperationInfo]
) {
- @deprecated("Use streamIdToInputInfo instead", "1.5.0")
- def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)
-
/**
* Time taken for the first job of this batch to start processing from the time this batch
* was submitted to the streaming scheduler. Essentially, it is
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
index e897de3cba..1fc34f569f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
@@ -56,7 +56,6 @@ class DStreamClosureSuite extends SparkFunSuite with BeforeAndAfterAll {
testFilter(dstream)
testMapPartitions(dstream)
testReduce(dstream)
- testForeach(dstream)
testForeachRDD(dstream)
testTransform(dstream)
testTransformWith(dstream)
@@ -106,12 +105,6 @@ class DStreamClosureSuite extends SparkFunSuite with BeforeAndAfterAll {
private def testReduce(ds: DStream[Int]): Unit = expectCorrectException {
ds.reduce { case (_, _) => return; 1 }
}
- private def testForeach(ds: DStream[Int]): Unit = {
- val foreachF1 = (rdd: RDD[Int], t: Time) => return
- val foreachF2 = (rdd: RDD[Int]) => return
- expectCorrectException { ds.foreach(foreachF1) }
- expectCorrectException { ds.foreach(foreachF2) }
- }
private def testForeachRDD(ds: DStream[Int]): Unit = {
val foreachRDDF1 = (rdd: RDD[Int], t: Time) => return
val foreachRDDF2 = (rdd: RDD[Int]) => return