aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/Aggregator.scala8
-rw-r--r--core/src/main/scala/spark/CartesianRDD.scala44
-rw-r--r--core/src/main/scala/spark/DAGScheduler.scala236
-rw-r--r--core/src/main/scala/spark/DaemonThreadFactory.scala14
-rw-r--r--core/src/main/scala/spark/Dependency.scala30
-rw-r--r--core/src/main/scala/spark/DfsShuffle.scala120
-rw-r--r--core/src/main/scala/spark/Executor.scala2
-rw-r--r--core/src/main/scala/spark/HadoopFile.scala16
-rw-r--r--core/src/main/scala/spark/LocalFileShuffle.scala3
-rw-r--r--core/src/main/scala/spark/LocalScheduler.scala44
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala56
-rw-r--r--core/src/main/scala/spark/MesosScheduler.scala7
-rw-r--r--core/src/main/scala/spark/NumberedSplitRDD.scala42
-rw-r--r--core/src/main/scala/spark/ParallelArray.scala9
-rw-r--r--core/src/main/scala/spark/Partitioner.scala22
-rw-r--r--core/src/main/scala/spark/RDD.scala361
-rw-r--r--core/src/main/scala/spark/RDDCache.scala92
-rw-r--r--core/src/main/scala/spark/ResultTask.scala14
-rw-r--r--core/src/main/scala/spark/SampledRDD.scala36
-rw-r--r--core/src/main/scala/spark/Scheduler.scala3
-rw-r--r--core/src/main/scala/spark/ShuffleMapTask.scala38
-rw-r--r--core/src/main/scala/spark/ShuffledRDD.scala60
-rw-r--r--core/src/main/scala/spark/SparkContext.scala42
-rw-r--r--core/src/main/scala/spark/Split.scala9
-rw-r--r--core/src/main/scala/spark/Stage.scala34
-rw-r--r--core/src/main/scala/spark/UnionRDD.scala43
-rw-r--r--core/src/main/scala/spark/Utils.scala7
-rw-r--r--examples/src/main/scala/spark/examples/CpuHog.scala26
-rw-r--r--examples/src/main/scala/spark/examples/SleepJob.scala21
29 files changed, 908 insertions, 531 deletions
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
new file mode 100644
index 0000000000..87453c9c15
--- /dev/null
+++ b/core/src/main/scala/spark/Aggregator.scala
@@ -0,0 +1,8 @@
+package spark
+
+@serializable
+class Aggregator[K, V, C] (
+ val createCombiner: V => C,
+ val mergeValue: (C, V) => C,
+ val mergeCombiners: (C, C) => C
+) \ No newline at end of file
diff --git a/core/src/main/scala/spark/CartesianRDD.scala b/core/src/main/scala/spark/CartesianRDD.scala
new file mode 100644
index 0000000000..42a9b3b23c
--- /dev/null
+++ b/core/src/main/scala/spark/CartesianRDD.scala
@@ -0,0 +1,44 @@
+package spark
+
+@serializable class CartesianSplit(idx: Int, val s1: Split, val s2: Split)
+extends Split {
+ override val index = idx
+}
+
+@serializable
+class CartesianRDD[T: ClassManifest, U:ClassManifest](
+ sc: SparkContext, rdd1: RDD[T], rdd2: RDD[U])
+extends RDD[Pair[T, U]](sc) {
+ val numSplitsInRdd2 = rdd2.splits.size
+
+ @transient val splits_ = {
+ // create the cross product split
+ val array = new Array[Split](rdd1.splits.size * rdd2.splits.size)
+ for (s1 <- rdd1.splits; s2 <- rdd2.splits) {
+ val idx = s1.index * numSplitsInRdd2 + s2.index
+ array(idx) = new CartesianSplit(idx, s1, s2)
+ }
+ array
+ }
+
+ override def splits = splits_.asInstanceOf[Array[Split]]
+
+ override def preferredLocations(split: Split) = {
+ val currSplit = split.asInstanceOf[CartesianSplit]
+ rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)
+ }
+
+ override def compute(split: Split) = {
+ val currSplit = split.asInstanceOf[CartesianSplit]
+ for (x <- rdd1.iterator(currSplit.s1); y <- rdd2.iterator(currSplit.s2)) yield (x, y)
+ }
+
+ override val dependencies = List(
+ new NarrowDependency(rdd1) {
+ def getParents(id: Int): Seq[Int] = List(id / numSplitsInRdd2)
+ },
+ new NarrowDependency(rdd2) {
+ def getParents(id: Int): Seq[Int] = List(id % numSplitsInRdd2)
+ }
+ )
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala
new file mode 100644
index 0000000000..ee3fda25a8
--- /dev/null
+++ b/core/src/main/scala/spark/DAGScheduler.scala
@@ -0,0 +1,236 @@
+package spark
+
+import java.util.concurrent.LinkedBlockingQueue
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
+
+/**
+ * A Scheduler subclass that implements stage-oriented scheduling. It computes
+ * a DAG of stages for each job, keeps track of which RDDs and stage outputs
+ * are materialized, and computes a minimal schedule to run the job. Subclasses
+ * only need to implement the code to send a task to the cluster and to report
+ * failures from it (the submitTasks method, and code to add completionEvents).
+ */
+private abstract class DAGScheduler extends Scheduler with Logging {
+ // Must be implemented by subclasses to start running a set of tasks
+ def submitTasks(tasks: Seq[Task[_]]): Unit
+
+ // Must be called by subclasses to report task completions or failures
+ def taskEnded(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any]) {
+ completionEvents.put(CompletionEvent(task, successful, result, accumUpdates))
+ }
+
+ private val completionEvents = new LinkedBlockingQueue[CompletionEvent]
+
+ var nextStageId = 0
+
+ def newStageId() = {
+ var res = nextStageId
+ nextStageId += 1
+ res
+ }
+
+ val idToStage = new HashMap[Int, Stage]
+
+ val shuffleToMapStage = new HashMap[ShuffleDependency[_,_,_], Stage]
+
+ val cacheLocs = new HashMap[RDD[_], Array[List[String]]]
+
+ def getCacheLocs(rdd: RDD[_]): Array[List[String]] = {
+ cacheLocs.getOrElseUpdate(rdd, Array.fill[List[String]](rdd.splits.size)(Nil))
+ }
+
+ def addCacheLoc(rdd: RDD[_], partition: Int, host: String) {
+ val locs = getCacheLocs(rdd)
+ locs(partition) = host :: locs(partition)
+ }
+
+ def removeCacheLoc(rdd: RDD[_], partition: Int, host: String) {
+ val locs = getCacheLocs(rdd)
+ locs(partition) -= host
+ }
+
+ def getShuffleMapStage(shuf: ShuffleDependency[_,_,_]): Stage = {
+ shuffleToMapStage.get(shuf) match {
+ case Some(stage) => stage
+ case None =>
+ val stage = newStage(shuf.rdd, Some(shuf))
+ shuffleToMapStage(shuf) = stage
+ stage
+ }
+ }
+
+ def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_,_]]): Stage = {
+ val id = newStageId()
+ val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd))
+ idToStage(id) = stage
+ stage
+ }
+
+ def getParentStages(rdd: RDD[_]): List[Stage] = {
+ val parents = new HashSet[Stage]
+ val visited = new HashSet[RDD[_]]
+ def visit(r: RDD[_]) {
+ if (!visited(r)) {
+ visited += r
+ for (dep <- r.dependencies) {
+ dep match {
+ case shufDep: ShuffleDependency[_,_,_] =>
+ parents += getShuffleMapStage(shufDep)
+ case _ =>
+ visit(dep.rdd)
+ }
+ }
+ }
+ }
+ visit(rdd)
+ parents.toList
+ }
+
+ def getMissingParentStages(stage: Stage): List[Stage] = {
+ val missing = new HashSet[Stage]
+ val visited = new HashSet[RDD[_]]
+ def visit(rdd: RDD[_]) {
+ if (!visited(rdd)) {
+ visited += rdd
+ val locs = getCacheLocs(rdd)
+ for (p <- 0 until rdd.splits.size) {
+ if (locs(p) == Nil) {
+ for (dep <- rdd.dependencies) {
+ dep match {
+ case shufDep: ShuffleDependency[_,_,_] =>
+ val stage = getShuffleMapStage(shufDep)
+ if (!stage.isAvailable)
+ missing += stage
+ case narrowDep: NarrowDependency[_] =>
+ visit(narrowDep.rdd)
+ }
+ }
+ }
+ }
+ }
+ }
+ visit(stage.rdd)
+ missing.toList
+ }
+
+ override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U])
+ : Array[U] = {
+ val numOutputParts: Int = rdd.splits.size
+ val finalStage = newStage(rdd, None)
+ val results = new Array[U](numOutputParts)
+ val finished = new Array[Boolean](numOutputParts)
+ var numFinished = 0
+
+ val waiting = new HashSet[Stage]
+ val running = new HashSet[Stage]
+ val pendingTasks = new HashMap[Stage, HashSet[Task[_]]]
+
+ logInfo("Final stage: " + finalStage)
+ logInfo("Parents of final stage: " + finalStage.parents)
+ logInfo("Missing parents: " + getMissingParentStages(finalStage))
+
+ def submitStage(stage: Stage) {
+ if (!waiting(stage) && !running(stage)) {
+ val missing = getMissingParentStages(stage)
+ if (missing == Nil) {
+ logInfo("Submitting " + stage + ", which has no missing parents")
+ submitMissingTasks(stage)
+ running += stage
+ } else {
+ for (parent <- missing)
+ submitStage(parent)
+ waiting += stage
+ }
+ }
+ }
+
+ def submitMissingTasks(stage: Stage) {
+ val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
+ var tasks = ArrayBuffer[Task[_]]()
+ if (stage == finalStage) {
+ for (p <- 0 until numOutputParts if (!finished(p))) {
+ val locs = getPreferredLocs(rdd, p)
+ tasks += new ResultTask(finalStage.id, rdd, func, p, locs)
+ }
+ } else {
+ for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
+ val locs = getPreferredLocs(stage.rdd, p)
+ tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
+ }
+ }
+ myPending ++= tasks
+ submitTasks(tasks)
+ }
+
+ submitStage(finalStage)
+
+ while (numFinished != numOutputParts) {
+ val evt = completionEvents.take()
+ if (evt.successful) {
+ logInfo("Completed " + evt.task)
+ Accumulators.add(currentThread, evt.accumUpdates)
+ evt.task match {
+ case rt: ResultTask[_, _] =>
+ results(rt.partition) = evt.result.asInstanceOf[U]
+ finished(rt.partition) = true
+ numFinished += 1
+ pendingTasks(finalStage) -= rt
+ case smt: ShuffleMapTask =>
+ val stage = idToStage(smt.stageId)
+ stage.addOutputLoc(smt.partition, evt.result.asInstanceOf[String])
+ val pending = pendingTasks(stage)
+ pending -= smt
+ MapOutputTracker.registerMapOutputs(
+ stage.shuffleDep.get.shuffleId,
+ stage.outputLocs.map(_.first).toArray)
+ if (pending.isEmpty) {
+ logInfo(stage + " finished; looking for newly runnable stages")
+ running -= stage
+ val newlyRunnable = new ArrayBuffer[Stage]
+ for (stage <- waiting if getMissingParentStages(stage) == Nil) {
+ newlyRunnable += stage
+ }
+ waiting --= newlyRunnable
+ running ++= newlyRunnable
+ for (stage <- newlyRunnable) {
+ submitMissingTasks(stage)
+ }
+ }
+ }
+ } else {
+ throw new SparkException("Task failed: " + evt.task)
+ // TODO: Kill the running job
+ }
+ }
+
+ return results
+ }
+
+ def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = {
+ // If the partition is cached, return the cache locations
+ val cached = getCacheLocs(rdd)(partition)
+ if (cached != Nil) {
+ return cached
+ }
+ // If the RDD has some placement preferences (as is the case for input RDDs), get those
+ val rddPrefs = rdd.preferredLocations(rdd.splits(partition)).toList
+ if (rddPrefs != Nil) {
+ return rddPrefs
+ }
+ // If the RDD has narrow dependencies, pick the first partition of the first narrow dep
+ // that has any placement preferences. Ideally we would choose based on transfer sizes,
+ // but this will do for now.
+ rdd.dependencies.foreach(_ match {
+ case n: NarrowDependency[_] =>
+ for (inPart <- n.getParents(partition)) {
+ val locs = getPreferredLocs(n.rdd, inPart)
+ if (locs != Nil)
+ return locs;
+ }
+ case _ =>
+ })
+ return Nil
+ }
+}
+
+case class CompletionEvent(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any])
diff --git a/core/src/main/scala/spark/DaemonThreadFactory.scala b/core/src/main/scala/spark/DaemonThreadFactory.scala
new file mode 100644
index 0000000000..cb30cb2ac8
--- /dev/null
+++ b/core/src/main/scala/spark/DaemonThreadFactory.scala
@@ -0,0 +1,14 @@
+package spark
+
+import java.util.concurrent.ThreadFactory
+
+/**
+ * A ThreadFactory that creates daemon threads
+ */
+private object DaemonThreadFactory extends ThreadFactory {
+ override def newThread(r: Runnable): Thread = {
+ val t = new Thread(r);
+ t.setDaemon(true)
+ return t
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala
new file mode 100644
index 0000000000..20b0357e44
--- /dev/null
+++ b/core/src/main/scala/spark/Dependency.scala
@@ -0,0 +1,30 @@
+package spark
+
+@serializable
+abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean)
+
+abstract class NarrowDependency[T](rdd: RDD[T])
+extends Dependency(rdd, false) {
+ def getParents(outputPartition: Int): Seq[Int]
+}
+
+class ShuffleDependency[K, V, C](
+ val shuffleId: Int,
+ rdd: RDD[(K, V)],
+ val aggregator: Aggregator[K, V, C],
+ val partitioner: Partitioner[K]
+) extends Dependency(rdd, true)
+
+class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
+ override def getParents(partitionId: Int) = List(partitionId)
+}
+
+class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
+extends NarrowDependency[T](rdd) {
+ override def getParents(partitionId: Int) = {
+ if (partitionId >= outStart && partitionId < outStart + length)
+ List(partitionId - outStart + inStart)
+ else
+ Nil
+ }
+}
diff --git a/core/src/main/scala/spark/DfsShuffle.scala b/core/src/main/scala/spark/DfsShuffle.scala
deleted file mode 100644
index 7a42bf2d06..0000000000
--- a/core/src/main/scala/spark/DfsShuffle.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-package spark
-
-import java.io.{EOFException, ObjectInputStream, ObjectOutputStream}
-import java.net.URI
-import java.util.UUID
-
-import scala.collection.mutable.HashMap
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
-
-
-/**
- * A simple implementation of shuffle using a distributed file system.
- *
- * TODO: Add support for compression when spark.compress is set to true.
- */
-@serializable
-class DfsShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
- override def compute(input: RDD[(K, V)],
- numOutputSplits: Int,
- createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiners: (C, C) => C)
- : RDD[(K, C)] =
- {
- val sc = input.sparkContext
- val dir = DfsShuffle.newTempDirectory()
- logInfo("Intermediate data directory: " + dir)
-
- val numberedSplitRdd = new NumberedSplitRDD(input)
- val numInputSplits = numberedSplitRdd.splits.size
-
- // Run a parallel foreach to write the intermediate data files
- numberedSplitRdd.foreach((pair: (Int, Iterator[(K, V)])) => {
- val myIndex = pair._1
- val myIterator = pair._2
- val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[K, C])
- for ((k, v) <- myIterator) {
- var bucketId = k.hashCode % numOutputSplits
- if (bucketId < 0) { // Fix bucket ID if hash code was negative
- bucketId += numOutputSplits
- }
- val bucket = buckets(bucketId)
- bucket(k) = bucket.get(k) match {
- case Some(c) => mergeValue(c, v)
- case None => createCombiner(v)
- }
- }
- val fs = DfsShuffle.getFileSystem()
- for (i <- 0 until numOutputSplits) {
- val path = new Path(dir, "%d-to-%d".format(myIndex, i))
- val out = new ObjectOutputStream(fs.create(path, true))
- buckets(i).foreach(pair => out.writeObject(pair))
- out.close()
- }
- })
-
- // Return an RDD that does each of the merges for a given partition
- val indexes = sc.parallelize(0 until numOutputSplits, numOutputSplits)
- return indexes.flatMap((myIndex: Int) => {
- val combiners = new HashMap[K, C]
- val fs = DfsShuffle.getFileSystem()
- for (i <- Utils.shuffle(0 until numInputSplits)) {
- val path = new Path(dir, "%d-to-%d".format(i, myIndex))
- val inputStream = new ObjectInputStream(fs.open(path))
- try {
- while (true) {
- val (k, c) = inputStream.readObject().asInstanceOf[(K, C)]
- combiners(k) = combiners.get(k) match {
- case Some(oldC) => mergeCombiners(oldC, c)
- case None => c
- }
- }
- } catch {
- case e: EOFException => {}
- }
- inputStream.close()
- }
- combiners
- })
- }
-}
-
-
-/**
- * Companion object of DfsShuffle; responsible for initializing a Hadoop
- * FileSystem object based on the spark.dfs property and generating names
- * for temporary directories.
- */
-object DfsShuffle {
- private var initialized = false
- private var fileSystem: FileSystem = null
-
- private def initializeIfNeeded() = synchronized {
- if (!initialized) {
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
- val dfs = System.getProperty("spark.dfs", "file:///")
- val conf = new Configuration()
- conf.setInt("io.file.buffer.size", bufferSize)
- conf.setInt("dfs.replication", 1)
- fileSystem = FileSystem.get(new URI(dfs), conf)
- initialized = true
- }
- }
-
- def getFileSystem(): FileSystem = {
- initializeIfNeeded()
- return fileSystem
- }
-
- def newTempDirectory(): String = {
- val fs = getFileSystem()
- val workDir = System.getProperty("spark.dfs.workdir", "/tmp")
- val uuid = UUID.randomUUID()
- val path = workDir + "/shuffle-" + uuid
- fs.mkdirs(new Path(path))
- return path
- }
-}
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index b4d023b428..35469aeb3f 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -25,6 +25,8 @@ class Executor extends mesos.Executor with Logging {
// Initialize cache and broadcast system (uses some properties read above)
Cache.initialize()
Broadcast.initialize(false)
+ MapOutputTracker.initialize(false)
+ RDDCache.initialize(false)
// Create our ClassLoader (using spark properties) and set it on this thread
classLoader = createClassLoader()
diff --git a/core/src/main/scala/spark/HadoopFile.scala b/core/src/main/scala/spark/HadoopFile.scala
index f5e80d5432..0a7996c7bd 100644
--- a/core/src/main/scala/spark/HadoopFile.scala
+++ b/core/src/main/scala/spark/HadoopFile.scala
@@ -14,12 +14,13 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
/** A Spark split class that wraps around a Hadoop InputSplit */
-@serializable class HadoopSplit(@transient s: InputSplit)
+@serializable class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
extends Split {
val inputSplit = new SerializableWritable[InputSplit](s)
- // Hadoop gives each split a unique toString value, so use this as our ID
- override def getId() = "HadoopSplit(" + inputSplit.toString + ")"
+ override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt
+
+ override val index = idx
}
@@ -39,7 +40,10 @@ extends RDD[(K, V)](sc) {
FileInputFormat.setInputPaths(conf, path)
val inputFormat = createInputFormat(conf)
val inputSplits = inputFormat.getSplits(conf, sc.numCores)
- inputSplits.map(x => new HadoopSplit(x): Split).toArray
+ val array = new Array[Split] (inputSplits.size)
+ for (i <- 0 until inputSplits.size)
+ array(i) = new HadoopSplit(id, i, inputSplits(i))
+ array
}
def createInputFormat(conf: JobConf): InputFormat[K, V] = {
@@ -49,7 +53,7 @@ extends RDD[(K, V)](sc) {
override def splits = splits_
- override def iterator(theSplit: Split) = new Iterator[(K, V)] {
+ override def compute(theSplit: Split) = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopSplit]
var reader: RecordReader[K, V] = null
@@ -97,6 +101,8 @@ extends RDD[(K, V)](sc) {
val hadoopSplit = split.asInstanceOf[HadoopSplit]
hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost")
}
+
+ override val dependencies: List[Dependency[_]] = Nil
}
diff --git a/core/src/main/scala/spark/LocalFileShuffle.scala b/core/src/main/scala/spark/LocalFileShuffle.scala
index 367599cfb4..fd70c54c0c 100644
--- a/core/src/main/scala/spark/LocalFileShuffle.scala
+++ b/core/src/main/scala/spark/LocalFileShuffle.scala
@@ -13,6 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
*
* TODO: Add support for compression when spark.compress is set to true.
*/
+/*
@serializable
class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
override def compute(input: RDD[(K, V)],
@@ -90,7 +91,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
})
}
}
-
+*/
object LocalFileShuffle extends Logging {
private var initialized = false
diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala
index 20954a1224..0287082687 100644
--- a/core/src/main/scala/spark/LocalScheduler.scala
+++ b/core/src/main/scala/spark/LocalScheduler.scala
@@ -2,40 +2,35 @@ package spark
import java.util.concurrent._
-import scala.collection.mutable.Map
-
/**
* A simple Scheduler implementation that runs tasks locally in a thread pool.
*/
-private class LocalScheduler(threads: Int) extends Scheduler with Logging {
+private class LocalScheduler(threads: Int) extends DAGScheduler with Logging {
var threadPool: ExecutorService =
Executors.newFixedThreadPool(threads, DaemonThreadFactory)
override def start() {}
override def waitForRegister() {}
-
- override def runTasks[T](tasks: Array[Task[T]])(implicit m: ClassManifest[T])
- : Array[T] = {
- val futures = new Array[Future[TaskResult[T]]](tasks.length)
-
- for (i <- 0 until tasks.length) {
- futures(i) = threadPool.submit(new Callable[TaskResult[T]]() {
- def call(): TaskResult[T] = {
+
+ override def submitTasks(tasks: Seq[Task[_]]) {
+ tasks.zipWithIndex.foreach { case (task, i) =>
+ threadPool.submit(new Runnable {
+ def run() {
logInfo("Running task " + i)
try {
// Serialize and deserialize the task so that accumulators are
// changed to thread-local ones; this adds a bit of unnecessary
- // overhead but matches how the Nexus Executor works
+ // overhead but matches how the Mesos Executor works
Accumulators.clear
val bytes = Utils.serialize(tasks(i))
logInfo("Size of task " + i + " is " + bytes.size + " bytes")
- val task = Utils.deserialize[Task[T]](
+ val deserializedTask = Utils.deserialize[Task[_]](
bytes, currentThread.getContextClassLoader)
- val value = task.run
+ val result: Any = deserializedTask.run
val accumUpdates = Accumulators.values
logInfo("Finished task " + i)
- new TaskResult[T](value, accumUpdates)
+ taskEnded(tasks(i), true, result, accumUpdates)
} catch {
case e: Exception => {
// TODO: Do something nicer here
@@ -47,26 +42,9 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging {
}
})
}
-
- val taskResults = futures.map(_.get)
- for (result <- taskResults)
- Accumulators.add(currentThread, result.accumUpdates)
- return taskResults.map(_.value).toArray(m)
}
override def stop() {}
override def numCores() = threads
-}
-
-
-/**
- * A ThreadFactory that creates daemon threads
- */
-private object DaemonThreadFactory extends ThreadFactory {
- override def newThread(r: Runnable): Thread = {
- val t = new Thread(r);
- t.setDaemon(true)
- return t
- }
-}
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
new file mode 100644
index 0000000000..ac62c6e411
--- /dev/null
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -0,0 +1,56 @@
+package spark
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.actors._
+import scala.actors.Actor._
+import scala.actors.remote._
+
+class MapOutputTracker extends DaemonActor with Logging {
+ def act() {
+ val port = System.getProperty("spark.master.port", "50501").toInt
+ RemoteActor.alive(port)
+ RemoteActor.register('MapOutputTracker, self)
+ logInfo("Started on port " + port)
+ }
+}
+
+object MapOutputTracker {
+ var trackerActor: AbstractActor = null
+
+ def initialize(isMaster: Boolean) {
+ if (isMaster) {
+ val tracker = new MapOutputTracker
+ tracker.start
+ trackerActor = tracker
+ } else {
+ val host = System.getProperty("spark.master.host")
+ val port = System.getProperty("spark.master.port").toInt
+ trackerActor = RemoteActor.select(Node(host, port), 'MapOutputTracker)
+ }
+ }
+
+ private val serverUris = new ConcurrentHashMap[Int, Array[String]]
+
+ def registerMapOutput(shuffleId: Int, numMaps: Int, mapId: Int, serverUri: String) {
+ var array = serverUris.get(shuffleId)
+ if (array == null) {
+ array = Array.fill[String](numMaps)(null)
+ serverUris.put(shuffleId, array)
+ }
+ array(mapId) = serverUri
+ }
+
+ def registerMapOutputs(shuffleId: Int, locs: Array[String]) {
+ serverUris.put(shuffleId, Array[String]() ++ locs)
+ }
+
+ def getServerUris(shuffleId: Int): Array[String] = {
+ // TODO: On remote node, fetch locations from master
+ serverUris.get(shuffleId)
+ }
+
+ def getMapOutputUri(serverUri: String, shuffleId: Int, mapId: Int, reduceId: Int): String = {
+ "%s/shuffle/%s/%s/%s".format(serverUri, shuffleId, mapId, reduceId)
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala
index 6a592d13c3..35ad552775 100644
--- a/core/src/main/scala/spark/MesosScheduler.scala
+++ b/core/src/main/scala/spark/MesosScheduler.scala
@@ -105,7 +105,7 @@ extends MScheduler with spark.Scheduler with Logging
* The primary means to submit a job to the scheduler. Given a list of tasks,
* runs them and returns an array of the results.
*/
- override def runTasks[T: ClassManifest](tasks: Array[Task[T]]): Array[T] = {
+ def runTasks[T: ClassManifest](tasks: Array[Task[T]]): Array[T] = {
waitForRegister()
val jobId = newJobId()
val myJob = new SimpleJob(this, tasks, jobId)
@@ -291,4 +291,9 @@ extends MScheduler with spark.Scheduler with Logging
// Serialize the map as an array of (String, String) pairs
return Utils.serialize(props.toArray)
}
+
+ override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U])
+ : Array[U] = {
+ new Array[U](0)
+ }
}
diff --git a/core/src/main/scala/spark/NumberedSplitRDD.scala b/core/src/main/scala/spark/NumberedSplitRDD.scala
deleted file mode 100644
index 7b12210d84..0000000000
--- a/core/src/main/scala/spark/NumberedSplitRDD.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-package spark
-
-import mesos.SlaveOffer
-
-
-/**
- * An RDD that takes the splits of a parent RDD and gives them unique indexes.
- * This is useful for a variety of shuffle implementations.
- */
-class NumberedSplitRDD[T: ClassManifest](prev: RDD[T])
-extends RDD[(Int, Iterator[T])](prev.sparkContext) {
- @transient val splits_ = {
- prev.splits.zipWithIndex.map {
- case (s, i) => new NumberedSplitRDDSplit(s, i): Split
- }.toArray
- }
-
- override def splits = splits_
-
- override def preferredLocations(split: Split) = {
- val nsplit = split.asInstanceOf[NumberedSplitRDDSplit]
- prev.preferredLocations(nsplit.prev)
- }
-
- override def iterator(split: Split) = {
- val nsplit = split.asInstanceOf[NumberedSplitRDDSplit]
- Iterator((nsplit.index, prev.iterator(nsplit.prev)))
- }
-
- override def taskStarted(split: Split, slot: SlaveOffer) = {
- val nsplit = split.asInstanceOf[NumberedSplitRDDSplit]
- prev.taskStarted(nsplit.prev, slot)
- }
-}
-
-
-/**
- * A split in a NumberedSplitRDD.
- */
-class NumberedSplitRDDSplit(val prev: Split, val index: Int) extends Split {
- override def getId() = "NumberedSplitRDDSplit(%d)".format(index)
-}
diff --git a/core/src/main/scala/spark/ParallelArray.scala b/core/src/main/scala/spark/ParallelArray.scala
index a01904d61c..e77bc3014f 100644
--- a/core/src/main/scala/spark/ParallelArray.scala
+++ b/core/src/main/scala/spark/ParallelArray.scala
@@ -17,8 +17,7 @@ extends Split {
case _ => false
}
- override def getId() =
- "ParallelArraySplit(arrayId %d, slice %d)".format(arrayId, slice)
+ override val index = slice
}
class ParallelArray[T: ClassManifest](
@@ -28,8 +27,6 @@ extends RDD[T](sc) {
// the RDD chain it gets cached. It might be worthwhile to write the data to
// a file in the DFS and read it in the split instead.
- val id = ParallelArray.newId()
-
@transient val splits_ = {
val slices = ParallelArray.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelArraySplit(id, i, slices(i))).toArray
@@ -37,9 +34,11 @@ extends RDD[T](sc) {
override def splits = splits_.asInstanceOf[Array[Split]]
- override def iterator(s: Split) = s.asInstanceOf[ParallelArraySplit[T]].iterator
+ override def compute(s: Split) = s.asInstanceOf[ParallelArraySplit[T]].iterator
override def preferredLocations(s: Split): Seq[String] = Nil
+
+ override val dependencies: List[Dependency[_]] = Nil
}
private object ParallelArray {
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
new file mode 100644
index 0000000000..cc1ca74447
--- /dev/null
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -0,0 +1,22 @@
+package spark
+
+@serializable
+abstract class Partitioner[K] {
+ def numPartitions: Int
+ def getPartition(key: K): Int
+}
+
+class HashPartitioner[K](partitions: Int) extends Partitioner[K] {
+ def numPartitions = partitions
+
+ def getPartition(key: K) = {
+ val mod = key.hashCode % partitions
+ if (mod < 0) mod + partitions else mod // Guard against negative hash codes
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case h: HashPartitioner[_] =>
+ h.numPartitions == numPartitions
+ case _ => false
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 39f2dc4458..df044bd6cf 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -1,5 +1,8 @@
package spark
+import java.io.EOFException
+import java.net.URL
+import java.io.ObjectInputStream
import java.util.concurrent.atomic.AtomicLong
import java.util.HashSet
import java.util.Random
@@ -12,53 +15,106 @@ import SparkContext._
import mesos._
-
@serializable
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
+ // Methods that must be implemented by subclasses
def splits: Array[Split]
- def iterator(split: Split): Iterator[T]
+ def compute(split: Split): Iterator[T]
def preferredLocations(split: Split): Seq[String]
+ val dependencies: List[Dependency[_]]
+
+ // Optionally overridden by subclasses to specify how they are partitioned
+ val partitioner: Option[Partitioner[_]] = None
+
+ def context = sc
+
+ // Get a unique ID for this RDD
+ val id = sc.newRddId()
+
+ // Variables relating to caching
+ private var shouldCache = false
+
+ // Change this RDD's caching
+ def cache(): RDD[T] = {
+ shouldCache = true
+ this
+ }
+
+ // Read this RDD; will read from cache if applicable, or otherwise compute
+ final def iterator(split: Split): Iterator[T] = {
+ if (shouldCache) {
+ RDDCache.getOrCompute[T](this, split)
+ } else {
+ compute(split)
+ }
+ }
+
+ // Transformations
+
+ def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
+
+ def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] =
+ new FlatMappedRDD(this, sc.clean(f))
+
+ def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
- def taskStarted(split: Split, slot: SlaveOffer) {}
+ def sample(withReplacement: Boolean, frac: Double, seed: Int): RDD[T] =
+ new SampledRDD(this, withReplacement, frac, seed)
- def sparkContext = sc
+ def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))
- def map[U: ClassManifest](f: T => U) = new MappedRDD(this, sc.clean(f))
- def filter(f: T => Boolean) = new FilteredRDD(this, sc.clean(f))
- def cache() = new CachedRDD(this)
+ def ++(other: RDD[T]): RDD[T] = this.union(other)
- def sample(withReplacement: Boolean, frac: Double, seed: Int) =
- new SampledRDD(this, withReplacement, frac, seed)
+ def glom(): RDD[Array[T]] = new SplitRDD(this)
- def flatMap[U: ClassManifest](f: T => Traversable[U]) =
- new FlatMappedRDD(this, sc.clean(f))
+ def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] =
+ new CartesianRDD(sc, this, other)
+
+ def groupBy[K](func: T => K, numSplits: Int): RDD[(K, Seq[T])] =
+ this.map(t => (func(t), t)).groupByKey(numSplits)
+
+ def groupBy[K](func: T => K): RDD[(K, Seq[T])] =
+ groupBy[K](func, sc.numCores)
+ // Parallel operations
+
def foreach(f: T => Unit) {
val cleanF = sc.clean(f)
- val tasks = splits.map(s => new ForeachTask(this, s, cleanF)).toArray
- sc.runTaskObjects(tasks)
+ sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
def collect(): Array[T] = {
- val tasks = splits.map(s => new CollectTask(this, s))
- val results = sc.runTaskObjects(tasks)
+ val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
- def toArray(): Array[T] = collect()
-
def reduce(f: (T, T) => T): T = {
val cleanF = sc.clean(f)
- val tasks = splits.map(s => new ReduceTask(this, s, f))
+ val reducePartition: Iterator[T] => Option[T] = iter => {
+ if (iter.hasNext)
+ Some(iter.reduceLeft(f))
+ else
+ None
+ }
+ val options = sc.runJob(this, reducePartition)
val results = new ArrayBuffer[T]
- for (option <- sc.runTaskObjects(tasks); elem <- option)
+ for (opt <- options; elem <- opt)
results += elem
if (results.size == 0)
throw new UnsupportedOperationException("empty collection")
else
return results.reduceLeft(f)
}
+
+ def count(): Long = {
+ sc.runJob(this, (iter: Iterator[T]) => iter.size.toLong).sum
+ }
+
+ def toArray(): Array[T] = collect()
+ // TODO: Reimplement these to properly build any shuffle dependencies on
+ // the cluster rather than attempting to compute a partiton on the master
+ /*
def take(num: Int): Array[T] = {
if (num == 0)
return new Array[T](0)
@@ -75,268 +131,44 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
case Array(t) => t
case _ => throw new UnsupportedOperationException("empty collection")
}
-
- def count(): Long = {
- try {
- map(x => 1L).reduce(_+_)
- } catch {
- case e: UnsupportedOperationException => 0L // No elements in RDD
- }
- }
-
- def union(other: RDD[T]) = new UnionRDD(sc, Array(this, other))
-
- def ++(other: RDD[T]) = this.union(other)
-
- def splitRdd() = new SplitRDD(this)
-
- def cartesian[U: ClassManifest](other: RDD[U]) =
- new CartesianRDD(sc, this, other)
-
- def groupBy[K](func: T => K, numSplits: Int): RDD[(K, Seq[T])] =
- this.map(t => (func(t), t)).groupByKey(numSplits)
-
- def groupBy[K](func: T => K): RDD[(K, Seq[T])] =
- groupBy[K](func, sc.numCores)
-}
-
-@serializable
-abstract class RDDTask[U: ClassManifest, T: ClassManifest](
- val rdd: RDD[T], val split: Split)
-extends Task[U] {
- override def preferredLocations() = rdd.preferredLocations(split)
- override def markStarted(slot: SlaveOffer) { rdd.taskStarted(split, slot) }
-}
-
-class ForeachTask[T: ClassManifest](
- rdd: RDD[T], split: Split, func: T => Unit)
-extends RDDTask[Unit, T](rdd, split) with Logging {
- override def run() {
- logInfo("Processing " + split)
- rdd.iterator(split).foreach(func)
- }
-}
-
-class CollectTask[T](
- rdd: RDD[T], split: Split)(implicit m: ClassManifest[T])
-extends RDDTask[Array[T], T](rdd, split) with Logging {
- override def run(): Array[T] = {
- logInfo("Processing " + split)
- rdd.iterator(split).toArray(m)
- }
-}
-
-class ReduceTask[T: ClassManifest](
- rdd: RDD[T], split: Split, f: (T, T) => T)
-extends RDDTask[Option[T], T](rdd, split) with Logging {
- override def run(): Option[T] = {
- logInfo("Processing " + split)
- val iter = rdd.iterator(split)
- if (iter.hasNext)
- Some(iter.reduceLeft(f))
- else
- None
- }
+ */
}
class MappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T], f: T => U)
-extends RDD[U](prev.sparkContext) {
+extends RDD[U](prev.context) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
- override def iterator(split: Split) = prev.iterator(split).map(f)
- override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
-}
-
-class FilteredRDD[T: ClassManifest](
- prev: RDD[T], f: T => Boolean)
-extends RDD[T](prev.sparkContext) {
- override def splits = prev.splits
- override def preferredLocations(split: Split) = prev.preferredLocations(split)
- override def iterator(split: Split) = prev.iterator(split).filter(f)
- override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = prev.iterator(split).map(f)
}
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T], f: T => Traversable[U])
-extends RDD[U](prev.sparkContext) {
+extends RDD[U](prev.context) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
- override def iterator(split: Split) =
- prev.iterator(split).toStream.flatMap(f).iterator
- override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = prev.iterator(split).toStream.flatMap(f).iterator
}
-class SplitRDD[T: ClassManifest](prev: RDD[T])
-extends RDD[Array[T]](prev.sparkContext) {
+class FilteredRDD[T: ClassManifest](
+ prev: RDD[T], f: T => Boolean)
+extends RDD[T](prev.context) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
- override def iterator(split: Split) = Iterator.fromArray(Array(prev.iterator(split).toArray))
- override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
-}
-
-
-@serializable class SeededSplit(val prev: Split, val seed: Int) extends Split {
- override def getId() =
- "SeededSplit(" + prev.getId() + ", seed " + seed + ")"
-}
-
-class SampledRDD[T: ClassManifest](
- prev: RDD[T], withReplacement: Boolean, frac: Double, seed: Int)
-extends RDD[T](prev.sparkContext) {
-
- @transient val splits_ = { val rg = new Random(seed); prev.splits.map(x => new SeededSplit(x, rg.nextInt)) }
-
- override def splits = splits_.asInstanceOf[Array[Split]]
-
- override def preferredLocations(split: Split) = prev.preferredLocations(split.asInstanceOf[SeededSplit].prev)
-
- override def iterator(splitIn: Split) = {
- val split = splitIn.asInstanceOf[SeededSplit]
- val rg = new Random(split.seed);
- // Sampling with replacement (TODO: use reservoir sampling to make this more efficient?)
- if (withReplacement) {
- val oldData = prev.iterator(split.prev).toArray
- val sampleSize = (oldData.size * frac).ceil.toInt
- val sampledData = for (i <- 1 to sampleSize) yield oldData(rg.nextInt(oldData.size)) // all of oldData's indices are candidates, even if sampleSize < oldData.size
- sampledData.iterator
- }
- // Sampling without replacement
- else {
- prev.iterator(split.prev).filter(x => (rg.nextDouble <= frac))
- }
- }
-
- override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split.asInstanceOf[SeededSplit].prev, slot)
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = prev.iterator(split).filter(f)
}
-
-class CachedRDD[T](
- prev: RDD[T])(implicit m: ClassManifest[T])
-extends RDD[T](prev.sparkContext) with Logging {
- val id = CachedRDD.newId()
- @transient val cacheLocs = Map[Split, List[String]]()
-
+class SplitRDD[T: ClassManifest](prev: RDD[T])
+extends RDD[Array[T]](prev.context) {
override def splits = prev.splits
-
- override def preferredLocations(split: Split) = {
- if (cacheLocs.contains(split))
- cacheLocs(split)
- else
- prev.preferredLocations(split)
- }
-
- override def iterator(split: Split): Iterator[T] = {
- val key = id + "::" + split.getId()
- logInfo("CachedRDD split key is " + key)
- val cache = CachedRDD.cache
- val loading = CachedRDD.loading
- val cachedVal = cache.get(key)
- if (cachedVal != null) {
- // Split is in cache, so just return its values
- return Iterator.fromArray(cachedVal.asInstanceOf[Array[T]])
- } else {
- // Mark the split as loading (unless someone else marks it first)
- loading.synchronized {
- if (loading.contains(key)) {
- while (loading.contains(key)) {
- try {loading.wait()} catch {case _ =>}
- }
- return Iterator.fromArray(cache.get(key).asInstanceOf[Array[T]])
- } else {
- loading.add(key)
- }
- }
- // If we got here, we have to load the split
- logInfo("Loading and caching " + split)
- val array = prev.iterator(split).toArray(m)
- cache.put(key, array)
- loading.synchronized {
- loading.remove(key)
- loading.notifyAll()
- }
- return Iterator.fromArray(array)
- }
- }
-
- override def taskStarted(split: Split, slot: SlaveOffer) {
- val oldList = cacheLocs.getOrElse(split, Nil)
- val host = slot.getHost
- if (!oldList.contains(host))
- cacheLocs(split) = host :: oldList
- }
-}
-
-private object CachedRDD {
- val nextId = new AtomicLong(0) // Generates IDs for cached RDDs (on master)
- def newId() = nextId.getAndIncrement()
-
- // Stores map results for various splits locally (on workers)
- val cache = Cache.newKeySpace()
-
- // Remembers which splits are currently being loaded (on workers)
- val loading = new HashSet[String]
-}
-
-@serializable
-class UnionSplit[T: ClassManifest](rdd: RDD[T], split: Split)
-extends Split {
- def iterator() = rdd.iterator(split)
- def preferredLocations() = rdd.preferredLocations(split)
- override def getId() = "UnionSplit(" + split.getId() + ")"
-}
-
-@serializable
-class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]])
-extends RDD[T](sc) {
- @transient val splits_ : Array[Split] = {
- val splits: Seq[Split] =
- for (rdd <- rdds; split <- rdd.splits)
- yield new UnionSplit(rdd, split)
- splits.toArray
- }
-
- override def splits = splits_
-
- override def iterator(s: Split): Iterator[T] =
- s.asInstanceOf[UnionSplit[T]].iterator()
-
- override def preferredLocations(s: Split): Seq[String] =
- s.asInstanceOf[UnionSplit[T]].preferredLocations()
-}
-
-@serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split {
- override def getId() =
- "CartesianSplit(" + s1.getId() + ", " + s2.getId() + ")"
+ override def preferredLocations(split: Split) = prev.preferredLocations(split)
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = Iterator.fromArray(Array(prev.iterator(split).toArray))
}
-@serializable
-class CartesianRDD[T: ClassManifest, U:ClassManifest](
- sc: SparkContext, rdd1: RDD[T], rdd2: RDD[U])
-extends RDD[Pair[T, U]](sc) {
- @transient val splits_ = {
- // create the cross product split
- rdd2.splits.map(y => rdd1.splits.map(x => new CartesianSplit(x, y))).flatten
- }
-
- override def splits = splits_.asInstanceOf[Array[Split]]
-
- override def preferredLocations(split: Split) = {
- val currSplit = split.asInstanceOf[CartesianSplit]
- rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)
- }
-
- override def iterator(split: Split) = {
- val currSplit = split.asInstanceOf[CartesianSplit]
- for (x <- rdd1.iterator(currSplit.s1); y <- rdd2.iterator(currSplit.s2)) yield (x, y)
- }
-
- override def taskStarted(split: Split, slot: SlaveOffer) = {
- val currSplit = split.asInstanceOf[CartesianSplit]
- rdd1.taskStarted(currSplit.s1, slot)
- rdd2.taskStarted(currSplit.s2, slot)
- }
-}
@serializable class PairRDDExtras[K, V](self: RDD[(K, V)]) {
def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = {
@@ -358,10 +190,9 @@ extends RDD[Pair[T, U]](sc) {
numSplits: Int)
: RDD[(K, C)] =
{
- val shufClass = Class.forName(System.getProperty(
- "spark.shuffle.class", "spark.LocalFileShuffle"))
- val shuf = shufClass.newInstance().asInstanceOf[Shuffle[K, V, C]]
- shuf.compute(self, numSplits, createCombiner, mergeValue, mergeCombiners)
+ val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ val partitioner = new HashPartitioner[K](numSplits)
+ new ShuffledRDD(self, aggregator, partitioner)
}
def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {
@@ -412,7 +243,7 @@ extends RDD[Pair[T, U]](sc) {
join(other, numCores)
}
- def numCores = self.sparkContext.numCores
+ def numCores = self.context.numCores
def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
}
diff --git a/core/src/main/scala/spark/RDDCache.scala b/core/src/main/scala/spark/RDDCache.scala
new file mode 100644
index 0000000000..2f2ec9d237
--- /dev/null
+++ b/core/src/main/scala/spark/RDDCache.scala
@@ -0,0 +1,92 @@
+package spark
+
+import scala.actors._
+import scala.actors.Actor._
+import scala.actors.remote._
+
+sealed trait CacheMessage
+case class CacheEntryAdded(rddId: Int, partition: Int, host: String)
+case class CacheEntryRemoved(rddId: Int, partition: Int, host: String)
+
+class RDDCacheTracker extends DaemonActor with Logging {
+ def act() {
+ val port = System.getProperty("spark.master.port", "50501").toInt
+ RemoteActor.alive(port)
+ RemoteActor.register('RDDCacheTracker, self)
+ logInfo("Started on port " + port)
+
+ loop {
+ react {
+ case CacheEntryAdded(rddId, partition, host) =>
+ logInfo("Cache entry added: %s, %s, %s".format(rddId, partition, host))
+
+ case CacheEntryRemoved(rddId, partition, host) =>
+ logInfo("Cache entry removed: %s, %s, %s".format(rddId, partition, host))
+ }
+ }
+ }
+}
+
+import scala.collection.mutable.HashSet
+private object RDDCache extends Logging {
+ // Stores map results for various splits locally
+ val cache = Cache.newKeySpace()
+
+ // Remembers which splits are currently being loaded
+ val loading = new HashSet[(Int, Int)]
+
+ // Tracker actor on the master, or remote reference to it on workers
+ var trackerActor: AbstractActor = null
+
+ def initialize(isMaster: Boolean) {
+ if (isMaster) {
+ val tracker = new RDDCacheTracker
+ tracker.start
+ trackerActor = tracker
+ } else {
+ val host = System.getProperty("spark.master.host")
+ val port = System.getProperty("spark.master.port").toInt
+ trackerActor = RemoteActor.select(Node(host, port), 'RDDCacheTracker)
+ }
+ }
+
+ // Gets or computes an RDD split
+ def getOrCompute[T](rdd: RDD[T], split: Split)(implicit m: ClassManifest[T])
+ : Iterator[T] = {
+ val key = (rdd.id, split.index)
+ logInfo("CachedRDD split key is " + key)
+ val cache = RDDCache.cache
+ val loading = RDDCache.loading
+ val cachedVal = cache.get(key)
+ if (cachedVal != null) {
+ // Split is in cache, so just return its values
+ return Iterator.fromArray(cachedVal.asInstanceOf[Array[T]])
+ } else {
+ // Mark the split as loading (unless someone else marks it first)
+ loading.synchronized {
+ if (loading.contains(key)) {
+ while (loading.contains(key)) {
+ try {loading.wait()} catch {case _ =>}
+ }
+ return Iterator.fromArray(cache.get(key).asInstanceOf[Array[T]])
+ } else {
+ loading.add(key)
+ }
+ }
+ val host = System.getProperty("spark.hostname", Utils.localHostName)
+ trackerActor ! CacheEntryAdded(rdd.id, split.index, host)
+ // If we got here, we have to load the split
+ // TODO: fetch any remote copy of the split that may be available
+ // TODO: also notify the master that we're loading it
+ // TODO: also register a listener for when it unloads
+ logInfo("Computing and caching " + split)
+ val array = rdd.compute(split).toArray(m)
+ cache.put(key, array)
+ loading.synchronized {
+ loading.remove(key)
+ loading.notifyAll()
+ }
+ return Iterator.fromArray(array)
+ }
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/ResultTask.scala b/core/src/main/scala/spark/ResultTask.scala
new file mode 100644
index 0000000000..3b63896175
--- /dev/null
+++ b/core/src/main/scala/spark/ResultTask.scala
@@ -0,0 +1,14 @@
+package spark
+
+class ResultTask[T, U](val stageId: Int, rdd: RDD[T], func: Iterator[T] => U, val partition: Int, locs: Seq[String])
+extends Task[U] {
+ val split = rdd.splits(partition)
+
+ override def run: U = {
+ func(rdd.iterator(split))
+ }
+
+ override def preferredLocations: Seq[String] = locs
+
+ override def toString = "ResultTask(" + stageId + ", " + partition + ")"
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/SampledRDD.scala b/core/src/main/scala/spark/SampledRDD.scala
new file mode 100644
index 0000000000..2eeafedcdd
--- /dev/null
+++ b/core/src/main/scala/spark/SampledRDD.scala
@@ -0,0 +1,36 @@
+package spark
+
+import java.util.Random
+
+@serializable class SampledRDDSplit(val prev: Split, val seed: Int) extends Split {
+ override val index = prev.index
+}
+
+class SampledRDD[T: ClassManifest](
+ prev: RDD[T], withReplacement: Boolean, frac: Double, seed: Int)
+extends RDD[T](prev.context) {
+
+ @transient val splits_ = { val rg = new Random(seed); prev.splits.map(x => new SampledRDDSplit(x, rg.nextInt)) }
+
+ override def splits = splits_.asInstanceOf[Array[Split]]
+
+ override val dependencies = List(new OneToOneDependency(prev))
+
+ override def preferredLocations(split: Split) = prev.preferredLocations(split.asInstanceOf[SampledRDDSplit].prev)
+
+ override def compute(splitIn: Split) = {
+ val split = splitIn.asInstanceOf[SampledRDDSplit]
+ val rg = new Random(split.seed);
+ // Sampling with replacement (TODO: use reservoir sampling to make this more efficient?)
+ if (withReplacement) {
+ val oldData = prev.iterator(split.prev).toArray
+ val sampleSize = (oldData.size * frac).ceil.toInt
+ val sampledData = for (i <- 1 to sampleSize) yield oldData(rg.nextInt(oldData.size)) // all of oldData's indices are candidates, even if sampleSize < oldData.size
+ sampledData.iterator
+ }
+ // Sampling without replacement
+ else {
+ prev.iterator(split.prev).filter(x => (rg.nextDouble <= frac))
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/Scheduler.scala b/core/src/main/scala/spark/Scheduler.scala
index b9f3128c82..fbcbb3e935 100644
--- a/core/src/main/scala/spark/Scheduler.scala
+++ b/core/src/main/scala/spark/Scheduler.scala
@@ -4,7 +4,8 @@ package spark
private trait Scheduler {
def start()
def waitForRegister()
- def runTasks[T](tasks: Array[Task[T]])(implicit m: ClassManifest[T]): Array[T]
+ //def runTasks[T](tasks: Array[Task[T]])(implicit m: ClassManifest[T]): Array[T]
+ def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U]): Array[U]
def stop()
def numCores(): Int
}
diff --git a/core/src/main/scala/spark/ShuffleMapTask.scala b/core/src/main/scala/spark/ShuffleMapTask.scala
new file mode 100644
index 0000000000..287c64a9cc
--- /dev/null
+++ b/core/src/main/scala/spark/ShuffleMapTask.scala
@@ -0,0 +1,38 @@
+package spark
+
+import java.io.FileOutputStream
+import java.io.ObjectOutputStream
+import scala.collection.mutable.HashMap
+
+
+class ShuffleMapTask(val stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_], val partition: Int, locs: Seq[String])
+extends Task[String] {
+ val split = rdd.splits(partition)
+
+ override def run: String = {
+ val numOutputSplits = dep.partitioner.numPartitions
+ val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]]
+ val partitioner = dep.partitioner.asInstanceOf[Partitioner[Any]]
+ val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[Any, Any])
+ for (elem <- rdd.iterator(split)) {
+ val (k, v) = elem.asInstanceOf[(Any, Any)]
+ var bucketId = partitioner.getPartition(k)
+ val bucket = buckets(bucketId)
+ bucket(k) = bucket.get(k) match {
+ case Some(c) => aggregator.mergeValue(c, v)
+ case None => aggregator.createCombiner(v)
+ }
+ }
+ for (i <- 0 until numOutputSplits) {
+ val file = LocalFileShuffle.getOutputFile(dep.shuffleId, partition, i)
+ val out = new ObjectOutputStream(new FileOutputStream(file))
+ buckets(i).foreach(pair => out.writeObject(pair))
+ out.close()
+ }
+ return LocalFileShuffle.getServerUri
+ }
+
+ override def preferredLocations: Seq[String] = locs
+
+ override def toString = "ShuffleMapTask(%d, %d)".format(stageId, partition)
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala
new file mode 100644
index 0000000000..826957a469
--- /dev/null
+++ b/core/src/main/scala/spark/ShuffledRDD.scala
@@ -0,0 +1,60 @@
+package spark
+
+import java.net.URL
+import java.io.EOFException
+import java.io.ObjectInputStream
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+
+class ShuffledRDDSplit(val idx: Int) extends Split {
+ override val index = idx
+ override def hashCode(): Int = idx
+}
+
+class ShuffledRDD[K, V, C](
+ parent: RDD[(K, V)],
+ aggregator: Aggregator[K, V, C],
+ part : Partitioner[K])
+extends RDD[(K, C)](parent.context) {
+ override val partitioner = Some(part)
+
+ @transient val splits_ =
+ Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))
+
+ override def splits = splits_
+
+ override def preferredLocations(split: Split) = Nil
+
+ val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part)
+ override val dependencies = List(dep)
+
+ override def compute(split: Split): Iterator[(K, C)] = {
+ val shuffleId = dep.shuffleId
+ val splitId = split.index
+ val splitsByUri = new HashMap[String, ArrayBuffer[Int]]
+ val serverUris = MapOutputTracker.getServerUris(shuffleId)
+ for ((serverUri, index) <- serverUris.zipWithIndex) {
+ splitsByUri.getOrElseUpdate(serverUri, ArrayBuffer()) += index
+ }
+ val combiners = new HashMap[K, C]
+ for ((serverUri, inputIds) <- Utils.shuffle(splitsByUri)) {
+ for (i <- inputIds) {
+ val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, splitId)
+ val inputStream = new ObjectInputStream(new URL(url).openStream())
+ try {
+ while (true) {
+ val (k, c) = inputStream.readObject().asInstanceOf[(K, C)]
+ combiners(k) = combiners.get(k) match {
+ case Some(oldC) => aggregator.mergeCombiners(oldC, c)
+ case None => c
+ }
+ }
+ } catch {
+ case e: EOFException => {}
+ }
+ inputStream.close()
+ }
+ }
+ combiners.iterator
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index bf70b5fcb1..fda2ee3be7 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -1,6 +1,7 @@
package spark
import java.io._
+import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.ArrayBuffer
@@ -34,6 +35,8 @@ extends Logging {
scheduler.start()
Cache.initialize()
Broadcast.initialize(true)
+ MapOutputTracker.initialize(true)
+ RDDCache.initialize(true)
// Methods for creating RDDs
@@ -42,6 +45,12 @@ extends Logging {
def parallelize[T: ClassManifest](seq: Seq[T]): RDD[T] =
parallelize(seq, numCores)
+
+ def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int): RDD[T] =
+ parallelize(seq, numSlices)
+
+ def makeRDD[T: ClassManifest](seq: Seq[T]): RDD[T] =
+ parallelize(seq, numCores)
def textFile(path: String): RDD[String] =
new HadoopTextFile(this, path)
@@ -89,8 +98,8 @@ extends Logging {
}
/** Build the union of a list of RDDs. */
- def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] =
- new UnionRDD(this, rdds)
+ //def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] =
+ // new UnionRDD(this, rdds)
// Methods for creating shared variables
@@ -126,19 +135,26 @@ extends Logging {
None
}
- // Submit an array of tasks (passed as functions) to the scheduler
- def runTasks[T: ClassManifest](tasks: Array[() => T]): Array[T] = {
- runTaskObjects(tasks.map(f => new FunctionTask(f)))
- }
-
// Run an array of spark.Task objects
private[spark] def runTaskObjects[T: ClassManifest](tasks: Seq[Task[T]])
: Array[T] = {
+ return null;
+ /*
logInfo("Running " + tasks.length + " tasks in parallel")
val start = System.nanoTime
val result = scheduler.runTasks(tasks.toArray)
logInfo("Tasks finished in " + (System.nanoTime - start) / 1e9 + " s")
return result
+ */
+ }
+
+ private[spark] def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U])
+ : Array[U] = {
+ logInfo("Starting job...")
+ val start = System.nanoTime
+ val result = scheduler.runJob(rdd, func)
+ logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s")
+ result
}
// Clean a closure to make it ready to serialized and send to tasks
@@ -150,6 +166,18 @@ extends Logging {
// Get the number of cores available to run tasks (as reported by Scheduler)
def numCores = scheduler.numCores
+
+ private var nextShuffleId = new AtomicInteger(0)
+
+ private[spark] def newShuffleId(): Int = {
+ nextShuffleId.getAndIncrement()
+ }
+
+ private var nextRddId = new AtomicInteger(0)
+
+ private[spark] def newRddId(): Int = {
+ nextRddId.getAndIncrement()
+ }
}
diff --git a/core/src/main/scala/spark/Split.scala b/core/src/main/scala/spark/Split.scala
index 116cd16370..62bb5f82c5 100644
--- a/core/src/main/scala/spark/Split.scala
+++ b/core/src/main/scala/spark/Split.scala
@@ -5,9 +5,10 @@ package spark
*/
@serializable trait Split {
/**
- * Get a unique ID for this split which can be used, for example, to
- * set up caches based on it. The ID should stay the same if we serialize
- * and then deserialize the split.
+ * Get the split's index within its parent RDD
*/
- def getId(): String
+ val index: Int
+
+ // A better default implementation of HashCode
+ override def hashCode(): Int = index
}
diff --git a/core/src/main/scala/spark/Stage.scala b/core/src/main/scala/spark/Stage.scala
new file mode 100644
index 0000000000..82b70ce60d
--- /dev/null
+++ b/core/src/main/scala/spark/Stage.scala
@@ -0,0 +1,34 @@
+package spark
+
+class Stage(val id: Int, val rdd: RDD[_], val shuffleDep: Option[ShuffleDependency[_,_,_]], val parents: List[Stage]) {
+ val isShuffleMap = shuffleDep != None
+ val numPartitions = rdd.splits.size
+ val outputLocs = Array.fill[List[String]](numPartitions)(Nil)
+ var numAvailableOutputs = 0
+
+ def isAvailable: Boolean = {
+ if (parents.size == 0 && !isShuffleMap)
+ true
+ else
+ numAvailableOutputs == numPartitions
+ }
+
+ def addOutputLoc(partition: Int, host: String) {
+ val prevList = outputLocs(partition)
+ outputLocs(partition) = host :: prevList
+ if (prevList == Nil)
+ numAvailableOutputs += 1
+ }
+
+ def removeOutputLoc(partition: Int, host: String) {
+ val prevList = outputLocs(partition)
+ val newList = prevList - host
+ outputLocs(partition) = newList
+ if (prevList != Nil && newList == Nil)
+ numAvailableOutputs -= 1
+ }
+
+ override def toString = "Stage " + id
+
+ override def hashCode(): Int = id
+}
diff --git a/core/src/main/scala/spark/UnionRDD.scala b/core/src/main/scala/spark/UnionRDD.scala
new file mode 100644
index 0000000000..78297be4f3
--- /dev/null
+++ b/core/src/main/scala/spark/UnionRDD.scala
@@ -0,0 +1,43 @@
+package spark
+
+import scala.collection.mutable.ArrayBuffer
+
+@serializable
+class UnionSplit[T: ClassManifest](idx: Int, rdd: RDD[T], split: Split)
+extends Split {
+ def iterator() = rdd.iterator(split)
+ def preferredLocations() = rdd.preferredLocations(split)
+ override val index = idx
+}
+
+@serializable
+class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]])
+extends RDD[T](sc) {
+ @transient val splits_ : Array[Split] = {
+ val array = new Array[Split](rdds.map(_.splits.size).sum)
+ var pos = 0
+ for (rdd <- rdds; split <- rdd.splits) {
+ array(pos) = new UnionSplit(pos, rdd, split)
+ pos += 1
+ }
+ array
+ }
+
+ override def splits = splits_
+
+ override val dependencies = {
+ val deps = new ArrayBuffer[Dependency[_]]
+ var pos = 0
+ for ((rdd, index) <- rdds.zipWithIndex) {
+ deps += new RangeDependency(rdd, 0, pos, rdd.splits.size)
+ pos += rdd.splits.size
+ }
+ deps.toList
+ }
+
+ override def compute(s: Split): Iterator[T] =
+ s.asInstanceOf[UnionSplit[T]].iterator()
+
+ override def preferredLocations(s: Split): Seq[String] =
+ s.asInstanceOf[UnionSplit[T]].preferredLocations()
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index e333dd9c91..00cbbfd616 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -124,4 +124,11 @@ object Utils {
// and join them into a string
return bytes.map(b => (b.toInt + 256) % 256).mkString(".")
}
+
+ /**
+ * Get the local machine's hostname
+ */
+ def localHostName(): String = {
+ return InetAddress.getLocalHost().getHostName
+ }
}
diff --git a/examples/src/main/scala/spark/examples/CpuHog.scala b/examples/src/main/scala/spark/examples/CpuHog.scala
deleted file mode 100644
index 94b3709850..0000000000
--- a/examples/src/main/scala/spark/examples/CpuHog.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-package spark.examples
-
-import spark._
-
-object CpuHog {
- def main(args: Array[String]) {
- if (args.length != 3) {
- System.err.println("Usage: CpuHog <master> <tasks> <threads_per_task>");
- System.exit(1)
- }
- val sc = new SparkContext(args(0), "CPU hog")
- val tasks = args(1).toInt
- val threads = args(2).toInt
- def task {
- for (i <- 0 until threads-1) {
- new Thread() {
- override def run {
- while(true) {}
- }
- }.start()
- }
- while(true) {}
- }
- sc.runTasks(Array.make(tasks, () => task))
- }
-}
diff --git a/examples/src/main/scala/spark/examples/SleepJob.scala b/examples/src/main/scala/spark/examples/SleepJob.scala
deleted file mode 100644
index 02673a5f88..0000000000
--- a/examples/src/main/scala/spark/examples/SleepJob.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-package spark.examples
-
-import spark._
-
-object SleepJob {
- def main(args: Array[String]) {
- if (args.length != 3) {
- System.err.println("Usage: SleepJob <master> <tasks> <task_duration>");
- System.exit(1)
- }
- val sc = new SparkContext(args(0), "Sleep job")
- val tasks = args(1).toInt
- val duration = args(2).toInt
- def task {
- val start = System.currentTimeMillis
- while (System.currentTimeMillis - start < duration * 1000L)
- Thread.sleep(200)
- }
- sc.runTasks(Array.make(tasks, () => task))
- }
-}