aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala16
3 files changed, 53 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index c17ca12379..e4ccd9f11b 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -445,6 +445,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Approximate version of count() that returns a potentially incomplete result
* within a timeout, even if not all tasks have finished.
+ *
+ * The confidence is the probability that the error bounds of the result will
+ * contain the true value. That is, if countApprox were called repeatedly
+ * with confidence 0.9, we would expect 90% of the results to contain the
+ * true count. The confidence must be in the range [0,1] or an exception will
+ * be thrown.
+ *
+ * @param timeout maximum time to wait for the job, in milliseconds
+ * @param confidence the desired statistical confidence in the result
+ * @return a potentially incomplete result, with error bounds
*/
def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
rdd.countApprox(timeout, confidence)
@@ -452,6 +462,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Approximate version of count() that returns a potentially incomplete result
* within a timeout, even if not all tasks have finished.
+ *
+ * @param timeout maximum time to wait for the job, in milliseconds
*/
def countApprox(timeout: Long): PartialResult[BoundedDouble] =
rdd.countApprox(timeout)
@@ -464,7 +476,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[JMap[T, jl.Long]]
/**
- * (Experimental) Approximate version of countByValue().
+ * Approximate version of countByValue().
+ *
+ * The confidence is the probability that the error bounds of the result will
+ * contain the true value. That is, if countApprox were called repeatedly
+ * with confidence 0.9, we would expect 90% of the results to contain the
+ * true count. The confidence must be in the range [0,1] or an exception will
+ * be thrown.
+ *
+ * @param timeout maximum time to wait for the job, in milliseconds
+ * @param confidence the desired statistical confidence in the result
+ * @return a potentially incomplete result, with error bounds
*/
def countByValueApprox(
timeout: Long,
@@ -473,7 +495,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
rdd.countByValueApprox(timeout, confidence).map(mapAsSerializableJavaMap)
/**
- * (Experimental) Approximate version of countByValue().
+ * Approximate version of countByValue().
+ *
+ * @param timeout maximum time to wait for the job, in milliseconds
+ * @return a potentially incomplete result, with error bounds
*/
def countByValueApprox(timeout: Long): PartialResult[JMap[T, BoundedDouble]] =
rdd.countByValueApprox(timeout).map(mapAsSerializableJavaMap)
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 7936d8e1d4..3b12448d63 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -375,6 +375,16 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
/**
* Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
+ *
+ * The confidence is the probability that the error bounds of the result will
+ * contain the true value. That is, if countApprox were called repeatedly
+ * with confidence 0.9, we would expect 90% of the results to contain the
+ * true count. The confidence must be in the range [0,1] or an exception will
+ * be thrown.
+ *
+ * @param timeout maximum time to wait for the job, in milliseconds
+ * @param confidence the desired statistical confidence in the result
+ * @return a potentially incomplete result, with error bounds
*/
def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
: PartialResult[Map[K, BoundedDouble]] = self.withScope {
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index d85d0fff46..e6db9b3eec 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1107,10 +1107,21 @@ abstract class RDD[T: ClassTag](
/**
* Approximate version of count() that returns a potentially incomplete result
* within a timeout, even if not all tasks have finished.
+ *
+ * The confidence is the probability that the error bounds of the result will
+ * contain the true value. That is, if countApprox were called repeatedly
+ * with confidence 0.9, we would expect 90% of the results to contain the
+ * true count. The confidence must be in the range [0,1] or an exception will
+ * be thrown.
+ *
+ * @param timeout maximum time to wait for the job, in milliseconds
+ * @param confidence the desired statistical confidence in the result
+ * @return a potentially incomplete result, with error bounds
*/
def countApprox(
timeout: Long,
confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {
+ require(0.0 <= confidence && confidence <= 1.0, s"confidence ($confidence) must be in [0,1]")
val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) =>
var result = 0L
while (iter.hasNext) {
@@ -1137,10 +1148,15 @@ abstract class RDD[T: ClassTag](
/**
* Approximate version of countByValue().
+ *
+ * @param timeout maximum time to wait for the job, in milliseconds
+ * @param confidence the desired statistical confidence in the result
+ * @return a potentially incomplete result, with error bounds
*/
def countByValueApprox(timeout: Long, confidence: Double = 0.95)
(implicit ord: Ordering[T] = null)
: PartialResult[Map[T, BoundedDouble]] = withScope {
+ require(0.0 <= confidence && confidence <= 1.0, s"confidence ($confidence) must be in [0,1]")
if (elementClassTag.runtimeClass.isArray) {
throw new SparkException("countByValueApprox() does not support arrays")
}