aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMridul Muralidharan <mridul@gmail.com>2013-05-04 00:35:59 +0530
committerMridul Muralidharan <mridul@gmail.com>2013-05-04 00:35:59 +0530
commitea2a6f91d383c958b9bab56858f4ef0c8b6ec847 (patch)
treee819b6f8d18a7983459da5de9a548890a88877d2 /core
parent11589c39d9f75e9757ba1717c5202f77d30031b2 (diff)
parent6fe9d4e61e30622abdbf4877daf5653d7339e4e8 (diff)
downloadspark-ea2a6f91d383c958b9bab56858f4ef0c8b6ec847.tar.gz
spark-ea2a6f91d383c958b9bab56858f4ef0c8b6ec847.tar.bz2
spark-ea2a6f91d383c958b9bab56858f4ef0c8b6ec847.zip
pull from master
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/BlockStoreShuffleFetcher.scala20
-rw-r--r--core/src/main/scala/spark/RDD.scala42
-rw-r--r--core/src/main/scala/spark/SparkContext.scala49
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDD.scala16
-rw-r--r--core/src/main/scala/spark/executor/TaskMetrics.scala5
-rw-r--r--core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala120
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala16
-rw-r--r--core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala12
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala3
-rw-r--r--core/src/main/scala/spark/storage/StorageUtils.scala33
-rw-r--r--core/src/main/scala/spark/util/TimedIterator.scala32
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala34
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala2
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala24
-rw-r--r--core/src/test/scala/spark/ZippedPartitionsSuite.scala34
-rw-r--r--core/src/test/scala/spark/scheduler/SparkListenerSuite.scala1
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala36
17 files changed, 369 insertions, 110 deletions
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index c27ed36406..2987dbbe58 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -4,8 +4,8 @@ import executor.{ShuffleReadMetrics, TaskMetrics}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import spark.storage.{DelegateBlockFetchTracker, BlockManagerId}
-import util.{CompletionIterator, TimedIterator}
+import spark.storage.BlockManagerId
+import spark.util.CompletionIterator
private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = {
@@ -49,17 +49,15 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
}
val blockFetcherItr = blockManager.getMultiple(blocksByAddress)
- val itr = new TimedIterator(blockFetcherItr.flatMap(unpackBlock)) with DelegateBlockFetchTracker
- itr.setDelegate(blockFetcherItr)
+ val itr = blockFetcherItr.flatMap(unpackBlock)
CompletionIterator[(K,V), Iterator[(K,V)]](itr, {
val shuffleMetrics = new ShuffleReadMetrics
- shuffleMetrics.shuffleReadMillis = itr.getNetMillis
- shuffleMetrics.remoteFetchTime = itr.remoteFetchTime
- shuffleMetrics.fetchWaitTime = itr.fetchWaitTime
- shuffleMetrics.remoteBytesRead = itr.remoteBytesRead
- shuffleMetrics.totalBlocksFetched = itr.totalBlocks
- shuffleMetrics.localBlocksFetched = itr.numLocalBlocks
- shuffleMetrics.remoteBlocksFetched = itr.numRemoteBlocks
+ shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
+ shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
+ shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
+ shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
+ shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
+ shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
metrics.shuffleReadMetrics = Some(shuffleMetrics)
})
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index ccd9d0364a..fd14ef17f1 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -35,6 +35,9 @@ import spark.rdd.ShuffledRDD
import spark.rdd.SubtractedRDD
import spark.rdd.UnionRDD
import spark.rdd.ZippedRDD
+import spark.rdd.ZippedPartitionsRDD2
+import spark.rdd.ZippedPartitionsRDD3
+import spark.rdd.ZippedPartitionsRDD4
import spark.storage.StorageLevel
import SparkContext._
@@ -104,7 +107,7 @@ abstract class RDD[T: ClassManifest](
// =======================================================================
/** A unique ID for this RDD (within its SparkContext). */
- val id = sc.newRddId()
+ val id: Int = sc.newRddId()
/** A friendly name for this RDD */
var name: String = null
@@ -117,7 +120,8 @@ abstract class RDD[T: ClassManifest](
/**
* Set this RDD's storage level to persist its values across operations after the first time
- * it is computed. Can only be called once on each RDD.
+ * it is computed. This can only be used to assign a new storage level if the RDD does not
+ * have a storage level set yet..
*/
def persist(newLevel: StorageLevel): RDD[T] = {
// TODO: Handle changes of StorageLevel
@@ -137,6 +141,15 @@ abstract class RDD[T: ClassManifest](
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): RDD[T] = persist()
+ /** Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. */
+ def unpersist(): RDD[T] = {
+ logInfo("Removing RDD " + id + " from persistence list")
+ sc.env.blockManager.master.removeRdd(id)
+ sc.persistentRdds.remove(id)
+ storageLevel = StorageLevel.NONE
+ this
+ }
+
/** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
def getStorageLevel = storageLevel
@@ -436,6 +449,31 @@ abstract class RDD[T: ClassManifest](
*/
def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
+ /**
+ * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
+ * applying a function to the zipped partitions. Assumes that all the RDDs have the
+ * *same number of partitions*, but does *not* require them to have the same number
+ * of elements in each partition.
+ */
+ def zipPartitions[B: ClassManifest, V: ClassManifest](
+ f: (Iterator[T], Iterator[B]) => Iterator[V],
+ rdd2: RDD[B]): RDD[V] =
+ new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
+
+ def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest](
+ f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V],
+ rdd2: RDD[B],
+ rdd3: RDD[C]): RDD[V] =
+ new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
+
+ def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest](
+ f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
+ rdd2: RDD[B],
+ rdd3: RDD[C],
+ rdd4: RDD[D]): RDD[V] =
+ new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)
+
+
// Actions (launch a job to return a value to the user program)
/**
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 5f5ec0b0f4..2ae4ad8659 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -1,47 +1,50 @@
package spark
import java.io._
-import java.util.concurrent.atomic.AtomicInteger
import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.generic.Growable
-import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions._
+import scala.collection.mutable.{ConcurrentMap, HashMap}
+
+import akka.actor.Actor._
-import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.SequenceFileInputFormat
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.io.IntWritable
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.FloatWritable
-import org.apache.hadoop.io.DoubleWritable
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.ArrayWritable
import org.apache.hadoop.io.BooleanWritable
import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.ArrayWritable
+import org.apache.hadoop.io.DoubleWritable
+import org.apache.hadoop.io.FloatWritable
+import org.apache.hadoop.io.IntWritable
+import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Text
+import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.FileInputFormat
+import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.SequenceFileInputFormat
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
+
import org.apache.mesos.MesosNativeLibrary
-import spark.deploy.{SparkHadoopUtil, LocalSparkCluster}
-import spark.partial.ApproximateEvaluator
-import spark.partial.PartialResult
+import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
+import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
-import spark.scheduler._
+import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
+import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
import spark.scheduler.local.LocalScheduler
-import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import spark.storage.BlockManagerUI
+import spark.storage.{BlockManagerUI, StorageStatus, StorageUtils, RDDInfo}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
-import spark.storage.{StorageStatus, StorageUtils, RDDInfo}
+
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -97,7 +100,7 @@ class SparkContext(
private[spark] val addedJars = HashMap[String, Long]()
// Keeps track of all persisted RDDs
- private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]()
+ private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
@@ -505,7 +508,7 @@ class SparkContext(
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
* they take, etc.
*/
- def getRDDStorageInfo : Array[RDDInfo] = {
+ def getRDDStorageInfo: Array[RDDInfo] = {
StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this)
}
@@ -516,7 +519,7 @@ class SparkContext(
/**
* Return information about blocks stored in all of the slaves
*/
- def getExecutorStorageStatus : Array[StorageStatus] = {
+ def getExecutorStorageStatus: Array[StorageStatus] = {
env.blockManager.master.getStorageStatus
}
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index e29f1e5899..eb81ed64cd 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -14,12 +14,18 @@ JavaRDDLike[T, JavaRDD[T]] {
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaRDD[T] = wrapRDD(rdd.cache())
- /**
+ /**
* Set this RDD's storage level to persist its values across operations after the first time
- * it is computed. Can only be called once on each RDD.
+ * it is computed. This can only be used to assign a new storage level if the RDD does not
+ * have a storage level set yet..
*/
def persist(newLevel: StorageLevel): JavaRDD[T] = wrapRDD(rdd.persist(newLevel))
+ /**
+ * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ */
+ def unpersist(): JavaRDD[T] = wrapRDD(rdd.unpersist())
+
// Transformations (return a new RDD)
/**
@@ -31,7 +37,7 @@ JavaRDDLike[T, JavaRDD[T]] {
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numPartitions))
-
+
/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
@@ -54,7 +60,7 @@ JavaRDDLike[T, JavaRDD[T]] {
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] =
wrapRDD(rdd.sample(withReplacement, fraction, seed))
-
+
/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
@@ -63,7 +69,7 @@ JavaRDDLike[T, JavaRDD[T]] {
/**
* Return an RDD with the elements from `this` that are not in `other`.
- *
+ *
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala
index 93bbb6b458..a7c56c2371 100644
--- a/core/src/main/scala/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/spark/executor/TaskMetrics.scala
@@ -49,11 +49,6 @@ class ShuffleReadMetrics extends Serializable {
var localBlocksFetched: Int = _
/**
- * Total time to read shuffle data
- */
- var shuffleReadMillis: Long = _
-
- /**
* Total time that is spent blocked waiting for shuffle to fetch data
*/
var fetchWaitTime: Long = _
diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
new file mode 100644
index 0000000000..fc3f29ffcd
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
@@ -0,0 +1,120 @@
+package spark.rdd
+
+import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
+import java.io.{ObjectOutputStream, IOException}
+
+private[spark] class ZippedPartitionsPartition(
+ idx: Int,
+ @transient rdds: Seq[RDD[_]])
+ extends Partition {
+
+ override val index: Int = idx
+ var partitionValues = rdds.map(rdd => rdd.partitions(idx))
+ def partitions = partitionValues
+
+ @throws(classOf[IOException])
+ private def writeObject(oos: ObjectOutputStream) {
+ // Update the reference to parent split at the time of task serialization
+ partitionValues = rdds.map(rdd => rdd.partitions(idx))
+ oos.defaultWriteObject()
+ }
+}
+
+abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
+ sc: SparkContext,
+ var rdds: Seq[RDD[_]])
+ extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) {
+
+ override def getPartitions: Array[Partition] = {
+ val sizes = rdds.map(x => x.partitions.size)
+ if (!sizes.forall(x => x == sizes(0))) {
+ throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions")
+ }
+ val array = new Array[Partition](sizes(0))
+ for (i <- 0 until sizes(0)) {
+ array(i) = new ZippedPartitionsPartition(i, rdds)
+ }
+ array
+ }
+
+ override def getPreferredLocations(s: Partition): Seq[String] = {
+ val splits = s.asInstanceOf[ZippedPartitionsPartition].partitions
+ val preferredLocations = rdds.zip(splits).map(x => x._1.preferredLocations(x._2))
+ preferredLocations.reduce((x, y) => x.intersect(y))
+ }
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ rdds = null
+ }
+}
+
+class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest](
+ sc: SparkContext,
+ f: (Iterator[A], Iterator[B]) => Iterator[V],
+ var rdd1: RDD[A],
+ var rdd2: RDD[B])
+ extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) {
+
+ override def compute(s: Partition, context: TaskContext): Iterator[V] = {
+ val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
+ f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context))
+ }
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ rdd1 = null
+ rdd2 = null
+ }
+}
+
+class ZippedPartitionsRDD3
+ [A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest](
+ sc: SparkContext,
+ f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V],
+ var rdd1: RDD[A],
+ var rdd2: RDD[B],
+ var rdd3: RDD[C])
+ extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) {
+
+ override def compute(s: Partition, context: TaskContext): Iterator[V] = {
+ val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
+ f(rdd1.iterator(partitions(0), context),
+ rdd2.iterator(partitions(1), context),
+ rdd3.iterator(partitions(2), context))
+ }
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ rdd1 = null
+ rdd2 = null
+ rdd3 = null
+ }
+}
+
+class ZippedPartitionsRDD4
+ [A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest](
+ sc: SparkContext,
+ f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
+ var rdd1: RDD[A],
+ var rdd2: RDD[B],
+ var rdd3: RDD[C],
+ var rdd4: RDD[D])
+ extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) {
+
+ override def compute(s: Partition, context: TaskContext): Iterator[V] = {
+ val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
+ f(rdd1.iterator(partitions(0), context),
+ rdd2.iterator(partitions(1), context),
+ rdd3.iterator(partitions(2), context),
+ rdd4.iterator(partitions(3), context))
+ }
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ rdd1 = null
+ rdd2 = null
+ rdd3 = null
+ rdd4 = null
+ }
+}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 6fae62d373..ac26c16867 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -15,6 +15,7 @@ import akka.util.duration._
import spark.{Logging, SparkException, Utils}
+
private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
@@ -88,6 +89,21 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi
}
/**
+ * Remove all blocks belonging to the given RDD.
+ */
+ def removeRdd(rddId: Int) {
+ val rddBlockPrefix = "rdd_" + rddId + "_"
+ // Get the list of blocks in block manager, and remove ones that are part of this RDD.
+ // The runtime complexity is linear to the number of blocks persisted in the cluster.
+ // It could be expensive if the cluster is large and has a lot of blocks persisted.
+ getStorageStatus.flatMap(_.blocks).foreach { case(blockId, status) =>
+ if (blockId.startsWith(rddBlockPrefix)) {
+ removeBlock(blockId)
+ }
+ }
+ }
+
+ /**
* Return the memory status for each block manager, in the form of a map from
* the block manager's id to two long values. The first value is the maximum
* amount of memory allocated for the block manager, while the second is the
diff --git a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala b/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala
deleted file mode 100644
index f6c28dce52..0000000000
--- a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala
+++ /dev/null
@@ -1,12 +0,0 @@
-package spark.storage
-
-private[spark] trait DelegateBlockFetchTracker extends BlockFetchTracker {
- var delegate : BlockFetchTracker = _
- def setDelegate(d: BlockFetchTracker) {delegate = d}
- def totalBlocks = delegate.totalBlocks
- def numLocalBlocks = delegate.numLocalBlocks
- def numRemoteBlocks = delegate.numRemoteBlocks
- def remoteFetchTime = delegate.remoteFetchTime
- def fetchWaitTime = delegate.fetchWaitTime
- def remoteBytesRead = delegate.remoteBytesRead
-}
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index c9553d2e0f..215c25132b 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -168,8 +168,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
localDir = new File(rootDir, "spark-local-" + localDirId)
if (!localDir.exists) {
- localDir.mkdirs()
- foundLocalDir = true
+ foundLocalDir = localDir.mkdirs()
}
} catch {
case e: Exception =>
diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala
index dec47a9d41..8f52168c24 100644
--- a/core/src/main/scala/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/spark/storage/StorageUtils.scala
@@ -4,9 +4,9 @@ import spark.{Utils, SparkContext}
import BlockManagerMasterActor.BlockStatus
private[spark]
-case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
+case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
blocks: Map[String, BlockStatus]) {
-
+
def memUsed(blockPrefix: String = "") = {
blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.memSize).
reduceOption(_+_).getOrElse(0l)
@@ -22,35 +22,40 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
}
case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
- numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long) {
+ numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long)
+ extends Ordered[RDDInfo] {
override def toString = {
import Utils.memoryBytesToString
"RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id,
storageLevel.toString, numCachedPartitions, numPartitions, memoryBytesToString(memSize), memoryBytesToString(diskSize))
}
+
+ override def compare(that: RDDInfo) = {
+ this.id - that.id
+ }
}
/* Helper methods for storage-related objects */
private[spark]
object StorageUtils {
- /* Given the current storage status of the BlockManager, returns information for each RDD */
- def rddInfoFromStorageStatus(storageStatusList: Array[StorageStatus],
+ /* Given the current storage status of the BlockManager, returns information for each RDD */
+ def rddInfoFromStorageStatus(storageStatusList: Array[StorageStatus],
sc: SparkContext) : Array[RDDInfo] = {
- rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc)
+ rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc)
}
- /* Given a list of BlockStatus objets, returns information for each RDD */
- def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus],
+ /* Given a list of BlockStatus objets, returns information for each RDD */
+ def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus],
sc: SparkContext) : Array[RDDInfo] = {
// Group by rddId, ignore the partition name
- val groupedRddBlocks = infos.groupBy { case(k, v) =>
+ val groupedRddBlocks = infos.filterKeys(_.startsWith("rdd_")).groupBy { case(k, v) =>
k.substring(0,k.lastIndexOf('_'))
}.mapValues(_.values.toArray)
// For each RDD, generate an RDDInfo object
- groupedRddBlocks.map { case(rddKey, rddBlocks) =>
+ val rddInfos = groupedRddBlocks.map { case(rddKey, rddBlocks) =>
// Add up memory and disk sizes
val memSize = rddBlocks.map(_.memSize).reduce(_ + _)
@@ -65,10 +70,14 @@ object StorageUtils {
RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.partitions.size, memSize, diskSize)
}.toArray
+
+ scala.util.Sorting.quickSort(rddInfos)
+
+ rddInfos
}
- /* Removes all BlockStatus object that are not part of a block prefix */
- def filterStorageStatusByPrefix(storageStatusList: Array[StorageStatus],
+ /* Removes all BlockStatus object that are not part of a block prefix */
+ def filterStorageStatusByPrefix(storageStatusList: Array[StorageStatus],
prefix: String) : Array[StorageStatus] = {
storageStatusList.map { status =>
diff --git a/core/src/main/scala/spark/util/TimedIterator.scala b/core/src/main/scala/spark/util/TimedIterator.scala
deleted file mode 100644
index 539b01f4ce..0000000000
--- a/core/src/main/scala/spark/util/TimedIterator.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-package spark.util
-
-/**
- * A utility for tracking the total time an iterator takes to iterate through its elements.
- *
- * In general, this should only be used if you expect it to take a considerable amount of time
- * (eg. milliseconds) to get each element -- otherwise, the timing won't be very accurate,
- * and you are probably just adding more overhead
- */
-class TimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] {
- private var netMillis = 0l
- private var nElems = 0
- def hasNext = {
- val start = System.currentTimeMillis()
- val r = sub.hasNext
- val end = System.currentTimeMillis()
- netMillis += (end - start)
- r
- }
- def next = {
- val start = System.currentTimeMillis()
- val r = sub.next
- val end = System.currentTimeMillis()
- netMillis += (end - start)
- nElems += 1
- r
- }
-
- def getNetMillis = netMillis
- def getAverageTimePerItem = netMillis / nElems.toDouble
-
-}
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index c9b4707def..ab3e197035 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -3,8 +3,10 @@ package spark
import network.ConnectionManagerId
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Timeouts._
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.prop.Checkers
+import org.scalatest.time.{Span, Millis}
import org.scalacheck.Arbitrary._
import org.scalacheck.Gen
import org.scalacheck.Prop._
@@ -252,12 +254,36 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(data2.count === 2)
}
}
+
+ test("unpersist RDDs") {
+ DistributedSuite.amMaster = true
+ sc = new SparkContext("local-cluster[3,1,512]", "test")
+ val data = sc.parallelize(Seq(true, false, false, false), 4)
+ data.persist(StorageLevel.MEMORY_ONLY_2)
+ data.count
+ assert(sc.persistentRdds.isEmpty == false)
+ data.unpersist()
+ assert(sc.persistentRdds.isEmpty == true)
+
+ failAfter(Span(3000, Millis)) {
+ try {
+ while (! sc.getRDDStorageInfo.isEmpty) {
+ Thread.sleep(200)
+ }
+ } catch {
+ case e: Exception =>
+ // Do nothing. We might see exceptions because block manager
+ // is racing this thread to remove entries from the driver.
+ }
+ }
+ assert(sc.getRDDStorageInfo.isEmpty == true)
+ }
}
object DistributedSuite {
// Indicates whether this JVM is marked for failure.
var mark = false
-
+
// Set by test to remember if we are in the driver program so we can assert
// that we are not.
var amMaster = false
@@ -274,9 +300,9 @@ object DistributedSuite {
// Act like an identity function, but if mark was set to true previously, fail,
// crashing the entire JVM.
def failOnMarkedIdentity(item: Boolean): Boolean = {
- if (mark) {
+ if (mark) {
System.exit(42)
- }
+ }
item
- }
+ }
}
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index 875975ca43..b5cedc0b68 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -82,7 +82,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("remote fetch") {
val hostname = "localhost"
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0)
+ System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+
val masterTracker = new MapOutputTracker()
masterTracker.trackerActor = actorSystem.actorOf(
Props(new MapOutputTrackerActor(masterTracker)), "MapOutputTracker")
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 7fbdd44340..cee6312572 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -2,6 +2,8 @@ package spark
import scala.collection.mutable.HashMap
import org.scalatest.FunSuite
+import org.scalatest.concurrent.Timeouts._
+import org.scalatest.time.{Span, Millis}
import spark.SparkContext._
import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD}
@@ -100,6 +102,28 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(rdd.collect().toList === List(1, 2, 3, 4))
}
+ test("unpersist RDD") {
+ sc = new SparkContext("local", "test")
+ val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
+ rdd.count
+ assert(sc.persistentRdds.isEmpty == false)
+ rdd.unpersist()
+ assert(sc.persistentRdds.isEmpty == true)
+
+ failAfter(Span(3000, Millis)) {
+ try {
+ while (! sc.getRDDStorageInfo.isEmpty) {
+ Thread.sleep(200)
+ }
+ } catch {
+ case e: Exception =>
+ // Do nothing. We might see exceptions because block manager
+ // is racing this thread to remove entries from the driver.
+ }
+ }
+ assert(sc.getRDDStorageInfo.isEmpty == true)
+ }
+
test("caching with failures") {
sc = new SparkContext("local", "test")
val onlySplit = new Partition { override def index: Int = 0 }
diff --git a/core/src/test/scala/spark/ZippedPartitionsSuite.scala b/core/src/test/scala/spark/ZippedPartitionsSuite.scala
new file mode 100644
index 0000000000..5f60aa75d7
--- /dev/null
+++ b/core/src/test/scala/spark/ZippedPartitionsSuite.scala
@@ -0,0 +1,34 @@
+package spark
+
+import scala.collection.immutable.NumericRange
+
+import org.scalatest.FunSuite
+import org.scalatest.prop.Checkers
+import org.scalacheck.Arbitrary._
+import org.scalacheck.Gen
+import org.scalacheck.Prop._
+
+import SparkContext._
+
+
+object ZippedPartitionsSuite {
+ def procZippedData(i: Iterator[Int], s: Iterator[String], d: Iterator[Double]) : Iterator[Int] = {
+ Iterator(i.toArray.size, s.toArray.size, d.toArray.size)
+ }
+}
+
+class ZippedPartitionsSuite extends FunSuite with LocalSparkContext {
+ test("print sizes") {
+ sc = new SparkContext("local", "test")
+ val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
+ val data3 = sc.makeRDD(Array(1.0, 2.0), 2)
+
+ val zippedRDD = data1.zipPartitions(ZippedPartitionsSuite.procZippedData, data2, data3)
+
+ val obtainedSizes = zippedRDD.collect()
+ val expectedSizes = Array(2, 3, 1, 2, 3, 1)
+ assert(obtainedSizes.size == 6)
+ assert(obtainedSizes.zip(expectedSizes).forall(x => x._1 == x._2))
+ }
+}
diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
index 2f5af10e69..42a87d8b90 100644
--- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
@@ -57,7 +57,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
taskMetrics.shuffleReadMetrics should be ('defined)
val sm = taskMetrics.shuffleReadMetrics.get
sm.totalBlocksFetched should be > (0)
- sm.shuffleReadMillis should be > (0l)
sm.localBlocksFetched should be > (0)
sm.remoteBlocksFetched should be (0)
sm.remoteBytesRead should be (0l)
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 3fc2825255..71d1f0bcc8 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -15,6 +15,8 @@ import org.scalatest.time.SpanSugar._
import spark.JavaSerializer
import spark.KryoSerializer
import spark.SizeEstimator
+import spark.Utils
+import spark.util.AkkaUtils
import spark.util.ByteBufferInputStream
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
@@ -31,7 +33,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val serializer = new KryoSerializer
before {
- actorSystem = ActorSystem("test")
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0)
+ this.actorSystem = actorSystem
+ System.setProperty("spark.driver.port", boundPort.toString)
+ System.setProperty("spark.hostPort", "localhost:" + boundPort)
+
master = new BlockManagerMaster(
actorSystem.actorOf(Props(new spark.storage.BlockManagerMasterActor(true))))
@@ -46,6 +52,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
after {
+ System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
+
if (store != null) {
store.stop()
store = null
@@ -200,6 +209,31 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
}
+ test("removing rdd") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ // Putting a1, a2 and a3 in memory.
+ store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
+ master.removeRdd(0)
+
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("rdd_0_0") should be (None)
+ master.getLocations("rdd_0_0") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("rdd_0_1") should be (None)
+ master.getLocations("rdd_0_1") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("nonrddblock") should not be (None)
+ master.getLocations("nonrddblock") should have size (1)
+ }
+ }
+
test("reregistration on heart beat") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)