aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorErik Erlandson <eerlands@redhat.com>2014-09-26 23:15:10 -0700
committerReynold Xin <rxin@apache.org>2014-09-26 23:15:10 -0700
commit2d972fd84ac54a89e416442508a6d4eaeff452c1 (patch)
tree8f9203bfc245f62a983943d05f5dd190558e95cd /core
parent9e8ced7847d84d63f0da08b15623d558a2407583 (diff)
downloadspark-2d972fd84ac54a89e416442508a6d4eaeff452c1.tar.gz
spark-2d972fd84ac54a89e416442508a6d4eaeff452c1.tar.bz2
spark-2d972fd84ac54a89e416442508a6d4eaeff452c1.zip
[SPARK-1021] Defer the data-driven computation of partition bounds in so...
...rtByKey() until evaluation. Author: Erik Erlandson <eerlands@redhat.com> Closes #1689 from erikerlandson/spark-1021-pr and squashes the following commits: 50b6da6 [Erik Erlandson] use standard getIteratorSize in countAsync 4e334a9 [Erik Erlandson] exception mystery fixed by fixing bug in ComplexFutureAction b88b5d4 [Erik Erlandson] tweak async actions to use ComplexFutureAction[T] so they handle RangePartitioner sampling job properly b2b20e8 [Erik Erlandson] Fix bug in exception passing with ComplexFutureAction[T] ca8913e [Erik Erlandson] RangePartition sampling job -> FutureAction 7143f97 [Erik Erlandson] [SPARK-1021] modify range bounds variable to be thread safe ac67195 [Erik Erlandson] [SPARK-1021] Defer the data-driven computation of partition bounds in sortByKey() until evaluation.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/FutureAction.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala64
3 files changed, 66 insertions, 34 deletions
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 75ea535f2f..c277c3a47d 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -208,7 +208,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
processPartition: Iterator[T] => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit,
- resultFunc: => R) {
+ resultFunc: => R): R = {
// If the action hasn't been cancelled yet, submit the job. The check and the submitJob
// command need to be in an atomic block.
val job = this.synchronized {
@@ -223,7 +223,10 @@ class ComplexFutureAction[T] extends FutureAction[T] {
// cancel the job and stop the execution. This is not in a synchronized block because
// Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
try {
- Await.ready(job, Duration.Inf)
+ Await.ready(job, Duration.Inf).value.get match {
+ case scala.util.Failure(e) => throw e
+ case scala.util.Success(v) => v
+ }
} catch {
case e: InterruptedException =>
job.cancel()
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 37053bb6f3..d40b152d22 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -29,6 +29,10 @@ import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.{CollectionsUtils, Utils}
import org.apache.spark.util.random.{XORShiftRandom, SamplingUtils}
+import org.apache.spark.SparkContext.rddToAsyncRDDActions
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
@@ -113,8 +117,12 @@ class RangePartitioner[K : Ordering : ClassTag, V](
private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions
- private var rangeBounds: Array[K] = {
- if (partitions <= 1) {
+ @volatile private var valRB: Array[K] = null
+
+ private def rangeBounds: Array[K] = this.synchronized {
+ if (valRB != null) return valRB
+
+ valRB = if (partitions <= 1) {
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
@@ -152,6 +160,8 @@ class RangePartitioner[K : Ordering : ClassTag, V](
RangePartitioner.determineBounds(candidates, partitions)
}
}
+
+ valRB
}
def numPartitions = rangeBounds.length + 1
@@ -222,7 +232,8 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}
@throws(classOf[IOException])
- private def readObject(in: ObjectInputStream) {
+ private def readObject(in: ObjectInputStream): Unit = this.synchronized {
+ if (valRB != null) return
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => in.defaultReadObject()
@@ -234,7 +245,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
val ser = sfactory.newInstance()
Utils.deserializeViaNestedStream(in, ser) { ds =>
implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
- rangeBounds = ds.readObject[Array[K]]()
+ valRB = ds.readObject[Array[K]]()
}
}
}
@@ -254,12 +265,18 @@ private[spark] object RangePartitioner {
sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
- val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
+ // use collectAsync here to run this job as a future, which is cancellable
+ val sketchFuture = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
- }.collect()
+ }.collectAsync()
+ // We do need the future's value to continue any further
+ val sketched = Await.ready(sketchFuture, Duration.Inf).value.get match {
+ case scala.util.Success(v) => v.toArray
+ case scala.util.Failure(e) => throw e
+ }
val numItems = sketched.map(_._2.toLong).sum
(numItems, sketched)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index b62f3fbdc4..7a68b3afa8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.reflect.ClassTag
+import org.apache.spark.util.Utils
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
import org.apache.spark.annotation.Experimental
@@ -38,29 +39,30 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
* Returns a future for counting the number of elements in the RDD.
*/
def countAsync(): FutureAction[Long] = {
- val totalCount = new AtomicLong
- self.context.submitJob(
- self,
- (iter: Iterator[T]) => {
- var result = 0L
- while (iter.hasNext) {
- result += 1L
- iter.next()
- }
- result
- },
- Range(0, self.partitions.size),
- (index: Int, data: Long) => totalCount.addAndGet(data),
- totalCount.get())
+ val f = new ComplexFutureAction[Long]
+ f.run {
+ val totalCount = new AtomicLong
+ f.runJob(self,
+ (iter: Iterator[T]) => Utils.getIteratorSize(iter),
+ Range(0, self.partitions.size),
+ (index: Int, data: Long) => totalCount.addAndGet(data),
+ totalCount.get())
+ }
}
/**
* Returns a future for retrieving all elements of this RDD.
*/
def collectAsync(): FutureAction[Seq[T]] = {
- val results = new Array[Array[T]](self.partitions.size)
- self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.size),
- (index, data) => results(index) = data, results.flatten.toSeq)
+ val f = new ComplexFutureAction[Seq[T]]
+ f.run {
+ val results = new Array[Array[T]](self.partitions.size)
+ f.runJob(self,
+ (iter: Iterator[T]) => iter.toArray,
+ Range(0, self.partitions.size),
+ (index: Int, data: Array[T]) => results(index) = data,
+ results.flatten.toSeq)
+ }
}
/**
@@ -104,24 +106,34 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
}
results.toSeq
}
-
- f
}
/**
* Applies a function f to all elements of this RDD.
*/
- def foreachAsync(f: T => Unit): FutureAction[Unit] = {
- val cleanF = self.context.clean(f)
- self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size),
- (index, data) => Unit, Unit)
+ def foreachAsync(expr: T => Unit): FutureAction[Unit] = {
+ val f = new ComplexFutureAction[Unit]
+ val exprClean = self.context.clean(expr)
+ f.run {
+ f.runJob(self,
+ (iter: Iterator[T]) => iter.foreach(exprClean),
+ Range(0, self.partitions.size),
+ (index: Int, data: Unit) => Unit,
+ Unit)
+ }
}
/**
* Applies a function f to each partition of this RDD.
*/
- def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
- self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.size),
- (index, data) => Unit, Unit)
+ def foreachPartitionAsync(expr: Iterator[T] => Unit): FutureAction[Unit] = {
+ val f = new ComplexFutureAction[Unit]
+ f.run {
+ f.runJob(self,
+ expr,
+ Range(0, self.partitions.size),
+ (index: Int, data: Unit) => Unit,
+ Unit)
+ }
}
}