diff options
author | Luciano Resende <lresende@apache.org> | 2016-02-21 16:27:56 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-02-21 16:27:56 +0000 |
commit | 1a340da8d7590d831b040c74f5a6eb560e14d585 (patch) | |
tree | a8b02a674783f2ea82fc666ebdab1bd230e169a3 /streaming/src | |
parent | d9efe63ecdc60a9955f1924de0e8a00bcb6a559d (diff) | |
download | spark-1a340da8d7590d831b040c74f5a6eb560e14d585.tar.gz spark-1a340da8d7590d831b040c74f5a6eb560e14d585.tar.bz2 spark-1a340da8d7590d831b040c74f5a6eb560e14d585.zip |
[SPARK-13248][STREAMING] Remove deprecated Streaming APIs.
Remove deprecated Streaming APIs and adjust sample applications.
Author: Luciano Resende <lresende@apache.org>
Closes #11139 from lresende/streaming-deprecated-apis.
Diffstat (limited to 'streaming/src')
6 files changed, 0 insertions, 230 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index a1b25c9f7d..25e61578a1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -271,20 +271,6 @@ class StreamingContext private[streaming] ( /** * Create an input stream with any arbitrary user implemented receiver. - * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html - * @param receiver Custom implementation of Receiver - * - * @deprecated As of 1.0.0 replaced by `receiverStream`. - */ - @deprecated("Use receiverStream", "1.0.0") - def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = { - withNamedScope("network stream") { - receiverStream(receiver) - } - } - - /** - * Create an input stream with any arbitrary user implemented receiver. * Find more details at http://spark.apache.org/docs/latest/streaming-custom-receivers.html * @param receiver Custom implementation of Receiver */ @@ -624,18 +610,6 @@ class StreamingContext private[streaming] ( /** * Wait for the execution to stop. Any exceptions that occurs during the execution * will be thrown in this thread. - * @param timeout time to wait in milliseconds - * - * @deprecated As of 1.3.0, replaced by `awaitTerminationOrTimeout(Long)`. - */ - @deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0") - def awaitTermination(timeout: Long) { - waiter.waitForStopOrError(timeout) - } - - /** - * Wait for the execution to stop. Any exceptions that occurs during the execution - * will be thrown in this thread. * * @param timeout time to wait in milliseconds * @return `true` if it's stopped; or throw the reported error during the execution; or `false` @@ -778,18 +752,6 @@ object StreamingContext extends Logging { } /** - * @deprecated As of 1.3.0, replaced by implicit functions in the DStream companion object. - * This is kept here only for backward compatibility. - */ - @deprecated("Replaced by implicit functions in the DStream companion object. This is " + - "kept here only for backward compatibility.", "1.3.0") - def toPairDStreamFunctions[K, V](stream: DStream[(K, V)]) - (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) - : PairDStreamFunctions[K, V] = { - DStream.toPairDStreamFunctions(stream)(kt, vt, ord) - } - - /** * :: Experimental :: * * Either return the "active" StreamingContext (that is, started but not stopped), or create a diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 9931a46d33..65aab2fac1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -220,26 +220,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @deprecated As this API is not Java compatible. - */ - @deprecated("Use Java-compatible version of reduceByWindow", "1.3.0") - def reduceByWindow( - reduceFunc: (T, T) => T, - windowDuration: Duration, - slideDuration: Duration - ): DStream[T] = { - dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration) - } - - /** - * Return a new DStream in which each RDD has a single element generated by reducing all - * elements in a sliding window over this DStream. - * @param reduceFunc associative and commutative reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval */ def reduceByWindow( reduceFunc: JFunction2[T, T, T], @@ -284,50 +264,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. - * - * @deprecated As of release 0.9.0, replaced by foreachRDD - */ - @deprecated("Use foreachRDD", "0.9.0") - def foreach(foreachFunc: JFunction[R, Void]) { - foreachRDD(foreachFunc) - } - - /** - * Apply a function to each RDD in this DStream. This is an output operator, so - * 'this' DStream will be registered as an output stream and therefore materialized. - * - * @deprecated As of release 0.9.0, replaced by foreachRDD - */ - @deprecated("Use foreachRDD", "0.9.0") - def foreach(foreachFunc: JFunction2[R, Time, Void]) { - foreachRDD(foreachFunc) - } - - /** - * Apply a function to each RDD in this DStream. This is an output operator, so - * 'this' DStream will be registered as an output stream and therefore materialized. - * - * @deprecated As of release 1.6.0, replaced by foreachRDD(JVoidFunction) - */ - @deprecated("Use foreachRDD(foreachFunc: JVoidFunction[R])", "1.6.0") - def foreachRDD(foreachFunc: JFunction[R, Void]) { - dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd))) - } - - /** - * Apply a function to each RDD in this DStream. This is an output operator, so - * 'this' DStream will be registered as an output stream and therefore materialized. - * - * @deprecated As of release 1.6.0, replaced by foreachRDD(JVoidFunction2) - */ - @deprecated("Use foreachRDD(foreachFunc: JVoidFunction2[R, Time])", "1.6.0") - def foreachRDD(foreachFunc: JFunction2[R, Time, Void]) { - dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time)) - } - - /** - * Apply a function to each RDD in this DStream. This is an output operator, so - * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreachRDD(foreachFunc: JVoidFunction[R]) { dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd))) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 7a25ce54b6..f8f1336693 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -155,12 +155,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { val sparkContext = new JavaSparkContext(ssc.sc) /** - * @deprecated As of 0.9.0, replaced by `sparkContext` - */ - @deprecated("use sparkContext", "0.9.0") - val sc: JavaSparkContext = sparkContext - - /** * Create an input stream from network source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited * lines. @@ -571,17 +565,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { /** * Wait for the execution to stop. Any exceptions that occurs during the execution * will be thrown in this thread. - * @param timeout time to wait in milliseconds - * @deprecated As of 1.3.0, replaced by `awaitTerminationOrTimeout(Long)`. - */ - @deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0") - def awaitTermination(timeout: Long): Unit = { - ssc.awaitTermination(timeout) - } - - /** - * Wait for the execution to stop. Any exceptions that occurs during the execution - * will be thrown in this thread. * * @param timeout time to wait in milliseconds * @return `true` if it's stopped; or throw the reported error during the execution; or `false` @@ -630,78 +613,6 @@ object JavaStreamingContext { * will be used to create a JavaStreamingContext. * * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program - * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext - * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory. - */ - @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0") - def getOrCreate( - checkpointPath: String, - factory: JavaStreamingContextFactory - ): JavaStreamingContext = { - val ssc = StreamingContext.getOrCreate(checkpointPath, () => { - factory.create.ssc - }) - new JavaStreamingContext(ssc) - } - - /** - * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. - * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the provided factory - * will be used to create a JavaStreamingContext. - * - * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program - * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext - * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible - * file system - * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory. - */ - @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0") - def getOrCreate( - checkpointPath: String, - hadoopConf: Configuration, - factory: JavaStreamingContextFactory - ): JavaStreamingContext = { - val ssc = StreamingContext.getOrCreate(checkpointPath, () => { - factory.create.ssc - }, hadoopConf) - new JavaStreamingContext(ssc) - } - - /** - * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. - * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the provided factory - * will be used to create a JavaStreamingContext. - * - * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program - * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext - * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible - * file system - * @param createOnError Whether to create a new JavaStreamingContext if there is an - * error in reading checkpoint data. - * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory. - */ - @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0") - def getOrCreate( - checkpointPath: String, - hadoopConf: Configuration, - factory: JavaStreamingContextFactory, - createOnError: Boolean - ): JavaStreamingContext = { - val ssc = StreamingContext.getOrCreate(checkpointPath, () => { - factory.create.ssc - }, hadoopConf, createOnError) - new JavaStreamingContext(ssc) - } - - /** - * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. - * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the provided factory - * will be used to create a JavaStreamingContext. - * - * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program * @param creatingFunc Function to create a new JavaStreamingContext */ def getOrCreate( @@ -767,10 +678,3 @@ object JavaStreamingContext { */ def jarOfClass(cls: Class[_]): Array[String] = SparkContext.jarOfClass(cls).toArray } - -/** - * Factory interface for creating a new JavaStreamingContext - */ -trait JavaStreamingContextFactory { - def create(): JavaStreamingContext -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 70e1d8abde..102a030818 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -621,28 +621,6 @@ abstract class DStream[T: ClassTag] ( /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. - * - * @deprecated As of 0.9.0, replaced by `foreachRDD`. - */ - @deprecated("use foreachRDD", "0.9.0") - def foreach(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope { - this.foreachRDD(foreachFunc) - } - - /** - * Apply a function to each RDD in this DStream. This is an output operator, so - * 'this' DStream will be registered as an output stream and therefore materialized. - * - * @deprecated As of 0.9.0, replaced by `foreachRDD`. - */ - @deprecated("use foreachRDD", "0.9.0") - def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope { - this.foreachRDD(foreachFunc) - } - - /** - * Apply a function to each RDD in this DStream. This is an output operator, so - * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope { val cleanedF = context.sparkContext.clean(foreachFunc, false) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 436eb0a566..5b2b959f81 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -41,9 +41,6 @@ case class BatchInfo( outputOperationInfos: Map[Int, OutputOperationInfo] ) { - @deprecated("Use streamIdToInputInfo instead", "1.5.0") - def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords) - /** * Time taken for the first job of this batch to start processing from the time this batch * was submitted to the streaming scheduler. Essentially, it is diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala index e897de3cba..1fc34f569f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala @@ -56,7 +56,6 @@ class DStreamClosureSuite extends SparkFunSuite with BeforeAndAfterAll { testFilter(dstream) testMapPartitions(dstream) testReduce(dstream) - testForeach(dstream) testForeachRDD(dstream) testTransform(dstream) testTransformWith(dstream) @@ -106,12 +105,6 @@ class DStreamClosureSuite extends SparkFunSuite with BeforeAndAfterAll { private def testReduce(ds: DStream[Int]): Unit = expectCorrectException { ds.reduce { case (_, _) => return; 1 } } - private def testForeach(ds: DStream[Int]): Unit = { - val foreachF1 = (rdd: RDD[Int], t: Time) => return - val foreachF2 = (rdd: RDD[Int]) => return - expectCorrectException { ds.foreach(foreachF1) } - expectCorrectException { ds.foreach(foreachF2) } - } private def testForeachRDD(ds: DStream[Int]): Unit = { val foreachRDDF1 = (rdd: RDD[Int], t: Time) => return val foreachRDDF2 = (rdd: RDD[Int]) => return |