From 8ce645d4eeda203cf5e100c4bdba2d71edd44e6a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 5 Jan 2016 11:10:14 -0800 Subject: [SPARK-12615] Remove some deprecated APIs in RDD/SparkContext I looked at each case individually and it looks like they can all be removed. The only one that I had to think twice was toArray (I even thought about un-deprecating it, until I realized it was a problem in Java to have toArray returning java.util.List). Author: Reynold Xin Closes #10569 from rxin/SPARK-12615. --- .../main/scala/org/apache/spark/Aggregator.scala | 8 - .../main/scala/org/apache/spark/SparkContext.scala | 261 +-------------------- .../main/scala/org/apache/spark/TaskContext.scala | 16 -- .../scala/org/apache/spark/TaskContextImpl.scala | 10 - .../org/apache/spark/api/java/JavaDoubleRDD.scala | 1 - .../org/apache/spark/api/java/JavaRDDLike.scala | 10 - .../apache/spark/api/java/JavaSparkContext.scala | 28 +-- .../org/apache/spark/rdd/PairRDDFunctions.scala | 6 - core/src/main/scala/org/apache/spark/rdd/RDD.scala | 101 -------- .../scala/org/apache/spark/rdd/SampledRDD.scala | 71 ------ .../spark/rdd/SequenceFileRDDFunctions.scala | 5 - .../org/apache/spark/scheduler/TaskInfo.scala | 3 - .../scala/org/apache/spark/util/RpcUtils.scala | 11 - .../test/java/org/apache/spark/JavaAPISuite.java | 7 - .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 60 ----- .../apache/spark/scheduler/TaskContextSuite.scala | 8 - .../apache/spark/util/ClosureCleanerSuite.scala | 21 -- 17 files changed, 3 insertions(+), 624 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 7196e57d5d..62629000cf 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -34,10 +34,6 @@ case class Aggregator[K, V, C] ( mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { - @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0") - def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] = - combineValuesByKey(iter, null) - def combineValuesByKey( iter: Iterator[_ <: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = { @@ -47,10 +43,6 @@ case class Aggregator[K, V, C] ( combiners.iterator } - @deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0") - def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]]) : Iterator[(K, C)] = - combineCombinersByKey(iter, null) - def combineCombinersByKey( iter: Iterator[_ <: Product2[K, C]], context: TaskContext): Iterator[(K, C)] = { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 77e44ee026..87301202de 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean, AtomicIntege import java.util.UUID.randomUUID import scala.collection.JavaConverters._ -import scala.collection.{Map, Set} +import scala.collection.Map import scala.collection.generic.Growable import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} @@ -122,20 +122,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def this() = this(new SparkConf()) - /** - * :: DeveloperApi :: - * Alternative constructor for setting preferred locations where Spark will create executors. - * - * @param config a [[org.apache.spark.SparkConf]] object specifying other Spark parameters - * @param preferredNodeLocationData not used. Left for backward compatibility. - */ - @deprecated("Passing in preferred locations has no effect at all, see SPARK-8949", "1.5.0") - @DeveloperApi - def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = { - this(config) - logWarning("Passing in preferred locations has no effect at all, see SPARK-8949") - } - /** * Alternative constructor that allows setting common Spark properties directly * @@ -155,21 +141,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @param jars Collection of JARs to send to the cluster. These can be paths on the local file * system or HDFS, HTTP, HTTPS, or FTP URLs. * @param environment Environment variables to set on worker nodes. - * @param preferredNodeLocationData not used. Left for backward compatibility. */ - @deprecated("Passing in preferred locations has no effect at all, see SPARK-10921", "1.6.0") def this( master: String, appName: String, sparkHome: String = null, jars: Seq[String] = Nil, - environment: Map[String, String] = Map(), - preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) = + environment: Map[String, String] = Map()) = { this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment)) - if (preferredNodeLocationData.nonEmpty) { - logWarning("Passing in preferred locations has no effect at all, see SPARK-8949") - } } // NOTE: The below constructors could be consolidated using default arguments. Due to @@ -267,8 +247,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Generate the random name for a temp folder in external block store. // Add a timestamp as the suffix here to make it more safe val externalBlockStoreFolderName = "spark-" + randomUUID.toString() - @deprecated("Use externalBlockStoreFolderName instead.", "1.4.0") - val tachyonFolderName = externalBlockStoreFolderName def isLocal: Boolean = (master == "local" || master.startsWith("local[")) @@ -641,11 +619,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli localProperties.set(props) } - @deprecated("Properties no longer need to be explicitly initialized.", "1.0.0") - def initLocalProperties() { - localProperties.set(new Properties()) - } - /** * Set a local property that affects jobs submitted from this thread, such as the * Spark fair scheduler pool. @@ -1585,15 +1558,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli taskScheduler.schedulingMode } - /** - * Clear the job's list of files added by `addFile` so that they do not get downloaded to - * any new nodes. - */ - @deprecated("adding files no longer creates local copies that need to be deleted", "1.0.0") - def clearFiles() { - addedFiles.clear() - } - /** * Gets the locality information associated with the partition in a particular rdd * @param rdd of interest @@ -1685,15 +1649,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli postEnvironmentUpdate() } - /** - * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to - * any new nodes. - */ - @deprecated("adding jars no longer creates local copies that need to be deleted", "1.0.0") - def clearJars() { - addedJars.clear() - } - // Shut down the SparkContext. def stop() { if (AsynchronousListenerBus.withinListenerThread.value) { @@ -1864,63 +1819,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions) } - - /** - * Run a function on a given set of partitions in an RDD and pass the results to the given - * handler function. This is the main entry point for all actions in Spark. - * - * The allowLocal flag is deprecated as of Spark 1.5.0+. - */ - @deprecated("use the version of runJob without the allowLocal parameter", "1.5.0") - def runJob[T, U: ClassTag]( - rdd: RDD[T], - func: (TaskContext, Iterator[T]) => U, - partitions: Seq[Int], - allowLocal: Boolean, - resultHandler: (Int, U) => Unit): Unit = { - if (allowLocal) { - logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+") - } - runJob(rdd, func, partitions, resultHandler) - } - - /** - * Run a function on a given set of partitions in an RDD and return the results as an array. - * - * The allowLocal flag is deprecated as of Spark 1.5.0+. - */ - @deprecated("use the version of runJob without the allowLocal parameter", "1.5.0") - def runJob[T, U: ClassTag]( - rdd: RDD[T], - func: (TaskContext, Iterator[T]) => U, - partitions: Seq[Int], - allowLocal: Boolean - ): Array[U] = { - if (allowLocal) { - logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+") - } - runJob(rdd, func, partitions) - } - - /** - * Run a job on a given set of partitions of an RDD, but take a function of type - * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`. - * - * The allowLocal argument is deprecated as of Spark 1.5.0+. - */ - @deprecated("use the version of runJob without the allowLocal parameter", "1.5.0") - def runJob[T, U: ClassTag]( - rdd: RDD[T], - func: Iterator[T] => U, - partitions: Seq[Int], - allowLocal: Boolean - ): Array[U] = { - if (allowLocal) { - logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+") - } - runJob(rdd, func, partitions) - } - /** * Run a job on all partitions in an RDD and return the results in an array. */ @@ -2094,10 +1992,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli taskScheduler.defaultParallelism } - /** Default min number of partitions for Hadoop RDDs when not given by user */ - @deprecated("use defaultMinPartitions", "1.0.0") - def defaultMinSplits: Int = defaultMinPartitions - /** * Default min number of partitions for Hadoop RDDs when not given by user * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2. @@ -2364,113 +2258,6 @@ object SparkContext extends Logging { */ private[spark] val LEGACY_DRIVER_IDENTIFIER = "" - // The following deprecated objects have already been copied to `object AccumulatorParam` to - // make the compiler find them automatically. They are duplicate codes only for backward - // compatibility, please update `object AccumulatorParam` accordingly if you plan to modify the - // following ones. - - @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + - "backward compatibility.", "1.3.0") - object DoubleAccumulatorParam extends AccumulatorParam[Double] { - def addInPlace(t1: Double, t2: Double): Double = t1 + t2 - def zero(initialValue: Double): Double = 0.0 - } - - @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + - "backward compatibility.", "1.3.0") - object IntAccumulatorParam extends AccumulatorParam[Int] { - def addInPlace(t1: Int, t2: Int): Int = t1 + t2 - def zero(initialValue: Int): Int = 0 - } - - @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + - "backward compatibility.", "1.3.0") - object LongAccumulatorParam extends AccumulatorParam[Long] { - def addInPlace(t1: Long, t2: Long): Long = t1 + t2 - def zero(initialValue: Long): Long = 0L - } - - @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + - "backward compatibility.", "1.3.0") - object FloatAccumulatorParam extends AccumulatorParam[Float] { - def addInPlace(t1: Float, t2: Float): Float = t1 + t2 - def zero(initialValue: Float): Float = 0f - } - - // The following deprecated functions have already been moved to `object RDD` to - // make the compiler find them automatically. They are still kept here for backward compatibility - // and just call the corresponding functions in `object RDD`. - - @deprecated("Replaced by implicit functions in the RDD companion object. This is " + - "kept here only for backward compatibility.", "1.3.0") - def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) - (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = - RDD.rddToPairRDDFunctions(rdd) - - @deprecated("Replaced by implicit functions in the RDD companion object. This is " + - "kept here only for backward compatibility.", "1.3.0") - def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]): AsyncRDDActions[T] = - RDD.rddToAsyncRDDActions(rdd) - - @deprecated("Replaced by implicit functions in the RDD companion object. This is " + - "kept here only for backward compatibility.", "1.3.0") - def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( - rdd: RDD[(K, V)]): SequenceFileRDDFunctions[K, V] = { - val kf = implicitly[K => Writable] - val vf = implicitly[V => Writable] - // Set the Writable class to null and `SequenceFileRDDFunctions` will use Reflection to get it - implicit val keyWritableFactory = new WritableFactory[K](_ => null, kf) - implicit val valueWritableFactory = new WritableFactory[V](_ => null, vf) - RDD.rddToSequenceFileRDDFunctions(rdd) - } - - @deprecated("Replaced by implicit functions in the RDD companion object. This is " + - "kept here only for backward compatibility.", "1.3.0") - def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( - rdd: RDD[(K, V)]): OrderedRDDFunctions[K, V, (K, V)] = - RDD.rddToOrderedRDDFunctions(rdd) - - @deprecated("Replaced by implicit functions in the RDD companion object. This is " + - "kept here only for backward compatibility.", "1.3.0") - def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]): DoubleRDDFunctions = - RDD.doubleRDDToDoubleRDDFunctions(rdd) - - @deprecated("Replaced by implicit functions in the RDD companion object. This is " + - "kept here only for backward compatibility.", "1.3.0") - def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]): DoubleRDDFunctions = - RDD.numericRDDToDoubleRDDFunctions(rdd) - - // The following deprecated functions have already been moved to `object WritableFactory` to - // make the compiler find them automatically. They are still kept here for backward compatibility. - - @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + - "kept here only for backward compatibility.", "1.3.0") - implicit def intToIntWritable(i: Int): IntWritable = new IntWritable(i) - - @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + - "kept here only for backward compatibility.", "1.3.0") - implicit def longToLongWritable(l: Long): LongWritable = new LongWritable(l) - - @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + - "kept here only for backward compatibility.", "1.3.0") - implicit def floatToFloatWritable(f: Float): FloatWritable = new FloatWritable(f) - - @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + - "kept here only for backward compatibility.", "1.3.0") - implicit def doubleToDoubleWritable(d: Double): DoubleWritable = new DoubleWritable(d) - - @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + - "kept here only for backward compatibility.", "1.3.0") - implicit def boolToBoolWritable (b: Boolean): BooleanWritable = new BooleanWritable(b) - - @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + - "kept here only for backward compatibility.", "1.3.0") - implicit def bytesToBytesWritable (aob: Array[Byte]): BytesWritable = new BytesWritable(aob) - - @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + - "kept here only for backward compatibility.", "1.3.0") - implicit def stringToText(s: String): Text = new Text(s) - private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T]) : ArrayWritable = { def anyToWritable[U <% Writable](u: U): Writable = u @@ -2479,50 +2266,6 @@ object SparkContext extends Logging { arr.map(x => anyToWritable(x)).toArray) } - // The following deprecated functions have already been moved to `object WritableConverter` to - // make the compiler find them automatically. They are still kept here for backward compatibility - // and just call the corresponding functions in `object WritableConverter`. - - @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + - "backward compatibility.", "1.3.0") - def intWritableConverter(): WritableConverter[Int] = - WritableConverter.intWritableConverter() - - @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + - "backward compatibility.", "1.3.0") - def longWritableConverter(): WritableConverter[Long] = - WritableConverter.longWritableConverter() - - @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + - "backward compatibility.", "1.3.0") - def doubleWritableConverter(): WritableConverter[Double] = - WritableConverter.doubleWritableConverter() - - @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + - "backward compatibility.", "1.3.0") - def floatWritableConverter(): WritableConverter[Float] = - WritableConverter.floatWritableConverter() - - @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + - "backward compatibility.", "1.3.0") - def booleanWritableConverter(): WritableConverter[Boolean] = - WritableConverter.booleanWritableConverter() - - @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + - "backward compatibility.", "1.3.0") - def bytesWritableConverter(): WritableConverter[Array[Byte]] = - WritableConverter.bytesWritableConverter() - - @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + - "backward compatibility.", "1.3.0") - def stringWritableConverter(): WritableConverter[String] = - WritableConverter.stringWritableConverter() - - @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + - "backward compatibility.", "1.3.0") - def writableWritableConverter[T <: Writable](): WritableConverter[T] = - WritableConverter.writableWritableConverter() - /** * Find the JAR from which a given class was loaded, to make it easy for users to pass * their JARs to SparkContext. diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index af558d6e5b..e25ed0fdd7 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -95,9 +95,6 @@ abstract class TaskContext extends Serializable { */ def isInterrupted(): Boolean - @deprecated("use isRunningLocally", "1.2.0") - def runningLocally(): Boolean - /** * Returns true if the task is running locally in the driver program. * @return @@ -118,16 +115,6 @@ abstract class TaskContext extends Serializable { */ def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext - /** - * Adds a callback function to be executed on task completion. An example use - * is for HadoopRDD to register a callback to close the input stream. - * Will be called in any situation - success, failure, or cancellation. - * - * @param f Callback function. - */ - @deprecated("use addTaskCompletionListener", "1.2.0") - def addOnCompleteCallback(f: () => Unit) - /** * The ID of the stage that this task belong to. */ @@ -144,9 +131,6 @@ abstract class TaskContext extends Serializable { */ def attemptNumber(): Int - @deprecated("use attemptNumber", "1.3.0") - def attemptId(): Long - /** * An ID that is unique to this task attempt (within the same SparkContext, no two task attempts * will share the same attempt ID). This is roughly equivalent to Hadoop's TaskAttemptID. diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index f0ae83a934..6c49363099 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -38,9 +38,6 @@ private[spark] class TaskContextImpl( extends TaskContext with Logging { - // For backwards-compatibility; this method is now deprecated as of 1.3.0. - override def attemptId(): Long = taskAttemptId - // List of callback functions to execute when the task completes. @transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener] @@ -62,13 +59,6 @@ private[spark] class TaskContextImpl( this } - @deprecated("use addTaskCompletionListener", "1.1.0") - override def addOnCompleteCallback(f: () => Unit) { - onCompleteCallbacks += new TaskCompletionListener { - override def onTaskCompletion(context: TaskContext): Unit = f() - } - } - /** Marks the task as completed and triggers the listeners. */ private[spark] def markTaskCompleted(): Unit = { completed = true diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index c32aefac46..37ae007f88 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -23,7 +23,6 @@ import scala.language.implicitConversions import scala.reflect.ClassTag import org.apache.spark.Partitioner -import org.apache.spark.SparkContext.doubleRDDToDoubleRDDFunctions import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.rdd.RDD diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 0e4d7dce0f..9cf68672be 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -57,9 +57,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def rdd: RDD[T] - @deprecated("Use partitions() instead.", "1.1.0") - def splits: JList[Partition] = rdd.partitions.toSeq.asJava - /** Set of partitions in this RDD. */ def partitions: JList[Partition] = rdd.partitions.toSeq.asJava @@ -346,13 +343,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def toLocalIterator(): JIterator[T] = asJavaIteratorConverter(rdd.toLocalIterator).asJava - /** - * Return an array that contains all of the elements in this RDD. - * @deprecated As of Spark 1.0.0, toArray() is deprecated, use {@link #collect()} instead - */ - @deprecated("use collect()", "1.0.0") - def toArray(): JList[T] = collect() - /** * Return an array that contains all of the elements in a specific partition of this RDD. */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 4f54cd69e2..9f5b89bb4b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -102,7 +102,7 @@ class JavaSparkContext(val sc: SparkContext) */ def this(master: String, appName: String, sparkHome: String, jars: Array[String], environment: JMap[String, String]) = - this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment.asScala, Map())) + this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment.asScala)) private[spark] val env = sc.env @@ -126,14 +126,6 @@ class JavaSparkContext(val sc: SparkContext) /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ def defaultParallelism: java.lang.Integer = sc.defaultParallelism - /** - * Default min number of partitions for Hadoop RDDs when not given by user. - * @deprecated As of Spark 1.0.0, defaultMinSplits is deprecated, use - * {@link #defaultMinPartitions()} instead - */ - @deprecated("use defaultMinPartitions", "1.0.0") - def defaultMinSplits: java.lang.Integer = sc.defaultMinSplits - /** Default min number of partitions for Hadoop RDDs when not given by user */ def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions @@ -671,24 +663,6 @@ class JavaSparkContext(val sc: SparkContext) sc.addJar(path) } - /** - * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to - * any new nodes. - */ - @deprecated("adding jars no longer creates local copies that need to be deleted", "1.0.0") - def clearJars() { - sc.clearJars() - } - - /** - * Clear the job's list of files added by `addFile` so that they do not get downloaded to - * any new nodes. - */ - @deprecated("adding files no longer creates local copies that need to be deleted", "1.0.0") - def clearFiles() { - sc.clearFiles() - } - /** * Returns the Hadoop configuration used for the Hadoop code (e.g. file systems) we reuse. * diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index b872301425..76b31165aa 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -359,12 +359,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) self.mapPartitions(reducePartition).reduce(mergeMaps).asScala } - /** Alias for reduceByKeyLocally */ - @deprecated("Use reduceByKeyLocally", "1.0.0") - def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = self.withScope { - reduceByKeyLocally(func) - } - /** * Count the number of elements for each key, collecting the results to a local Map. * diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 9fe9d83a70..394f79dc77 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -746,99 +746,6 @@ abstract class RDD[T: ClassTag]( preservesPartitioning) } - /** - * :: DeveloperApi :: - * Return a new RDD by applying a function to each partition of this RDD. This is a variant of - * mapPartitions that also passes the TaskContext into the closure. - * - * `preservesPartitioning` indicates whether the input function preserves the partitioner, which - * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. - */ - @DeveloperApi - @deprecated("use TaskContext.get", "1.2.0") - def mapPartitionsWithContext[U: ClassTag]( - f: (TaskContext, Iterator[T]) => Iterator[U], - preservesPartitioning: Boolean = false): RDD[U] = withScope { - val cleanF = sc.clean(f) - val func = (context: TaskContext, index: Int, iter: Iterator[T]) => cleanF(context, iter) - new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) - } - - /** - * Return a new RDD by applying a function to each partition of this RDD, while tracking the index - * of the original partition. - */ - @deprecated("use mapPartitionsWithIndex", "0.7.0") - def mapPartitionsWithSplit[U: ClassTag]( - f: (Int, Iterator[T]) => Iterator[U], - preservesPartitioning: Boolean = false): RDD[U] = withScope { - mapPartitionsWithIndex(f, preservesPartitioning) - } - - /** - * Maps f over this RDD, where f takes an additional parameter of type A. This - * additional parameter is produced by constructA, which is called in each - * partition with the index of that partition. - */ - @deprecated("use mapPartitionsWithIndex", "1.0.0") - def mapWith[A, U: ClassTag] - (constructA: Int => A, preservesPartitioning: Boolean = false) - (f: (T, A) => U): RDD[U] = withScope { - val cleanF = sc.clean(f) - val cleanA = sc.clean(constructA) - mapPartitionsWithIndex((index, iter) => { - val a = cleanA(index) - iter.map(t => cleanF(t, a)) - }, preservesPartitioning) - } - - /** - * FlatMaps f over this RDD, where f takes an additional parameter of type A. This - * additional parameter is produced by constructA, which is called in each - * partition with the index of that partition. - */ - @deprecated("use mapPartitionsWithIndex and flatMap", "1.0.0") - def flatMapWith[A, U: ClassTag] - (constructA: Int => A, preservesPartitioning: Boolean = false) - (f: (T, A) => Seq[U]): RDD[U] = withScope { - val cleanF = sc.clean(f) - val cleanA = sc.clean(constructA) - mapPartitionsWithIndex((index, iter) => { - val a = cleanA(index) - iter.flatMap(t => cleanF(t, a)) - }, preservesPartitioning) - } - - /** - * Applies f to each element of this RDD, where f takes an additional parameter of type A. - * This additional parameter is produced by constructA, which is called in each - * partition with the index of that partition. - */ - @deprecated("use mapPartitionsWithIndex and foreach", "1.0.0") - def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit): Unit = withScope { - val cleanF = sc.clean(f) - val cleanA = sc.clean(constructA) - mapPartitionsWithIndex { (index, iter) => - val a = cleanA(index) - iter.map(t => {cleanF(t, a); t}) - } - } - - /** - * Filters this RDD with p, where p takes an additional parameter of type A. This - * additional parameter is produced by constructA, which is called in each - * partition with the index of that partition. - */ - @deprecated("use mapPartitionsWithIndex and filter", "1.0.0") - def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = withScope { - val cleanP = sc.clean(p) - val cleanA = sc.clean(constructA) - mapPartitionsWithIndex((index, iter) => { - val a = cleanA(index) - iter.filter(t => cleanP(t, a)) - }, preservesPartitioning = true) - } - /** * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, * second element in each RDD, etc. Assumes that the two RDDs have the *same number of @@ -944,14 +851,6 @@ abstract class RDD[T: ClassTag]( (0 until partitions.length).iterator.flatMap(i => collectPartition(i)) } - /** - * Return an array that contains all of the elements in this RDD. - */ - @deprecated("use collect", "1.0.0") - def toArray(): Array[T] = withScope { - collect() - } - /** * Return an RDD that contains all matching values by applying `f`. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala deleted file mode 100644 index 9e8cee5331..0000000000 --- a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import java.util.Random - -import scala.reflect.ClassTag - -import org.apache.commons.math3.distribution.PoissonDistribution - -import org.apache.spark.{Partition, TaskContext} - -@deprecated("Replaced by PartitionwiseSampledRDDPartition", "1.0.0") -private[spark] -class SampledRDDPartition(val prev: Partition, val seed: Int) extends Partition with Serializable { - override val index: Int = prev.index -} - -@deprecated("Replaced by PartitionwiseSampledRDD", "1.0.0") -private[spark] class SampledRDD[T: ClassTag]( - prev: RDD[T], - withReplacement: Boolean, - frac: Double, - seed: Int) - extends RDD[T](prev) { - - override def getPartitions: Array[Partition] = { - val rg = new Random(seed) - firstParent[T].partitions.map(x => new SampledRDDPartition(x, rg.nextInt)) - } - - override def getPreferredLocations(split: Partition): Seq[String] = - firstParent[T].preferredLocations(split.asInstanceOf[SampledRDDPartition].prev) - - override def compute(splitIn: Partition, context: TaskContext): Iterator[T] = { - val split = splitIn.asInstanceOf[SampledRDDPartition] - if (withReplacement) { - // For large datasets, the expected number of occurrences of each element in a sample with - // replacement is Poisson(frac). We use that to get a count for each element. - val poisson = new PoissonDistribution(frac) - poisson.reseedRandomGenerator(split.seed) - - firstParent[T].iterator(split.prev, context).flatMap { element => - val count = poisson.sample() - if (count == 0) { - Iterator.empty // Avoid object allocation when we return 0 items, which is quite often - } else { - Iterator.fill(count)(element) - } - } - } else { // Sampling without replacement - val rand = new Random(split.seed) - firstParent[T].iterator(split.prev, context).filter(x => (rand.nextDouble <= frac)) - } - } -} diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala index 4b5f15dd06..c4bc85a5ea 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala @@ -38,11 +38,6 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag extends Logging with Serializable { - @deprecated("It's used to provide backward compatibility for pre 1.3.0.", "1.3.0") - def this(self: RDD[(K, V)]) { - this(self, null, null) - } - private val keyWritableClass = if (_keyWritableClass == null) { // pre 1.3.0, we need to use Reflection to get the Writable class diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index f113c2b1b8..a42990addb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -95,9 +95,6 @@ class TaskInfo( } } - @deprecated("Use attemptNumber", "1.6.0") - def attempt: Int = attemptNumber - def id: String = s"$index.$attemptNumber" def duration: Long = { diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index a51f30b9c2..b68936f5c9 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -17,7 +17,6 @@ package org.apache.spark.util -import scala.concurrent.duration.FiniteDuration import scala.language.postfixOps import org.apache.spark.SparkConf @@ -50,18 +49,8 @@ private[spark] object RpcUtils { RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), "120s") } - @deprecated("use askRpcTimeout instead, this method was not intended to be public", "1.5.0") - def askTimeout(conf: SparkConf): FiniteDuration = { - askRpcTimeout(conf).duration - } - /** Returns the default Spark timeout to use for RPC remote endpoint lookup. */ def lookupRpcTimeout(conf: SparkConf): RpcTimeout = { RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s") } - - @deprecated("use lookupRpcTimeout instead, this method was not intended to be public", "1.5.0") - def lookupTimeout(conf: SparkConf): FiniteDuration = { - lookupRpcTimeout(conf).duration - } } diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index d91948e446..502f86f178 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -687,13 +687,6 @@ public class JavaAPISuite implements Serializable { }).isEmpty()); } - @Test - public void toArray() { - JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3)); - List list = rdd.toArray(); - Assert.assertEquals(Arrays.asList(1, 2, 3), list); - } - @Test public void cartesian() { JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 007a71f87c..18d1466bb7 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -441,66 +441,6 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(prunedData(0) === 10) } - test("mapWith") { - import java.util.Random - val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) - @deprecated("suppress compile time deprecation warning", "1.0.0") - val randoms = ones.mapWith( - (index: Int) => new Random(index + 42)) - {(t: Int, prng: Random) => prng.nextDouble * t}.collect() - val prn42_3 = { - val prng42 = new Random(42) - prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble() - } - val prn43_3 = { - val prng43 = new Random(43) - prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble() - } - assert(randoms(2) === prn42_3) - assert(randoms(5) === prn43_3) - } - - test("flatMapWith") { - import java.util.Random - val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) - @deprecated("suppress compile time deprecation warning", "1.0.0") - val randoms = ones.flatMapWith( - (index: Int) => new Random(index + 42)) - {(t: Int, prng: Random) => - val random = prng.nextDouble() - Seq(random * t, random * t * 10)}. - collect() - val prn42_3 = { - val prng42 = new Random(42) - prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble() - } - val prn43_3 = { - val prng43 = new Random(43) - prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble() - } - assert(randoms(5) === prn42_3 * 10) - assert(randoms(11) === prn43_3 * 10) - } - - test("filterWith") { - import java.util.Random - val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2) - @deprecated("suppress compile time deprecation warning", "1.0.0") - val sample = ints.filterWith( - (index: Int) => new Random(index + 42)) - {(t: Int, prng: Random) => prng.nextInt(3) == 0}. - collect() - val checkSample = { - val prng42 = new Random(42) - val prng43 = new Random(43) - Array(1, 2, 3, 4, 5, 6).filter{i => - if (i < 4) 0 == prng42.nextInt(3) else 0 == prng43.nextInt(3) - } - } - assert(sample.size === checkSample.size) - for (i <- 0 until sample.size) assert(sample(i) === checkSample(i)) - } - test("collect large number of empty partitions") { // Regression test for SPARK-4019 assert(sc.makeRDD(0 until 10, 1000).repartition(2001).collect().toSet === (0 until 10).toSet) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index d83d0aee42..40ebfdde92 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -99,14 +99,6 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark }.collect() assert(attemptIdsWithFailedTask.toSet === Set(0, 1)) } - - test("TaskContext.attemptId returns taskAttemptId for backwards-compatibility (SPARK-4014)") { - sc = new SparkContext("local", "test") - val attemptIds = sc.parallelize(Seq(1, 2, 3, 4), 4).mapPartitions { iter => - Seq(TaskContext.get().attemptId).iterator - }.collect() - assert(attemptIds.toSet === Set(0, 1, 2, 3)) - } } private object TaskContextSuite { diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 480722a5ac..5e745e0a95 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.util import java.io.NotSerializableException -import java.util.Random import org.apache.spark.LocalSparkContext._ import org.apache.spark.{SparkContext, SparkException, SparkFunSuite, TaskContext} @@ -91,11 +90,6 @@ class ClosureCleanerSuite extends SparkFunSuite { expectCorrectException { TestUserClosuresActuallyCleaned.testKeyBy(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitions(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithIndex(rdd) } - expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithContext(rdd) } - expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMapWith(rdd) } - expectCorrectException { TestUserClosuresActuallyCleaned.testFilterWith(rdd) } - expectCorrectException { TestUserClosuresActuallyCleaned.testForEachWith(rdd) } - expectCorrectException { TestUserClosuresActuallyCleaned.testMapWith(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions2(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions3(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions4(rdd) } @@ -269,21 +263,6 @@ private object TestUserClosuresActuallyCleaned { def testMapPartitionsWithIndex(rdd: RDD[Int]): Unit = { rdd.mapPartitionsWithIndex { (_, it) => return; it }.count() } - def testFlatMapWith(rdd: RDD[Int]): Unit = { - rdd.flatMapWith ((index: Int) => new Random(index + 42)){ (_, it) => return; Seq() }.count() - } - def testMapWith(rdd: RDD[Int]): Unit = { - rdd.mapWith ((index: Int) => new Random(index + 42)){ (_, it) => return; 0 }.count() - } - def testFilterWith(rdd: RDD[Int]): Unit = { - rdd.filterWith ((index: Int) => new Random(index + 42)){ (_, it) => return; true }.count() - } - def testForEachWith(rdd: RDD[Int]): Unit = { - rdd.foreachWith ((index: Int) => new Random(index + 42)){ (_, it) => return } - } - def testMapPartitionsWithContext(rdd: RDD[Int]): Unit = { - rdd.mapPartitionsWithContext { (_, it) => return; it }.count() - } def testZipPartitions2(rdd: RDD[Int]): Unit = { rdd.zipPartitions(rdd) { case (it1, it2) => return; it1 }.count() } -- cgit v1.2.3