diff options
author | Sean Owen <sowen@cloudera.com> | 2015-11-05 09:08:53 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-11-05 09:08:53 +0000 |
commit | 6f81eae24f83df51a99d4bb2629dd7daadc01519 (patch) | |
tree | 79b7d20c8381b97afb48cfd92ce940297a7f6ea5 /core | |
parent | 81498dd5c86ca51d2fb351c8ef52cbb28e6844f4 (diff) | |
download | spark-6f81eae24f83df51a99d4bb2629dd7daadc01519.tar.gz spark-6f81eae24f83df51a99d4bb2629dd7daadc01519.tar.bz2 spark-6f81eae24f83df51a99d4bb2629dd7daadc01519.zip |
[SPARK-11440][CORE][STREAMING][BUILD] Declare rest of @Experimental items non-experimental if they've existed since 1.2.0
Remove `Experimental` annotations in core, streaming for items that existed in 1.2.0 or before. The changes are:
* SparkContext
* binary{Files,Records} : 1.2.0
* submitJob : 1.0.0
* JavaSparkContext
* binary{Files,Records} : 1.2.0
* DoubleRDDFunctions, JavaDoubleRDD
* {mean,sum}Approx : 1.0.0
* PairRDDFunctions, JavaPairRDD
* sampleByKeyExact : 1.2.0
* countByKeyApprox : 1.0.0
* PairRDDFunctions
* countApproxDistinctByKey : 1.1.0
* RDD
* countApprox, countByValueApprox, countApproxDistinct : 1.0.0
* JavaRDDLike
* countApprox : 1.0.0
* PythonHadoopUtil.Converter : 1.1.0
* PortableDataStream : 1.2.0 (related to binaryFiles)
* BoundedDouble : 1.0.0
* PartialResult : 1.0.0
* StreamingContext, JavaStreamingContext
* binaryRecordsStream : 1.2.0
* HiveContext
* analyze : 1.2.0
Author: Sean Owen <sowen@cloudera.com>
Closes #9396 from srowen/SPARK-11440.
Diffstat (limited to 'core')
12 files changed, 2 insertions, 67 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a6857b4c7d..7421821e26 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -45,7 +45,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.mesos.MesosNativeLibrary -import org.apache.spark.annotation.{DeveloperApi, Experimental} +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump} @@ -870,8 +870,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** - * :: Experimental :: - * * Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file * (useful for binary data) * @@ -902,7 +900,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * list of inputs. * @param minPartitions A suggestion value of the minimal splitting number for input data. */ - @Experimental def binaryFiles( path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope { @@ -922,8 +919,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** - * :: Experimental :: - * * Load data from a flat binary file, assuming the length of each record is constant. * * '''Note:''' We ensure that the byte array for each record in the resulting RDD @@ -936,7 +931,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * * @return An RDD of data with values, represented as byte arrays */ - @Experimental def binaryRecords( path: String, recordLength: Int, @@ -1963,10 +1957,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** - * :: Experimental :: * Submit a job for execution and return a FutureJob holding the result. */ - @Experimental def submitJob[T, U, R]( rdd: RDD[T], processPartition: Iterator[T] => U, diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index a650df605b..c32aefac46 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -24,7 +24,6 @@ import scala.reflect.ClassTag import org.apache.spark.Partitioner import org.apache.spark.SparkContext.doubleRDDToDoubleRDDFunctions -import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.rdd.RDD @@ -209,25 +208,19 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) srdd.meanApprox(timeout, confidence) /** - * :: Experimental :: * Approximate operation to return the mean within a timeout. */ - @Experimental def meanApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.meanApprox(timeout) /** - * :: Experimental :: * Approximate operation to return the sum within a timeout. */ - @Experimental def sumApprox(timeout: Long, confidence: JDouble): PartialResult[BoundedDouble] = srdd.sumApprox(timeout, confidence) /** - * :: Experimental :: * Approximate operation to return the sum within a timeout. */ - @Experimental def sumApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.sumApprox(timeout) /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 8344f6368a..0b0c6e5bb8 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -32,7 +32,6 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.spark.{HashPartitioner, Partitioner} import org.apache.spark.Partitioner._ -import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction} @@ -159,7 +158,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) sampleByKey(withReplacement, fractions, Utils.random.nextLong) /** - * ::Experimental:: * Return a subset of this RDD sampled by key (via stratified sampling) containing exactly * math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key). * @@ -169,14 +167,12 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * additional pass over the RDD to guarantee sample size; when sampling with replacement, we need * two additional passes. */ - @Experimental def sampleByKeyExact(withReplacement: Boolean, fractions: JMap[K, Double], seed: Long): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions.asScala, seed)) /** - * ::Experimental:: * Return a subset of this RDD sampled by key (via stratified sampling) containing exactly * math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key). * @@ -188,7 +184,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * * Use Utils.random.nextLong as the default seed for the random number generator. */ - @Experimental def sampleByKeyExact(withReplacement: Boolean, fractions: JMap[K, Double]): JavaPairRDD[K, V] = sampleByKeyExact(withReplacement, fractions, Utils.random.nextLong) @@ -300,20 +295,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def countByKey(): java.util.Map[K, Long] = mapAsSerializableJavaMap(rdd.countByKey()) /** - * :: Experimental :: * Approximate version of countByKey that can return a partial result if it does * not finish within a timeout. */ - @Experimental def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] = rdd.countByKeyApprox(timeout).map(mapAsSerializableJavaMap) /** - * :: Experimental :: * Approximate version of countByKey that can return a partial result if it does * not finish within a timeout. */ - @Experimental def countByKeyApprox(timeout: Long, confidence: Double = 0.95) : PartialResult[java.util.Map[K, BoundedDouble]] = rdd.countByKeyApprox(timeout, confidence).map(mapAsSerializableJavaMap) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index fc817cdd6a..871be0b1f3 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -28,7 +28,6 @@ import com.google.common.base.Optional import org.apache.hadoop.io.compress.CompressionCodec import org.apache.spark._ -import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaPairRDD._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap @@ -436,20 +435,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def count(): Long = rdd.count() /** - * :: Experimental :: * Approximate version of count() that returns a potentially incomplete result * within a timeout, even if not all tasks have finished. */ - @Experimental def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] = rdd.countApprox(timeout, confidence) /** - * :: Experimental :: * Approximate version of count() that returns a potentially incomplete result * within a timeout, even if not all tasks have finished. */ - @Experimental def countApprox(timeout: Long): PartialResult[BoundedDouble] = rdd.countApprox(timeout) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 609496ccdf..4f54cd69e2 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark._ import org.apache.spark.AccumulatorParam._ -import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD} @@ -266,8 +265,6 @@ class JavaSparkContext(val sc: SparkContext) new JavaPairRDD(sc.binaryFiles(path, minPartitions)) /** - * :: Experimental :: - * * Read a directory of binary files from HDFS, a local file system (available on all nodes), * or any Hadoop-supported file system URI as a byte array. Each file is read as a single * record and returned in a key-value pair, where the key is the path of each file, @@ -294,19 +291,15 @@ class JavaSparkContext(val sc: SparkContext) * * @note Small files are preferred; very large files but may cause bad performance. */ - @Experimental def binaryFiles(path: String): JavaPairRDD[String, PortableDataStream] = new JavaPairRDD(sc.binaryFiles(path, defaultMinPartitions)) /** - * :: Experimental :: - * * Load data from a flat binary file, assuming the length of each record is constant. * * @param path Directory to the input data files * @return An RDD of data with values, represented as byte arrays */ - @Experimental def binaryRecords(path: String, recordLength: Int): JavaRDD[Array[Byte]] = { new JavaRDD(sc.binaryRecords(path, recordLength)) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala index a7dfa1d257..d2beef2a0d 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -24,17 +24,14 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io._ import org.apache.spark.{Logging, SparkException} -import org.apache.spark.annotation.Experimental import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.util.{SerializableConfiguration, Utils} /** - * :: Experimental :: * A trait for use with reading custom classes in PySpark. Implement this trait and add custom * transformation code by overriding the convert method. */ -@Experimental trait Converter[T, + U] extends Serializable { def convert(obj: T): U } diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index e2ffc3b64e..33e4ee0215 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit} -import org.apache.spark.annotation.Experimental import org.apache.spark.deploy.SparkHadoopUtil /** @@ -129,7 +128,6 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDat * @note TaskAttemptContext is not serializable resulting in the confBytes construct * @note CombineFileSplit is not serializable resulting in the splitBytes construct */ -@Experimental class PortableDataStream( isplit: CombineFileSplit, context: TaskAttemptContext, diff --git a/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala index aed0353344..48b9434153 100644 --- a/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala +++ b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala @@ -17,13 +17,9 @@ package org.apache.spark.partial -import org.apache.spark.annotation.Experimental - /** - * :: Experimental :: * A Double value with error bars and associated confidence. */ -@Experimental class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) { override def toString(): String = "[%.3f, %.3f]".format(low, high) } diff --git a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala index 53c4b32c95..25cb7490aa 100644 --- a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala +++ b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala @@ -17,9 +17,6 @@ package org.apache.spark.partial -import org.apache.spark.annotation.Experimental - -@Experimental class PartialResult[R](initialVal: R, isFinal: Boolean) { private var finalValue: Option[R] = if (isFinal) Some(initialVal) else None private var failure: Option[Exception] = None diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala index 926bce6f15..7fbaadcea3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala @@ -74,10 +74,8 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { } /** - * :: Experimental :: * Approximate operation to return the mean within a timeout. */ - @Experimental def meanApprox( timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = self.withScope { @@ -87,10 +85,8 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { } /** - * :: Experimental :: * Approximate operation to return the sum within a timeout. */ - @Experimental def sumApprox( timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = self.withScope { diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index a981b63942..c6181902ac 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -274,7 +274,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** - * ::Experimental:: * Return a subset of this RDD sampled by key (via stratified sampling) containing exactly * math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key). * @@ -289,7 +288,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * @param seed seed for the random number generator * @return RDD containing the sampled subset */ - @Experimental def sampleByKeyExact( withReplacement: Boolean, fractions: Map[K, Double], @@ -384,19 +382,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** - * :: Experimental :: * Approximate version of countByKey that can return a partial result if it does * not finish within a timeout. */ - @Experimental def countByKeyApprox(timeout: Long, confidence: Double = 0.95) : PartialResult[Map[K, BoundedDouble]] = self.withScope { self.map(_._1).countByValueApprox(timeout, confidence) } /** - * :: Experimental :: - * * Return approximate number of distinct values for each key in this RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: @@ -413,7 +407,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * If `sp` equals 0, the sparse representation is skipped. * @param partitioner Partitioner to use for the resulting RDD. */ - @Experimental def countApproxDistinctByKey( p: Int, sp: Int, diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a97bb17443..800ef53cbe 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.TextOutputFormat import org.apache.spark._ import org.apache.spark.Partitioner._ -import org.apache.spark.annotation.{DeveloperApi, Experimental} +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.java.JavaRDD import org.apache.spark.partial.BoundedDouble import org.apache.spark.partial.CountEvaluator @@ -1119,11 +1119,9 @@ abstract class RDD[T: ClassTag]( def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum /** - * :: Experimental :: * Approximate version of count() that returns a potentially incomplete result * within a timeout, even if not all tasks have finished. */ - @Experimental def countApprox( timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope { @@ -1152,10 +1150,8 @@ abstract class RDD[T: ClassTag]( } /** - * :: Experimental :: * Approximate version of countByValue(). */ - @Experimental def countByValueApprox(timeout: Long, confidence: Double = 0.95) (implicit ord: Ordering[T] = null) : PartialResult[Map[T, BoundedDouble]] = withScope { @@ -1174,7 +1170,6 @@ abstract class RDD[T: ClassTag]( } /** - * :: Experimental :: * Return approximate number of distinct elements in the RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: @@ -1190,7 +1185,6 @@ abstract class RDD[T: ClassTag]( * @param sp The precision value for the sparse set, between 0 and 32. * If `sp` equals 0, the sparse representation is skipped. */ - @Experimental def countApproxDistinct(p: Int, sp: Int): Long = withScope { require(p >= 4, s"p ($p) must be >= 4") require(sp <= 32, s"sp ($sp) must be <= 32") |