aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-01-05 11:10:14 -0800
committerJosh Rosen <joshrosen@databricks.com>2016-01-05 11:10:14 -0800
commit8ce645d4eeda203cf5e100c4bdba2d71edd44e6a (patch)
treea4bb76e60b52ce5b4c12c6794f24920bd958385d /core
parent76768337beec6842660db7522ad15c25ee66d346 (diff)
downloadspark-8ce645d4eeda203cf5e100c4bdba2d71edd44e6a.tar.gz
spark-8ce645d4eeda203cf5e100c4bdba2d71edd44e6a.tar.bz2
spark-8ce645d4eeda203cf5e100c4bdba2d71edd44e6a.zip
[SPARK-12615] Remove some deprecated APIs in RDD/SparkContext
I looked at each case individually and it looks like they can all be removed. The only one that I had to think twice was toArray (I even thought about un-deprecating it, until I realized it was a problem in Java to have toArray returning java.util.List). Author: Reynold Xin <rxin@databricks.com> Closes #10569 from rxin/SPARK-12615.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala261
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala101
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala71
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/RpcUtils.scala11
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java7
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala60
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala21
17 files changed, 3 insertions, 624 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 7196e57d5d..62629000cf 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -34,10 +34,6 @@ case class Aggregator[K, V, C] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {
- @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
- def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
- combineValuesByKey(iter, null)
-
def combineValuesByKey(
iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
@@ -47,10 +43,6 @@ case class Aggregator[K, V, C] (
combiners.iterator
}
- @deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0")
- def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]]) : Iterator[(K, C)] =
- combineCombinersByKey(iter, null)
-
def combineCombinersByKey(
iter: Iterator[_ <: Product2[K, C]],
context: TaskContext): Iterator[(K, C)] = {
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 77e44ee026..87301202de 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean, AtomicIntege
import java.util.UUID.randomUUID
import scala.collection.JavaConverters._
-import scala.collection.{Map, Set}
+import scala.collection.Map
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.reflect.{ClassTag, classTag}
@@ -123,20 +123,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def this() = this(new SparkConf())
/**
- * :: DeveloperApi ::
- * Alternative constructor for setting preferred locations where Spark will create executors.
- *
- * @param config a [[org.apache.spark.SparkConf]] object specifying other Spark parameters
- * @param preferredNodeLocationData not used. Left for backward compatibility.
- */
- @deprecated("Passing in preferred locations has no effect at all, see SPARK-8949", "1.5.0")
- @DeveloperApi
- def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = {
- this(config)
- logWarning("Passing in preferred locations has no effect at all, see SPARK-8949")
- }
-
- /**
* Alternative constructor that allows setting common Spark properties directly
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
@@ -155,21 +141,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param environment Environment variables to set on worker nodes.
- * @param preferredNodeLocationData not used. Left for backward compatibility.
*/
- @deprecated("Passing in preferred locations has no effect at all, see SPARK-10921", "1.6.0")
def this(
master: String,
appName: String,
sparkHome: String = null,
jars: Seq[String] = Nil,
- environment: Map[String, String] = Map(),
- preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
+ environment: Map[String, String] = Map()) =
{
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
- if (preferredNodeLocationData.nonEmpty) {
- logWarning("Passing in preferred locations has no effect at all, see SPARK-8949")
- }
}
// NOTE: The below constructors could be consolidated using default arguments. Due to
@@ -267,8 +247,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Generate the random name for a temp folder in external block store.
// Add a timestamp as the suffix here to make it more safe
val externalBlockStoreFolderName = "spark-" + randomUUID.toString()
- @deprecated("Use externalBlockStoreFolderName instead.", "1.4.0")
- val tachyonFolderName = externalBlockStoreFolderName
def isLocal: Boolean = (master == "local" || master.startsWith("local["))
@@ -641,11 +619,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
localProperties.set(props)
}
- @deprecated("Properties no longer need to be explicitly initialized.", "1.0.0")
- def initLocalProperties() {
- localProperties.set(new Properties())
- }
-
/**
* Set a local property that affects jobs submitted from this thread, such as the
* Spark fair scheduler pool.
@@ -1586,15 +1559,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
/**
- * Clear the job's list of files added by `addFile` so that they do not get downloaded to
- * any new nodes.
- */
- @deprecated("adding files no longer creates local copies that need to be deleted", "1.0.0")
- def clearFiles() {
- addedFiles.clear()
- }
-
- /**
* Gets the locality information associated with the partition in a particular rdd
* @param rdd of interest
* @param partition to be looked up for locality
@@ -1685,15 +1649,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
postEnvironmentUpdate()
}
- /**
- * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
- * any new nodes.
- */
- @deprecated("adding jars no longer creates local copies that need to be deleted", "1.0.0")
- def clearJars() {
- addedJars.clear()
- }
-
// Shut down the SparkContext.
def stop() {
if (AsynchronousListenerBus.withinListenerThread.value) {
@@ -1864,63 +1819,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
-
- /**
- * Run a function on a given set of partitions in an RDD and pass the results to the given
- * handler function. This is the main entry point for all actions in Spark.
- *
- * The allowLocal flag is deprecated as of Spark 1.5.0+.
- */
- @deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
- def runJob[T, U: ClassTag](
- rdd: RDD[T],
- func: (TaskContext, Iterator[T]) => U,
- partitions: Seq[Int],
- allowLocal: Boolean,
- resultHandler: (Int, U) => Unit): Unit = {
- if (allowLocal) {
- logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
- }
- runJob(rdd, func, partitions, resultHandler)
- }
-
- /**
- * Run a function on a given set of partitions in an RDD and return the results as an array.
- *
- * The allowLocal flag is deprecated as of Spark 1.5.0+.
- */
- @deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
- def runJob[T, U: ClassTag](
- rdd: RDD[T],
- func: (TaskContext, Iterator[T]) => U,
- partitions: Seq[Int],
- allowLocal: Boolean
- ): Array[U] = {
- if (allowLocal) {
- logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
- }
- runJob(rdd, func, partitions)
- }
-
- /**
- * Run a job on a given set of partitions of an RDD, but take a function of type
- * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
- *
- * The allowLocal argument is deprecated as of Spark 1.5.0+.
- */
- @deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
- def runJob[T, U: ClassTag](
- rdd: RDD[T],
- func: Iterator[T] => U,
- partitions: Seq[Int],
- allowLocal: Boolean
- ): Array[U] = {
- if (allowLocal) {
- logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
- }
- runJob(rdd, func, partitions)
- }
-
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
@@ -2094,10 +1992,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
taskScheduler.defaultParallelism
}
- /** Default min number of partitions for Hadoop RDDs when not given by user */
- @deprecated("use defaultMinPartitions", "1.0.0")
- def defaultMinSplits: Int = defaultMinPartitions
-
/**
* Default min number of partitions for Hadoop RDDs when not given by user
* Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
@@ -2364,113 +2258,6 @@ object SparkContext extends Logging {
*/
private[spark] val LEGACY_DRIVER_IDENTIFIER = "<driver>"
- // The following deprecated objects have already been copied to `object AccumulatorParam` to
- // make the compiler find them automatically. They are duplicate codes only for backward
- // compatibility, please update `object AccumulatorParam` accordingly if you plan to modify the
- // following ones.
-
- @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
- "backward compatibility.", "1.3.0")
- object DoubleAccumulatorParam extends AccumulatorParam[Double] {
- def addInPlace(t1: Double, t2: Double): Double = t1 + t2
- def zero(initialValue: Double): Double = 0.0
- }
-
- @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
- "backward compatibility.", "1.3.0")
- object IntAccumulatorParam extends AccumulatorParam[Int] {
- def addInPlace(t1: Int, t2: Int): Int = t1 + t2
- def zero(initialValue: Int): Int = 0
- }
-
- @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
- "backward compatibility.", "1.3.0")
- object LongAccumulatorParam extends AccumulatorParam[Long] {
- def addInPlace(t1: Long, t2: Long): Long = t1 + t2
- def zero(initialValue: Long): Long = 0L
- }
-
- @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
- "backward compatibility.", "1.3.0")
- object FloatAccumulatorParam extends AccumulatorParam[Float] {
- def addInPlace(t1: Float, t2: Float): Float = t1 + t2
- def zero(initialValue: Float): Float = 0f
- }
-
- // The following deprecated functions have already been moved to `object RDD` to
- // make the compiler find them automatically. They are still kept here for backward compatibility
- // and just call the corresponding functions in `object RDD`.
-
- @deprecated("Replaced by implicit functions in the RDD companion object. This is " +
- "kept here only for backward compatibility.", "1.3.0")
- def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
- (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] =
- RDD.rddToPairRDDFunctions(rdd)
-
- @deprecated("Replaced by implicit functions in the RDD companion object. This is " +
- "kept here only for backward compatibility.", "1.3.0")
- def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]): AsyncRDDActions[T] =
- RDD.rddToAsyncRDDActions(rdd)
-
- @deprecated("Replaced by implicit functions in the RDD companion object. This is " +
- "kept here only for backward compatibility.", "1.3.0")
- def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
- rdd: RDD[(K, V)]): SequenceFileRDDFunctions[K, V] = {
- val kf = implicitly[K => Writable]
- val vf = implicitly[V => Writable]
- // Set the Writable class to null and `SequenceFileRDDFunctions` will use Reflection to get it
- implicit val keyWritableFactory = new WritableFactory[K](_ => null, kf)
- implicit val valueWritableFactory = new WritableFactory[V](_ => null, vf)
- RDD.rddToSequenceFileRDDFunctions(rdd)
- }
-
- @deprecated("Replaced by implicit functions in the RDD companion object. This is " +
- "kept here only for backward compatibility.", "1.3.0")
- def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
- rdd: RDD[(K, V)]): OrderedRDDFunctions[K, V, (K, V)] =
- RDD.rddToOrderedRDDFunctions(rdd)
-
- @deprecated("Replaced by implicit functions in the RDD companion object. This is " +
- "kept here only for backward compatibility.", "1.3.0")
- def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]): DoubleRDDFunctions =
- RDD.doubleRDDToDoubleRDDFunctions(rdd)
-
- @deprecated("Replaced by implicit functions in the RDD companion object. This is " +
- "kept here only for backward compatibility.", "1.3.0")
- def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]): DoubleRDDFunctions =
- RDD.numericRDDToDoubleRDDFunctions(rdd)
-
- // The following deprecated functions have already been moved to `object WritableFactory` to
- // make the compiler find them automatically. They are still kept here for backward compatibility.
-
- @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
- "kept here only for backward compatibility.", "1.3.0")
- implicit def intToIntWritable(i: Int): IntWritable = new IntWritable(i)
-
- @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
- "kept here only for backward compatibility.", "1.3.0")
- implicit def longToLongWritable(l: Long): LongWritable = new LongWritable(l)
-
- @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
- "kept here only for backward compatibility.", "1.3.0")
- implicit def floatToFloatWritable(f: Float): FloatWritable = new FloatWritable(f)
-
- @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
- "kept here only for backward compatibility.", "1.3.0")
- implicit def doubleToDoubleWritable(d: Double): DoubleWritable = new DoubleWritable(d)
-
- @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
- "kept here only for backward compatibility.", "1.3.0")
- implicit def boolToBoolWritable (b: Boolean): BooleanWritable = new BooleanWritable(b)
-
- @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
- "kept here only for backward compatibility.", "1.3.0")
- implicit def bytesToBytesWritable (aob: Array[Byte]): BytesWritable = new BytesWritable(aob)
-
- @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
- "kept here only for backward compatibility.", "1.3.0")
- implicit def stringToText(s: String): Text = new Text(s)
-
private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T])
: ArrayWritable = {
def anyToWritable[U <% Writable](u: U): Writable = u
@@ -2479,50 +2266,6 @@ object SparkContext extends Logging {
arr.map(x => anyToWritable(x)).toArray)
}
- // The following deprecated functions have already been moved to `object WritableConverter` to
- // make the compiler find them automatically. They are still kept here for backward compatibility
- // and just call the corresponding functions in `object WritableConverter`.
-
- @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
- "backward compatibility.", "1.3.0")
- def intWritableConverter(): WritableConverter[Int] =
- WritableConverter.intWritableConverter()
-
- @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
- "backward compatibility.", "1.3.0")
- def longWritableConverter(): WritableConverter[Long] =
- WritableConverter.longWritableConverter()
-
- @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
- "backward compatibility.", "1.3.0")
- def doubleWritableConverter(): WritableConverter[Double] =
- WritableConverter.doubleWritableConverter()
-
- @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
- "backward compatibility.", "1.3.0")
- def floatWritableConverter(): WritableConverter[Float] =
- WritableConverter.floatWritableConverter()
-
- @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
- "backward compatibility.", "1.3.0")
- def booleanWritableConverter(): WritableConverter[Boolean] =
- WritableConverter.booleanWritableConverter()
-
- @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
- "backward compatibility.", "1.3.0")
- def bytesWritableConverter(): WritableConverter[Array[Byte]] =
- WritableConverter.bytesWritableConverter()
-
- @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
- "backward compatibility.", "1.3.0")
- def stringWritableConverter(): WritableConverter[String] =
- WritableConverter.stringWritableConverter()
-
- @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
- "backward compatibility.", "1.3.0")
- def writableWritableConverter[T <: Writable](): WritableConverter[T] =
- WritableConverter.writableWritableConverter()
-
/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to SparkContext.
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index af558d6e5b..e25ed0fdd7 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -95,9 +95,6 @@ abstract class TaskContext extends Serializable {
*/
def isInterrupted(): Boolean
- @deprecated("use isRunningLocally", "1.2.0")
- def runningLocally(): Boolean
-
/**
* Returns true if the task is running locally in the driver program.
* @return
@@ -119,16 +116,6 @@ abstract class TaskContext extends Serializable {
def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext
/**
- * Adds a callback function to be executed on task completion. An example use
- * is for HadoopRDD to register a callback to close the input stream.
- * Will be called in any situation - success, failure, or cancellation.
- *
- * @param f Callback function.
- */
- @deprecated("use addTaskCompletionListener", "1.2.0")
- def addOnCompleteCallback(f: () => Unit)
-
- /**
* The ID of the stage that this task belong to.
*/
def stageId(): Int
@@ -144,9 +131,6 @@ abstract class TaskContext extends Serializable {
*/
def attemptNumber(): Int
- @deprecated("use attemptNumber", "1.3.0")
- def attemptId(): Long
-
/**
* An ID that is unique to this task attempt (within the same SparkContext, no two task attempts
* will share the same attempt ID). This is roughly equivalent to Hadoop's TaskAttemptID.
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index f0ae83a934..6c49363099 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -38,9 +38,6 @@ private[spark] class TaskContextImpl(
extends TaskContext
with Logging {
- // For backwards-compatibility; this method is now deprecated as of 1.3.0.
- override def attemptId(): Long = taskAttemptId
-
// List of callback functions to execute when the task completes.
@transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]
@@ -62,13 +59,6 @@ private[spark] class TaskContextImpl(
this
}
- @deprecated("use addTaskCompletionListener", "1.1.0")
- override def addOnCompleteCallback(f: () => Unit) {
- onCompleteCallbacks += new TaskCompletionListener {
- override def onTaskCompletion(context: TaskContext): Unit = f()
- }
- }
-
/** Marks the task as completed and triggers the listeners. */
private[spark] def markTaskCompleted(): Unit = {
completed = true
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index c32aefac46..37ae007f88 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -23,7 +23,6 @@ import scala.language.implicitConversions
import scala.reflect.ClassTag
import org.apache.spark.Partitioner
-import org.apache.spark.SparkContext.doubleRDDToDoubleRDDFunctions
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.RDD
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 0e4d7dce0f..9cf68672be 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -57,9 +57,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def rdd: RDD[T]
- @deprecated("Use partitions() instead.", "1.1.0")
- def splits: JList[Partition] = rdd.partitions.toSeq.asJava
-
/** Set of partitions in this RDD. */
def partitions: JList[Partition] = rdd.partitions.toSeq.asJava
@@ -347,13 +344,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
asJavaIteratorConverter(rdd.toLocalIterator).asJava
/**
- * Return an array that contains all of the elements in this RDD.
- * @deprecated As of Spark 1.0.0, toArray() is deprecated, use {@link #collect()} instead
- */
- @deprecated("use collect()", "1.0.0")
- def toArray(): JList[T] = collect()
-
- /**
* Return an array that contains all of the elements in a specific partition of this RDD.
*/
def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 4f54cd69e2..9f5b89bb4b 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -102,7 +102,7 @@ class JavaSparkContext(val sc: SparkContext)
*/
def this(master: String, appName: String, sparkHome: String, jars: Array[String],
environment: JMap[String, String]) =
- this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment.asScala, Map()))
+ this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment.asScala))
private[spark] val env = sc.env
@@ -126,14 +126,6 @@ class JavaSparkContext(val sc: SparkContext)
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: java.lang.Integer = sc.defaultParallelism
- /**
- * Default min number of partitions for Hadoop RDDs when not given by user.
- * @deprecated As of Spark 1.0.0, defaultMinSplits is deprecated, use
- * {@link #defaultMinPartitions()} instead
- */
- @deprecated("use defaultMinPartitions", "1.0.0")
- def defaultMinSplits: java.lang.Integer = sc.defaultMinSplits
-
/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions
@@ -672,24 +664,6 @@ class JavaSparkContext(val sc: SparkContext)
}
/**
- * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
- * any new nodes.
- */
- @deprecated("adding jars no longer creates local copies that need to be deleted", "1.0.0")
- def clearJars() {
- sc.clearJars()
- }
-
- /**
- * Clear the job's list of files added by `addFile` so that they do not get downloaded to
- * any new nodes.
- */
- @deprecated("adding files no longer creates local copies that need to be deleted", "1.0.0")
- def clearFiles() {
- sc.clearFiles()
- }
-
- /**
* Returns the Hadoop configuration used for the Hadoop code (e.g. file systems) we reuse.
*
* '''Note:''' As it will be reused in all Hadoop RDDs, it's better not to modify it unless you
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index b872301425..76b31165aa 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -359,12 +359,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
self.mapPartitions(reducePartition).reduce(mergeMaps).asScala
}
- /** Alias for reduceByKeyLocally */
- @deprecated("Use reduceByKeyLocally", "1.0.0")
- def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = self.withScope {
- reduceByKeyLocally(func)
- }
-
/**
* Count the number of elements for each key, collecting the results to a local Map.
*
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 9fe9d83a70..394f79dc77 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -747,99 +747,6 @@ abstract class RDD[T: ClassTag](
}
/**
- * :: DeveloperApi ::
- * Return a new RDD by applying a function to each partition of this RDD. This is a variant of
- * mapPartitions that also passes the TaskContext into the closure.
- *
- * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
- * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
- */
- @DeveloperApi
- @deprecated("use TaskContext.get", "1.2.0")
- def mapPartitionsWithContext[U: ClassTag](
- f: (TaskContext, Iterator[T]) => Iterator[U],
- preservesPartitioning: Boolean = false): RDD[U] = withScope {
- val cleanF = sc.clean(f)
- val func = (context: TaskContext, index: Int, iter: Iterator[T]) => cleanF(context, iter)
- new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
- }
-
- /**
- * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
- * of the original partition.
- */
- @deprecated("use mapPartitionsWithIndex", "0.7.0")
- def mapPartitionsWithSplit[U: ClassTag](
- f: (Int, Iterator[T]) => Iterator[U],
- preservesPartitioning: Boolean = false): RDD[U] = withScope {
- mapPartitionsWithIndex(f, preservesPartitioning)
- }
-
- /**
- * Maps f over this RDD, where f takes an additional parameter of type A. This
- * additional parameter is produced by constructA, which is called in each
- * partition with the index of that partition.
- */
- @deprecated("use mapPartitionsWithIndex", "1.0.0")
- def mapWith[A, U: ClassTag]
- (constructA: Int => A, preservesPartitioning: Boolean = false)
- (f: (T, A) => U): RDD[U] = withScope {
- val cleanF = sc.clean(f)
- val cleanA = sc.clean(constructA)
- mapPartitionsWithIndex((index, iter) => {
- val a = cleanA(index)
- iter.map(t => cleanF(t, a))
- }, preservesPartitioning)
- }
-
- /**
- * FlatMaps f over this RDD, where f takes an additional parameter of type A. This
- * additional parameter is produced by constructA, which is called in each
- * partition with the index of that partition.
- */
- @deprecated("use mapPartitionsWithIndex and flatMap", "1.0.0")
- def flatMapWith[A, U: ClassTag]
- (constructA: Int => A, preservesPartitioning: Boolean = false)
- (f: (T, A) => Seq[U]): RDD[U] = withScope {
- val cleanF = sc.clean(f)
- val cleanA = sc.clean(constructA)
- mapPartitionsWithIndex((index, iter) => {
- val a = cleanA(index)
- iter.flatMap(t => cleanF(t, a))
- }, preservesPartitioning)
- }
-
- /**
- * Applies f to each element of this RDD, where f takes an additional parameter of type A.
- * This additional parameter is produced by constructA, which is called in each
- * partition with the index of that partition.
- */
- @deprecated("use mapPartitionsWithIndex and foreach", "1.0.0")
- def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit): Unit = withScope {
- val cleanF = sc.clean(f)
- val cleanA = sc.clean(constructA)
- mapPartitionsWithIndex { (index, iter) =>
- val a = cleanA(index)
- iter.map(t => {cleanF(t, a); t})
- }
- }
-
- /**
- * Filters this RDD with p, where p takes an additional parameter of type A. This
- * additional parameter is produced by constructA, which is called in each
- * partition with the index of that partition.
- */
- @deprecated("use mapPartitionsWithIndex and filter", "1.0.0")
- def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = withScope {
- val cleanP = sc.clean(p)
- val cleanA = sc.clean(constructA)
- mapPartitionsWithIndex((index, iter) => {
- val a = cleanA(index)
- iter.filter(t => cleanP(t, a))
- }, preservesPartitioning = true)
- }
-
- /**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of
* partitions* and the *same number of elements in each partition* (e.g. one was made through
@@ -945,14 +852,6 @@ abstract class RDD[T: ClassTag](
}
/**
- * Return an array that contains all of the elements in this RDD.
- */
- @deprecated("use collect", "1.0.0")
- def toArray(): Array[T] = withScope {
- collect()
- }
-
- /**
* Return an RDD that contains all matching values by applying `f`.
*/
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
diff --git a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
deleted file mode 100644
index 9e8cee5331..0000000000
--- a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import java.util.Random
-
-import scala.reflect.ClassTag
-
-import org.apache.commons.math3.distribution.PoissonDistribution
-
-import org.apache.spark.{Partition, TaskContext}
-
-@deprecated("Replaced by PartitionwiseSampledRDDPartition", "1.0.0")
-private[spark]
-class SampledRDDPartition(val prev: Partition, val seed: Int) extends Partition with Serializable {
- override val index: Int = prev.index
-}
-
-@deprecated("Replaced by PartitionwiseSampledRDD", "1.0.0")
-private[spark] class SampledRDD[T: ClassTag](
- prev: RDD[T],
- withReplacement: Boolean,
- frac: Double,
- seed: Int)
- extends RDD[T](prev) {
-
- override def getPartitions: Array[Partition] = {
- val rg = new Random(seed)
- firstParent[T].partitions.map(x => new SampledRDDPartition(x, rg.nextInt))
- }
-
- override def getPreferredLocations(split: Partition): Seq[String] =
- firstParent[T].preferredLocations(split.asInstanceOf[SampledRDDPartition].prev)
-
- override def compute(splitIn: Partition, context: TaskContext): Iterator[T] = {
- val split = splitIn.asInstanceOf[SampledRDDPartition]
- if (withReplacement) {
- // For large datasets, the expected number of occurrences of each element in a sample with
- // replacement is Poisson(frac). We use that to get a count for each element.
- val poisson = new PoissonDistribution(frac)
- poisson.reseedRandomGenerator(split.seed)
-
- firstParent[T].iterator(split.prev, context).flatMap { element =>
- val count = poisson.sample()
- if (count == 0) {
- Iterator.empty // Avoid object allocation when we return 0 items, which is quite often
- } else {
- Iterator.fill(count)(element)
- }
- }
- } else { // Sampling without replacement
- val rand = new Random(split.seed)
- firstParent[T].iterator(split.prev, context).filter(x => (rand.nextDouble <= frac))
- }
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
index 4b5f15dd06..c4bc85a5ea 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
@@ -38,11 +38,6 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
extends Logging
with Serializable {
- @deprecated("It's used to provide backward compatibility for pre 1.3.0.", "1.3.0")
- def this(self: RDD[(K, V)]) {
- this(self, null, null)
- }
-
private val keyWritableClass =
if (_keyWritableClass == null) {
// pre 1.3.0, we need to use Reflection to get the Writable class
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index f113c2b1b8..a42990addb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -95,9 +95,6 @@ class TaskInfo(
}
}
- @deprecated("Use attemptNumber", "1.6.0")
- def attempt: Int = attemptNumber
-
def id: String = s"$index.$attemptNumber"
def duration: Long = {
diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
index a51f30b9c2..b68936f5c9 100644
--- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
@@ -17,7 +17,6 @@
package org.apache.spark.util
-import scala.concurrent.duration.FiniteDuration
import scala.language.postfixOps
import org.apache.spark.SparkConf
@@ -50,18 +49,8 @@ private[spark] object RpcUtils {
RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), "120s")
}
- @deprecated("use askRpcTimeout instead, this method was not intended to be public", "1.5.0")
- def askTimeout(conf: SparkConf): FiniteDuration = {
- askRpcTimeout(conf).duration
- }
-
/** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
def lookupRpcTimeout(conf: SparkConf): RpcTimeout = {
RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s")
}
-
- @deprecated("use lookupRpcTimeout instead, this method was not intended to be public", "1.5.0")
- def lookupTimeout(conf: SparkConf): FiniteDuration = {
- lookupRpcTimeout(conf).duration
- }
}
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index d91948e446..502f86f178 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -688,13 +688,6 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void toArray() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3));
- List<Integer> list = rdd.toArray();
- Assert.assertEquals(Arrays.asList(1, 2, 3), list);
- }
-
- @Test
public void cartesian() {
JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World"));
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 007a71f87c..18d1466bb7 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -441,66 +441,6 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
assert(prunedData(0) === 10)
}
- test("mapWith") {
- import java.util.Random
- val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
- @deprecated("suppress compile time deprecation warning", "1.0.0")
- val randoms = ones.mapWith(
- (index: Int) => new Random(index + 42))
- {(t: Int, prng: Random) => prng.nextDouble * t}.collect()
- val prn42_3 = {
- val prng42 = new Random(42)
- prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
- }
- val prn43_3 = {
- val prng43 = new Random(43)
- prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble()
- }
- assert(randoms(2) === prn42_3)
- assert(randoms(5) === prn43_3)
- }
-
- test("flatMapWith") {
- import java.util.Random
- val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
- @deprecated("suppress compile time deprecation warning", "1.0.0")
- val randoms = ones.flatMapWith(
- (index: Int) => new Random(index + 42))
- {(t: Int, prng: Random) =>
- val random = prng.nextDouble()
- Seq(random * t, random * t * 10)}.
- collect()
- val prn42_3 = {
- val prng42 = new Random(42)
- prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
- }
- val prn43_3 = {
- val prng43 = new Random(43)
- prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble()
- }
- assert(randoms(5) === prn42_3 * 10)
- assert(randoms(11) === prn43_3 * 10)
- }
-
- test("filterWith") {
- import java.util.Random
- val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
- @deprecated("suppress compile time deprecation warning", "1.0.0")
- val sample = ints.filterWith(
- (index: Int) => new Random(index + 42))
- {(t: Int, prng: Random) => prng.nextInt(3) == 0}.
- collect()
- val checkSample = {
- val prng42 = new Random(42)
- val prng43 = new Random(43)
- Array(1, 2, 3, 4, 5, 6).filter{i =>
- if (i < 4) 0 == prng42.nextInt(3) else 0 == prng43.nextInt(3)
- }
- }
- assert(sample.size === checkSample.size)
- for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
- }
-
test("collect large number of empty partitions") {
// Regression test for SPARK-4019
assert(sc.makeRDD(0 until 10, 1000).repartition(2001).collect().toSet === (0 until 10).toSet)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index d83d0aee42..40ebfdde92 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -99,14 +99,6 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
}.collect()
assert(attemptIdsWithFailedTask.toSet === Set(0, 1))
}
-
- test("TaskContext.attemptId returns taskAttemptId for backwards-compatibility (SPARK-4014)") {
- sc = new SparkContext("local", "test")
- val attemptIds = sc.parallelize(Seq(1, 2, 3, 4), 4).mapPartitions { iter =>
- Seq(TaskContext.get().attemptId).iterator
- }.collect()
- assert(attemptIds.toSet === Set(0, 1, 2, 3))
- }
}
private object TaskContextSuite {
diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
index 480722a5ac..5e745e0a95 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.util
import java.io.NotSerializableException
-import java.util.Random
import org.apache.spark.LocalSparkContext._
import org.apache.spark.{SparkContext, SparkException, SparkFunSuite, TaskContext}
@@ -91,11 +90,6 @@ class ClosureCleanerSuite extends SparkFunSuite {
expectCorrectException { TestUserClosuresActuallyCleaned.testKeyBy(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitions(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithIndex(rdd) }
- expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithContext(rdd) }
- expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMapWith(rdd) }
- expectCorrectException { TestUserClosuresActuallyCleaned.testFilterWith(rdd) }
- expectCorrectException { TestUserClosuresActuallyCleaned.testForEachWith(rdd) }
- expectCorrectException { TestUserClosuresActuallyCleaned.testMapWith(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions2(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions3(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions4(rdd) }
@@ -269,21 +263,6 @@ private object TestUserClosuresActuallyCleaned {
def testMapPartitionsWithIndex(rdd: RDD[Int]): Unit = {
rdd.mapPartitionsWithIndex { (_, it) => return; it }.count()
}
- def testFlatMapWith(rdd: RDD[Int]): Unit = {
- rdd.flatMapWith ((index: Int) => new Random(index + 42)){ (_, it) => return; Seq() }.count()
- }
- def testMapWith(rdd: RDD[Int]): Unit = {
- rdd.mapWith ((index: Int) => new Random(index + 42)){ (_, it) => return; 0 }.count()
- }
- def testFilterWith(rdd: RDD[Int]): Unit = {
- rdd.filterWith ((index: Int) => new Random(index + 42)){ (_, it) => return; true }.count()
- }
- def testForEachWith(rdd: RDD[Int]): Unit = {
- rdd.foreachWith ((index: Int) => new Random(index + 42)){ (_, it) => return }
- }
- def testMapPartitionsWithContext(rdd: RDD[Int]): Unit = {
- rdd.mapPartitionsWithContext { (_, it) => return; it }.count()
- }
def testZipPartitions2(rdd: RDD[Int]): Unit = {
rdd.zipPartitions(rdd) { case (it1, it2) => return; it1 }.count()
}