aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorLuciano Resende <lresende@apache.org>2016-02-21 16:27:56 +0000
committerSean Owen <sowen@cloudera.com>2016-02-21 16:27:56 +0000
commit1a340da8d7590d831b040c74f5a6eb560e14d585 (patch)
treea8b02a674783f2ea82fc666ebdab1bd230e169a3 /streaming
parentd9efe63ecdc60a9955f1924de0e8a00bcb6a559d (diff)
downloadspark-1a340da8d7590d831b040c74f5a6eb560e14d585.tar.gz
spark-1a340da8d7590d831b040c74f5a6eb560e14d585.tar.bz2
spark-1a340da8d7590d831b040c74f5a6eb560e14d585.zip
[SPARK-13248][STREAMING] Remove deprecated Streaming APIs.
Remove deprecated Streaming APIs and adjust sample applications. Author: Luciano Resende <lresende@apache.org> Closes #11139 from lresende/streaming-deprecated-apis.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala38
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala64
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala96
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala22
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala7
6 files changed, 0 insertions, 230 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index a1b25c9f7d..25e61578a1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -271,20 +271,6 @@ class StreamingContext private[streaming] (
/**
* Create an input stream with any arbitrary user implemented receiver.
- * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- * @param receiver Custom implementation of Receiver
- *
- * @deprecated As of 1.0.0 replaced by `receiverStream`.
- */
- @deprecated("Use receiverStream", "1.0.0")
- def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
- withNamedScope("network stream") {
- receiverStream(receiver)
- }
- }
-
- /**
- * Create an input stream with any arbitrary user implemented receiver.
* Find more details at http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
@@ -624,18 +610,6 @@ class StreamingContext private[streaming] (
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
- * @param timeout time to wait in milliseconds
- *
- * @deprecated As of 1.3.0, replaced by `awaitTerminationOrTimeout(Long)`.
- */
- @deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0")
- def awaitTermination(timeout: Long) {
- waiter.waitForStopOrError(timeout)
- }
-
- /**
- * Wait for the execution to stop. Any exceptions that occurs during the execution
- * will be thrown in this thread.
*
* @param timeout time to wait in milliseconds
* @return `true` if it's stopped; or throw the reported error during the execution; or `false`
@@ -778,18 +752,6 @@ object StreamingContext extends Logging {
}
/**
- * @deprecated As of 1.3.0, replaced by implicit functions in the DStream companion object.
- * This is kept here only for backward compatibility.
- */
- @deprecated("Replaced by implicit functions in the DStream companion object. This is " +
- "kept here only for backward compatibility.", "1.3.0")
- def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
- (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
- : PairDStreamFunctions[K, V] = {
- DStream.toPairDStreamFunctions(stream)(kt, vt, ord)
- }
-
- /**
* :: Experimental ::
*
* Either return the "active" StreamingContext (that is, started but not stopped), or create a
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 9931a46d33..65aab2fac1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -220,26 +220,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @deprecated As this API is not Java compatible.
- */
- @deprecated("Use Java-compatible version of reduceByWindow", "1.3.0")
- def reduceByWindow(
- reduceFunc: (T, T) => T,
- windowDuration: Duration,
- slideDuration: Duration
- ): DStream[T] = {
- dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
- }
-
- /**
- * Return a new DStream in which each RDD has a single element generated by reducing all
- * elements in a sliding window over this DStream.
- * @param reduceFunc associative and commutative reduce function
- * @param windowDuration width of the window; must be a multiple of this DStream's
- * batching interval
- * @param slideDuration sliding interval of the window (i.e., the interval after which
- * the new DStream will generate RDDs); must be a multiple of this
- * DStream's batching interval
*/
def reduceByWindow(
reduceFunc: JFunction2[T, T, T],
@@ -284,50 +264,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
- *
- * @deprecated As of release 0.9.0, replaced by foreachRDD
- */
- @deprecated("Use foreachRDD", "0.9.0")
- def foreach(foreachFunc: JFunction[R, Void]) {
- foreachRDD(foreachFunc)
- }
-
- /**
- * Apply a function to each RDD in this DStream. This is an output operator, so
- * 'this' DStream will be registered as an output stream and therefore materialized.
- *
- * @deprecated As of release 0.9.0, replaced by foreachRDD
- */
- @deprecated("Use foreachRDD", "0.9.0")
- def foreach(foreachFunc: JFunction2[R, Time, Void]) {
- foreachRDD(foreachFunc)
- }
-
- /**
- * Apply a function to each RDD in this DStream. This is an output operator, so
- * 'this' DStream will be registered as an output stream and therefore materialized.
- *
- * @deprecated As of release 1.6.0, replaced by foreachRDD(JVoidFunction)
- */
- @deprecated("Use foreachRDD(foreachFunc: JVoidFunction[R])", "1.6.0")
- def foreachRDD(foreachFunc: JFunction[R, Void]) {
- dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))
- }
-
- /**
- * Apply a function to each RDD in this DStream. This is an output operator, so
- * 'this' DStream will be registered as an output stream and therefore materialized.
- *
- * @deprecated As of release 1.6.0, replaced by foreachRDD(JVoidFunction2)
- */
- @deprecated("Use foreachRDD(foreachFunc: JVoidFunction2[R, Time])", "1.6.0")
- def foreachRDD(foreachFunc: JFunction2[R, Time, Void]) {
- dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
- }
-
- /**
- * Apply a function to each RDD in this DStream. This is an output operator, so
- * 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreachRDD(foreachFunc: JVoidFunction[R]) {
dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 7a25ce54b6..f8f1336693 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -155,12 +155,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
val sparkContext = new JavaSparkContext(ssc.sc)
/**
- * @deprecated As of 0.9.0, replaced by `sparkContext`
- */
- @deprecated("use sparkContext", "0.9.0")
- val sc: JavaSparkContext = sparkContext
-
- /**
* Create an input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
* lines.
@@ -571,17 +565,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
- * @param timeout time to wait in milliseconds
- * @deprecated As of 1.3.0, replaced by `awaitTerminationOrTimeout(Long)`.
- */
- @deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0")
- def awaitTermination(timeout: Long): Unit = {
- ssc.awaitTermination(timeout)
- }
-
- /**
- * Wait for the execution to stop. Any exceptions that occurs during the execution
- * will be thrown in this thread.
*
* @param timeout time to wait in milliseconds
* @return `true` if it's stopped; or throw the reported error during the execution; or `false`
@@ -630,78 +613,6 @@ object JavaStreamingContext {
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
- * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext
- * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory.
- */
- @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0")
- def getOrCreate(
- checkpointPath: String,
- factory: JavaStreamingContextFactory
- ): JavaStreamingContext = {
- val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
- factory.create.ssc
- })
- new JavaStreamingContext(ssc)
- }
-
- /**
- * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
- * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
- * recreated from the checkpoint data. If the data does not exist, then the provided factory
- * will be used to create a JavaStreamingContext.
- *
- * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
- * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext
- * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible
- * file system
- * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory.
- */
- @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0")
- def getOrCreate(
- checkpointPath: String,
- hadoopConf: Configuration,
- factory: JavaStreamingContextFactory
- ): JavaStreamingContext = {
- val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
- factory.create.ssc
- }, hadoopConf)
- new JavaStreamingContext(ssc)
- }
-
- /**
- * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
- * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
- * recreated from the checkpoint data. If the data does not exist, then the provided factory
- * will be used to create a JavaStreamingContext.
- *
- * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
- * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext
- * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible
- * file system
- * @param createOnError Whether to create a new JavaStreamingContext if there is an
- * error in reading checkpoint data.
- * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory.
- */
- @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0")
- def getOrCreate(
- checkpointPath: String,
- hadoopConf: Configuration,
- factory: JavaStreamingContextFactory,
- createOnError: Boolean
- ): JavaStreamingContext = {
- val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
- factory.create.ssc
- }, hadoopConf, createOnError)
- new JavaStreamingContext(ssc)
- }
-
- /**
- * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
- * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
- * recreated from the checkpoint data. If the data does not exist, then the provided factory
- * will be used to create a JavaStreamingContext.
- *
- * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
* @param creatingFunc Function to create a new JavaStreamingContext
*/
def getOrCreate(
@@ -767,10 +678,3 @@ object JavaStreamingContext {
*/
def jarOfClass(cls: Class[_]): Array[String] = SparkContext.jarOfClass(cls).toArray
}
-
-/**
- * Factory interface for creating a new JavaStreamingContext
- */
-trait JavaStreamingContextFactory {
- def create(): JavaStreamingContext
-}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 70e1d8abde..102a030818 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -621,28 +621,6 @@ abstract class DStream[T: ClassTag] (
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
- *
- * @deprecated As of 0.9.0, replaced by `foreachRDD`.
- */
- @deprecated("use foreachRDD", "0.9.0")
- def foreach(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
- this.foreachRDD(foreachFunc)
- }
-
- /**
- * Apply a function to each RDD in this DStream. This is an output operator, so
- * 'this' DStream will be registered as an output stream and therefore materialized.
- *
- * @deprecated As of 0.9.0, replaced by `foreachRDD`.
- */
- @deprecated("use foreachRDD", "0.9.0")
- def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
- this.foreachRDD(foreachFunc)
- }
-
- /**
- * Apply a function to each RDD in this DStream. This is an output operator, so
- * 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
val cleanedF = context.sparkContext.clean(foreachFunc, false)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 436eb0a566..5b2b959f81 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -41,9 +41,6 @@ case class BatchInfo(
outputOperationInfos: Map[Int, OutputOperationInfo]
) {
- @deprecated("Use streamIdToInputInfo instead", "1.5.0")
- def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)
-
/**
* Time taken for the first job of this batch to start processing from the time this batch
* was submitted to the streaming scheduler. Essentially, it is
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
index e897de3cba..1fc34f569f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
@@ -56,7 +56,6 @@ class DStreamClosureSuite extends SparkFunSuite with BeforeAndAfterAll {
testFilter(dstream)
testMapPartitions(dstream)
testReduce(dstream)
- testForeach(dstream)
testForeachRDD(dstream)
testTransform(dstream)
testTransformWith(dstream)
@@ -106,12 +105,6 @@ class DStreamClosureSuite extends SparkFunSuite with BeforeAndAfterAll {
private def testReduce(ds: DStream[Int]): Unit = expectCorrectException {
ds.reduce { case (_, _) => return; 1 }
}
- private def testForeach(ds: DStream[Int]): Unit = {
- val foreachF1 = (rdd: RDD[Int], t: Time) => return
- val foreachF2 = (rdd: RDD[Int]) => return
- expectCorrectException { ds.foreach(foreachF1) }
- expectCorrectException { ds.foreach(foreachF2) }
- }
private def testForeachRDD(ds: DStream[Int]): Unit = {
val foreachRDDF1 = (rdd: RDD[Int], t: Time) => return
val foreachRDDF2 = (rdd: RDD[Int]) => return