From c9c4954d994c5ba824e71c1c5cd8d5de531caf78 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Fri, 26 Apr 2013 16:57:46 -0700 Subject: Add an interface to zip iterators of multiple RDDs The current code supports 2, 3 or 4 arguments but can be extended to more arguments if required. --- core/src/main/scala/spark/RDD.scala | 22 ++++ .../scala/spark/rdd/MapZippedPartitionsRDD.scala | 118 +++++++++++++++++++++ .../scala/spark/MapZippedPartitionsSuite.scala | 34 ++++++ 3 files changed, 174 insertions(+) create mode 100644 core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala create mode 100644 core/src/test/scala/spark/MapZippedPartitionsSuite.scala diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index ccd9d0364a..8e7e1457c1 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -35,6 +35,9 @@ import spark.rdd.ShuffledRDD import spark.rdd.SubtractedRDD import spark.rdd.UnionRDD import spark.rdd.ZippedRDD +import spark.rdd.MapZippedPartitionsRDD2 +import spark.rdd.MapZippedPartitionsRDD3 +import spark.rdd.MapZippedPartitionsRDD4 import spark.storage.StorageLevel import SparkContext._ @@ -436,6 +439,25 @@ abstract class RDD[T: ClassManifest]( */ def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other) + def zipAndMapPartitions[B: ClassManifest, V: ClassManifest]( + f: (Iterator[T], Iterator[B]) => Iterator[V], + rdd2: RDD[B]) = + new MapZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2) + + def zipAndMapPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]( + f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V], + rdd2: RDD[B], + rdd3: RDD[C]) = + new MapZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3) + + def zipAndMapPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]( + f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], + rdd2: RDD[B], + rdd3: RDD[C], + rdd4: RDD[D]) = + new MapZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4) + + // Actions (launch a job to return a value to the user program) /** diff --git a/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala new file mode 100644 index 0000000000..6653b3b444 --- /dev/null +++ b/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala @@ -0,0 +1,118 @@ +package spark.rdd + +import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext} +import java.io.{ObjectOutputStream, IOException} + +private[spark] class MapZippedPartition( + idx: Int, + @transient rdds: Seq[RDD[_]] + ) extends Partition { + + override val index: Int = idx + var partitionValues = rdds.map(rdd => rdd.partitions(idx)) + def partitions = partitionValues + + @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream) { + // Update the reference to parent split at the time of task serialization + partitionValues = rdds.map(rdd => rdd.partitions(idx)) + oos.defaultWriteObject() + } +} + +abstract class MapZippedPartitionsBaseRDD[V: ClassManifest]( + sc: SparkContext, + var rdds: Seq[RDD[_]]) + extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) { + + override def getPartitions: Array[Partition] = { + val sizes = rdds.map(x => x.partitions.size) + if (!sizes.forall(x => x == sizes(0))) { + throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions") + } + val array = new Array[Partition](sizes(0)) + for (i <- 0 until sizes(0)) { + array(i) = new MapZippedPartition(i, rdds) + } + array + } + + override def getPreferredLocations(s: Partition): Seq[String] = { + val splits = s.asInstanceOf[MapZippedPartition].partitions + val preferredLocations = rdds.zip(splits).map(x => x._1.preferredLocations(x._2)) + preferredLocations.reduce((x, y) => x.intersect(y)) + } + + override def clearDependencies() { + super.clearDependencies() + rdds = null + } +} + +class MapZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest]( + sc: SparkContext, + f: (Iterator[A], Iterator[B]) => Iterator[V], + var rdd1: RDD[A], + var rdd2: RDD[B]) + extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) { + + override def compute(s: Partition, context: TaskContext): Iterator[V] = { + val partitions = s.asInstanceOf[MapZippedPartition].partitions + f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context)) + } + + override def clearDependencies() { + super.clearDependencies() + rdd1 = null + rdd2 = null + } +} + +class MapZippedPartitionsRDD3[A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest]( + sc: SparkContext, + f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V], + var rdd1: RDD[A], + var rdd2: RDD[B], + var rdd3: RDD[C]) + extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) { + + override def compute(s: Partition, context: TaskContext): Iterator[V] = { + val partitions = s.asInstanceOf[MapZippedPartition].partitions + f(rdd1.iterator(partitions(0), context), + rdd2.iterator(partitions(1), context), + rdd3.iterator(partitions(2), context)) + } + + override def clearDependencies() { + super.clearDependencies() + rdd1 = null + rdd2 = null + rdd3 = null + } +} + +class MapZippedPartitionsRDD4[A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest]( + sc: SparkContext, + f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], + var rdd1: RDD[A], + var rdd2: RDD[B], + var rdd3: RDD[C], + var rdd4: RDD[D]) + extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) { + + override def compute(s: Partition, context: TaskContext): Iterator[V] = { + val partitions = s.asInstanceOf[MapZippedPartition].partitions + f(rdd1.iterator(partitions(0), context), + rdd2.iterator(partitions(1), context), + rdd3.iterator(partitions(2), context), + rdd4.iterator(partitions(3), context)) + } + + override def clearDependencies() { + super.clearDependencies() + rdd1 = null + rdd2 = null + rdd3 = null + rdd4 = null + } +} diff --git a/core/src/test/scala/spark/MapZippedPartitionsSuite.scala b/core/src/test/scala/spark/MapZippedPartitionsSuite.scala new file mode 100644 index 0000000000..f65a646416 --- /dev/null +++ b/core/src/test/scala/spark/MapZippedPartitionsSuite.scala @@ -0,0 +1,34 @@ +package spark + +import scala.collection.immutable.NumericRange + +import org.scalatest.FunSuite +import org.scalatest.prop.Checkers +import org.scalacheck.Arbitrary._ +import org.scalacheck.Gen +import org.scalacheck.Prop._ + +import SparkContext._ + + +object MapZippedPartitionsSuite { + def procZippedData(i: Iterator[Int], s: Iterator[String], d: Iterator[Double]) : Iterator[Int] = { + Iterator(i.toArray.size, s.toArray.size, d.toArray.size) + } +} + +class MapZippedPartitionsSuite extends FunSuite with LocalSparkContext { + test("print sizes") { + sc = new SparkContext("local", "test") + val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2) + val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2) + val data3 = sc.makeRDD(Array(1.0, 2.0), 2) + + val zippedRDD = data1.zipAndMapPartitions(MapZippedPartitionsSuite.procZippedData, data2, data3) + + val obtainedSizes = zippedRDD.collect() + val expectedSizes = Array(2, 3, 1, 2, 3, 1) + assert(obtainedSizes.size == 6) + assert(obtainedSizes.zip(expectedSizes).forall(x => x._1 == x._2)) + } +} -- cgit v1.2.3 From 0cc6642b7c6fbb4167956b668603f2ea6fb5ac8e Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Sun, 28 Apr 2013 05:11:03 -0700 Subject: Rename to zipPartitions and style changes --- core/src/main/scala/spark/RDD.scala | 24 +++++++++++----------- .../scala/spark/rdd/MapZippedPartitionsRDD.scala | 22 +++++++++++--------- .../scala/spark/MapZippedPartitionsSuite.scala | 2 +- 3 files changed, 25 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 8e7e1457c1..bded55238f 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -439,22 +439,22 @@ abstract class RDD[T: ClassManifest]( */ def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other) - def zipAndMapPartitions[B: ClassManifest, V: ClassManifest]( - f: (Iterator[T], Iterator[B]) => Iterator[V], - rdd2: RDD[B]) = + def zipPartitions[B: ClassManifest, V: ClassManifest]( + f: (Iterator[T], Iterator[B]) => Iterator[V], + rdd2: RDD[B]) = new MapZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2) - def zipAndMapPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]( - f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V], - rdd2: RDD[B], - rdd3: RDD[C]) = + def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]( + f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V], + rdd2: RDD[B], + rdd3: RDD[C]) = new MapZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3) - def zipAndMapPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]( - f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], - rdd2: RDD[B], - rdd3: RDD[C], - rdd4: RDD[D]) = + def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]( + f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], + rdd2: RDD[B], + rdd3: RDD[C], + rdd4: RDD[D]) = new MapZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4) diff --git a/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala index 6653b3b444..3520fd24b0 100644 --- a/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala @@ -4,13 +4,13 @@ import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext} import java.io.{ObjectOutputStream, IOException} private[spark] class MapZippedPartition( - idx: Int, - @transient rdds: Seq[RDD[_]] - ) extends Partition { + idx: Int, + @transient rdds: Seq[RDD[_]]) + extends Partition { override val index: Int = idx var partitionValues = rdds.map(rdd => rdd.partitions(idx)) - def partitions = partitionValues + def partitions = partitionValues @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream) { @@ -68,7 +68,8 @@ class MapZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManife } } -class MapZippedPartitionsRDD3[A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest]( +class MapZippedPartitionsRDD3 + [A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest]( sc: SparkContext, f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V], var rdd1: RDD[A], @@ -78,8 +79,8 @@ class MapZippedPartitionsRDD3[A: ClassManifest, B: ClassManifest, C: ClassManife override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[MapZippedPartition].partitions - f(rdd1.iterator(partitions(0), context), - rdd2.iterator(partitions(1), context), + f(rdd1.iterator(partitions(0), context), + rdd2.iterator(partitions(1), context), rdd3.iterator(partitions(2), context)) } @@ -91,7 +92,8 @@ class MapZippedPartitionsRDD3[A: ClassManifest, B: ClassManifest, C: ClassManife } } -class MapZippedPartitionsRDD4[A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest]( +class MapZippedPartitionsRDD4 + [A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest]( sc: SparkContext, f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], var rdd1: RDD[A], @@ -102,8 +104,8 @@ class MapZippedPartitionsRDD4[A: ClassManifest, B: ClassManifest, C: ClassManife override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[MapZippedPartition].partitions - f(rdd1.iterator(partitions(0), context), - rdd2.iterator(partitions(1), context), + f(rdd1.iterator(partitions(0), context), + rdd2.iterator(partitions(1), context), rdd3.iterator(partitions(2), context), rdd4.iterator(partitions(3), context)) } diff --git a/core/src/test/scala/spark/MapZippedPartitionsSuite.scala b/core/src/test/scala/spark/MapZippedPartitionsSuite.scala index f65a646416..834b517cbc 100644 --- a/core/src/test/scala/spark/MapZippedPartitionsSuite.scala +++ b/core/src/test/scala/spark/MapZippedPartitionsSuite.scala @@ -24,7 +24,7 @@ class MapZippedPartitionsSuite extends FunSuite with LocalSparkContext { val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2) val data3 = sc.makeRDD(Array(1.0, 2.0), 2) - val zippedRDD = data1.zipAndMapPartitions(MapZippedPartitionsSuite.procZippedData, data2, data3) + val zippedRDD = data1.zipPartitions(MapZippedPartitionsSuite.procZippedData, data2, data3) val obtainedSizes = zippedRDD.collect() val expectedSizes = Array(2, 3, 1, 2, 3, 1) -- cgit v1.2.3 From 6e84635ab904ee2798f1d6acd3a8ed5e01563e54 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Sun, 28 Apr 2013 15:58:40 -0700 Subject: Rename classes from MapZipped* to Zipped* --- .../scala/spark/rdd/MapZippedPartitionsRDD.scala | 120 --------------------- .../main/scala/spark/rdd/ZippedPartitionsRDD.scala | 120 +++++++++++++++++++++ .../scala/spark/MapZippedPartitionsSuite.scala | 34 ------ .../test/scala/spark/ZippedPartitionsSuite.scala | 34 ++++++ 4 files changed, 154 insertions(+), 154 deletions(-) delete mode 100644 core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala create mode 100644 core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala delete mode 100644 core/src/test/scala/spark/MapZippedPartitionsSuite.scala create mode 100644 core/src/test/scala/spark/ZippedPartitionsSuite.scala diff --git a/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala deleted file mode 100644 index 3520fd24b0..0000000000 --- a/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala +++ /dev/null @@ -1,120 +0,0 @@ -package spark.rdd - -import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext} -import java.io.{ObjectOutputStream, IOException} - -private[spark] class MapZippedPartition( - idx: Int, - @transient rdds: Seq[RDD[_]]) - extends Partition { - - override val index: Int = idx - var partitionValues = rdds.map(rdd => rdd.partitions(idx)) - def partitions = partitionValues - - @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { - // Update the reference to parent split at the time of task serialization - partitionValues = rdds.map(rdd => rdd.partitions(idx)) - oos.defaultWriteObject() - } -} - -abstract class MapZippedPartitionsBaseRDD[V: ClassManifest]( - sc: SparkContext, - var rdds: Seq[RDD[_]]) - extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) { - - override def getPartitions: Array[Partition] = { - val sizes = rdds.map(x => x.partitions.size) - if (!sizes.forall(x => x == sizes(0))) { - throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions") - } - val array = new Array[Partition](sizes(0)) - for (i <- 0 until sizes(0)) { - array(i) = new MapZippedPartition(i, rdds) - } - array - } - - override def getPreferredLocations(s: Partition): Seq[String] = { - val splits = s.asInstanceOf[MapZippedPartition].partitions - val preferredLocations = rdds.zip(splits).map(x => x._1.preferredLocations(x._2)) - preferredLocations.reduce((x, y) => x.intersect(y)) - } - - override def clearDependencies() { - super.clearDependencies() - rdds = null - } -} - -class MapZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest]( - sc: SparkContext, - f: (Iterator[A], Iterator[B]) => Iterator[V], - var rdd1: RDD[A], - var rdd2: RDD[B]) - extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) { - - override def compute(s: Partition, context: TaskContext): Iterator[V] = { - val partitions = s.asInstanceOf[MapZippedPartition].partitions - f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context)) - } - - override def clearDependencies() { - super.clearDependencies() - rdd1 = null - rdd2 = null - } -} - -class MapZippedPartitionsRDD3 - [A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest]( - sc: SparkContext, - f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V], - var rdd1: RDD[A], - var rdd2: RDD[B], - var rdd3: RDD[C]) - extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) { - - override def compute(s: Partition, context: TaskContext): Iterator[V] = { - val partitions = s.asInstanceOf[MapZippedPartition].partitions - f(rdd1.iterator(partitions(0), context), - rdd2.iterator(partitions(1), context), - rdd3.iterator(partitions(2), context)) - } - - override def clearDependencies() { - super.clearDependencies() - rdd1 = null - rdd2 = null - rdd3 = null - } -} - -class MapZippedPartitionsRDD4 - [A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest]( - sc: SparkContext, - f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], - var rdd1: RDD[A], - var rdd2: RDD[B], - var rdd3: RDD[C], - var rdd4: RDD[D]) - extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) { - - override def compute(s: Partition, context: TaskContext): Iterator[V] = { - val partitions = s.asInstanceOf[MapZippedPartition].partitions - f(rdd1.iterator(partitions(0), context), - rdd2.iterator(partitions(1), context), - rdd3.iterator(partitions(2), context), - rdd4.iterator(partitions(3), context)) - } - - override def clearDependencies() { - super.clearDependencies() - rdd1 = null - rdd2 = null - rdd3 = null - rdd4 = null - } -} diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala new file mode 100644 index 0000000000..3520fd24b0 --- /dev/null +++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala @@ -0,0 +1,120 @@ +package spark.rdd + +import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext} +import java.io.{ObjectOutputStream, IOException} + +private[spark] class MapZippedPartition( + idx: Int, + @transient rdds: Seq[RDD[_]]) + extends Partition { + + override val index: Int = idx + var partitionValues = rdds.map(rdd => rdd.partitions(idx)) + def partitions = partitionValues + + @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream) { + // Update the reference to parent split at the time of task serialization + partitionValues = rdds.map(rdd => rdd.partitions(idx)) + oos.defaultWriteObject() + } +} + +abstract class MapZippedPartitionsBaseRDD[V: ClassManifest]( + sc: SparkContext, + var rdds: Seq[RDD[_]]) + extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) { + + override def getPartitions: Array[Partition] = { + val sizes = rdds.map(x => x.partitions.size) + if (!sizes.forall(x => x == sizes(0))) { + throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions") + } + val array = new Array[Partition](sizes(0)) + for (i <- 0 until sizes(0)) { + array(i) = new MapZippedPartition(i, rdds) + } + array + } + + override def getPreferredLocations(s: Partition): Seq[String] = { + val splits = s.asInstanceOf[MapZippedPartition].partitions + val preferredLocations = rdds.zip(splits).map(x => x._1.preferredLocations(x._2)) + preferredLocations.reduce((x, y) => x.intersect(y)) + } + + override def clearDependencies() { + super.clearDependencies() + rdds = null + } +} + +class MapZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest]( + sc: SparkContext, + f: (Iterator[A], Iterator[B]) => Iterator[V], + var rdd1: RDD[A], + var rdd2: RDD[B]) + extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) { + + override def compute(s: Partition, context: TaskContext): Iterator[V] = { + val partitions = s.asInstanceOf[MapZippedPartition].partitions + f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context)) + } + + override def clearDependencies() { + super.clearDependencies() + rdd1 = null + rdd2 = null + } +} + +class MapZippedPartitionsRDD3 + [A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest]( + sc: SparkContext, + f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V], + var rdd1: RDD[A], + var rdd2: RDD[B], + var rdd3: RDD[C]) + extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) { + + override def compute(s: Partition, context: TaskContext): Iterator[V] = { + val partitions = s.asInstanceOf[MapZippedPartition].partitions + f(rdd1.iterator(partitions(0), context), + rdd2.iterator(partitions(1), context), + rdd3.iterator(partitions(2), context)) + } + + override def clearDependencies() { + super.clearDependencies() + rdd1 = null + rdd2 = null + rdd3 = null + } +} + +class MapZippedPartitionsRDD4 + [A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest]( + sc: SparkContext, + f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], + var rdd1: RDD[A], + var rdd2: RDD[B], + var rdd3: RDD[C], + var rdd4: RDD[D]) + extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) { + + override def compute(s: Partition, context: TaskContext): Iterator[V] = { + val partitions = s.asInstanceOf[MapZippedPartition].partitions + f(rdd1.iterator(partitions(0), context), + rdd2.iterator(partitions(1), context), + rdd3.iterator(partitions(2), context), + rdd4.iterator(partitions(3), context)) + } + + override def clearDependencies() { + super.clearDependencies() + rdd1 = null + rdd2 = null + rdd3 = null + rdd4 = null + } +} diff --git a/core/src/test/scala/spark/MapZippedPartitionsSuite.scala b/core/src/test/scala/spark/MapZippedPartitionsSuite.scala deleted file mode 100644 index 834b517cbc..0000000000 --- a/core/src/test/scala/spark/MapZippedPartitionsSuite.scala +++ /dev/null @@ -1,34 +0,0 @@ -package spark - -import scala.collection.immutable.NumericRange - -import org.scalatest.FunSuite -import org.scalatest.prop.Checkers -import org.scalacheck.Arbitrary._ -import org.scalacheck.Gen -import org.scalacheck.Prop._ - -import SparkContext._ - - -object MapZippedPartitionsSuite { - def procZippedData(i: Iterator[Int], s: Iterator[String], d: Iterator[Double]) : Iterator[Int] = { - Iterator(i.toArray.size, s.toArray.size, d.toArray.size) - } -} - -class MapZippedPartitionsSuite extends FunSuite with LocalSparkContext { - test("print sizes") { - sc = new SparkContext("local", "test") - val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2) - val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2) - val data3 = sc.makeRDD(Array(1.0, 2.0), 2) - - val zippedRDD = data1.zipPartitions(MapZippedPartitionsSuite.procZippedData, data2, data3) - - val obtainedSizes = zippedRDD.collect() - val expectedSizes = Array(2, 3, 1, 2, 3, 1) - assert(obtainedSizes.size == 6) - assert(obtainedSizes.zip(expectedSizes).forall(x => x._1 == x._2)) - } -} diff --git a/core/src/test/scala/spark/ZippedPartitionsSuite.scala b/core/src/test/scala/spark/ZippedPartitionsSuite.scala new file mode 100644 index 0000000000..834b517cbc --- /dev/null +++ b/core/src/test/scala/spark/ZippedPartitionsSuite.scala @@ -0,0 +1,34 @@ +package spark + +import scala.collection.immutable.NumericRange + +import org.scalatest.FunSuite +import org.scalatest.prop.Checkers +import org.scalacheck.Arbitrary._ +import org.scalacheck.Gen +import org.scalacheck.Prop._ + +import SparkContext._ + + +object MapZippedPartitionsSuite { + def procZippedData(i: Iterator[Int], s: Iterator[String], d: Iterator[Double]) : Iterator[Int] = { + Iterator(i.toArray.size, s.toArray.size, d.toArray.size) + } +} + +class MapZippedPartitionsSuite extends FunSuite with LocalSparkContext { + test("print sizes") { + sc = new SparkContext("local", "test") + val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2) + val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2) + val data3 = sc.makeRDD(Array(1.0, 2.0), 2) + + val zippedRDD = data1.zipPartitions(MapZippedPartitionsSuite.procZippedData, data2, data3) + + val obtainedSizes = zippedRDD.collect() + val expectedSizes = Array(2, 3, 1, 2, 3, 1) + assert(obtainedSizes.size == 6) + assert(obtainedSizes.zip(expectedSizes).forall(x => x._1 == x._2)) + } +} -- cgit v1.2.3 From 15acd49f07c3cde0a381f4abe139b17791a910b4 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Sun, 28 Apr 2013 16:03:22 -0700 Subject: Actually rename classes to ZippedPartitions* (the previous commit only renamed the file) --- core/src/main/scala/spark/RDD.scala | 18 +++++++-------- .../main/scala/spark/rdd/ZippedPartitionsRDD.scala | 26 +++++++++++----------- .../test/scala/spark/ZippedPartitionsSuite.scala | 6 ++--- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index bded55238f..4310f745f3 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -35,9 +35,9 @@ import spark.rdd.ShuffledRDD import spark.rdd.SubtractedRDD import spark.rdd.UnionRDD import spark.rdd.ZippedRDD -import spark.rdd.MapZippedPartitionsRDD2 -import spark.rdd.MapZippedPartitionsRDD3 -import spark.rdd.MapZippedPartitionsRDD4 +import spark.rdd.ZippedPartitionsRDD2 +import spark.rdd.ZippedPartitionsRDD3 +import spark.rdd.ZippedPartitionsRDD4 import spark.storage.StorageLevel import SparkContext._ @@ -441,21 +441,21 @@ abstract class RDD[T: ClassManifest]( def zipPartitions[B: ClassManifest, V: ClassManifest]( f: (Iterator[T], Iterator[B]) => Iterator[V], - rdd2: RDD[B]) = - new MapZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2) + rdd2: RDD[B]): RDD[V] = + new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2) def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]( f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V], rdd2: RDD[B], - rdd3: RDD[C]) = - new MapZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3) + rdd3: RDD[C]): RDD[V] = + new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3) def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]( f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], rdd2: RDD[B], rdd3: RDD[C], - rdd4: RDD[D]) = - new MapZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4) + rdd4: RDD[D]): RDD[V] = + new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4) // Actions (launch a job to return a value to the user program) diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala index 3520fd24b0..b3113c1969 100644 --- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala @@ -3,7 +3,7 @@ package spark.rdd import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext} import java.io.{ObjectOutputStream, IOException} -private[spark] class MapZippedPartition( +private[spark] class ZippedPartitions( idx: Int, @transient rdds: Seq[RDD[_]]) extends Partition { @@ -20,7 +20,7 @@ private[spark] class MapZippedPartition( } } -abstract class MapZippedPartitionsBaseRDD[V: ClassManifest]( +abstract class ZippedPartitionsBaseRDD[V: ClassManifest]( sc: SparkContext, var rdds: Seq[RDD[_]]) extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) { @@ -32,13 +32,13 @@ abstract class MapZippedPartitionsBaseRDD[V: ClassManifest]( } val array = new Array[Partition](sizes(0)) for (i <- 0 until sizes(0)) { - array(i) = new MapZippedPartition(i, rdds) + array(i) = new ZippedPartitions(i, rdds) } array } override def getPreferredLocations(s: Partition): Seq[String] = { - val splits = s.asInstanceOf[MapZippedPartition].partitions + val splits = s.asInstanceOf[ZippedPartitions].partitions val preferredLocations = rdds.zip(splits).map(x => x._1.preferredLocations(x._2)) preferredLocations.reduce((x, y) => x.intersect(y)) } @@ -49,15 +49,15 @@ abstract class MapZippedPartitionsBaseRDD[V: ClassManifest]( } } -class MapZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest]( +class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest]( sc: SparkContext, f: (Iterator[A], Iterator[B]) => Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B]) - extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) { + extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { - val partitions = s.asInstanceOf[MapZippedPartition].partitions + val partitions = s.asInstanceOf[ZippedPartitions].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context)) } @@ -68,17 +68,17 @@ class MapZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManife } } -class MapZippedPartitionsRDD3 +class ZippedPartitionsRDD3 [A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest]( sc: SparkContext, f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B], var rdd3: RDD[C]) - extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) { + extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { - val partitions = s.asInstanceOf[MapZippedPartition].partitions + val partitions = s.asInstanceOf[ZippedPartitions].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context), rdd3.iterator(partitions(2), context)) @@ -92,7 +92,7 @@ class MapZippedPartitionsRDD3 } } -class MapZippedPartitionsRDD4 +class ZippedPartitionsRDD4 [A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest]( sc: SparkContext, f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], @@ -100,10 +100,10 @@ class MapZippedPartitionsRDD4 var rdd2: RDD[B], var rdd3: RDD[C], var rdd4: RDD[D]) - extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) { + extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { - val partitions = s.asInstanceOf[MapZippedPartition].partitions + val partitions = s.asInstanceOf[ZippedPartitions].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context), rdd3.iterator(partitions(2), context), diff --git a/core/src/test/scala/spark/ZippedPartitionsSuite.scala b/core/src/test/scala/spark/ZippedPartitionsSuite.scala index 834b517cbc..5f60aa75d7 100644 --- a/core/src/test/scala/spark/ZippedPartitionsSuite.scala +++ b/core/src/test/scala/spark/ZippedPartitionsSuite.scala @@ -11,20 +11,20 @@ import org.scalacheck.Prop._ import SparkContext._ -object MapZippedPartitionsSuite { +object ZippedPartitionsSuite { def procZippedData(i: Iterator[Int], s: Iterator[String], d: Iterator[Double]) : Iterator[Int] = { Iterator(i.toArray.size, s.toArray.size, d.toArray.size) } } -class MapZippedPartitionsSuite extends FunSuite with LocalSparkContext { +class ZippedPartitionsSuite extends FunSuite with LocalSparkContext { test("print sizes") { sc = new SparkContext("local", "test") val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2) val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2) val data3 = sc.makeRDD(Array(1.0, 2.0), 2) - val zippedRDD = data1.zipPartitions(MapZippedPartitionsSuite.procZippedData, data2, data3) + val zippedRDD = data1.zipPartitions(ZippedPartitionsSuite.procZippedData, data2, data3) val obtainedSizes = zippedRDD.collect() val expectedSizes = Array(2, 3, 1, 2, 3, 1) -- cgit v1.2.3 From 604d3bf56ce2f77ad391b10842ec1c51daf91a97 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Sun, 28 Apr 2013 16:31:07 -0700 Subject: Rename partition class and add scala doc --- core/src/main/scala/spark/RDD.scala | 6 ++++++ core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala | 12 ++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 4310f745f3..09e52ebf3e 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -439,6 +439,12 @@ abstract class RDD[T: ClassManifest]( */ def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other) + /** + * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by + * applying a function to the zipped partitions. Assumes that all the RDDs have the + * *same number of partitions*, but does *not* require them to have the same number + * of elements in each partition. + */ def zipPartitions[B: ClassManifest, V: ClassManifest]( f: (Iterator[T], Iterator[B]) => Iterator[V], rdd2: RDD[B]): RDD[V] = diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala index b3113c1969..fc3f29ffcd 100644 --- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala @@ -3,7 +3,7 @@ package spark.rdd import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext} import java.io.{ObjectOutputStream, IOException} -private[spark] class ZippedPartitions( +private[spark] class ZippedPartitionsPartition( idx: Int, @transient rdds: Seq[RDD[_]]) extends Partition { @@ -32,13 +32,13 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest]( } val array = new Array[Partition](sizes(0)) for (i <- 0 until sizes(0)) { - array(i) = new ZippedPartitions(i, rdds) + array(i) = new ZippedPartitionsPartition(i, rdds) } array } override def getPreferredLocations(s: Partition): Seq[String] = { - val splits = s.asInstanceOf[ZippedPartitions].partitions + val splits = s.asInstanceOf[ZippedPartitionsPartition].partitions val preferredLocations = rdds.zip(splits).map(x => x._1.preferredLocations(x._2)) preferredLocations.reduce((x, y) => x.intersect(y)) } @@ -57,7 +57,7 @@ class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest] extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { - val partitions = s.asInstanceOf[ZippedPartitions].partitions + val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context)) } @@ -78,7 +78,7 @@ class ZippedPartitionsRDD3 extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { - val partitions = s.asInstanceOf[ZippedPartitions].partitions + val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context), rdd3.iterator(partitions(2), context)) @@ -103,7 +103,7 @@ class ZippedPartitionsRDD4 extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { - val partitions = s.asInstanceOf[ZippedPartitions].partitions + val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context), rdd3.iterator(partitions(2), context), -- cgit v1.2.3 From bce4089f22f5e17811f63368b164fae66774095f Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 28 Apr 2013 22:23:48 -0700 Subject: Fix BlockManagerSuite to deal with clearing spark.hostPort --- core/src/test/scala/spark/storage/BlockManagerSuite.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index b8c0f6fb76..77f444bcad 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -15,6 +15,8 @@ import org.scalatest.time.SpanSugar._ import spark.JavaSerializer import spark.KryoSerializer import spark.SizeEstimator +import spark.Utils +import spark.util.AkkaUtils import spark.util.ByteBufferInputStream class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester { @@ -31,7 +33,12 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT val serializer = new KryoSerializer before { - actorSystem = ActorSystem("test") + val hostname = Utils.localHostName + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", hostname, 0) + this.actorSystem = actorSystem + System.setProperty("spark.driver.port", boundPort.toString) + System.setProperty("spark.hostPort", hostname + ":" + boundPort) + master = new BlockManagerMaster( actorSystem.actorOf(Props(new spark.storage.BlockManagerMasterActor(true)))) @@ -44,6 +51,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } after { + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") + if (store != null) { store.stop() store = null -- cgit v1.2.3 From 0f45347c7b7243dbf54569f057a3605f96d614af Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 28 Apr 2013 22:29:27 -0700 Subject: More unit test fixes --- core/src/test/scala/spark/MapOutputTrackerSuite.scala | 3 +++ core/src/test/scala/spark/storage/BlockManagerSuite.scala | 5 ++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala index 3abc584b6a..e95818db61 100644 --- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala @@ -81,6 +81,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("remote fetch") { val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", "localhost", 0) + System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext + System.setProperty("spark.hostPort", "localhost:" + boundPort) + val masterTracker = new MapOutputTracker() masterTracker.trackerActor = actorSystem.actorOf( Props(new MapOutputTrackerActor(masterTracker)), "MapOutputTracker") diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 77f444bcad..5a11a4483b 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -33,11 +33,10 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT val serializer = new KryoSerializer before { - val hostname = Utils.localHostName - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", hostname, 0) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0) this.actorSystem = actorSystem System.setProperty("spark.driver.port", boundPort.toString) - System.setProperty("spark.hostPort", hostname + ":" + boundPort) + System.setProperty("spark.hostPort", "localhost:" + boundPort) master = new BlockManagerMaster( actorSystem.actorOf(Props(new spark.storage.BlockManagerMasterActor(true)))) -- cgit v1.2.3 From 224fbac0612d5c35259cc9f4963dcd4a65ecc832 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 29 Apr 2013 10:10:14 -0700 Subject: Spark-742: TaskMetrics should not employ per-record timing. This patch does three things: 1. Makes TimedIterator a trait with two implementations (one a no-op) 2. Makes the default behavior to use the no-op implementation 3. Removes DelegateBlockFetchTracker. This is just cleanup, but it seems like the triat doesn't really reduce complexity in any way. In the future we can add other implementations, e.g. ones which perform sampling. --- .../scala/spark/BlockStoreShuffleFetcher.scala | 23 ++++++++++--------- .../main/scala/spark/executor/TaskMetrics.scala | 2 +- .../spark/storage/DelegateBlockFetchTracker.scala | 12 ---------- core/src/main/scala/spark/util/TimedIterator.scala | 26 ++++++++++++++++++---- .../scala/spark/scheduler/SparkListenerSuite.scala | 2 +- 5 files changed, 37 insertions(+), 28 deletions(-) delete mode 100644 core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index c27ed36406..83c22b1f14 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -4,8 +4,8 @@ import executor.{ShuffleReadMetrics, TaskMetrics} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import spark.storage.{DelegateBlockFetchTracker, BlockManagerId} -import util.{CompletionIterator, TimedIterator} +import spark.storage.BlockManagerId +import util.{NoOpTimedIterator, SystemTimedIterator, CompletionIterator, TimedIterator} private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = { @@ -49,17 +49,20 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin } val blockFetcherItr = blockManager.getMultiple(blocksByAddress) - val itr = new TimedIterator(blockFetcherItr.flatMap(unpackBlock)) with DelegateBlockFetchTracker - itr.setDelegate(blockFetcherItr) + val itr = if (System.getProperty("per.record.shuffle.metrics", "false").toBoolean) { + new SystemTimedIterator(blockFetcherItr.flatMap(unpackBlock)) + } else { + new NoOpTimedIterator(blockFetcherItr.flatMap(unpackBlock)) + } CompletionIterator[(K,V), Iterator[(K,V)]](itr, { val shuffleMetrics = new ShuffleReadMetrics shuffleMetrics.shuffleReadMillis = itr.getNetMillis - shuffleMetrics.remoteFetchTime = itr.remoteFetchTime - shuffleMetrics.fetchWaitTime = itr.fetchWaitTime - shuffleMetrics.remoteBytesRead = itr.remoteBytesRead - shuffleMetrics.totalBlocksFetched = itr.totalBlocks - shuffleMetrics.localBlocksFetched = itr.numLocalBlocks - shuffleMetrics.remoteBlocksFetched = itr.numRemoteBlocks + shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime + shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime + shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead + shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks + shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks + shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks metrics.shuffleReadMetrics = Some(shuffleMetrics) }) } diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala index 93bbb6b458..45f6d43971 100644 --- a/core/src/main/scala/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/spark/executor/TaskMetrics.scala @@ -51,7 +51,7 @@ class ShuffleReadMetrics extends Serializable { /** * Total time to read shuffle data */ - var shuffleReadMillis: Long = _ + var shuffleReadMillis: Option[Long] = _ /** * Total time that is spent blocked waiting for shuffle to fetch data diff --git a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala b/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala deleted file mode 100644 index f6c28dce52..0000000000 --- a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala +++ /dev/null @@ -1,12 +0,0 @@ -package spark.storage - -private[spark] trait DelegateBlockFetchTracker extends BlockFetchTracker { - var delegate : BlockFetchTracker = _ - def setDelegate(d: BlockFetchTracker) {delegate = d} - def totalBlocks = delegate.totalBlocks - def numLocalBlocks = delegate.numLocalBlocks - def numRemoteBlocks = delegate.numRemoteBlocks - def remoteFetchTime = delegate.remoteFetchTime - def fetchWaitTime = delegate.fetchWaitTime - def remoteBytesRead = delegate.remoteBytesRead -} diff --git a/core/src/main/scala/spark/util/TimedIterator.scala b/core/src/main/scala/spark/util/TimedIterator.scala index 539b01f4ce..49f1276b4e 100644 --- a/core/src/main/scala/spark/util/TimedIterator.scala +++ b/core/src/main/scala/spark/util/TimedIterator.scala @@ -1,13 +1,21 @@ package spark.util /** - * A utility for tracking the total time an iterator takes to iterate through its elements. + * A utility for tracking the the time an iterator takes to iterate through its elements. + */ +trait TimedIterator { + def getNetMillis: Option[Long] + def getAverageTimePerItem: Option[Double] +} + +/** + * A TimedIterator which uses System.currentTimeMillis() on every call to next(). * * In general, this should only be used if you expect it to take a considerable amount of time * (eg. milliseconds) to get each element -- otherwise, the timing won't be very accurate, * and you are probably just adding more overhead */ -class TimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] { +class SystemTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator { private var netMillis = 0l private var nElems = 0 def hasNext = { @@ -26,7 +34,17 @@ class TimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] { r } - def getNetMillis = netMillis - def getAverageTimePerItem = netMillis / nElems.toDouble + def getNetMillis = Some(netMillis) + def getAverageTimePerItem = Some(netMillis / nElems.toDouble) } + +/** + * A TimedIterator which doesn't perform any timing measurements. + */ +class NoOpTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator { + def hasNext = sub.hasNext + def next = sub.next + def getNetMillis = None + def getAverageTimePerItem = None +} diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala index 2f5af10e69..5ccab369db 100644 --- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala @@ -57,7 +57,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc taskMetrics.shuffleReadMetrics should be ('defined) val sm = taskMetrics.shuffleReadMetrics.get sm.totalBlocksFetched should be > (0) - sm.shuffleReadMillis should be > (0l) + sm.shuffleReadMillis.get should be > (0l) sm.localBlocksFetched should be > (0) sm.remoteBlocksFetched should be (0) sm.remoteBytesRead should be (0l) -- cgit v1.2.3 From 540be6b1544d26c7db79ec84a98fc6696c7c6434 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 29 Apr 2013 11:32:07 -0700 Subject: Modified version of the fix which just removes all per-record tracking. --- .../scala/spark/BlockStoreShuffleFetcher.scala | 9 +--- .../main/scala/spark/executor/TaskMetrics.scala | 5 --- core/src/main/scala/spark/util/TimedIterator.scala | 50 ---------------------- .../scala/spark/scheduler/SparkListenerSuite.scala | 1 - 4 files changed, 2 insertions(+), 63 deletions(-) delete mode 100644 core/src/main/scala/spark/util/TimedIterator.scala diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 83c22b1f14..ce61d27448 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -5,7 +5,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import spark.storage.BlockManagerId -import util.{NoOpTimedIterator, SystemTimedIterator, CompletionIterator, TimedIterator} +import util.CompletionIterator private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = { @@ -49,14 +49,9 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin } val blockFetcherItr = blockManager.getMultiple(blocksByAddress) - val itr = if (System.getProperty("per.record.shuffle.metrics", "false").toBoolean) { - new SystemTimedIterator(blockFetcherItr.flatMap(unpackBlock)) - } else { - new NoOpTimedIterator(blockFetcherItr.flatMap(unpackBlock)) - } + val itr = blockFetcherItr.flatMap(unpackBlock) CompletionIterator[(K,V), Iterator[(K,V)]](itr, { val shuffleMetrics = new ShuffleReadMetrics - shuffleMetrics.shuffleReadMillis = itr.getNetMillis shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala index 45f6d43971..a7c56c2371 100644 --- a/core/src/main/scala/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/spark/executor/TaskMetrics.scala @@ -48,11 +48,6 @@ class ShuffleReadMetrics extends Serializable { */ var localBlocksFetched: Int = _ - /** - * Total time to read shuffle data - */ - var shuffleReadMillis: Option[Long] = _ - /** * Total time that is spent blocked waiting for shuffle to fetch data */ diff --git a/core/src/main/scala/spark/util/TimedIterator.scala b/core/src/main/scala/spark/util/TimedIterator.scala deleted file mode 100644 index 49f1276b4e..0000000000 --- a/core/src/main/scala/spark/util/TimedIterator.scala +++ /dev/null @@ -1,50 +0,0 @@ -package spark.util - -/** - * A utility for tracking the the time an iterator takes to iterate through its elements. - */ -trait TimedIterator { - def getNetMillis: Option[Long] - def getAverageTimePerItem: Option[Double] -} - -/** - * A TimedIterator which uses System.currentTimeMillis() on every call to next(). - * - * In general, this should only be used if you expect it to take a considerable amount of time - * (eg. milliseconds) to get each element -- otherwise, the timing won't be very accurate, - * and you are probably just adding more overhead - */ -class SystemTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator { - private var netMillis = 0l - private var nElems = 0 - def hasNext = { - val start = System.currentTimeMillis() - val r = sub.hasNext - val end = System.currentTimeMillis() - netMillis += (end - start) - r - } - def next = { - val start = System.currentTimeMillis() - val r = sub.next - val end = System.currentTimeMillis() - netMillis += (end - start) - nElems += 1 - r - } - - def getNetMillis = Some(netMillis) - def getAverageTimePerItem = Some(netMillis / nElems.toDouble) - -} - -/** - * A TimedIterator which doesn't perform any timing measurements. - */ -class NoOpTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator { - def hasNext = sub.hasNext - def next = sub.next - def getNetMillis = None - def getAverageTimePerItem = None -} diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala index 5ccab369db..42a87d8b90 100644 --- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala @@ -57,7 +57,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc taskMetrics.shuffleReadMetrics should be ('defined) val sm = taskMetrics.shuffleReadMetrics.get sm.totalBlocksFetched should be > (0) - sm.shuffleReadMillis.get should be > (0l) sm.localBlocksFetched should be > (0) sm.remoteBlocksFetched should be (0) sm.remoteBytesRead should be (0l) -- cgit v1.2.3 From 016ce1fa9c9ebbe45559b1cbd95a3674510fe880 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 29 Apr 2013 12:02:27 -0700 Subject: Using full package name for util --- core/src/main/scala/spark/BlockStoreShuffleFetcher.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index ce61d27448..2987dbbe58 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -5,7 +5,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import spark.storage.BlockManagerId -import util.CompletionIterator +import spark.util.CompletionIterator private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = { -- cgit v1.2.3 From f1f92c88eb2960a16d33bf7dd291c8ce58f665de Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 29 Apr 2013 17:08:45 -0700 Subject: Build against Hadoop 1 by default --- project/SparkBuild.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 7bd6c4c235..f2410085d8 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -11,8 +11,9 @@ import twirl.sbt.TwirlPlugin._ object SparkBuild extends Build { // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or // "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop. - //val HADOOP_VERSION = "1.0.4" - //val HADOOP_MAJOR_VERSION = "1" + val HADOOP_VERSION = "1.0.4" + val HADOOP_MAJOR_VERSION = "1" + val HADOOP_YARN = false // For Hadoop 2 versions such as "2.0.0-mr1-cdh4.1.1", set the HADOOP_MAJOR_VERSION to "2" //val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1" @@ -20,10 +21,9 @@ object SparkBuild extends Build { //val HADOOP_YARN = false // For Hadoop 2 YARN support - // val HADOOP_VERSION = "0.23.7" - val HADOOP_VERSION = "2.0.2-alpha" - val HADOOP_MAJOR_VERSION = "2" - val HADOOP_YARN = true + //val HADOOP_VERSION = "2.0.2-alpha" + //val HADOOP_MAJOR_VERSION = "2" + //val HADOOP_YARN = true lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel, streaming) -- cgit v1.2.3 From 848156273178bed5763bcbc91baa788bd4a57f6e Mon Sep 17 00:00:00 2001 From: harshars Date: Mon, 25 Mar 2013 20:09:07 -0700 Subject: Merged Ram's commit on removing RDDs. Conflicts: core/src/main/scala/spark/SparkContext.scala --- core/src/main/scala/spark/SparkContext.scala | 62 +++++++++++++++--------- core/src/test/scala/spark/DistributedSuite.scala | 12 +++++ core/src/test/scala/spark/RDDSuite.scala | 7 +++ 3 files changed, 59 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 5f5ec0b0f4..8bee1d65a2 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -1,47 +1,50 @@ package spark import java.io._ -import java.util.concurrent.atomic.AtomicInteger import java.net.URI +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.generic.Growable -import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ +import scala.collection.mutable.{ConcurrentMap, HashMap} + +import akka.actor.Actor._ -import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapred.InputFormat -import org.apache.hadoop.mapred.SequenceFileInputFormat -import org.apache.hadoop.io.Writable -import org.apache.hadoop.io.IntWritable -import org.apache.hadoop.io.LongWritable -import org.apache.hadoop.io.FloatWritable -import org.apache.hadoop.io.DoubleWritable +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.ArrayWritable import org.apache.hadoop.io.BooleanWritable import org.apache.hadoop.io.BytesWritable -import org.apache.hadoop.io.ArrayWritable +import org.apache.hadoop.io.DoubleWritable +import org.apache.hadoop.io.FloatWritable +import org.apache.hadoop.io.IntWritable +import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.Text +import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.FileInputFormat +import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.SequenceFileInputFormat import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} + import org.apache.mesos.MesosNativeLibrary -import spark.deploy.{SparkHadoopUtil, LocalSparkCluster} -import spark.partial.ApproximateEvaluator -import spark.partial.PartialResult +import spark.deploy.{LocalSparkCluster, SparkHadoopUtil} +import spark.partial.{ApproximateEvaluator, PartialResult} import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} -import spark.scheduler._ +import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler} +import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler} import spark.scheduler.local.LocalScheduler -import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler} import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} -import spark.storage.BlockManagerUI +import spark.storage.{BlockManagerUI, StorageStatus, StorageUtils, RDDInfo} import spark.util.{MetadataCleaner, TimeStampedHashMap} -import spark.storage.{StorageStatus, StorageUtils, RDDInfo} + /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -97,7 +100,7 @@ class SparkContext( private[spark] val addedJars = HashMap[String, Long]() // Keeps track of all persisted RDDs - private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]() + private[spark] val persistentRdds: ConcurrentMap[Int, RDD[_]] = new ConcurrentHashMap[Int, RDD[_]]() private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup) @@ -520,6 +523,21 @@ class SparkContext( env.blockManager.master.getStorageStatus } + def removeRDD(id: Int): Unit = { + val storageStatusList = getExecutorStorageStatus + val groupedRddBlocks = storageStatusList.flatMap(_.blocks).toMap + logInfo("RDD to remove: " + id) + groupedRddBlocks.foreach(x => { + val k = x._1.substring(0,x._1.lastIndexOf('_')) + val rdd_id = "rdd_" + id + logInfo("RDD to check: " + rdd_id) + if(k.equals(rdd_id)) { + env.blockManager.master.removeBlock(x._1) + } + }) + persistentRdds.remove(id) + } + /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. @@ -743,7 +761,7 @@ class SparkContext( /** Called by MetadataCleaner to clean up the persistentRdds map periodically */ private[spark] def cleanup(cleanupTime: Long) { - persistentRdds.clearOldValues(cleanupTime) + // do nothing. this needs to be removed. } } diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index c9b4707def..c7f6ab3133 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -252,6 +252,18 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(data2.count === 2) } } + + test("remove RDDs cleanly") { + DistributedSuite.amMaster = true + sc = new SparkContext("local-cluster[3,1,512]", "test") + val data = sc.parallelize(Seq(true, false, false, false), 4) + data.persist(StorageLevel.MEMORY_ONLY_2) + data.count + sc.removeRDD(data.id) + assert(sc.persistentRdds.isEmpty == true) + assert(sc.getRDDStorageInfo.isEmpty == true) + + } } object DistributedSuite { diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 7fbdd44340..88b7ab9f52 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -100,6 +100,13 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(rdd.collect().toList === List(1, 2, 3, 4)) } + test("remove RDD") { + sc = new SparkContext("local", "test") + val rdd = sc.makeRDD(Array(1,2,3,4), 2).cache() + sc.removeRDD(rdd.id) + assert(sc.persistentRdds.empty == true) + } + test("caching with failures") { sc = new SparkContext("local", "test") val onlySplit = new Partition { override def index: Int = 0 } -- cgit v1.2.3 From 3227ec8edde05cff27c1f9de8861d18b3cda1aae Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 1 May 2013 16:07:44 -0700 Subject: Cleaned up Ram's code. Moved SparkContext.remove to RDD.unpersist. Also updated unit tests to make sure they are properly testing for concurrency. --- core/src/main/scala/spark/RDD.scala | 17 ++++++++++++ core/src/main/scala/spark/SparkContext.scala | 25 ++++-------------- .../main/scala/spark/storage/BlockManagerUI.scala | 4 +-- core/src/test/scala/spark/DistributedSuite.scala | 30 ++++++++++++++++------ core/src/test/scala/spark/RDDSuite.scala | 27 +++++++++++++++---- 5 files changed, 68 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 09e52ebf3e..c77f9915c0 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -140,6 +140,23 @@ abstract class RDD[T: ClassManifest]( /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): RDD[T] = persist() + /** Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. */ + def unpersist(): RDD[T] = { + logInfo("Removing RDD " + id + " from persistence list") + val rddBlockPrefix = "rdd_" + id + "_" + // Get the list of blocks in block manager, and remove ones that are part of this RDD. + // The runtime complexity is linear to the number of blocks persisted in the cluster. + // It could be expensive if the cluster is large and has a lot of blocks persisted. + sc.getExecutorStorageStatus().flatMap(_.blocks).foreach { case(blockId, status) => + if (blockId.startsWith(rddBlockPrefix)) { + sc.env.blockManager.master.removeBlock(blockId) + } + } + sc.persistentRdds.remove(id) + storageLevel = StorageLevel.NONE + this + } + /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ def getStorageLevel = storageLevel diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 8bee1d65a2..b686c595b8 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -100,7 +100,7 @@ class SparkContext( private[spark] val addedJars = HashMap[String, Long]() // Keeps track of all persisted RDDs - private[spark] val persistentRdds: ConcurrentMap[Int, RDD[_]] = new ConcurrentHashMap[Int, RDD[_]]() + private[spark] val persistentRdds: ConcurrentMap[Int, RDD[_]] = new ConcurrentHashMap[Int, RDD[_]] private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup) @@ -508,36 +508,21 @@ class SparkContext( * Return information about what RDDs are cached, if they are in mem or on disk, how much space * they take, etc. */ - def getRDDStorageInfo : Array[RDDInfo] = { - StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this) + def getRDDStorageInfo(): Array[RDDInfo] = { + StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus(), this) } - def getStageInfo: Map[Stage,StageInfo] = { + def getStageInfo(): Map[Stage,StageInfo] = { dagScheduler.stageToInfos } /** * Return information about blocks stored in all of the slaves */ - def getExecutorStorageStatus : Array[StorageStatus] = { + def getExecutorStorageStatus(): Array[StorageStatus] = { env.blockManager.master.getStorageStatus } - def removeRDD(id: Int): Unit = { - val storageStatusList = getExecutorStorageStatus - val groupedRddBlocks = storageStatusList.flatMap(_.blocks).toMap - logInfo("RDD to remove: " + id) - groupedRddBlocks.foreach(x => { - val k = x._1.substring(0,x._1.lastIndexOf('_')) - val rdd_id = "rdd_" + id - logInfo("RDD to check: " + rdd_id) - if(k.equals(rdd_id)) { - env.blockManager.master.removeBlock(x._1) - } - }) - persistentRdds.remove(id) - } - /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index 07da572044..c9e4519efe 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -45,7 +45,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, path("") { completeWith { // Request the current storage status from the Master - val storageStatusList = sc.getExecutorStorageStatus + val storageStatusList = sc.getExecutorStorageStatus() // Calculate macro-level statistics val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) @@ -60,7 +60,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, parameter("id") { id => completeWith { val prefix = "rdd_" + id.toString - val storageStatusList = sc.getExecutorStorageStatus + val storageStatusList = sc.getExecutorStorageStatus() val filteredStorageStatusList = StorageUtils. filterStorageStatusByPrefix(storageStatusList, prefix) val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index c7f6ab3133..ab3e197035 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -3,8 +3,10 @@ package spark import network.ConnectionManagerId import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.Timeouts._ import org.scalatest.matchers.ShouldMatchers import org.scalatest.prop.Checkers +import org.scalatest.time.{Span, Millis} import org.scalacheck.Arbitrary._ import org.scalacheck.Gen import org.scalacheck.Prop._ @@ -252,24 +254,36 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(data2.count === 2) } } - - test("remove RDDs cleanly") { + + test("unpersist RDDs") { DistributedSuite.amMaster = true sc = new SparkContext("local-cluster[3,1,512]", "test") val data = sc.parallelize(Seq(true, false, false, false), 4) data.persist(StorageLevel.MEMORY_ONLY_2) data.count - sc.removeRDD(data.id) + assert(sc.persistentRdds.isEmpty == false) + data.unpersist() assert(sc.persistentRdds.isEmpty == true) + + failAfter(Span(3000, Millis)) { + try { + while (! sc.getRDDStorageInfo.isEmpty) { + Thread.sleep(200) + } + } catch { + case e: Exception => + // Do nothing. We might see exceptions because block manager + // is racing this thread to remove entries from the driver. + } + } assert(sc.getRDDStorageInfo.isEmpty == true) - } } object DistributedSuite { // Indicates whether this JVM is marked for failure. var mark = false - + // Set by test to remember if we are in the driver program so we can assert // that we are not. var amMaster = false @@ -286,9 +300,9 @@ object DistributedSuite { // Act like an identity function, but if mark was set to true previously, fail, // crashing the entire JVM. def failOnMarkedIdentity(item: Boolean): Boolean = { - if (mark) { + if (mark) { System.exit(42) - } + } item - } + } } diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 88b7ab9f52..cee6312572 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -2,6 +2,8 @@ package spark import scala.collection.mutable.HashMap import org.scalatest.FunSuite +import org.scalatest.concurrent.Timeouts._ +import org.scalatest.time.{Span, Millis} import spark.SparkContext._ import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD} @@ -100,11 +102,26 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(rdd.collect().toList === List(1, 2, 3, 4)) } - test("remove RDD") { - sc = new SparkContext("local", "test") - val rdd = sc.makeRDD(Array(1,2,3,4), 2).cache() - sc.removeRDD(rdd.id) - assert(sc.persistentRdds.empty == true) + test("unpersist RDD") { + sc = new SparkContext("local", "test") + val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() + rdd.count + assert(sc.persistentRdds.isEmpty == false) + rdd.unpersist() + assert(sc.persistentRdds.isEmpty == true) + + failAfter(Span(3000, Millis)) { + try { + while (! sc.getRDDStorageInfo.isEmpty) { + Thread.sleep(200) + } + } catch { + case e: Exception => + // Do nothing. We might see exceptions because block manager + // is racing this thread to remove entries from the driver. + } + } + assert(sc.getRDDStorageInfo.isEmpty == true) } test("caching with failures") { -- cgit v1.2.3 From 34637b97ec7ebdd356653324f15345b00b3a2ac2 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 1 May 2013 16:12:37 -0700 Subject: Added SparkContext.cleanup back. Not sure why it was removed before ... --- core/src/main/scala/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index b686c595b8..401e55d615 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -746,7 +746,7 @@ class SparkContext( /** Called by MetadataCleaner to clean up the persistentRdds map periodically */ private[spark] def cleanup(cleanupTime: Long) { - // do nothing. this needs to be removed. + persistentRdds.clearOldValues(cleanupTime) } } -- cgit v1.2.3 From 204eb32e14e8fce5e4b4cf602375ae9b4ed136c9 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 1 May 2013 16:14:58 -0700 Subject: Changed the type of the persistentRdds hashmap back to TimeStampedHashMap. --- core/src/main/scala/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 401e55d615..d7d450d958 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -100,7 +100,7 @@ class SparkContext( private[spark] val addedJars = HashMap[String, Long]() // Keeps track of all persisted RDDs - private[spark] val persistentRdds: ConcurrentMap[Int, RDD[_]] = new ConcurrentHashMap[Int, RDD[_]] + private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]] private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup) -- cgit v1.2.3 From 207afe4088219a0c7350b3f80eb60e86c97e140f Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Thu, 18 Apr 2013 12:08:11 -0700 Subject: Remove spark-repl's extraneous dependency on spark-streaming --- project/SparkBuild.scala | 2 +- repl/pom.xml | 14 -------------- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f2410085d8..190d723435 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -29,7 +29,7 @@ object SparkBuild extends Build { lazy val core = Project("core", file("core"), settings = coreSettings) - lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) dependsOn (streaming) + lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming) diff --git a/repl/pom.xml b/repl/pom.xml index 038da5d988..92a2020b48 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -96,13 +96,6 @@ hadoop1 runtime - - org.spark-project - spark-streaming - ${project.version} - hadoop1 - runtime - org.apache.hadoop hadoop-core @@ -147,13 +140,6 @@ hadoop2 runtime - - org.spark-project - spark-streaming - ${project.version} - hadoop2 - runtime - org.apache.hadoop hadoop-core -- cgit v1.2.3 From c047f0e3adae59d7e388a1d42d940c3cd5714f82 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 26 Apr 2013 13:28:21 +0800 Subject: filter out Spark streaming block RDD and sort RDDInfo with id --- .../main/scala/spark/storage/StorageUtils.scala | 33 ++++++++++++++-------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index dec47a9d41..8f52168c24 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -4,9 +4,9 @@ import spark.{Utils, SparkContext} import BlockManagerMasterActor.BlockStatus private[spark] -case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, +case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, blocks: Map[String, BlockStatus]) { - + def memUsed(blockPrefix: String = "") = { blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.memSize). reduceOption(_+_).getOrElse(0l) @@ -22,35 +22,40 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, } case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, - numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long) { + numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long) + extends Ordered[RDDInfo] { override def toString = { import Utils.memoryBytesToString "RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id, storageLevel.toString, numCachedPartitions, numPartitions, memoryBytesToString(memSize), memoryBytesToString(diskSize)) } + + override def compare(that: RDDInfo) = { + this.id - that.id + } } /* Helper methods for storage-related objects */ private[spark] object StorageUtils { - /* Given the current storage status of the BlockManager, returns information for each RDD */ - def rddInfoFromStorageStatus(storageStatusList: Array[StorageStatus], + /* Given the current storage status of the BlockManager, returns information for each RDD */ + def rddInfoFromStorageStatus(storageStatusList: Array[StorageStatus], sc: SparkContext) : Array[RDDInfo] = { - rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc) + rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc) } - /* Given a list of BlockStatus objets, returns information for each RDD */ - def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus], + /* Given a list of BlockStatus objets, returns information for each RDD */ + def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus], sc: SparkContext) : Array[RDDInfo] = { // Group by rddId, ignore the partition name - val groupedRddBlocks = infos.groupBy { case(k, v) => + val groupedRddBlocks = infos.filterKeys(_.startsWith("rdd_")).groupBy { case(k, v) => k.substring(0,k.lastIndexOf('_')) }.mapValues(_.values.toArray) // For each RDD, generate an RDDInfo object - groupedRddBlocks.map { case(rddKey, rddBlocks) => + val rddInfos = groupedRddBlocks.map { case(rddKey, rddBlocks) => // Add up memory and disk sizes val memSize = rddBlocks.map(_.memSize).reduce(_ + _) @@ -65,10 +70,14 @@ object StorageUtils { RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.partitions.size, memSize, diskSize) }.toArray + + scala.util.Sorting.quickSort(rddInfos) + + rddInfos } - /* Removes all BlockStatus object that are not part of a block prefix */ - def filterStorageStatusByPrefix(storageStatusList: Array[StorageStatus], + /* Removes all BlockStatus object that are not part of a block prefix */ + def filterStorageStatusByPrefix(storageStatusList: Array[StorageStatus], prefix: String) : Array[StorageStatus] = { storageStatusList.map { status => -- cgit v1.2.3 From 98df9d28536f5208530488a316df9401e16490bd Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 1 May 2013 20:17:09 -0700 Subject: Added removeRdd function in BlockManager. --- core/src/main/scala/spark/RDD.scala | 15 ++++--------- core/src/main/scala/spark/SparkContext.scala | 8 +++---- .../scala/spark/storage/BlockManagerMaster.scala | 16 ++++++++++++++ .../main/scala/spark/storage/BlockManagerUI.scala | 4 ++-- .../scala/spark/storage/BlockManagerSuite.scala | 25 ++++++++++++++++++++++ 5 files changed, 51 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index c77f9915c0..fd14ef17f1 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -107,7 +107,7 @@ abstract class RDD[T: ClassManifest]( // ======================================================================= /** A unique ID for this RDD (within its SparkContext). */ - val id = sc.newRddId() + val id: Int = sc.newRddId() /** A friendly name for this RDD */ var name: String = null @@ -120,7 +120,8 @@ abstract class RDD[T: ClassManifest]( /** * Set this RDD's storage level to persist its values across operations after the first time - * it is computed. Can only be called once on each RDD. + * it is computed. This can only be used to assign a new storage level if the RDD does not + * have a storage level set yet.. */ def persist(newLevel: StorageLevel): RDD[T] = { // TODO: Handle changes of StorageLevel @@ -143,15 +144,7 @@ abstract class RDD[T: ClassManifest]( /** Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. */ def unpersist(): RDD[T] = { logInfo("Removing RDD " + id + " from persistence list") - val rddBlockPrefix = "rdd_" + id + "_" - // Get the list of blocks in block manager, and remove ones that are part of this RDD. - // The runtime complexity is linear to the number of blocks persisted in the cluster. - // It could be expensive if the cluster is large and has a lot of blocks persisted. - sc.getExecutorStorageStatus().flatMap(_.blocks).foreach { case(blockId, status) => - if (blockId.startsWith(rddBlockPrefix)) { - sc.env.blockManager.master.removeBlock(blockId) - } - } + sc.env.blockManager.master.removeRdd(id) sc.persistentRdds.remove(id) storageLevel = StorageLevel.NONE this diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index d7d450d958..2ae4ad8659 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -508,18 +508,18 @@ class SparkContext( * Return information about what RDDs are cached, if they are in mem or on disk, how much space * they take, etc. */ - def getRDDStorageInfo(): Array[RDDInfo] = { - StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus(), this) + def getRDDStorageInfo: Array[RDDInfo] = { + StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this) } - def getStageInfo(): Map[Stage,StageInfo] = { + def getStageInfo: Map[Stage,StageInfo] = { dagScheduler.stageToInfos } /** * Return information about blocks stored in all of the slaves */ - def getExecutorStorageStatus(): Array[StorageStatus] = { + def getExecutorStorageStatus: Array[StorageStatus] = { env.blockManager.master.getStorageStatus } diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 6fae62d373..ac26c16867 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -15,6 +15,7 @@ import akka.util.duration._ import spark.{Logging, SparkException, Utils} + private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging { val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt @@ -87,6 +88,21 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi askDriverWithReply(RemoveBlock(blockId)) } + /** + * Remove all blocks belonging to the given RDD. + */ + def removeRdd(rddId: Int) { + val rddBlockPrefix = "rdd_" + rddId + "_" + // Get the list of blocks in block manager, and remove ones that are part of this RDD. + // The runtime complexity is linear to the number of blocks persisted in the cluster. + // It could be expensive if the cluster is large and has a lot of blocks persisted. + getStorageStatus.flatMap(_.blocks).foreach { case(blockId, status) => + if (blockId.startsWith(rddBlockPrefix)) { + removeBlock(blockId) + } + } + } + /** * Return the memory status for each block manager, in the form of a map from * the block manager's id to two long values. The first value is the maximum diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index c9e4519efe..07da572044 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -45,7 +45,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, path("") { completeWith { // Request the current storage status from the Master - val storageStatusList = sc.getExecutorStorageStatus() + val storageStatusList = sc.getExecutorStorageStatus // Calculate macro-level statistics val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) @@ -60,7 +60,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, parameter("id") { id => completeWith { val prefix = "rdd_" + id.toString - val storageStatusList = sc.getExecutorStorageStatus() + val storageStatusList = sc.getExecutorStorageStatus val filteredStorageStatusList = StorageUtils. filterStorageStatusByPrefix(storageStatusList, prefix) val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 5a11a4483b..9fe0de665c 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -207,6 +207,31 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } } + test("removing rdd") { + store = new BlockManager("", actorSystem, master, serializer, 2000) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + // Putting a1, a2 and a3 in memory. + store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY) + master.removeRdd(0) + + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("rdd_0_0") should be (None) + master.getLocations("rdd_0_0") should have size 0 + } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("rdd_0_1") should be (None) + master.getLocations("rdd_0_1") should have size 0 + } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("nonrddblock") should not be (None) + master.getLocations("nonrddblock") should have size (1) + } + } + test("reregistration on heart beat") { val heartBeat = PrivateMethod[Unit]('heartBeat) store = new BlockManager("", actorSystem, master, serializer, 2000) -- cgit v1.2.3 From 4a318774088f829fe54c3ef0b5f565a845631b4e Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 1 May 2013 20:31:54 -0700 Subject: Added the unpersist api to JavaRDD. --- core/src/main/scala/spark/api/java/JavaRDD.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala index e29f1e5899..eb81ed64cd 100644 --- a/core/src/main/scala/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaRDD.scala @@ -14,12 +14,18 @@ JavaRDDLike[T, JavaRDD[T]] { /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): JavaRDD[T] = wrapRDD(rdd.cache()) - /** + /** * Set this RDD's storage level to persist its values across operations after the first time - * it is computed. Can only be called once on each RDD. + * it is computed. This can only be used to assign a new storage level if the RDD does not + * have a storage level set yet.. */ def persist(newLevel: StorageLevel): JavaRDD[T] = wrapRDD(rdd.persist(newLevel)) + /** + * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. + */ + def unpersist(): JavaRDD[T] = wrapRDD(rdd.unpersist()) + // Transformations (return a new RDD) /** @@ -31,7 +37,7 @@ JavaRDDLike[T, JavaRDD[T]] { * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numPartitions)) - + /** * Return a new RDD containing only the elements that satisfy a predicate. */ @@ -54,7 +60,7 @@ JavaRDDLike[T, JavaRDD[T]] { */ def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] = wrapRDD(rdd.sample(withReplacement, fraction, seed)) - + /** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). @@ -63,7 +69,7 @@ JavaRDDLike[T, JavaRDD[T]] { /** * Return an RDD with the elements from `this` that are not in `other`. - * + * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ -- cgit v1.2.3 From c847dd3da29483fede326cb9821b0d33f735137e Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Tue, 19 Mar 2013 15:08:22 -0700 Subject: Don't accept generated temp directory names that can't be created successfully. --- core/src/main/scala/spark/storage/DiskStore.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index c9553d2e0f..215c25132b 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -168,8 +168,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) localDir = new File(rootDir, "spark-local-" + localDirId) if (!localDir.exists) { - localDir.mkdirs() - foundLocalDir = true + foundLocalDir = localDir.mkdirs() } } catch { case e: Exception => -- cgit v1.2.3