aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/RDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/RDD.scala')
-rw-r--r--core/src/main/scala/spark/RDD.scala84
1 files changed, 63 insertions, 21 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 709271d4eb..ca7cdd622a 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark
import java.util.Random
@@ -105,6 +122,9 @@ abstract class RDD[T: ClassManifest](
// Methods and fields available on all RDDs
// =======================================================================
+ /** The SparkContext that created this RDD. */
+ def sparkContext: SparkContext = sc
+
/** A unique ID for this RDD (within its SparkContext). */
val id: Int = sc.newRddId()
@@ -117,6 +137,14 @@ abstract class RDD[T: ClassManifest](
this
}
+ /** User-defined generator of this RDD*/
+ var generator = Utils.getCallSiteInfo.firstUserClass
+
+ /** Reset generator*/
+ def setGenerator(_generator: String) = {
+ generator = _generator
+ }
+
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
@@ -273,31 +301,35 @@ abstract class RDD[T: ClassManifest](
def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
var fraction = 0.0
var total = 0
- val multiplier = 3.0
- val initialCount = count()
+ var multiplier = 3.0
+ var initialCount = this.count()
var maxSelected = 0
+ if (num < 0) {
+ throw new IllegalArgumentException("Negative number of elements requested")
+ }
+
if (initialCount > Integer.MAX_VALUE - 1) {
maxSelected = Integer.MAX_VALUE - 1
} else {
maxSelected = initialCount.toInt
}
- if (num > initialCount) {
+ if (num > initialCount && !withReplacement) {
total = maxSelected
- fraction = math.min(multiplier * (maxSelected + 1) / initialCount, 1.0)
- } else if (num < 0) {
- throw(new IllegalArgumentException("Negative number of elements requested"))
+ fraction = multiplier * (maxSelected + 1) / initialCount
} else {
- fraction = math.min(multiplier * (num + 1) / initialCount, 1.0)
+ fraction = multiplier * (num + 1) / initialCount
total = num
}
val rand = new Random(seed)
- var samples = this.sample(withReplacement, fraction, rand.nextInt).collect()
+ var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
+ // If the first sample didn't turn out large enough, keep trying to take samples;
+ // this shouldn't happen often because we use a big multiplier for thei initial size
while (samples.length < total) {
- samples = this.sample(withReplacement, fraction, rand.nextInt).collect()
+ samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
}
Utils.randomizeInPlace(samples, rand).take(total)
@@ -355,7 +387,7 @@ abstract class RDD[T: ClassManifest](
/**
* Return an RDD created by piping elements to a forked external process.
*/
- def pipe(command: String, env: Map[String, String]): RDD[String] =
+ def pipe(command: String, env: Map[String, String]): RDD[String] =
new PipedRDD(this, command, env)
@@ -366,24 +398,24 @@ abstract class RDD[T: ClassManifest](
* @param command command to run in forked process.
* @param env environment variables to set.
* @param printPipeContext Before piping elements, this function is called as an oppotunity
- * to pipe context data. Print line function (like out.println) will be
+ * to pipe context data. Print line function (like out.println) will be
* passed as printPipeContext's parameter.
- * @param printRDDElement Use this function to customize how to pipe elements. This function
- * will be called with each RDD element as the 1st parameter, and the
+ * @param printRDDElement Use this function to customize how to pipe elements. This function
+ * will be called with each RDD element as the 1st parameter, and the
* print line function (like out.println()) as the 2nd parameter.
* An example of pipe the RDD data of groupBy() in a streaming way,
* instead of constructing a huge String to concat all the elements:
- * def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
+ * def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
* for (e <- record._2){f(e)}
* @return the result RDD
*/
def pipe(
- command: Seq[String],
- env: Map[String, String] = Map(),
+ command: Seq[String],
+ env: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
- printRDDElement: (T, String => Unit) => Unit = null): RDD[String] =
- new PipedRDD(this, command, env,
- if (printPipeContext ne null) sc.clean(printPipeContext) else null,
+ printRDDElement: (T, String => Unit) => Unit = null): RDD[String] =
+ new PipedRDD(this, command, env,
+ if (printPipeContext ne null) sc.clean(printPipeContext) else null,
if (printRDDElement ne null) sc.clean(printRDDElement) else null)
/**
@@ -763,10 +795,20 @@ abstract class RDD[T: ClassManifest](
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
- }.toArray
+ }.toArray.sorted(ord.reverse)
}
/**
+ * Returns the first K elements from this RDD as defined by
+ * the specified implicit Ordering[T] and maintains the
+ * ordering.
+ * @param num the number of top elements to return
+ * @param ord the implicit ordering for T
+ * @return an array of top elements
+ */
+ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
+
+ /**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) {
@@ -840,7 +882,7 @@ abstract class RDD[T: ClassManifest](
private var storageLevel: StorageLevel = StorageLevel.NONE
/** Record user function generating this RDD. */
- private[spark] val origin = Utils.getSparkCallSite
+ private[spark] val origin = Utils.formatSparkCallSite
private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]