aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorvinodkc <vinod.kc.in@gmail.com>2015-09-15 17:01:10 -0700
committerAndrew Or <andrew@databricks.com>2015-09-15 17:01:39 -0700
commit99ecfa5945aedaa71765ecf5cce59964ae52eebe (patch)
tree73a57f7286658cd14b8e8634c551919055235f91
parenta63cdc769f511e98b38c3318bcc732c9a6c76c22 (diff)
downloadspark-99ecfa5945aedaa71765ecf5cce59964ae52eebe.tar.gz
spark-99ecfa5945aedaa71765ecf5cce59964ae52eebe.tar.bz2
spark-99ecfa5945aedaa71765ecf5cce59964ae52eebe.zip
[SPARK-10575] [SPARK CORE] Wrapped RDD.takeSample with Scope
Remove return statements in RDD.takeSample and wrap it withScope Author: vinodkc <vinod.kc.in@gmail.com> Author: vinodkc <vinodkc@users.noreply.github.com> Author: Vinod K C <vinod.kc@huawei.com> Closes #8730 from vinodkc/fix_takesample_return.
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala68
1 files changed, 31 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 7dd2bc5d7c..a56e542242 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -469,50 +469,44 @@ abstract class RDD[T: ClassTag](
* @param seed seed for the random number generator
* @return sample of specified size in an array
*/
- // TODO: rewrite this without return statements so we can wrap it in a scope
def takeSample(
withReplacement: Boolean,
num: Int,
- seed: Long = Utils.random.nextLong): Array[T] = {
+ seed: Long = Utils.random.nextLong): Array[T] = withScope {
val numStDev = 10.0
- if (num < 0) {
- throw new IllegalArgumentException("Negative number of elements requested")
- } else if (num == 0) {
- return new Array[T](0)
- }
-
- val initialCount = this.count()
- if (initialCount == 0) {
- return new Array[T](0)
- }
-
- val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt
- if (num > maxSampleSize) {
- throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " +
- s"$numStDev * math.sqrt(Int.MaxValue)")
- }
-
- val rand = new Random(seed)
- if (!withReplacement && num >= initialCount) {
- return Utils.randomizeInPlace(this.collect(), rand)
- }
-
- val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
- withReplacement)
-
- var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
+ require(num >= 0, "Negative number of elements requested")
+ require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt),
+ "Cannot support a sample size > Int.MaxValue - " +
+ s"$numStDev * math.sqrt(Int.MaxValue)")
- // If the first sample didn't turn out large enough, keep trying to take samples;
- // this shouldn't happen often because we use a big multiplier for the initial size
- var numIters = 0
- while (samples.length < num) {
- logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
- samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
- numIters += 1
+ if (num == 0) {
+ new Array[T](0)
+ } else {
+ val initialCount = this.count()
+ if (initialCount == 0) {
+ new Array[T](0)
+ } else {
+ val rand = new Random(seed)
+ if (!withReplacement && num >= initialCount) {
+ Utils.randomizeInPlace(this.collect(), rand)
+ } else {
+ val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
+ withReplacement)
+ var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
+
+ // If the first sample didn't turn out large enough, keep trying to take samples;
+ // this shouldn't happen often because we use a big multiplier for the initial size
+ var numIters = 0
+ while (samples.length < num) {
+ logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
+ samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
+ numIters += 1
+ }
+ Utils.randomizeInPlace(samples, rand).take(num)
+ }
+ }
}
-
- Utils.randomizeInPlace(samples, rand).take(num)
}
/**