diff options
29 files changed, 908 insertions, 531 deletions
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala new file mode 100644 index 0000000000..87453c9c15 --- /dev/null +++ b/core/src/main/scala/spark/Aggregator.scala @@ -0,0 +1,8 @@ +package spark + +@serializable +class Aggregator[K, V, C] ( + val createCombiner: V => C, + val mergeValue: (C, V) => C, + val mergeCombiners: (C, C) => C +)
\ No newline at end of file diff --git a/core/src/main/scala/spark/CartesianRDD.scala b/core/src/main/scala/spark/CartesianRDD.scala new file mode 100644 index 0000000000..42a9b3b23c --- /dev/null +++ b/core/src/main/scala/spark/CartesianRDD.scala @@ -0,0 +1,44 @@ +package spark + +@serializable class CartesianSplit(idx: Int, val s1: Split, val s2: Split) +extends Split { + override val index = idx +} + +@serializable +class CartesianRDD[T: ClassManifest, U:ClassManifest]( + sc: SparkContext, rdd1: RDD[T], rdd2: RDD[U]) +extends RDD[Pair[T, U]](sc) { + val numSplitsInRdd2 = rdd2.splits.size + + @transient val splits_ = { + // create the cross product split + val array = new Array[Split](rdd1.splits.size * rdd2.splits.size) + for (s1 <- rdd1.splits; s2 <- rdd2.splits) { + val idx = s1.index * numSplitsInRdd2 + s2.index + array(idx) = new CartesianSplit(idx, s1, s2) + } + array + } + + override def splits = splits_.asInstanceOf[Array[Split]] + + override def preferredLocations(split: Split) = { + val currSplit = split.asInstanceOf[CartesianSplit] + rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2) + } + + override def compute(split: Split) = { + val currSplit = split.asInstanceOf[CartesianSplit] + for (x <- rdd1.iterator(currSplit.s1); y <- rdd2.iterator(currSplit.s2)) yield (x, y) + } + + override val dependencies = List( + new NarrowDependency(rdd1) { + def getParents(id: Int): Seq[Int] = List(id / numSplitsInRdd2) + }, + new NarrowDependency(rdd2) { + def getParents(id: Int): Seq[Int] = List(id % numSplitsInRdd2) + } + ) +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala new file mode 100644 index 0000000000..ee3fda25a8 --- /dev/null +++ b/core/src/main/scala/spark/DAGScheduler.scala @@ -0,0 +1,236 @@ +package spark + +import java.util.concurrent.LinkedBlockingQueue +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} + +/** + * A Scheduler subclass that implements stage-oriented scheduling. It computes + * a DAG of stages for each job, keeps track of which RDDs and stage outputs + * are materialized, and computes a minimal schedule to run the job. Subclasses + * only need to implement the code to send a task to the cluster and to report + * failures from it (the submitTasks method, and code to add completionEvents). + */ +private abstract class DAGScheduler extends Scheduler with Logging { + // Must be implemented by subclasses to start running a set of tasks + def submitTasks(tasks: Seq[Task[_]]): Unit + + // Must be called by subclasses to report task completions or failures + def taskEnded(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any]) { + completionEvents.put(CompletionEvent(task, successful, result, accumUpdates)) + } + + private val completionEvents = new LinkedBlockingQueue[CompletionEvent] + + var nextStageId = 0 + + def newStageId() = { + var res = nextStageId + nextStageId += 1 + res + } + + val idToStage = new HashMap[Int, Stage] + + val shuffleToMapStage = new HashMap[ShuffleDependency[_,_,_], Stage] + + val cacheLocs = new HashMap[RDD[_], Array[List[String]]] + + def getCacheLocs(rdd: RDD[_]): Array[List[String]] = { + cacheLocs.getOrElseUpdate(rdd, Array.fill[List[String]](rdd.splits.size)(Nil)) + } + + def addCacheLoc(rdd: RDD[_], partition: Int, host: String) { + val locs = getCacheLocs(rdd) + locs(partition) = host :: locs(partition) + } + + def removeCacheLoc(rdd: RDD[_], partition: Int, host: String) { + val locs = getCacheLocs(rdd) + locs(partition) -= host + } + + def getShuffleMapStage(shuf: ShuffleDependency[_,_,_]): Stage = { + shuffleToMapStage.get(shuf) match { + case Some(stage) => stage + case None => + val stage = newStage(shuf.rdd, Some(shuf)) + shuffleToMapStage(shuf) = stage + stage + } + } + + def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_,_]]): Stage = { + val id = newStageId() + val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd)) + idToStage(id) = stage + stage + } + + def getParentStages(rdd: RDD[_]): List[Stage] = { + val parents = new HashSet[Stage] + val visited = new HashSet[RDD[_]] + def visit(r: RDD[_]) { + if (!visited(r)) { + visited += r + for (dep <- r.dependencies) { + dep match { + case shufDep: ShuffleDependency[_,_,_] => + parents += getShuffleMapStage(shufDep) + case _ => + visit(dep.rdd) + } + } + } + } + visit(rdd) + parents.toList + } + + def getMissingParentStages(stage: Stage): List[Stage] = { + val missing = new HashSet[Stage] + val visited = new HashSet[RDD[_]] + def visit(rdd: RDD[_]) { + if (!visited(rdd)) { + visited += rdd + val locs = getCacheLocs(rdd) + for (p <- 0 until rdd.splits.size) { + if (locs(p) == Nil) { + for (dep <- rdd.dependencies) { + dep match { + case shufDep: ShuffleDependency[_,_,_] => + val stage = getShuffleMapStage(shufDep) + if (!stage.isAvailable) + missing += stage + case narrowDep: NarrowDependency[_] => + visit(narrowDep.rdd) + } + } + } + } + } + } + visit(stage.rdd) + missing.toList + } + + override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U]) + : Array[U] = { + val numOutputParts: Int = rdd.splits.size + val finalStage = newStage(rdd, None) + val results = new Array[U](numOutputParts) + val finished = new Array[Boolean](numOutputParts) + var numFinished = 0 + + val waiting = new HashSet[Stage] + val running = new HashSet[Stage] + val pendingTasks = new HashMap[Stage, HashSet[Task[_]]] + + logInfo("Final stage: " + finalStage) + logInfo("Parents of final stage: " + finalStage.parents) + logInfo("Missing parents: " + getMissingParentStages(finalStage)) + + def submitStage(stage: Stage) { + if (!waiting(stage) && !running(stage)) { + val missing = getMissingParentStages(stage) + if (missing == Nil) { + logInfo("Submitting " + stage + ", which has no missing parents") + submitMissingTasks(stage) + running += stage + } else { + for (parent <- missing) + submitStage(parent) + waiting += stage + } + } + } + + def submitMissingTasks(stage: Stage) { + val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet) + var tasks = ArrayBuffer[Task[_]]() + if (stage == finalStage) { + for (p <- 0 until numOutputParts if (!finished(p))) { + val locs = getPreferredLocs(rdd, p) + tasks += new ResultTask(finalStage.id, rdd, func, p, locs) + } + } else { + for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) { + val locs = getPreferredLocs(stage.rdd, p) + tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs) + } + } + myPending ++= tasks + submitTasks(tasks) + } + + submitStage(finalStage) + + while (numFinished != numOutputParts) { + val evt = completionEvents.take() + if (evt.successful) { + logInfo("Completed " + evt.task) + Accumulators.add(currentThread, evt.accumUpdates) + evt.task match { + case rt: ResultTask[_, _] => + results(rt.partition) = evt.result.asInstanceOf[U] + finished(rt.partition) = true + numFinished += 1 + pendingTasks(finalStage) -= rt + case smt: ShuffleMapTask => + val stage = idToStage(smt.stageId) + stage.addOutputLoc(smt.partition, evt.result.asInstanceOf[String]) + val pending = pendingTasks(stage) + pending -= smt + MapOutputTracker.registerMapOutputs( + stage.shuffleDep.get.shuffleId, + stage.outputLocs.map(_.first).toArray) + if (pending.isEmpty) { + logInfo(stage + " finished; looking for newly runnable stages") + running -= stage + val newlyRunnable = new ArrayBuffer[Stage] + for (stage <- waiting if getMissingParentStages(stage) == Nil) { + newlyRunnable += stage + } + waiting --= newlyRunnable + running ++= newlyRunnable + for (stage <- newlyRunnable) { + submitMissingTasks(stage) + } + } + } + } else { + throw new SparkException("Task failed: " + evt.task) + // TODO: Kill the running job + } + } + + return results + } + + def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = { + // If the partition is cached, return the cache locations + val cached = getCacheLocs(rdd)(partition) + if (cached != Nil) { + return cached + } + // If the RDD has some placement preferences (as is the case for input RDDs), get those + val rddPrefs = rdd.preferredLocations(rdd.splits(partition)).toList + if (rddPrefs != Nil) { + return rddPrefs + } + // If the RDD has narrow dependencies, pick the first partition of the first narrow dep + // that has any placement preferences. Ideally we would choose based on transfer sizes, + // but this will do for now. + rdd.dependencies.foreach(_ match { + case n: NarrowDependency[_] => + for (inPart <- n.getParents(partition)) { + val locs = getPreferredLocs(n.rdd, inPart) + if (locs != Nil) + return locs; + } + case _ => + }) + return Nil + } +} + +case class CompletionEvent(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any]) diff --git a/core/src/main/scala/spark/DaemonThreadFactory.scala b/core/src/main/scala/spark/DaemonThreadFactory.scala new file mode 100644 index 0000000000..cb30cb2ac8 --- /dev/null +++ b/core/src/main/scala/spark/DaemonThreadFactory.scala @@ -0,0 +1,14 @@ +package spark + +import java.util.concurrent.ThreadFactory + +/** + * A ThreadFactory that creates daemon threads + */ +private object DaemonThreadFactory extends ThreadFactory { + override def newThread(r: Runnable): Thread = { + val t = new Thread(r); + t.setDaemon(true) + return t + } +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala new file mode 100644 index 0000000000..20b0357e44 --- /dev/null +++ b/core/src/main/scala/spark/Dependency.scala @@ -0,0 +1,30 @@ +package spark + +@serializable +abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) + +abstract class NarrowDependency[T](rdd: RDD[T]) +extends Dependency(rdd, false) { + def getParents(outputPartition: Int): Seq[Int] +} + +class ShuffleDependency[K, V, C]( + val shuffleId: Int, + rdd: RDD[(K, V)], + val aggregator: Aggregator[K, V, C], + val partitioner: Partitioner[K] +) extends Dependency(rdd, true) + +class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { + override def getParents(partitionId: Int) = List(partitionId) +} + +class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) +extends NarrowDependency[T](rdd) { + override def getParents(partitionId: Int) = { + if (partitionId >= outStart && partitionId < outStart + length) + List(partitionId - outStart + inStart) + else + Nil + } +} diff --git a/core/src/main/scala/spark/DfsShuffle.scala b/core/src/main/scala/spark/DfsShuffle.scala deleted file mode 100644 index 7a42bf2d06..0000000000 --- a/core/src/main/scala/spark/DfsShuffle.scala +++ /dev/null @@ -1,120 +0,0 @@ -package spark - -import java.io.{EOFException, ObjectInputStream, ObjectOutputStream} -import java.net.URI -import java.util.UUID - -import scala.collection.mutable.HashMap - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem} - - -/** - * A simple implementation of shuffle using a distributed file system. - * - * TODO: Add support for compression when spark.compress is set to true. - */ -@serializable -class DfsShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { - override def compute(input: RDD[(K, V)], - numOutputSplits: Int, - createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiners: (C, C) => C) - : RDD[(K, C)] = - { - val sc = input.sparkContext - val dir = DfsShuffle.newTempDirectory() - logInfo("Intermediate data directory: " + dir) - - val numberedSplitRdd = new NumberedSplitRDD(input) - val numInputSplits = numberedSplitRdd.splits.size - - // Run a parallel foreach to write the intermediate data files - numberedSplitRdd.foreach((pair: (Int, Iterator[(K, V)])) => { - val myIndex = pair._1 - val myIterator = pair._2 - val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[K, C]) - for ((k, v) <- myIterator) { - var bucketId = k.hashCode % numOutputSplits - if (bucketId < 0) { // Fix bucket ID if hash code was negative - bucketId += numOutputSplits - } - val bucket = buckets(bucketId) - bucket(k) = bucket.get(k) match { - case Some(c) => mergeValue(c, v) - case None => createCombiner(v) - } - } - val fs = DfsShuffle.getFileSystem() - for (i <- 0 until numOutputSplits) { - val path = new Path(dir, "%d-to-%d".format(myIndex, i)) - val out = new ObjectOutputStream(fs.create(path, true)) - buckets(i).foreach(pair => out.writeObject(pair)) - out.close() - } - }) - - // Return an RDD that does each of the merges for a given partition - val indexes = sc.parallelize(0 until numOutputSplits, numOutputSplits) - return indexes.flatMap((myIndex: Int) => { - val combiners = new HashMap[K, C] - val fs = DfsShuffle.getFileSystem() - for (i <- Utils.shuffle(0 until numInputSplits)) { - val path = new Path(dir, "%d-to-%d".format(i, myIndex)) - val inputStream = new ObjectInputStream(fs.open(path)) - try { - while (true) { - val (k, c) = inputStream.readObject().asInstanceOf[(K, C)] - combiners(k) = combiners.get(k) match { - case Some(oldC) => mergeCombiners(oldC, c) - case None => c - } - } - } catch { - case e: EOFException => {} - } - inputStream.close() - } - combiners - }) - } -} - - -/** - * Companion object of DfsShuffle; responsible for initializing a Hadoop - * FileSystem object based on the spark.dfs property and generating names - * for temporary directories. - */ -object DfsShuffle { - private var initialized = false - private var fileSystem: FileSystem = null - - private def initializeIfNeeded() = synchronized { - if (!initialized) { - val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt - val dfs = System.getProperty("spark.dfs", "file:///") - val conf = new Configuration() - conf.setInt("io.file.buffer.size", bufferSize) - conf.setInt("dfs.replication", 1) - fileSystem = FileSystem.get(new URI(dfs), conf) - initialized = true - } - } - - def getFileSystem(): FileSystem = { - initializeIfNeeded() - return fileSystem - } - - def newTempDirectory(): String = { - val fs = getFileSystem() - val workDir = System.getProperty("spark.dfs.workdir", "/tmp") - val uuid = UUID.randomUUID() - val path = workDir + "/shuffle-" + uuid - fs.mkdirs(new Path(path)) - return path - } -} diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index b4d023b428..35469aeb3f 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -25,6 +25,8 @@ class Executor extends mesos.Executor with Logging { // Initialize cache and broadcast system (uses some properties read above) Cache.initialize() Broadcast.initialize(false) + MapOutputTracker.initialize(false) + RDDCache.initialize(false) // Create our ClassLoader (using spark properties) and set it on this thread classLoader = createClassLoader() diff --git a/core/src/main/scala/spark/HadoopFile.scala b/core/src/main/scala/spark/HadoopFile.scala index f5e80d5432..0a7996c7bd 100644 --- a/core/src/main/scala/spark/HadoopFile.scala +++ b/core/src/main/scala/spark/HadoopFile.scala @@ -14,12 +14,13 @@ import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils /** A Spark split class that wraps around a Hadoop InputSplit */ -@serializable class HadoopSplit(@transient s: InputSplit) +@serializable class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit) extends Split { val inputSplit = new SerializableWritable[InputSplit](s) - // Hadoop gives each split a unique toString value, so use this as our ID - override def getId() = "HadoopSplit(" + inputSplit.toString + ")" + override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt + + override val index = idx } @@ -39,7 +40,10 @@ extends RDD[(K, V)](sc) { FileInputFormat.setInputPaths(conf, path) val inputFormat = createInputFormat(conf) val inputSplits = inputFormat.getSplits(conf, sc.numCores) - inputSplits.map(x => new HadoopSplit(x): Split).toArray + val array = new Array[Split] (inputSplits.size) + for (i <- 0 until inputSplits.size) + array(i) = new HadoopSplit(id, i, inputSplits(i)) + array } def createInputFormat(conf: JobConf): InputFormat[K, V] = { @@ -49,7 +53,7 @@ extends RDD[(K, V)](sc) { override def splits = splits_ - override def iterator(theSplit: Split) = new Iterator[(K, V)] { + override def compute(theSplit: Split) = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[HadoopSplit] var reader: RecordReader[K, V] = null @@ -97,6 +101,8 @@ extends RDD[(K, V)](sc) { val hadoopSplit = split.asInstanceOf[HadoopSplit] hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost") } + + override val dependencies: List[Dependency[_]] = Nil } diff --git a/core/src/main/scala/spark/LocalFileShuffle.scala b/core/src/main/scala/spark/LocalFileShuffle.scala index 367599cfb4..fd70c54c0c 100644 --- a/core/src/main/scala/spark/LocalFileShuffle.scala +++ b/core/src/main/scala/spark/LocalFileShuffle.scala @@ -13,6 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} * * TODO: Add support for compression when spark.compress is set to true. */ +/* @serializable class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { override def compute(input: RDD[(K, V)], @@ -90,7 +91,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { }) } } - +*/ object LocalFileShuffle extends Logging { private var initialized = false diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala index 20954a1224..0287082687 100644 --- a/core/src/main/scala/spark/LocalScheduler.scala +++ b/core/src/main/scala/spark/LocalScheduler.scala @@ -2,40 +2,35 @@ package spark import java.util.concurrent._ -import scala.collection.mutable.Map - /** * A simple Scheduler implementation that runs tasks locally in a thread pool. */ -private class LocalScheduler(threads: Int) extends Scheduler with Logging { +private class LocalScheduler(threads: Int) extends DAGScheduler with Logging { var threadPool: ExecutorService = Executors.newFixedThreadPool(threads, DaemonThreadFactory) override def start() {} override def waitForRegister() {} - - override def runTasks[T](tasks: Array[Task[T]])(implicit m: ClassManifest[T]) - : Array[T] = { - val futures = new Array[Future[TaskResult[T]]](tasks.length) - - for (i <- 0 until tasks.length) { - futures(i) = threadPool.submit(new Callable[TaskResult[T]]() { - def call(): TaskResult[T] = { + + override def submitTasks(tasks: Seq[Task[_]]) { + tasks.zipWithIndex.foreach { case (task, i) => + threadPool.submit(new Runnable { + def run() { logInfo("Running task " + i) try { // Serialize and deserialize the task so that accumulators are // changed to thread-local ones; this adds a bit of unnecessary - // overhead but matches how the Nexus Executor works + // overhead but matches how the Mesos Executor works Accumulators.clear val bytes = Utils.serialize(tasks(i)) logInfo("Size of task " + i + " is " + bytes.size + " bytes") - val task = Utils.deserialize[Task[T]]( + val deserializedTask = Utils.deserialize[Task[_]]( bytes, currentThread.getContextClassLoader) - val value = task.run + val result: Any = deserializedTask.run val accumUpdates = Accumulators.values logInfo("Finished task " + i) - new TaskResult[T](value, accumUpdates) + taskEnded(tasks(i), true, result, accumUpdates) } catch { case e: Exception => { // TODO: Do something nicer here @@ -47,26 +42,9 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging { } }) } - - val taskResults = futures.map(_.get) - for (result <- taskResults) - Accumulators.add(currentThread, result.accumUpdates) - return taskResults.map(_.value).toArray(m) } override def stop() {} override def numCores() = threads -} - - -/** - * A ThreadFactory that creates daemon threads - */ -private object DaemonThreadFactory extends ThreadFactory { - override def newThread(r: Runnable): Thread = { - val t = new Thread(r); - t.setDaemon(true) - return t - } -} +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala new file mode 100644 index 0000000000..ac62c6e411 --- /dev/null +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -0,0 +1,56 @@ +package spark + +import java.util.concurrent.ConcurrentHashMap + +import scala.actors._ +import scala.actors.Actor._ +import scala.actors.remote._ + +class MapOutputTracker extends DaemonActor with Logging { + def act() { + val port = System.getProperty("spark.master.port", "50501").toInt + RemoteActor.alive(port) + RemoteActor.register('MapOutputTracker, self) + logInfo("Started on port " + port) + } +} + +object MapOutputTracker { + var trackerActor: AbstractActor = null + + def initialize(isMaster: Boolean) { + if (isMaster) { + val tracker = new MapOutputTracker + tracker.start + trackerActor = tracker + } else { + val host = System.getProperty("spark.master.host") + val port = System.getProperty("spark.master.port").toInt + trackerActor = RemoteActor.select(Node(host, port), 'MapOutputTracker) + } + } + + private val serverUris = new ConcurrentHashMap[Int, Array[String]] + + def registerMapOutput(shuffleId: Int, numMaps: Int, mapId: Int, serverUri: String) { + var array = serverUris.get(shuffleId) + if (array == null) { + array = Array.fill[String](numMaps)(null) + serverUris.put(shuffleId, array) + } + array(mapId) = serverUri + } + + def registerMapOutputs(shuffleId: Int, locs: Array[String]) { + serverUris.put(shuffleId, Array[String]() ++ locs) + } + + def getServerUris(shuffleId: Int): Array[String] = { + // TODO: On remote node, fetch locations from master + serverUris.get(shuffleId) + } + + def getMapOutputUri(serverUri: String, shuffleId: Int, mapId: Int, reduceId: Int): String = { + "%s/shuffle/%s/%s/%s".format(serverUri, shuffleId, mapId, reduceId) + } +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala index 6a592d13c3..35ad552775 100644 --- a/core/src/main/scala/spark/MesosScheduler.scala +++ b/core/src/main/scala/spark/MesosScheduler.scala @@ -105,7 +105,7 @@ extends MScheduler with spark.Scheduler with Logging * The primary means to submit a job to the scheduler. Given a list of tasks, * runs them and returns an array of the results. */ - override def runTasks[T: ClassManifest](tasks: Array[Task[T]]): Array[T] = { + def runTasks[T: ClassManifest](tasks: Array[Task[T]]): Array[T] = { waitForRegister() val jobId = newJobId() val myJob = new SimpleJob(this, tasks, jobId) @@ -291,4 +291,9 @@ extends MScheduler with spark.Scheduler with Logging // Serialize the map as an array of (String, String) pairs return Utils.serialize(props.toArray) } + + override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U]) + : Array[U] = { + new Array[U](0) + } } diff --git a/core/src/main/scala/spark/NumberedSplitRDD.scala b/core/src/main/scala/spark/NumberedSplitRDD.scala deleted file mode 100644 index 7b12210d84..0000000000 --- a/core/src/main/scala/spark/NumberedSplitRDD.scala +++ /dev/null @@ -1,42 +0,0 @@ -package spark - -import mesos.SlaveOffer - - -/** - * An RDD that takes the splits of a parent RDD and gives them unique indexes. - * This is useful for a variety of shuffle implementations. - */ -class NumberedSplitRDD[T: ClassManifest](prev: RDD[T]) -extends RDD[(Int, Iterator[T])](prev.sparkContext) { - @transient val splits_ = { - prev.splits.zipWithIndex.map { - case (s, i) => new NumberedSplitRDDSplit(s, i): Split - }.toArray - } - - override def splits = splits_ - - override def preferredLocations(split: Split) = { - val nsplit = split.asInstanceOf[NumberedSplitRDDSplit] - prev.preferredLocations(nsplit.prev) - } - - override def iterator(split: Split) = { - val nsplit = split.asInstanceOf[NumberedSplitRDDSplit] - Iterator((nsplit.index, prev.iterator(nsplit.prev))) - } - - override def taskStarted(split: Split, slot: SlaveOffer) = { - val nsplit = split.asInstanceOf[NumberedSplitRDDSplit] - prev.taskStarted(nsplit.prev, slot) - } -} - - -/** - * A split in a NumberedSplitRDD. - */ -class NumberedSplitRDDSplit(val prev: Split, val index: Int) extends Split { - override def getId() = "NumberedSplitRDDSplit(%d)".format(index) -} diff --git a/core/src/main/scala/spark/ParallelArray.scala b/core/src/main/scala/spark/ParallelArray.scala index a01904d61c..e77bc3014f 100644 --- a/core/src/main/scala/spark/ParallelArray.scala +++ b/core/src/main/scala/spark/ParallelArray.scala @@ -17,8 +17,7 @@ extends Split { case _ => false } - override def getId() = - "ParallelArraySplit(arrayId %d, slice %d)".format(arrayId, slice) + override val index = slice } class ParallelArray[T: ClassManifest]( @@ -28,8 +27,6 @@ extends RDD[T](sc) { // the RDD chain it gets cached. It might be worthwhile to write the data to // a file in the DFS and read it in the split instead. - val id = ParallelArray.newId() - @transient val splits_ = { val slices = ParallelArray.slice(data, numSlices).toArray slices.indices.map(i => new ParallelArraySplit(id, i, slices(i))).toArray @@ -37,9 +34,11 @@ extends RDD[T](sc) { override def splits = splits_.asInstanceOf[Array[Split]] - override def iterator(s: Split) = s.asInstanceOf[ParallelArraySplit[T]].iterator + override def compute(s: Split) = s.asInstanceOf[ParallelArraySplit[T]].iterator override def preferredLocations(s: Split): Seq[String] = Nil + + override val dependencies: List[Dependency[_]] = Nil } private object ParallelArray { diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala new file mode 100644 index 0000000000..cc1ca74447 --- /dev/null +++ b/core/src/main/scala/spark/Partitioner.scala @@ -0,0 +1,22 @@ +package spark + +@serializable +abstract class Partitioner[K] { + def numPartitions: Int + def getPartition(key: K): Int +} + +class HashPartitioner[K](partitions: Int) extends Partitioner[K] { + def numPartitions = partitions + + def getPartition(key: K) = { + val mod = key.hashCode % partitions + if (mod < 0) mod + partitions else mod // Guard against negative hash codes + } + + override def equals(other: Any): Boolean = other match { + case h: HashPartitioner[_] => + h.numPartitions == numPartitions + case _ => false + } +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 39f2dc4458..df044bd6cf 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -1,5 +1,8 @@ package spark +import java.io.EOFException +import java.net.URL +import java.io.ObjectInputStream import java.util.concurrent.atomic.AtomicLong import java.util.HashSet import java.util.Random @@ -12,53 +15,106 @@ import SparkContext._ import mesos._ - @serializable abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { + // Methods that must be implemented by subclasses def splits: Array[Split] - def iterator(split: Split): Iterator[T] + def compute(split: Split): Iterator[T] def preferredLocations(split: Split): Seq[String] + val dependencies: List[Dependency[_]] + + // Optionally overridden by subclasses to specify how they are partitioned + val partitioner: Option[Partitioner[_]] = None + + def context = sc + + // Get a unique ID for this RDD + val id = sc.newRddId() + + // Variables relating to caching + private var shouldCache = false + + // Change this RDD's caching + def cache(): RDD[T] = { + shouldCache = true + this + } + + // Read this RDD; will read from cache if applicable, or otherwise compute + final def iterator(split: Split): Iterator[T] = { + if (shouldCache) { + RDDCache.getOrCompute[T](this, split) + } else { + compute(split) + } + } + + // Transformations + + def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) + + def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] = + new FlatMappedRDD(this, sc.clean(f)) + + def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f)) - def taskStarted(split: Split, slot: SlaveOffer) {} + def sample(withReplacement: Boolean, frac: Double, seed: Int): RDD[T] = + new SampledRDD(this, withReplacement, frac, seed) - def sparkContext = sc + def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other)) - def map[U: ClassManifest](f: T => U) = new MappedRDD(this, sc.clean(f)) - def filter(f: T => Boolean) = new FilteredRDD(this, sc.clean(f)) - def cache() = new CachedRDD(this) + def ++(other: RDD[T]): RDD[T] = this.union(other) - def sample(withReplacement: Boolean, frac: Double, seed: Int) = - new SampledRDD(this, withReplacement, frac, seed) + def glom(): RDD[Array[T]] = new SplitRDD(this) - def flatMap[U: ClassManifest](f: T => Traversable[U]) = - new FlatMappedRDD(this, sc.clean(f)) + def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = + new CartesianRDD(sc, this, other) + + def groupBy[K](func: T => K, numSplits: Int): RDD[(K, Seq[T])] = + this.map(t => (func(t), t)).groupByKey(numSplits) + + def groupBy[K](func: T => K): RDD[(K, Seq[T])] = + groupBy[K](func, sc.numCores) + // Parallel operations + def foreach(f: T => Unit) { val cleanF = sc.clean(f) - val tasks = splits.map(s => new ForeachTask(this, s, cleanF)).toArray - sc.runTaskObjects(tasks) + sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) } def collect(): Array[T] = { - val tasks = splits.map(s => new CollectTask(this, s)) - val results = sc.runTaskObjects(tasks) + val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } - def toArray(): Array[T] = collect() - def reduce(f: (T, T) => T): T = { val cleanF = sc.clean(f) - val tasks = splits.map(s => new ReduceTask(this, s, f)) + val reducePartition: Iterator[T] => Option[T] = iter => { + if (iter.hasNext) + Some(iter.reduceLeft(f)) + else + None + } + val options = sc.runJob(this, reducePartition) val results = new ArrayBuffer[T] - for (option <- sc.runTaskObjects(tasks); elem <- option) + for (opt <- options; elem <- opt) results += elem if (results.size == 0) throw new UnsupportedOperationException("empty collection") else return results.reduceLeft(f) } + + def count(): Long = { + sc.runJob(this, (iter: Iterator[T]) => iter.size.toLong).sum + } + + def toArray(): Array[T] = collect() + // TODO: Reimplement these to properly build any shuffle dependencies on + // the cluster rather than attempting to compute a partiton on the master + /* def take(num: Int): Array[T] = { if (num == 0) return new Array[T](0) @@ -75,268 +131,44 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { case Array(t) => t case _ => throw new UnsupportedOperationException("empty collection") } - - def count(): Long = { - try { - map(x => 1L).reduce(_+_) - } catch { - case e: UnsupportedOperationException => 0L // No elements in RDD - } - } - - def union(other: RDD[T]) = new UnionRDD(sc, Array(this, other)) - - def ++(other: RDD[T]) = this.union(other) - - def splitRdd() = new SplitRDD(this) - - def cartesian[U: ClassManifest](other: RDD[U]) = - new CartesianRDD(sc, this, other) - - def groupBy[K](func: T => K, numSplits: Int): RDD[(K, Seq[T])] = - this.map(t => (func(t), t)).groupByKey(numSplits) - - def groupBy[K](func: T => K): RDD[(K, Seq[T])] = - groupBy[K](func, sc.numCores) -} - -@serializable -abstract class RDDTask[U: ClassManifest, T: ClassManifest]( - val rdd: RDD[T], val split: Split) -extends Task[U] { - override def preferredLocations() = rdd.preferredLocations(split) - override def markStarted(slot: SlaveOffer) { rdd.taskStarted(split, slot) } -} - -class ForeachTask[T: ClassManifest]( - rdd: RDD[T], split: Split, func: T => Unit) -extends RDDTask[Unit, T](rdd, split) with Logging { - override def run() { - logInfo("Processing " + split) - rdd.iterator(split).foreach(func) - } -} - -class CollectTask[T]( - rdd: RDD[T], split: Split)(implicit m: ClassManifest[T]) -extends RDDTask[Array[T], T](rdd, split) with Logging { - override def run(): Array[T] = { - logInfo("Processing " + split) - rdd.iterator(split).toArray(m) - } -} - -class ReduceTask[T: ClassManifest]( - rdd: RDD[T], split: Split, f: (T, T) => T) -extends RDDTask[Option[T], T](rdd, split) with Logging { - override def run(): Option[T] = { - logInfo("Processing " + split) - val iter = rdd.iterator(split) - if (iter.hasNext) - Some(iter.reduceLeft(f)) - else - None - } + */ } class MappedRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], f: T => U) -extends RDD[U](prev.sparkContext) { +extends RDD[U](prev.context) { override def splits = prev.splits override def preferredLocations(split: Split) = prev.preferredLocations(split) - override def iterator(split: Split) = prev.iterator(split).map(f) - override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot) -} - -class FilteredRDD[T: ClassManifest]( - prev: RDD[T], f: T => Boolean) -extends RDD[T](prev.sparkContext) { - override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) - override def iterator(split: Split) = prev.iterator(split).filter(f) - override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot) + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = prev.iterator(split).map(f) } class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], f: T => Traversable[U]) -extends RDD[U](prev.sparkContext) { +extends RDD[U](prev.context) { override def splits = prev.splits override def preferredLocations(split: Split) = prev.preferredLocations(split) - override def iterator(split: Split) = - prev.iterator(split).toStream.flatMap(f).iterator - override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot) + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = prev.iterator(split).toStream.flatMap(f).iterator } -class SplitRDD[T: ClassManifest](prev: RDD[T]) -extends RDD[Array[T]](prev.sparkContext) { +class FilteredRDD[T: ClassManifest]( + prev: RDD[T], f: T => Boolean) +extends RDD[T](prev.context) { override def splits = prev.splits override def preferredLocations(split: Split) = prev.preferredLocations(split) - override def iterator(split: Split) = Iterator.fromArray(Array(prev.iterator(split).toArray)) - override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot) -} - - -@serializable class SeededSplit(val prev: Split, val seed: Int) extends Split { - override def getId() = - "SeededSplit(" + prev.getId() + ", seed " + seed + ")" -} - -class SampledRDD[T: ClassManifest]( - prev: RDD[T], withReplacement: Boolean, frac: Double, seed: Int) -extends RDD[T](prev.sparkContext) { - - @transient val splits_ = { val rg = new Random(seed); prev.splits.map(x => new SeededSplit(x, rg.nextInt)) } - - override def splits = splits_.asInstanceOf[Array[Split]] - - override def preferredLocations(split: Split) = prev.preferredLocations(split.asInstanceOf[SeededSplit].prev) - - override def iterator(splitIn: Split) = { - val split = splitIn.asInstanceOf[SeededSplit] - val rg = new Random(split.seed); - // Sampling with replacement (TODO: use reservoir sampling to make this more efficient?) - if (withReplacement) { - val oldData = prev.iterator(split.prev).toArray - val sampleSize = (oldData.size * frac).ceil.toInt - val sampledData = for (i <- 1 to sampleSize) yield oldData(rg.nextInt(oldData.size)) // all of oldData's indices are candidates, even if sampleSize < oldData.size - sampledData.iterator - } - // Sampling without replacement - else { - prev.iterator(split.prev).filter(x => (rg.nextDouble <= frac)) - } - } - - override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split.asInstanceOf[SeededSplit].prev, slot) + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = prev.iterator(split).filter(f) } - -class CachedRDD[T]( - prev: RDD[T])(implicit m: ClassManifest[T]) -extends RDD[T](prev.sparkContext) with Logging { - val id = CachedRDD.newId() - @transient val cacheLocs = Map[Split, List[String]]() - +class SplitRDD[T: ClassManifest](prev: RDD[T]) +extends RDD[Array[T]](prev.context) { override def splits = prev.splits - - override def preferredLocations(split: Split) = { - if (cacheLocs.contains(split)) - cacheLocs(split) - else - prev.preferredLocations(split) - } - - override def iterator(split: Split): Iterator[T] = { - val key = id + "::" + split.getId() - logInfo("CachedRDD split key is " + key) - val cache = CachedRDD.cache - val loading = CachedRDD.loading - val cachedVal = cache.get(key) - if (cachedVal != null) { - // Split is in cache, so just return its values - return Iterator.fromArray(cachedVal.asInstanceOf[Array[T]]) - } else { - // Mark the split as loading (unless someone else marks it first) - loading.synchronized { - if (loading.contains(key)) { - while (loading.contains(key)) { - try {loading.wait()} catch {case _ =>} - } - return Iterator.fromArray(cache.get(key).asInstanceOf[Array[T]]) - } else { - loading.add(key) - } - } - // If we got here, we have to load the split - logInfo("Loading and caching " + split) - val array = prev.iterator(split).toArray(m) - cache.put(key, array) - loading.synchronized { - loading.remove(key) - loading.notifyAll() - } - return Iterator.fromArray(array) - } - } - - override def taskStarted(split: Split, slot: SlaveOffer) { - val oldList = cacheLocs.getOrElse(split, Nil) - val host = slot.getHost - if (!oldList.contains(host)) - cacheLocs(split) = host :: oldList - } -} - -private object CachedRDD { - val nextId = new AtomicLong(0) // Generates IDs for cached RDDs (on master) - def newId() = nextId.getAndIncrement() - - // Stores map results for various splits locally (on workers) - val cache = Cache.newKeySpace() - - // Remembers which splits are currently being loaded (on workers) - val loading = new HashSet[String] -} - -@serializable -class UnionSplit[T: ClassManifest](rdd: RDD[T], split: Split) -extends Split { - def iterator() = rdd.iterator(split) - def preferredLocations() = rdd.preferredLocations(split) - override def getId() = "UnionSplit(" + split.getId() + ")" -} - -@serializable -class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]]) -extends RDD[T](sc) { - @transient val splits_ : Array[Split] = { - val splits: Seq[Split] = - for (rdd <- rdds; split <- rdd.splits) - yield new UnionSplit(rdd, split) - splits.toArray - } - - override def splits = splits_ - - override def iterator(s: Split): Iterator[T] = - s.asInstanceOf[UnionSplit[T]].iterator() - - override def preferredLocations(s: Split): Seq[String] = - s.asInstanceOf[UnionSplit[T]].preferredLocations() -} - -@serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split { - override def getId() = - "CartesianSplit(" + s1.getId() + ", " + s2.getId() + ")" + override def preferredLocations(split: Split) = prev.preferredLocations(split) + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = Iterator.fromArray(Array(prev.iterator(split).toArray)) } -@serializable -class CartesianRDD[T: ClassManifest, U:ClassManifest]( - sc: SparkContext, rdd1: RDD[T], rdd2: RDD[U]) -extends RDD[Pair[T, U]](sc) { - @transient val splits_ = { - // create the cross product split - rdd2.splits.map(y => rdd1.splits.map(x => new CartesianSplit(x, y))).flatten - } - - override def splits = splits_.asInstanceOf[Array[Split]] - - override def preferredLocations(split: Split) = { - val currSplit = split.asInstanceOf[CartesianSplit] - rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2) - } - - override def iterator(split: Split) = { - val currSplit = split.asInstanceOf[CartesianSplit] - for (x <- rdd1.iterator(currSplit.s1); y <- rdd2.iterator(currSplit.s2)) yield (x, y) - } - - override def taskStarted(split: Split, slot: SlaveOffer) = { - val currSplit = split.asInstanceOf[CartesianSplit] - rdd1.taskStarted(currSplit.s1, slot) - rdd2.taskStarted(currSplit.s2, slot) - } -} @serializable class PairRDDExtras[K, V](self: RDD[(K, V)]) { def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = { @@ -358,10 +190,9 @@ extends RDD[Pair[T, U]](sc) { numSplits: Int) : RDD[(K, C)] = { - val shufClass = Class.forName(System.getProperty( - "spark.shuffle.class", "spark.LocalFileShuffle")) - val shuf = shufClass.newInstance().asInstanceOf[Shuffle[K, V, C]] - shuf.compute(self, numSplits, createCombiner, mergeValue, mergeCombiners) + val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) + val partitioner = new HashPartitioner[K](numSplits) + new ShuffledRDD(self, aggregator, partitioner) } def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = { @@ -412,7 +243,7 @@ extends RDD[Pair[T, U]](sc) { join(other, numCores) } - def numCores = self.sparkContext.numCores + def numCores = self.context.numCores def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*) } diff --git a/core/src/main/scala/spark/RDDCache.scala b/core/src/main/scala/spark/RDDCache.scala new file mode 100644 index 0000000000..2f2ec9d237 --- /dev/null +++ b/core/src/main/scala/spark/RDDCache.scala @@ -0,0 +1,92 @@ +package spark + +import scala.actors._ +import scala.actors.Actor._ +import scala.actors.remote._ + +sealed trait CacheMessage +case class CacheEntryAdded(rddId: Int, partition: Int, host: String) +case class CacheEntryRemoved(rddId: Int, partition: Int, host: String) + +class RDDCacheTracker extends DaemonActor with Logging { + def act() { + val port = System.getProperty("spark.master.port", "50501").toInt + RemoteActor.alive(port) + RemoteActor.register('RDDCacheTracker, self) + logInfo("Started on port " + port) + + loop { + react { + case CacheEntryAdded(rddId, partition, host) => + logInfo("Cache entry added: %s, %s, %s".format(rddId, partition, host)) + + case CacheEntryRemoved(rddId, partition, host) => + logInfo("Cache entry removed: %s, %s, %s".format(rddId, partition, host)) + } + } + } +} + +import scala.collection.mutable.HashSet +private object RDDCache extends Logging { + // Stores map results for various splits locally + val cache = Cache.newKeySpace() + + // Remembers which splits are currently being loaded + val loading = new HashSet[(Int, Int)] + + // Tracker actor on the master, or remote reference to it on workers + var trackerActor: AbstractActor = null + + def initialize(isMaster: Boolean) { + if (isMaster) { + val tracker = new RDDCacheTracker + tracker.start + trackerActor = tracker + } else { + val host = System.getProperty("spark.master.host") + val port = System.getProperty("spark.master.port").toInt + trackerActor = RemoteActor.select(Node(host, port), 'RDDCacheTracker) + } + } + + // Gets or computes an RDD split + def getOrCompute[T](rdd: RDD[T], split: Split)(implicit m: ClassManifest[T]) + : Iterator[T] = { + val key = (rdd.id, split.index) + logInfo("CachedRDD split key is " + key) + val cache = RDDCache.cache + val loading = RDDCache.loading + val cachedVal = cache.get(key) + if (cachedVal != null) { + // Split is in cache, so just return its values + return Iterator.fromArray(cachedVal.asInstanceOf[Array[T]]) + } else { + // Mark the split as loading (unless someone else marks it first) + loading.synchronized { + if (loading.contains(key)) { + while (loading.contains(key)) { + try {loading.wait()} catch {case _ =>} + } + return Iterator.fromArray(cache.get(key).asInstanceOf[Array[T]]) + } else { + loading.add(key) + } + } + val host = System.getProperty("spark.hostname", Utils.localHostName) + trackerActor ! CacheEntryAdded(rdd.id, split.index, host) + // If we got here, we have to load the split + // TODO: fetch any remote copy of the split that may be available + // TODO: also notify the master that we're loading it + // TODO: also register a listener for when it unloads + logInfo("Computing and caching " + split) + val array = rdd.compute(split).toArray(m) + cache.put(key, array) + loading.synchronized { + loading.remove(key) + loading.notifyAll() + } + return Iterator.fromArray(array) + } + } +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/ResultTask.scala b/core/src/main/scala/spark/ResultTask.scala new file mode 100644 index 0000000000..3b63896175 --- /dev/null +++ b/core/src/main/scala/spark/ResultTask.scala @@ -0,0 +1,14 @@ +package spark + +class ResultTask[T, U](val stageId: Int, rdd: RDD[T], func: Iterator[T] => U, val partition: Int, locs: Seq[String]) +extends Task[U] { + val split = rdd.splits(partition) + + override def run: U = { + func(rdd.iterator(split)) + } + + override def preferredLocations: Seq[String] = locs + + override def toString = "ResultTask(" + stageId + ", " + partition + ")" +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/SampledRDD.scala b/core/src/main/scala/spark/SampledRDD.scala new file mode 100644 index 0000000000..2eeafedcdd --- /dev/null +++ b/core/src/main/scala/spark/SampledRDD.scala @@ -0,0 +1,36 @@ +package spark + +import java.util.Random + +@serializable class SampledRDDSplit(val prev: Split, val seed: Int) extends Split { + override val index = prev.index +} + +class SampledRDD[T: ClassManifest]( + prev: RDD[T], withReplacement: Boolean, frac: Double, seed: Int) +extends RDD[T](prev.context) { + + @transient val splits_ = { val rg = new Random(seed); prev.splits.map(x => new SampledRDDSplit(x, rg.nextInt)) } + + override def splits = splits_.asInstanceOf[Array[Split]] + + override val dependencies = List(new OneToOneDependency(prev)) + + override def preferredLocations(split: Split) = prev.preferredLocations(split.asInstanceOf[SampledRDDSplit].prev) + + override def compute(splitIn: Split) = { + val split = splitIn.asInstanceOf[SampledRDDSplit] + val rg = new Random(split.seed); + // Sampling with replacement (TODO: use reservoir sampling to make this more efficient?) + if (withReplacement) { + val oldData = prev.iterator(split.prev).toArray + val sampleSize = (oldData.size * frac).ceil.toInt + val sampledData = for (i <- 1 to sampleSize) yield oldData(rg.nextInt(oldData.size)) // all of oldData's indices are candidates, even if sampleSize < oldData.size + sampledData.iterator + } + // Sampling without replacement + else { + prev.iterator(split.prev).filter(x => (rg.nextDouble <= frac)) + } + } +} diff --git a/core/src/main/scala/spark/Scheduler.scala b/core/src/main/scala/spark/Scheduler.scala index b9f3128c82..fbcbb3e935 100644 --- a/core/src/main/scala/spark/Scheduler.scala +++ b/core/src/main/scala/spark/Scheduler.scala @@ -4,7 +4,8 @@ package spark private trait Scheduler { def start() def waitForRegister() - def runTasks[T](tasks: Array[Task[T]])(implicit m: ClassManifest[T]): Array[T] + //def runTasks[T](tasks: Array[Task[T]])(implicit m: ClassManifest[T]): Array[T] + def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U]): Array[U] def stop() def numCores(): Int } diff --git a/core/src/main/scala/spark/ShuffleMapTask.scala b/core/src/main/scala/spark/ShuffleMapTask.scala new file mode 100644 index 0000000000..287c64a9cc --- /dev/null +++ b/core/src/main/scala/spark/ShuffleMapTask.scala @@ -0,0 +1,38 @@ +package spark + +import java.io.FileOutputStream +import java.io.ObjectOutputStream +import scala.collection.mutable.HashMap + + +class ShuffleMapTask(val stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_], val partition: Int, locs: Seq[String]) +extends Task[String] { + val split = rdd.splits(partition) + + override def run: String = { + val numOutputSplits = dep.partitioner.numPartitions + val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]] + val partitioner = dep.partitioner.asInstanceOf[Partitioner[Any]] + val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[Any, Any]) + for (elem <- rdd.iterator(split)) { + val (k, v) = elem.asInstanceOf[(Any, Any)] + var bucketId = partitioner.getPartition(k) + val bucket = buckets(bucketId) + bucket(k) = bucket.get(k) match { + case Some(c) => aggregator.mergeValue(c, v) + case None => aggregator.createCombiner(v) + } + } + for (i <- 0 until numOutputSplits) { + val file = LocalFileShuffle.getOutputFile(dep.shuffleId, partition, i) + val out = new ObjectOutputStream(new FileOutputStream(file)) + buckets(i).foreach(pair => out.writeObject(pair)) + out.close() + } + return LocalFileShuffle.getServerUri + } + + override def preferredLocations: Seq[String] = locs + + override def toString = "ShuffleMapTask(%d, %d)".format(stageId, partition) +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala new file mode 100644 index 0000000000..826957a469 --- /dev/null +++ b/core/src/main/scala/spark/ShuffledRDD.scala @@ -0,0 +1,60 @@ +package spark + +import java.net.URL +import java.io.EOFException +import java.io.ObjectInputStream +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap + +class ShuffledRDDSplit(val idx: Int) extends Split { + override val index = idx + override def hashCode(): Int = idx +} + +class ShuffledRDD[K, V, C]( + parent: RDD[(K, V)], + aggregator: Aggregator[K, V, C], + part : Partitioner[K]) +extends RDD[(K, C)](parent.context) { + override val partitioner = Some(part) + + @transient val splits_ = + Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i)) + + override def splits = splits_ + + override def preferredLocations(split: Split) = Nil + + val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part) + override val dependencies = List(dep) + + override def compute(split: Split): Iterator[(K, C)] = { + val shuffleId = dep.shuffleId + val splitId = split.index + val splitsByUri = new HashMap[String, ArrayBuffer[Int]] + val serverUris = MapOutputTracker.getServerUris(shuffleId) + for ((serverUri, index) <- serverUris.zipWithIndex) { + splitsByUri.getOrElseUpdate(serverUri, ArrayBuffer()) += index + } + val combiners = new HashMap[K, C] + for ((serverUri, inputIds) <- Utils.shuffle(splitsByUri)) { + for (i <- inputIds) { + val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, splitId) + val inputStream = new ObjectInputStream(new URL(url).openStream()) + try { + while (true) { + val (k, c) = inputStream.readObject().asInstanceOf[(K, C)] + combiners(k) = combiners.get(k) match { + case Some(oldC) => aggregator.mergeCombiners(oldC, c) + case None => c + } + } + } catch { + case e: EOFException => {} + } + inputStream.close() + } + } + combiners.iterator + } +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index bf70b5fcb1..fda2ee3be7 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -1,6 +1,7 @@ package spark import java.io._ +import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer @@ -34,6 +35,8 @@ extends Logging { scheduler.start() Cache.initialize() Broadcast.initialize(true) + MapOutputTracker.initialize(true) + RDDCache.initialize(true) // Methods for creating RDDs @@ -42,6 +45,12 @@ extends Logging { def parallelize[T: ClassManifest](seq: Seq[T]): RDD[T] = parallelize(seq, numCores) + + def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int): RDD[T] = + parallelize(seq, numSlices) + + def makeRDD[T: ClassManifest](seq: Seq[T]): RDD[T] = + parallelize(seq, numCores) def textFile(path: String): RDD[String] = new HadoopTextFile(this, path) @@ -89,8 +98,8 @@ extends Logging { } /** Build the union of a list of RDDs. */ - def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] = - new UnionRDD(this, rdds) + //def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] = + // new UnionRDD(this, rdds) // Methods for creating shared variables @@ -126,19 +135,26 @@ extends Logging { None } - // Submit an array of tasks (passed as functions) to the scheduler - def runTasks[T: ClassManifest](tasks: Array[() => T]): Array[T] = { - runTaskObjects(tasks.map(f => new FunctionTask(f))) - } - // Run an array of spark.Task objects private[spark] def runTaskObjects[T: ClassManifest](tasks: Seq[Task[T]]) : Array[T] = { + return null; + /* logInfo("Running " + tasks.length + " tasks in parallel") val start = System.nanoTime val result = scheduler.runTasks(tasks.toArray) logInfo("Tasks finished in " + (System.nanoTime - start) / 1e9 + " s") return result + */ + } + + private[spark] def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U]) + : Array[U] = { + logInfo("Starting job...") + val start = System.nanoTime + val result = scheduler.runJob(rdd, func) + logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s") + result } // Clean a closure to make it ready to serialized and send to tasks @@ -150,6 +166,18 @@ extends Logging { // Get the number of cores available to run tasks (as reported by Scheduler) def numCores = scheduler.numCores + + private var nextShuffleId = new AtomicInteger(0) + + private[spark] def newShuffleId(): Int = { + nextShuffleId.getAndIncrement() + } + + private var nextRddId = new AtomicInteger(0) + + private[spark] def newRddId(): Int = { + nextRddId.getAndIncrement() + } } diff --git a/core/src/main/scala/spark/Split.scala b/core/src/main/scala/spark/Split.scala index 116cd16370..62bb5f82c5 100644 --- a/core/src/main/scala/spark/Split.scala +++ b/core/src/main/scala/spark/Split.scala @@ -5,9 +5,10 @@ package spark */ @serializable trait Split { /** - * Get a unique ID for this split which can be used, for example, to - * set up caches based on it. The ID should stay the same if we serialize - * and then deserialize the split. + * Get the split's index within its parent RDD */ - def getId(): String + val index: Int + + // A better default implementation of HashCode + override def hashCode(): Int = index } diff --git a/core/src/main/scala/spark/Stage.scala b/core/src/main/scala/spark/Stage.scala new file mode 100644 index 0000000000..82b70ce60d --- /dev/null +++ b/core/src/main/scala/spark/Stage.scala @@ -0,0 +1,34 @@ +package spark + +class Stage(val id: Int, val rdd: RDD[_], val shuffleDep: Option[ShuffleDependency[_,_,_]], val parents: List[Stage]) { + val isShuffleMap = shuffleDep != None + val numPartitions = rdd.splits.size + val outputLocs = Array.fill[List[String]](numPartitions)(Nil) + var numAvailableOutputs = 0 + + def isAvailable: Boolean = { + if (parents.size == 0 && !isShuffleMap) + true + else + numAvailableOutputs == numPartitions + } + + def addOutputLoc(partition: Int, host: String) { + val prevList = outputLocs(partition) + outputLocs(partition) = host :: prevList + if (prevList == Nil) + numAvailableOutputs += 1 + } + + def removeOutputLoc(partition: Int, host: String) { + val prevList = outputLocs(partition) + val newList = prevList - host + outputLocs(partition) = newList + if (prevList != Nil && newList == Nil) + numAvailableOutputs -= 1 + } + + override def toString = "Stage " + id + + override def hashCode(): Int = id +} diff --git a/core/src/main/scala/spark/UnionRDD.scala b/core/src/main/scala/spark/UnionRDD.scala new file mode 100644 index 0000000000..78297be4f3 --- /dev/null +++ b/core/src/main/scala/spark/UnionRDD.scala @@ -0,0 +1,43 @@ +package spark + +import scala.collection.mutable.ArrayBuffer + +@serializable +class UnionSplit[T: ClassManifest](idx: Int, rdd: RDD[T], split: Split) +extends Split { + def iterator() = rdd.iterator(split) + def preferredLocations() = rdd.preferredLocations(split) + override val index = idx +} + +@serializable +class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]]) +extends RDD[T](sc) { + @transient val splits_ : Array[Split] = { + val array = new Array[Split](rdds.map(_.splits.size).sum) + var pos = 0 + for (rdd <- rdds; split <- rdd.splits) { + array(pos) = new UnionSplit(pos, rdd, split) + pos += 1 + } + array + } + + override def splits = splits_ + + override val dependencies = { + val deps = new ArrayBuffer[Dependency[_]] + var pos = 0 + for ((rdd, index) <- rdds.zipWithIndex) { + deps += new RangeDependency(rdd, 0, pos, rdd.splits.size) + pos += rdd.splits.size + } + deps.toList + } + + override def compute(s: Split): Iterator[T] = + s.asInstanceOf[UnionSplit[T]].iterator() + + override def preferredLocations(s: Split): Seq[String] = + s.asInstanceOf[UnionSplit[T]].preferredLocations() +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index e333dd9c91..00cbbfd616 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -124,4 +124,11 @@ object Utils { // and join them into a string return bytes.map(b => (b.toInt + 256) % 256).mkString(".") } + + /** + * Get the local machine's hostname + */ + def localHostName(): String = { + return InetAddress.getLocalHost().getHostName + } } diff --git a/examples/src/main/scala/spark/examples/CpuHog.scala b/examples/src/main/scala/spark/examples/CpuHog.scala deleted file mode 100644 index 94b3709850..0000000000 --- a/examples/src/main/scala/spark/examples/CpuHog.scala +++ /dev/null @@ -1,26 +0,0 @@ -package spark.examples - -import spark._ - -object CpuHog { - def main(args: Array[String]) { - if (args.length != 3) { - System.err.println("Usage: CpuHog <master> <tasks> <threads_per_task>"); - System.exit(1) - } - val sc = new SparkContext(args(0), "CPU hog") - val tasks = args(1).toInt - val threads = args(2).toInt - def task { - for (i <- 0 until threads-1) { - new Thread() { - override def run { - while(true) {} - } - }.start() - } - while(true) {} - } - sc.runTasks(Array.make(tasks, () => task)) - } -} diff --git a/examples/src/main/scala/spark/examples/SleepJob.scala b/examples/src/main/scala/spark/examples/SleepJob.scala deleted file mode 100644 index 02673a5f88..0000000000 --- a/examples/src/main/scala/spark/examples/SleepJob.scala +++ /dev/null @@ -1,21 +0,0 @@ -package spark.examples - -import spark._ - -object SleepJob { - def main(args: Array[String]) { - if (args.length != 3) { - System.err.println("Usage: SleepJob <master> <tasks> <task_duration>"); - System.exit(1) - } - val sc = new SparkContext(args(0), "Sleep job") - val tasks = args(1).toInt - val duration = args(2).toInt - def task { - val start = System.currentTimeMillis - while (System.currentTimeMillis - start < duration * 1000L) - Thread.sleep(200) - } - sc.runTasks(Array.make(tasks, () => task)) - } -} |