From b2a3f24dde7a69587a5fea50d3e1e4e8f02a2dc3 Mon Sep 17 00:00:00 2001 From: koeninger Date: Sun, 21 Apr 2013 00:29:37 -0500 Subject: first attempt at an RDD to pull data from JDBC sources --- core/src/main/scala/spark/rdd/JdbcRDD.scala | 79 +++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 core/src/main/scala/spark/rdd/JdbcRDD.scala diff --git a/core/src/main/scala/spark/rdd/JdbcRDD.scala b/core/src/main/scala/spark/rdd/JdbcRDD.scala new file mode 100644 index 0000000000..c8a5d76012 --- /dev/null +++ b/core/src/main/scala/spark/rdd/JdbcRDD.scala @@ -0,0 +1,79 @@ +package spark.rdd + +import java.sql.{Connection, ResultSet} + +import spark.{Logging, Partition, RDD, SparkContext, TaskContext} +import spark.util.NextIterator + +/** + An RDD that executes an SQL query on a JDBC connection and reads results. + @param getConnection a function that returns an open Connection. + The RDD takes care of closing the connection. + @param sql the text of the query. + The query must contain two ? placeholders for parameters used to partition the results. + E.g. "select title, author from books where ? <= id and id <= ?" + @param lowerBound the minimum value of the first placeholder + @param upperBound the maximum value of the second placeholder + The lower and upper bounds are inclusive. + @param numPartitions the amount of parallelism. + Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2, + the query would be executed twice, once with (1, 10) and once with (11, 20) + @param mapRow a function from a ResultSet to a single row of the desired result type(s). + This should only call getInt, getString, etc; the RDD takes care of calling next. + The default maps a ResultSet to an array of Object. +*/ +class JdbcRDD[T: ClassManifest]( + sc: SparkContext, + getConnection: () => Connection, + sql: String, + lowerBound: Long, + upperBound: Long, + numPartitions: Int, + mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray) + extends RDD[T](sc, Nil) with Logging { + + override def getPartitions: Array[Partition] = + ParallelCollectionRDD.slice(lowerBound to upperBound, numPartitions). + filter(! _.isEmpty). + zipWithIndex. + map(x => new JdbcPartition(x._2, x._1.head, x._1.last)). + toArray + + override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] { + val part = thePart.asInstanceOf[JdbcPartition] + val conn = getConnection() + context.addOnCompleteCallback{ () => closeIfNeeded() } + val stmt = conn.prepareStatement(sql) + stmt.setLong(1, part.lower) + stmt.setLong(2, part.upper) + val rs = stmt.executeQuery() + + override def getNext: T = { + if (rs.next()) { + mapRow(rs) + } else { + finished = true + null.asInstanceOf[T] + } + } + + override def close() { + try { + logInfo("closing connection") + conn.close() + } catch { + case e: Exception => logWarning("Exception closing connection", e) + } + } + } + +} + +private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition { + override def index = idx +} + +object JdbcRDD { + val resultSetToObjectArray = (rs: ResultSet) => + Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1)) +} -- cgit v1.2.3 From dfac0aa5c2e5f46955b008b1e8d9ee5d8069efa5 Mon Sep 17 00:00:00 2001 From: koeninger Date: Mon, 22 Apr 2013 21:12:52 -0500 Subject: prevent mysql driver from pulling entire resultset into memory. explicitly close resultset and statement. --- core/src/main/scala/spark/rdd/JdbcRDD.scala | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/rdd/JdbcRDD.scala b/core/src/main/scala/spark/rdd/JdbcRDD.scala index c8a5d76012..4c3054465c 100644 --- a/core/src/main/scala/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/spark/rdd/JdbcRDD.scala @@ -15,7 +15,7 @@ import spark.util.NextIterator @param lowerBound the minimum value of the first placeholder @param upperBound the maximum value of the second placeholder The lower and upper bounds are inclusive. - @param numPartitions the amount of parallelism. + @param numPartitions the number of partitions. Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2, the query would be executed twice, once with (1, 10) and once with (11, 20) @param mapRow a function from a ResultSet to a single row of the desired result type(s). @@ -40,10 +40,15 @@ class JdbcRDD[T: ClassManifest]( toArray override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] { + context.addOnCompleteCallback{ () => closeIfNeeded() } val part = thePart.asInstanceOf[JdbcPartition] val conn = getConnection() - context.addOnCompleteCallback{ () => closeIfNeeded() } - val stmt = conn.prepareStatement(sql) + val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) + // force mysql driver to stream rather than pull entire resultset into memory + if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) { + stmt.setFetchSize(Integer.MIN_VALUE) + logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ") + } stmt.setLong(1, part.lower) stmt.setLong(2, part.upper) val rs = stmt.executeQuery() @@ -59,8 +64,18 @@ class JdbcRDD[T: ClassManifest]( override def close() { try { - logInfo("closing connection") - conn.close() + if (null != rs && ! rs.isClosed()) rs.close() + } catch { + case e: Exception => logWarning("Exception closing resultset", e) + } + try { + if (null != stmt && ! stmt.isClosed()) stmt.close() + } catch { + case e: Exception => logWarning("Exception closing statement", e) + } + try { + if (null != conn && ! stmt.isClosed()) conn.close() + logInfo("closed connection") } catch { case e: Exception => logWarning("Exception closing connection", e) } -- cgit v1.2.3 From d960e7e0f83385d8f43129d53c189b3036936daf Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Wed, 1 May 2013 20:24:00 +0530 Subject: a) Add support for hyper local scheduling - specific to a host + port - before trying host local scheduling. b) Add some fixes to test code to ensure it passes (and fixes some other issues). c) Fix bug in task scheduling which incorrectly used availableCores instead of all cores on the node. --- core/src/main/scala/spark/SparkEnv.scala | 21 ++- core/src/main/scala/spark/Utils.scala | 17 ++- .../main/scala/spark/deploy/worker/Worker.scala | 5 +- core/src/main/scala/spark/rdd/BlockRDD.scala | 9 +- core/src/main/scala/spark/rdd/ZippedRDD.scala | 2 + .../main/scala/spark/scheduler/DAGScheduler.scala | 7 +- .../main/scala/spark/scheduler/ResultTask.scala | 4 +- .../scala/spark/scheduler/ShuffleMapTask.scala | 4 +- .../spark/scheduler/cluster/ClusterScheduler.scala | 64 ++++++++- .../spark/scheduler/cluster/TaskSetManager.scala | 155 ++++++++++++++------- .../main/scala/spark/storage/BlockManager.scala | 43 ++++-- .../test/scala/spark/MapOutputTrackerSuite.scala | 6 +- .../scala/spark/scheduler/DAGSchedulerSuite.scala | 4 +- .../scala/spark/storage/BlockManagerSuite.scala | 2 + 14 files changed, 244 insertions(+), 99 deletions(-) diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index ffb40bab3a..5b4a464010 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -29,7 +29,11 @@ class SparkEnv ( val blockManager: BlockManager, val connectionManager: ConnectionManager, val httpFileServer: HttpFileServer, - val sparkFilesDir: String + val sparkFilesDir: String, + // To be set only as part of initialization of SparkContext. + // (executorId, defaultHostPort) => executorHostPort + // If executorId is NOT found, return defaultHostPort + var executorIdToHostPort: (String, String) => String ) { def stop() { @@ -44,6 +48,17 @@ class SparkEnv ( // down, but let's call it anyway in case it gets fixed in a later release actorSystem.awaitTermination() } + + + def resolveExecutorIdToHostPort(executorId: String, defaultHostPort: String): String = { + val env = SparkEnv.get + if (env.executorIdToHostPort == null) { + // default to using host, not host port. Relevant to non cluster modes. + return defaultHostPort + } + + env.executorIdToHostPort(executorId, defaultHostPort) + } } object SparkEnv extends Logging { @@ -162,7 +177,7 @@ object SparkEnv extends Logging { blockManager, connectionManager, httpFileServer, - sparkFilesDir) + sparkFilesDir, + null) } - } diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 9f48cbe490..279daf04ed 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -357,21 +357,26 @@ private object Utils extends Logging { Utils.logErrorWithStack("Unexpected to have port " + port + " which is not valid in " + hostPort + ". Message " + message) } } + + // Used by DEBUG code : remove when all testing done + def logErrorWithStack(msg: String) { + try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } } + // temp code for debug + System.exit(-1) + } */ // Once testing is complete in various modes, replace with this ? def checkHost(host: String, message: String = "") {} def checkHostPort(hostPort: String, message: String = "") {} - def getUserNameFromEnvironment(): String = { - SparkHadoopUtil.getUserNameFromEnvironment - } - // Used by DEBUG code : remove when all testing done def logErrorWithStack(msg: String) { try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } } - // temp code for debug - System.exit(-1) + } + + def getUserNameFromEnvironment(): String = { + SparkHadoopUtil.getUserNameFromEnvironment } // Typically, this will be of order of number of nodes in cluster diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 1a7da0f7bf..3dc2207170 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -54,7 +54,10 @@ private[spark] class Worker( def createWorkDir() { workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work")) try { - if ( (workDir.exists() && !workDir.isDirectory) || (!workDir.exists() && !workDir.mkdirs()) ) { + // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs() + // So attempting to create and then check if directory was created or not. + workDir.mkdirs() + if ( !workDir.exists() || !workDir.isDirectory) { logError("Failed to create work directory " + workDir) System.exit(1) } diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala index 7348c4f15b..719d4bf03e 100644 --- a/core/src/main/scala/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/spark/rdd/BlockRDD.scala @@ -1,7 +1,7 @@ package spark.rdd -import scala.collection.mutable.HashMap import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext} +import spark.storage.BlockManager private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition { val index = idx @@ -11,12 +11,7 @@ private[spark] class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String]) extends RDD[T](sc, Nil) { - @transient lazy val locations_ = { - val blockManager = SparkEnv.get.blockManager - /*val locations = blockIds.map(id => blockManager.getLocations(id))*/ - val locations = blockManager.getLocations(blockIds) - HashMap(blockIds.zip(locations):_*) - } + @transient lazy val locations_ = BlockManager.blockIdsToExecutorLocations(blockIds, SparkEnv.get) override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i => { new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition] diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala index 35b0e06785..e80250a99b 100644 --- a/core/src/main/scala/spark/rdd/ZippedRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala @@ -49,6 +49,8 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( override def getPreferredLocations(s: Partition): Seq[String] = { val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions + // TODO: becomes complicated - intersect on hostPort if available, else fallback to host (removing intersected hostPort's). + // Since I am not very sure about this RDD, leaving it to others to comment better ! rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2)) } diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 1440b93e65..8072c60bb7 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -12,7 +12,7 @@ import spark.executor.TaskMetrics import spark.partial.ApproximateActionListener import spark.partial.ApproximateEvaluator import spark.partial.PartialResult -import spark.storage.BlockManagerMaster +import spark.storage.{BlockManager, BlockManagerMaster} import spark.util.{MetadataCleaner, TimeStampedHashMap} /** @@ -117,9 +117,8 @@ class DAGScheduler( private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = { if (!cacheLocs.contains(rdd.id)) { val blockIds = rdd.partitions.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray - cacheLocs(rdd.id) = blockManagerMaster.getLocations(blockIds).map { - locations => locations.map(_.hostPort).toList - }.toArray + val locs = BlockManager.blockIdsToExecutorLocations(blockIds, env) + cacheLocs(rdd.id) = blockIds.map(locs.getOrElse(_, Nil)) } cacheLocs(rdd.id) } diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala index 89dc6640b2..c43cbe5ed4 100644 --- a/core/src/main/scala/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/spark/scheduler/ResultTask.scala @@ -71,11 +71,11 @@ private[spark] class ResultTask[T, U]( } // data locality is on a per host basis, not hyper specific to container (host:port). Unique on set of hosts. - val preferredLocs: Seq[String] = if (locs == null) Nil else locs.map(loc => Utils.parseHostPort(loc)._1).toSet.toSeq + private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq { // DEBUG code - preferredLocs.foreach (host => Utils.checkHost(host, "preferredLocs : " + preferredLocs)) + preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs)) } override def run(attemptId: Long): U = { diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 7dc6da4573..0b848af2f3 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -85,11 +85,11 @@ private[spark] class ShuffleMapTask( protected def this() = this(0, null, null, 0, null) // data locality is on a per host basis, not hyper specific to container (host:port). Unique on set of hosts. - private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.map(loc => Utils.parseHostPort(loc)._1).toSet.toSeq + private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq { // DEBUG code - preferredLocs.foreach (host => Utils.checkHost(host, "preferredLocs : " + preferredLocs)) + preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs)) } var split = if (rdd == null) { diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index a9d9c5e44c..3c72ce4206 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -79,9 +79,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // The set of executors we have on each host; this is used to compute hostsAlive, which // in turn is used to decide when we can attain data locality on a given host - val executorsByHostPort = new HashMap[String, HashSet[String]] + private val executorsByHostPort = new HashMap[String, HashSet[String]] - val executorIdToHostPort = new HashMap[String, String] + private val executorIdToHostPort = new HashMap[String, String] // JAR server, if any JARs were added by the user to the SparkContext var jarServer: HttpServer = null @@ -102,6 +102,14 @@ private[spark] class ClusterScheduler(val sc: SparkContext) def initialize(context: SchedulerBackend) { backend = context + // resolve executorId to hostPort mapping. + def executorToHostPort(executorId: String, defaultHostPort: String): String = { + executorIdToHostPort.getOrElse(executorId, defaultHostPort) + } + + // Unfortunately, this means that SparkEnv is indirectly referencing ClusterScheduler + // Will that be a design violation ? + SparkEnv.get.executorIdToHostPort = executorToHostPort } def newTaskId(): Long = nextTaskId.getAndIncrement() @@ -209,13 +217,30 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } // Build a list of tasks to assign to each slave val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores)) + // merge availableCpus into hostToAvailableCpus block ? val availableCpus = offers.map(o => o.cores).toArray + val hostToAvailableCpus = { + val map = new HashMap[String, Int]() + for (offer <- offers) { + val hostPort = offer.hostPort + val cores = offer.cores + // DEBUG code + Utils.checkHostPort(hostPort) + + val host = Utils.parseHostPort(hostPort)._1 + + map.put(host, map.getOrElse(host, 0) + cores) + } + + map + } var launchedTask = false for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) { // Split offers based on host local, rack local and off-rack tasks. + val hyperLocalOffers = new HashMap[String, ArrayBuffer[Int]]() val hostLocalOffers = new HashMap[String, ArrayBuffer[Int]]() val rackLocalOffers = new HashMap[String, ArrayBuffer[Int]]() val otherOffers = new HashMap[String, ArrayBuffer[Int]]() @@ -224,8 +249,17 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val hostPort = offers(i).hostPort // DEBUG code Utils.checkHostPort(hostPort) + + val numHyperLocalTasks = math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i))) + if (numHyperLocalTasks > 0){ + val list = hyperLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int]) + for (j <- 0 until numHyperLocalTasks) list += i + } + val host = Utils.parseHostPort(hostPort)._1 - val numHostLocalTasks = math.max(0, math.min(manager.numPendingTasksForHost(hostPort), availableCpus(i))) + val numHostLocalTasks = math.max(0, + // Remove hyper local tasks (which are also host local btw !) from this + math.min(manager.numPendingTasksForHost(hostPort) - numHyperLocalTasks, hostToAvailableCpus(host))) if (numHostLocalTasks > 0){ val list = hostLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int]) for (j <- 0 until numHostLocalTasks) list += i @@ -233,7 +267,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val numRackLocalTasks = math.max(0, // Remove host local tasks (which are also rack local btw !) from this - math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numHostLocalTasks, availableCpus(i))) + math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numHyperLocalTasks - numHostLocalTasks, hostToAvailableCpus(host))) if (numRackLocalTasks > 0){ val list = rackLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int]) for (j <- 0 until numRackLocalTasks) list += i @@ -246,12 +280,19 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } val offersPriorityList = new ArrayBuffer[Int]( - hostLocalOffers.size + rackLocalOffers.size + otherOffers.size) - // First host local, then rack, then others + hyperLocalOffers.size + hostLocalOffers.size + rackLocalOffers.size + otherOffers.size) + + // First hyper local, then host local, then rack, then others + + // numHostLocalOffers contains count of both hyper local and host offers. val numHostLocalOffers = { + val hyperLocalPriorityList = ClusterScheduler.prioritizeContainers(hyperLocalOffers) + offersPriorityList ++= hyperLocalPriorityList + val hostLocalPriorityList = ClusterScheduler.prioritizeContainers(hostLocalOffers) offersPriorityList ++= hostLocalPriorityList - hostLocalPriorityList.size + + hyperLocalPriorityList.size + hostLocalPriorityList.size } val numRackLocalOffers = { val rackLocalPriorityList = ClusterScheduler.prioritizeContainers(rackLocalOffers) @@ -477,6 +518,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } def getExecutorsAliveOnHost(host: String): Option[Set[String]] = { + Utils.checkHost(host) + val retval = hostToAliveHostPorts.get(host) if (retval.isDefined) { return Some(retval.get.toSet) @@ -485,6 +528,13 @@ private[spark] class ClusterScheduler(val sc: SparkContext) None } + def isExecutorAliveOnHostPort(hostPort: String): Boolean = { + // Even if hostPort is a host, it does not matter - it is just a specific check. + // But we do have to ensure that only hostPort get into hostPortsAlive ! + // So no check against Utils.checkHostPort + hostPortsAlive.contains(hostPort) + } + // By default, rack is unknown def getRackForHost(value: String): Option[String] = None diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 27e713e2c4..f5c0058554 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -13,14 +13,18 @@ import spark.scheduler._ import spark.TaskState.TaskState import java.nio.ByteBuffer -private[spark] object TaskLocality extends Enumeration("HOST_LOCAL", "RACK_LOCAL", "ANY") with Logging { +private[spark] object TaskLocality extends Enumeration("HYPER_LOCAL", "HOST_LOCAL", "RACK_LOCAL", "ANY") with Logging { - val HOST_LOCAL, RACK_LOCAL, ANY = Value + // hyper local is expected to be used ONLY within tasksetmanager for now. + val HYPER_LOCAL, HOST_LOCAL, RACK_LOCAL, ANY = Value type TaskLocality = Value def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = { + // Must not be the constraint. + assert (constraint != TaskLocality.HYPER_LOCAL) + constraint match { case TaskLocality.HOST_LOCAL => condition == TaskLocality.HOST_LOCAL case TaskLocality.RACK_LOCAL => condition == TaskLocality.HOST_LOCAL || condition == TaskLocality.RACK_LOCAL @@ -32,7 +36,11 @@ private[spark] object TaskLocality extends Enumeration("HOST_LOCAL", "RACK_LOCAL def parse(str: String): TaskLocality = { // better way to do this ? try { - TaskLocality.withName(str) + val retval = TaskLocality.withName(str) + // Must not specify HYPER_LOCAL ! + assert (retval != TaskLocality.HYPER_LOCAL) + + retval } catch { case nEx: NoSuchElementException => { logWarning("Invalid task locality specified '" + str + "', defaulting to HOST_LOCAL"); @@ -133,35 +141,55 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe addPendingTask(i) } - private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler, rackLocal: Boolean = false): ArrayBuffer[String] = { - // DEBUG code - _taskPreferredLocations.foreach(h => Utils.checkHost(h, "taskPreferredLocation " + _taskPreferredLocations)) - - val taskPreferredLocations = if (! rackLocal) _taskPreferredLocations else { - // Expand set to include all 'seen' rack local hosts. - // This works since container allocation/management happens within master - so any rack locality information is updated in msater. - // Best case effort, and maybe sort of kludge for now ... rework it later ? - val hosts = new HashSet[String] - _taskPreferredLocations.foreach(h => { - val rackOpt = scheduler.getRackForHost(h) - if (rackOpt.isDefined) { - val hostsOpt = scheduler.getCachedHostsForRack(rackOpt.get) - if (hostsOpt.isDefined) { - hosts ++= hostsOpt.get + // Note that it follows the hierarchy. + // if we search for HOST_LOCAL, the output will include HYPER_LOCAL and + // if we search for RACK_LOCAL, it will include HYPER_LOCAL & HOST_LOCAL + private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler, + taskLocality: TaskLocality.TaskLocality): HashSet[String] = { + + if (TaskLocality.HYPER_LOCAL == taskLocality) { + // straight forward comparison ! Special case it. + val retval = new HashSet[String]() + scheduler.synchronized { + for (location <- _taskPreferredLocations) { + if (scheduler.isExecutorAliveOnHostPort(location)) { + retval += location } } + } - // Ensure that irrespective of what scheduler says, host is always added ! - hosts += h - }) - - hosts + return retval } - val retval = new ArrayBuffer[String] + val taskPreferredLocations = + if (TaskLocality.HOST_LOCAL == taskLocality) { + _taskPreferredLocations + } else { + assert (TaskLocality.RACK_LOCAL == taskLocality) + // Expand set to include all 'seen' rack local hosts. + // This works since container allocation/management happens within master - so any rack locality information is updated in msater. + // Best case effort, and maybe sort of kludge for now ... rework it later ? + val hosts = new HashSet[String] + _taskPreferredLocations.foreach(h => { + val rackOpt = scheduler.getRackForHost(h) + if (rackOpt.isDefined) { + val hostsOpt = scheduler.getCachedHostsForRack(rackOpt.get) + if (hostsOpt.isDefined) { + hosts ++= hostsOpt.get + } + } + + // Ensure that irrespective of what scheduler says, host is always added ! + hosts += h + }) + + hosts + } + + val retval = new HashSet[String] scheduler.synchronized { for (prefLocation <- taskPreferredLocations) { - val aliveLocationsOpt = scheduler.getExecutorsAliveOnHost(prefLocation) + val aliveLocationsOpt = scheduler.getExecutorsAliveOnHost(Utils.parseHostPort(prefLocation)._1) if (aliveLocationsOpt.isDefined) { retval ++= aliveLocationsOpt.get } @@ -175,29 +203,37 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe private def addPendingTask(index: Int) { // We can infer hostLocalLocations from rackLocalLocations by joining it against tasks(index).preferredLocations (with appropriate // hostPort <-> host conversion). But not doing it for simplicity sake. If this becomes a performance issue, modify it. - val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched) - val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, true) + val hyperLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HYPER_LOCAL) + val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL) + val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL) if (rackLocalLocations.size == 0) { // Current impl ensures this. + assert (hyperLocalLocations.size == 0) assert (hostLocalLocations.size == 0) pendingTasksWithNoPrefs += index } else { - // host locality - for (hostPort <- hostLocalLocations) { + // hyper local locality + for (hostPort <- hyperLocalLocations) { // DEBUG Code Utils.checkHostPort(hostPort) val hostPortList = pendingTasksForHostPort.getOrElseUpdate(hostPort, ArrayBuffer()) hostPortList += index + } + + // host locality (includes hyper local) + for (hostPort <- hostLocalLocations) { + // DEBUG Code + Utils.checkHostPort(hostPort) val host = Utils.parseHostPort(hostPort)._1 val hostList = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer()) hostList += index } - // rack locality + // rack locality (includes hyper local and host local) for (rackLocalHostPort <- rackLocalLocations) { // DEBUG Code Utils.checkHostPort(rackLocalHostPort) @@ -233,6 +269,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe pendingRackLocalTasksForHost.getOrElse(host, ArrayBuffer()) } + // Number of pending tasks for a given host Port (which would be hyper local) + def numPendingTasksForHostPort(hostPort: String): Int = { + getPendingTasksForHostPort(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) ) + } + // Number of pending tasks for a given host (which would be data local) def numPendingTasksForHost(hostPort: String): Int = { getPendingTasksForHost(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) ) @@ -270,7 +311,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe if (speculatableTasks.size > 0) { val localTask = speculatableTasks.find { index => - val locations = findPreferredLocations(tasks(index).preferredLocations, sched) + val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL) val attemptLocs = taskAttempts(index).map(_.hostPort) (locations.size == 0 || locations.contains(hostPort)) && !attemptLocs.contains(hostPort) } @@ -284,7 +325,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { val rackTask = speculatableTasks.find { index => - val locations = findPreferredLocations(tasks(index).preferredLocations, sched, true) + val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL) val attemptLocs = taskAttempts(index).map(_.hostPort) locations.contains(hostPort) && !attemptLocs.contains(hostPort) } @@ -311,6 +352,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // Dequeue a pending task for a given node and return its index. // If localOnly is set to false, allow non-local tasks as well. private def findTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = { + val hyperLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort)) + if (hyperLocalTask != None) { + return hyperLocalTask + } + val localTask = findTaskFromList(getPendingTasksForHost(hostPort)) if (localTask != None) { return localTask @@ -341,30 +387,31 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe return findSpeculativeTask(hostPort, locality) } - // Does a host count as a preferred location for a task? This is true if - // either the task has preferred locations and this host is one, or it has - // no preferred locations (in which we still count the launch as preferred). - private def isPreferredLocation(task: Task[_], hostPort: String): Boolean = { + private def isHyperLocalLocation(task: Task[_], hostPort: String): Boolean = { + Utils.checkHostPort(hostPort) + val locs = task.preferredLocations - // DEBUG code - locs.foreach(h => Utils.checkHost(h, "preferredLocation " + locs)) - if (locs.contains(hostPort) || locs.isEmpty) return true + locs.contains(hostPort) + } + + private def isHostLocalLocation(task: Task[_], hostPort: String): Boolean = { + val locs = task.preferredLocations + + // If no preference, consider it as host local + if (locs.isEmpty) return true val host = Utils.parseHostPort(hostPort)._1 - locs.contains(host) + locs.find(h => Utils.parseHostPort(h)._1 == host).isDefined } // Does a host count as a rack local preferred location for a task? (assumes host is NOT preferred location). // This is true if either the task has preferred locations and this host is one, or it has // no preferred locations (in which we still count the launch as preferred). - def isRackLocalLocation(task: Task[_], hostPort: String): Boolean = { + private def isRackLocalLocation(task: Task[_], hostPort: String): Boolean = { val locs = task.preferredLocations - // DEBUG code - locs.foreach(h => Utils.checkHost(h, "preferredLocation " + locs)) - val preferredRacks = new HashSet[String]() for (preferredHost <- locs) { val rack = sched.getRackForHost(preferredHost) @@ -395,8 +442,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe val task = tasks(index) val taskId = sched.newTaskId() // Figure out whether this should count as a preferred launch - val taskLocality = if (isPreferredLocation(task, hostPort)) TaskLocality.HOST_LOCAL else - if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else TaskLocality.ANY + val taskLocality = + if (isHyperLocalLocation(task, hostPort)) TaskLocality.HYPER_LOCAL else + if (isHostLocalLocation(task, hostPort)) TaskLocality.HOST_LOCAL else + if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else + TaskLocality.ANY val prefStr = taskLocality.toString logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format( taskSet.id, index, taskId, execId, hostPort, prefStr)) @@ -552,15 +602,22 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe def executorLost(execId: String, hostPort: String) { logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id) + // If some task has preferred locations only on hostname, and there are no more executors there, // put it in the no-prefs list to avoid the wait from delay scheduling - for (index <- getPendingTasksForHostPort(hostPort)) { - val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, true) + + // host local tasks - should we push this to rack local or no pref list ? For now, preserving behavior and moving to + // no prefs list. Note, this was done due to impliations related to 'waiting' for data local tasks, etc. + // Note: NOT checking hyper local list - since host local list is super set of that. We need to ad to no prefs only if + // there is no host local node for the task (not if there is no hyper local node for the task) + for (index <- getPendingTasksForHost(Utils.parseHostPort(hostPort)._1)) { + // val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL) + val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL) if (newLocs.isEmpty) { - assert (findPreferredLocations(tasks(index).preferredLocations, sched).isEmpty) pendingTasksWithNoPrefs += index } } + // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage if (tasks(0).isInstanceOf[ShuffleMapTask]) { for ((tid, info) <- taskInfos if info.executorId == execId) { diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 6e861ac734..7a0d6ced3e 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -4,7 +4,7 @@ import java.io.{InputStream, OutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} +import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet, Queue} import scala.collection.JavaConversions._ import akka.actor.{ActorSystem, Cancellable, Props} @@ -271,23 +271,12 @@ class BlockManager( } - /** - * Get locations of the block. - */ - def getLocations(blockId: String): Seq[String] = { - val startTimeMs = System.currentTimeMillis - var managers = master.getLocations(blockId) - val locations = managers.map(_.hostPort) - logDebug("Got block locations in " + Utils.getUsedTimeMs(startTimeMs)) - return locations - } - /** * Get locations of an array of blocks. */ - def getLocations(blockIds: Array[String]): Array[Seq[String]] = { + def getLocationBlockIds(blockIds: Array[String]): Array[Seq[BlockManagerId]] = { val startTimeMs = System.currentTimeMillis - val locations = master.getLocations(blockIds).map(_.map(_.hostPort).toSeq).toArray + val locations = master.getLocations(blockIds).toArray logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs)) return locations } @@ -947,6 +936,32 @@ object BlockManager extends Logging { } } } + + def blockIdsToExecutorLocations(blockIds: Array[String], env: SparkEnv): HashMap[String, List[String]] = { + val blockManager = env.blockManager + /*val locations = blockIds.map(id => blockManager.getLocations(id))*/ + val locationBlockIds = blockManager.getLocationBlockIds(blockIds) + + // Convert from block master locations to executor locations (we need that for task scheduling) + val executorLocations = new HashMap[String, List[String]]() + for (i <- 0 until blockIds.length) { + val blockId = blockIds(i) + val blockLocations = locationBlockIds(i) + + val executors = new HashSet[String]() + + for (bkLocation <- blockLocations) { + val executorHostPort = env.resolveExecutorIdToHostPort(bkLocation.executorId, bkLocation.host) + executors += executorHostPort + // logInfo("bkLocation = " + bkLocation + ", executorHostPort = " + executorHostPort) + } + + executorLocations.put(blockId, executors.toSeq.toList) + } + + executorLocations + } + } class BlockFetcherIterator( diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala index 3abc584b6a..875975ca43 100644 --- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala @@ -80,12 +80,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { } test("remote fetch") { - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", "localhost", 0) + val hostname = "localhost" + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0) + System.setProperty("spark.hostPort", hostname + ":" + boundPort) val masterTracker = new MapOutputTracker() masterTracker.trackerActor = actorSystem.actorOf( Props(new MapOutputTrackerActor(masterTracker)), "MapOutputTracker") - val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", "localhost", 0) + val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0) val slaveTracker = new MapOutputTracker() slaveTracker.trackerActor = slaveSystem.actorFor( "akka://spark@localhost:" + boundPort + "/user/MapOutputTracker") diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala index c0f8986de8..16554eac6e 100644 --- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -385,12 +385,12 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assert(results === Map(0 -> 42)) } - /** Assert that the supplied TaskSet has exactly the given preferredLocations. */ + /** Assert that the supplied TaskSet has exactly the given preferredLocations. Note, converts taskSet's locations to host only. */ private def assertLocations(taskSet: TaskSet, locations: Seq[Seq[String]]) { assert(locations.size === taskSet.tasks.size) for ((expectLocs, taskLocs) <- taskSet.tasks.map(_.preferredLocations).zip(locations)) { - assert(expectLocs === taskLocs) + assert(expectLocs.map(loc => spark.Utils.parseHostPort(loc)._1) === taskLocs) } } diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index b8c0f6fb76..3fc2825255 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -41,6 +41,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true") val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() + // Set some value ... + System.setProperty("spark.hostPort", spark.Utils.localHostName() + ":" + 1111) } after { -- cgit v1.2.3 From 27764a00f40391b94fa05abb11484c442607f6f7 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Wed, 1 May 2013 20:56:05 +0530 Subject: Fix some npe introduced accidentally --- .../main/scala/spark/scheduler/DAGScheduler.scala | 2 +- .../main/scala/spark/storage/BlockManager.scala | 30 ++++++++++++++++------ 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 8072c60bb7..b18248d2b5 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -117,7 +117,7 @@ class DAGScheduler( private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = { if (!cacheLocs.contains(rdd.id)) { val blockIds = rdd.partitions.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray - val locs = BlockManager.blockIdsToExecutorLocations(blockIds, env) + val locs = BlockManager.blockIdsToExecutorLocations(blockIds, env, blockManagerMaster) cacheLocs(rdd.id) = blockIds.map(locs.getOrElse(_, Nil)) } cacheLocs(rdd.id) diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 7a0d6ced3e..040082e600 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -937,10 +937,16 @@ object BlockManager extends Logging { } } - def blockIdsToExecutorLocations(blockIds: Array[String], env: SparkEnv): HashMap[String, List[String]] = { - val blockManager = env.blockManager - /*val locations = blockIds.map(id => blockManager.getLocations(id))*/ - val locationBlockIds = blockManager.getLocationBlockIds(blockIds) + def blockIdsToExecutorLocations(blockIds: Array[String], env: SparkEnv, blockManagerMaster: BlockManagerMaster = null): HashMap[String, List[String]] = { + // env == null and blockManagerMaster != null is used in tests + assert (env != null || blockManagerMaster != null) + val locationBlockIds: Seq[Seq[BlockManagerId]] = + if (env != null) { + val blockManager = env.blockManager + blockManager.getLocationBlockIds(blockIds) + } else { + blockManagerMaster.getLocations(blockIds) + } // Convert from block master locations to executor locations (we need that for task scheduling) val executorLocations = new HashMap[String, List[String]]() @@ -950,10 +956,18 @@ object BlockManager extends Logging { val executors = new HashSet[String]() - for (bkLocation <- blockLocations) { - val executorHostPort = env.resolveExecutorIdToHostPort(bkLocation.executorId, bkLocation.host) - executors += executorHostPort - // logInfo("bkLocation = " + bkLocation + ", executorHostPort = " + executorHostPort) + if (env != null) { + for (bkLocation <- blockLocations) { + val executorHostPort = env.resolveExecutorIdToHostPort(bkLocation.executorId, bkLocation.host) + executors += executorHostPort + // logInfo("bkLocation = " + bkLocation + ", executorHostPort = " + executorHostPort) + } + } else { + // Typically while testing, etc - revert to simply using host. + for (bkLocation <- blockLocations) { + executors += bkLocation.host + // logInfo("bkLocation = " + bkLocation + ", executorHostPort = " + executorHostPort) + } } executorLocations.put(blockId, executors.toSeq.toList) -- cgit v1.2.3 From 609a817f52d8db05711c0d4529dd1448ed8c4fe0 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Thu, 2 May 2013 06:44:33 +0530 Subject: Integrate review comments on pull request --- core/src/main/scala/spark/SparkEnv.scala | 9 ++-- core/src/main/scala/spark/Utils.scala | 4 +- .../main/scala/spark/scheduler/ResultTask.scala | 1 - .../scala/spark/scheduler/ShuffleMapTask.scala | 1 - .../spark/scheduler/cluster/ClusterScheduler.scala | 32 +++++++------- .../spark/scheduler/cluster/TaskSetManager.scala | 50 +++++++++++----------- 6 files changed, 47 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 5b4a464010..2ee25e547d 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -33,8 +33,7 @@ class SparkEnv ( // To be set only as part of initialization of SparkContext. // (executorId, defaultHostPort) => executorHostPort // If executorId is NOT found, return defaultHostPort - var executorIdToHostPort: (String, String) => String - ) { + var executorIdToHostPort: Option[(String, String) => String]) { def stop() { httpFileServer.stop() @@ -52,12 +51,12 @@ class SparkEnv ( def resolveExecutorIdToHostPort(executorId: String, defaultHostPort: String): String = { val env = SparkEnv.get - if (env.executorIdToHostPort == null) { + if (env.executorIdToHostPort.isEmpty) { // default to using host, not host port. Relevant to non cluster modes. return defaultHostPort } - env.executorIdToHostPort(executorId, defaultHostPort) + env.executorIdToHostPort.get(executorId, defaultHostPort) } } @@ -178,6 +177,6 @@ object SparkEnv extends Logging { connectionManager, httpFileServer, sparkFilesDir, - null) + None) } } diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 279daf04ed..0e348f8189 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -335,7 +335,6 @@ private object Utils extends Logging { retval } - /* // Used by DEBUG code : remove when all testing done private val ipPattern = Pattern.compile("^[0-9]+(\\.[0-9]+)*$") def checkHost(host: String, message: String = "") { @@ -364,8 +363,8 @@ private object Utils extends Logging { // temp code for debug System.exit(-1) } - */ +/* // Once testing is complete in various modes, replace with this ? def checkHost(host: String, message: String = "") {} def checkHostPort(hostPort: String, message: String = "") {} @@ -374,6 +373,7 @@ private object Utils extends Logging { def logErrorWithStack(msg: String) { try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } } } +*/ def getUserNameFromEnvironment(): String = { SparkHadoopUtil.getUserNameFromEnvironment diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala index c43cbe5ed4..83166bce22 100644 --- a/core/src/main/scala/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/spark/scheduler/ResultTask.scala @@ -70,7 +70,6 @@ private[spark] class ResultTask[T, U]( rdd.partitions(partition) } - // data locality is on a per host basis, not hyper specific to container (host:port). Unique on set of hosts. private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq { diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 0b848af2f3..4b36e71c32 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -84,7 +84,6 @@ private[spark] class ShuffleMapTask( protected def this() = this(0, null, null, 0, null) - // data locality is on a per host basis, not hyper specific to container (host:port). Unique on set of hosts. private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq { diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 3c72ce4206..49fc449e86 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -73,7 +73,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val activeExecutorIds = new HashSet[String] // TODO: We might want to remove this and merge it with execId datastructures - but later. - // Which hosts in the cluster are alive (contains hostPort's) - used for hyper local and local task locality. + // Which hosts in the cluster are alive (contains hostPort's) - used for instance local and node local task locality. private val hostPortsAlive = new HashSet[String] private val hostToAliveHostPorts = new HashMap[String, HashSet[String]] @@ -109,7 +109,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // Unfortunately, this means that SparkEnv is indirectly referencing ClusterScheduler // Will that be a design violation ? - SparkEnv.get.executorIdToHostPort = executorToHostPort + SparkEnv.get.executorIdToHostPort = Some(executorToHostPort) } def newTaskId(): Long = nextTaskId.getAndIncrement() @@ -240,7 +240,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) { // Split offers based on host local, rack local and off-rack tasks. - val hyperLocalOffers = new HashMap[String, ArrayBuffer[Int]]() + val instanceLocalOffers = new HashMap[String, ArrayBuffer[Int]]() val hostLocalOffers = new HashMap[String, ArrayBuffer[Int]]() val rackLocalOffers = new HashMap[String, ArrayBuffer[Int]]() val otherOffers = new HashMap[String, ArrayBuffer[Int]]() @@ -250,16 +250,16 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // DEBUG code Utils.checkHostPort(hostPort) - val numHyperLocalTasks = math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i))) - if (numHyperLocalTasks > 0){ - val list = hyperLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int]) - for (j <- 0 until numHyperLocalTasks) list += i + val numInstanceLocalTasks = math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i))) + if (numInstanceLocalTasks > 0){ + val list = instanceLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int]) + for (j <- 0 until numInstanceLocalTasks) list += i } val host = Utils.parseHostPort(hostPort)._1 val numHostLocalTasks = math.max(0, - // Remove hyper local tasks (which are also host local btw !) from this - math.min(manager.numPendingTasksForHost(hostPort) - numHyperLocalTasks, hostToAvailableCpus(host))) + // Remove instance local tasks (which are also host local btw !) from this + math.min(manager.numPendingTasksForHost(hostPort) - numInstanceLocalTasks, hostToAvailableCpus(host))) if (numHostLocalTasks > 0){ val list = hostLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int]) for (j <- 0 until numHostLocalTasks) list += i @@ -267,7 +267,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val numRackLocalTasks = math.max(0, // Remove host local tasks (which are also rack local btw !) from this - math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numHyperLocalTasks - numHostLocalTasks, hostToAvailableCpus(host))) + math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numInstanceLocalTasks - numHostLocalTasks, hostToAvailableCpus(host))) if (numRackLocalTasks > 0){ val list = rackLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int]) for (j <- 0 until numRackLocalTasks) list += i @@ -280,19 +280,19 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } val offersPriorityList = new ArrayBuffer[Int]( - hyperLocalOffers.size + hostLocalOffers.size + rackLocalOffers.size + otherOffers.size) + instanceLocalOffers.size + hostLocalOffers.size + rackLocalOffers.size + otherOffers.size) - // First hyper local, then host local, then rack, then others + // First instance local, then host local, then rack, then others - // numHostLocalOffers contains count of both hyper local and host offers. + // numHostLocalOffers contains count of both instance local and host offers. val numHostLocalOffers = { - val hyperLocalPriorityList = ClusterScheduler.prioritizeContainers(hyperLocalOffers) - offersPriorityList ++= hyperLocalPriorityList + val instanceLocalPriorityList = ClusterScheduler.prioritizeContainers(instanceLocalOffers) + offersPriorityList ++= instanceLocalPriorityList val hostLocalPriorityList = ClusterScheduler.prioritizeContainers(hostLocalOffers) offersPriorityList ++= hostLocalPriorityList - hyperLocalPriorityList.size + hostLocalPriorityList.size + instanceLocalPriorityList.size + hostLocalPriorityList.size } val numRackLocalOffers = { val rackLocalPriorityList = ClusterScheduler.prioritizeContainers(rackLocalOffers) diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index f5c0058554..5f3faaa5c3 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -13,17 +13,17 @@ import spark.scheduler._ import spark.TaskState.TaskState import java.nio.ByteBuffer -private[spark] object TaskLocality extends Enumeration("HYPER_LOCAL", "HOST_LOCAL", "RACK_LOCAL", "ANY") with Logging { +private[spark] object TaskLocality extends Enumeration("INSTANCE_LOCAL", "HOST_LOCAL", "RACK_LOCAL", "ANY") with Logging { - // hyper local is expected to be used ONLY within tasksetmanager for now. - val HYPER_LOCAL, HOST_LOCAL, RACK_LOCAL, ANY = Value + // instance local is expected to be used ONLY within tasksetmanager for now. + val INSTANCE_LOCAL, HOST_LOCAL, RACK_LOCAL, ANY = Value type TaskLocality = Value def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = { // Must not be the constraint. - assert (constraint != TaskLocality.HYPER_LOCAL) + assert (constraint != TaskLocality.INSTANCE_LOCAL) constraint match { case TaskLocality.HOST_LOCAL => condition == TaskLocality.HOST_LOCAL @@ -37,8 +37,8 @@ private[spark] object TaskLocality extends Enumeration("HYPER_LOCAL", "HOST_LOCA // better way to do this ? try { val retval = TaskLocality.withName(str) - // Must not specify HYPER_LOCAL ! - assert (retval != TaskLocality.HYPER_LOCAL) + // Must not specify INSTANCE_LOCAL ! + assert (retval != TaskLocality.INSTANCE_LOCAL) retval } catch { @@ -84,7 +84,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // Last time when we launched a preferred task (for delay scheduling) var lastPreferredLaunchTime = System.currentTimeMillis - // List of pending tasks for each node (hyper local to container). These collections are actually + // List of pending tasks for each node (instance local to container). These collections are actually // treated as stacks, in which new tasks are added to the end of the // ArrayBuffer and removed from the end. This makes it faster to detect // tasks that repeatedly fail because whenever a task failed, it is put @@ -142,12 +142,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe } // Note that it follows the hierarchy. - // if we search for HOST_LOCAL, the output will include HYPER_LOCAL and - // if we search for RACK_LOCAL, it will include HYPER_LOCAL & HOST_LOCAL + // if we search for HOST_LOCAL, the output will include INSTANCE_LOCAL and + // if we search for RACK_LOCAL, it will include INSTANCE_LOCAL & HOST_LOCAL private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler, taskLocality: TaskLocality.TaskLocality): HashSet[String] = { - if (TaskLocality.HYPER_LOCAL == taskLocality) { + if (TaskLocality.INSTANCE_LOCAL == taskLocality) { // straight forward comparison ! Special case it. val retval = new HashSet[String]() scheduler.synchronized { @@ -203,19 +203,19 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe private def addPendingTask(index: Int) { // We can infer hostLocalLocations from rackLocalLocations by joining it against tasks(index).preferredLocations (with appropriate // hostPort <-> host conversion). But not doing it for simplicity sake. If this becomes a performance issue, modify it. - val hyperLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HYPER_LOCAL) + val instanceLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.INSTANCE_LOCAL) val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL) val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL) if (rackLocalLocations.size == 0) { // Current impl ensures this. - assert (hyperLocalLocations.size == 0) + assert (instanceLocalLocations.size == 0) assert (hostLocalLocations.size == 0) pendingTasksWithNoPrefs += index } else { - // hyper local locality - for (hostPort <- hyperLocalLocations) { + // instance local locality + for (hostPort <- instanceLocalLocations) { // DEBUG Code Utils.checkHostPort(hostPort) @@ -223,7 +223,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe hostPortList += index } - // host locality (includes hyper local) + // host locality (includes instance local) for (hostPort <- hostLocalLocations) { // DEBUG Code Utils.checkHostPort(hostPort) @@ -233,7 +233,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe hostList += index } - // rack locality (includes hyper local and host local) + // rack locality (includes instance local and host local) for (rackLocalHostPort <- rackLocalLocations) { // DEBUG Code Utils.checkHostPort(rackLocalHostPort) @@ -247,7 +247,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe allPendingTasks += index } - // Return the pending tasks list for a given host port (hyper local), or an empty list if + // Return the pending tasks list for a given host port (instance local), or an empty list if // there is no map entry for that host private def getPendingTasksForHostPort(hostPort: String): ArrayBuffer[Int] = { // DEBUG Code @@ -269,7 +269,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe pendingRackLocalTasksForHost.getOrElse(host, ArrayBuffer()) } - // Number of pending tasks for a given host Port (which would be hyper local) + // Number of pending tasks for a given host Port (which would be instance local) def numPendingTasksForHostPort(hostPort: String): Int = { getPendingTasksForHostPort(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) ) } @@ -352,9 +352,9 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // Dequeue a pending task for a given node and return its index. // If localOnly is set to false, allow non-local tasks as well. private def findTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = { - val hyperLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort)) - if (hyperLocalTask != None) { - return hyperLocalTask + val instanceLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort)) + if (instanceLocalTask != None) { + return instanceLocalTask } val localTask = findTaskFromList(getPendingTasksForHost(hostPort)) @@ -387,7 +387,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe return findSpeculativeTask(hostPort, locality) } - private def isHyperLocalLocation(task: Task[_], hostPort: String): Boolean = { + private def isInstanceLocalLocation(task: Task[_], hostPort: String): Boolean = { Utils.checkHostPort(hostPort) val locs = task.preferredLocations @@ -443,7 +443,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe val taskId = sched.newTaskId() // Figure out whether this should count as a preferred launch val taskLocality = - if (isHyperLocalLocation(task, hostPort)) TaskLocality.HYPER_LOCAL else + if (isInstanceLocalLocation(task, hostPort)) TaskLocality.INSTANCE_LOCAL else if (isHostLocalLocation(task, hostPort)) TaskLocality.HOST_LOCAL else if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else TaskLocality.ANY @@ -608,8 +608,8 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // host local tasks - should we push this to rack local or no pref list ? For now, preserving behavior and moving to // no prefs list. Note, this was done due to impliations related to 'waiting' for data local tasks, etc. - // Note: NOT checking hyper local list - since host local list is super set of that. We need to ad to no prefs only if - // there is no host local node for the task (not if there is no hyper local node for the task) + // Note: NOT checking instance local list - since host local list is super set of that. We need to ad to no prefs only if + // there is no host local node for the task (not if there is no instance local node for the task) for (index <- getPendingTasksForHost(Utils.parseHostPort(hostPort)._1)) { // val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL) val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL) -- cgit v1.2.3 From 1b5aaeadc72ad5197c00897c41f670ea241d0235 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Thu, 2 May 2013 07:30:06 +0530 Subject: Integrate review comments 2 --- .../spark/scheduler/cluster/ClusterScheduler.scala | 78 +++++++++++----------- .../spark/scheduler/cluster/TaskSetManager.scala | 74 ++++++++++---------- .../spark/scheduler/local/LocalScheduler.scala | 2 +- 3 files changed, 77 insertions(+), 77 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 49fc449e86..cf4483f144 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -32,28 +32,28 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val TASK_REVIVAL_INTERVAL = System.getProperty("spark.tasks.revive.interval", "0").toLong /* - This property controls how aggressive we should be to modulate waiting for host local task scheduling. - To elaborate, currently there is a time limit (3 sec def) to ensure that spark attempts to wait for host locality of tasks before + This property controls how aggressive we should be to modulate waiting for node local task scheduling. + To elaborate, currently there is a time limit (3 sec def) to ensure that spark attempts to wait for node locality of tasks before scheduling on other nodes. We have modified this in yarn branch such that offers to task set happen in prioritized order : - host-local, rack-local and then others - But once all available host local (and no pref) tasks are scheduled, instead of waiting for 3 sec before + node-local, rack-local and then others + But once all available node local (and no pref) tasks are scheduled, instead of waiting for 3 sec before scheduling to other nodes (which degrades performance for time sensitive tasks and on larger clusters), we can modulate that : to also allow rack local nodes or any node. The default is still set to HOST - so that previous behavior is maintained. This is to allow tuning the tension between pulling rdd data off node and scheduling computation asap. TODO: rename property ? The value is one of - - HOST_LOCAL (default, no change w.r.t current behavior), + - NODE_LOCAL (default, no change w.r.t current behavior), - RACK_LOCAL and - ANY Note that this property makes more sense when used in conjugation with spark.tasks.revive.interval > 0 : else it is not very effective. Additional Note: For non trivial clusters, there is a 4x - 5x reduction in running time (in some of our experiments) based on whether - it is left at default HOST_LOCAL, RACK_LOCAL (if cluster is configured to be rack aware) or ANY. + it is left at default NODE_LOCAL, RACK_LOCAL (if cluster is configured to be rack aware) or ANY. If cluster is rack aware, then setting it to RACK_LOCAL gives best tradeoff and a 3x - 4x performance improvement while minimizing IO impact. Also, it brings down the variance in running time drastically. */ - val TASK_SCHEDULING_AGGRESSION = TaskLocality.parse(System.getProperty("spark.tasks.schedule.aggression", "HOST_LOCAL")) + val TASK_SCHEDULING_AGGRESSION = TaskLocality.parse(System.getProperty("spark.tasks.schedule.aggression", "NODE_LOCAL")) val activeTaskSets = new HashMap[String, TaskSetManager] var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] @@ -73,7 +73,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val activeExecutorIds = new HashSet[String] // TODO: We might want to remove this and merge it with execId datastructures - but later. - // Which hosts in the cluster are alive (contains hostPort's) - used for instance local and node local task locality. + // Which hosts in the cluster are alive (contains hostPort's) - used for process local and node local task locality. private val hostPortsAlive = new HashSet[String] private val hostToAliveHostPorts = new HashMap[String, HashSet[String]] @@ -217,9 +217,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } // Build a list of tasks to assign to each slave val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores)) - // merge availableCpus into hostToAvailableCpus block ? + // merge availableCpus into nodeToAvailableCpus block ? val availableCpus = offers.map(o => o.cores).toArray - val hostToAvailableCpus = { + val nodeToAvailableCpus = { val map = new HashMap[String, Int]() for (offer <- offers) { val hostPort = offer.hostPort @@ -239,9 +239,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) { - // Split offers based on host local, rack local and off-rack tasks. - val instanceLocalOffers = new HashMap[String, ArrayBuffer[Int]]() - val hostLocalOffers = new HashMap[String, ArrayBuffer[Int]]() + // Split offers based on node local, rack local and off-rack tasks. + val processLocalOffers = new HashMap[String, ArrayBuffer[Int]]() + val nodeLocalOffers = new HashMap[String, ArrayBuffer[Int]]() val rackLocalOffers = new HashMap[String, ArrayBuffer[Int]]() val otherOffers = new HashMap[String, ArrayBuffer[Int]]() @@ -250,29 +250,29 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // DEBUG code Utils.checkHostPort(hostPort) - val numInstanceLocalTasks = math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i))) - if (numInstanceLocalTasks > 0){ - val list = instanceLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int]) - for (j <- 0 until numInstanceLocalTasks) list += i + val numProcessLocalTasks = math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i))) + if (numProcessLocalTasks > 0){ + val list = processLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int]) + for (j <- 0 until numProcessLocalTasks) list += i } val host = Utils.parseHostPort(hostPort)._1 - val numHostLocalTasks = math.max(0, - // Remove instance local tasks (which are also host local btw !) from this - math.min(manager.numPendingTasksForHost(hostPort) - numInstanceLocalTasks, hostToAvailableCpus(host))) - if (numHostLocalTasks > 0){ - val list = hostLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int]) - for (j <- 0 until numHostLocalTasks) list += i + val numNodeLocalTasks = math.max(0, + // Remove process local tasks (which are also host local btw !) from this + math.min(manager.numPendingTasksForHost(hostPort) - numProcessLocalTasks, nodeToAvailableCpus(host))) + if (numNodeLocalTasks > 0){ + val list = nodeLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int]) + for (j <- 0 until numNodeLocalTasks) list += i } val numRackLocalTasks = math.max(0, - // Remove host local tasks (which are also rack local btw !) from this - math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numInstanceLocalTasks - numHostLocalTasks, hostToAvailableCpus(host))) + // Remove node local tasks (which are also rack local btw !) from this + math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numProcessLocalTasks - numNodeLocalTasks, nodeToAvailableCpus(host))) if (numRackLocalTasks > 0){ val list = rackLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int]) for (j <- 0 until numRackLocalTasks) list += i } - if (numHostLocalTasks <= 0 && numRackLocalTasks <= 0){ + if (numNodeLocalTasks <= 0 && numRackLocalTasks <= 0){ // add to others list - spread even this across cluster. val list = otherOffers.getOrElseUpdate(host, new ArrayBuffer[Int]) list += i @@ -280,19 +280,19 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } val offersPriorityList = new ArrayBuffer[Int]( - instanceLocalOffers.size + hostLocalOffers.size + rackLocalOffers.size + otherOffers.size) + processLocalOffers.size + nodeLocalOffers.size + rackLocalOffers.size + otherOffers.size) - // First instance local, then host local, then rack, then others + // First process local, then host local, then rack, then others - // numHostLocalOffers contains count of both instance local and host offers. - val numHostLocalOffers = { - val instanceLocalPriorityList = ClusterScheduler.prioritizeContainers(instanceLocalOffers) - offersPriorityList ++= instanceLocalPriorityList + // numNodeLocalOffers contains count of both process local and host offers. + val numNodeLocalOffers = { + val processLocalPriorityList = ClusterScheduler.prioritizeContainers(processLocalOffers) + offersPriorityList ++= processLocalPriorityList - val hostLocalPriorityList = ClusterScheduler.prioritizeContainers(hostLocalOffers) - offersPriorityList ++= hostLocalPriorityList + val nodeLocalPriorityList = ClusterScheduler.prioritizeContainers(nodeLocalOffers) + offersPriorityList ++= nodeLocalPriorityList - instanceLocalPriorityList.size + hostLocalPriorityList.size + processLocalPriorityList.size + nodeLocalPriorityList.size } val numRackLocalOffers = { val rackLocalPriorityList = ClusterScheduler.prioritizeContainers(rackLocalOffers) @@ -303,8 +303,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext) var lastLoop = false val lastLoopIndex = TASK_SCHEDULING_AGGRESSION match { - case TaskLocality.HOST_LOCAL => numHostLocalOffers - case TaskLocality.RACK_LOCAL => numRackLocalOffers + numHostLocalOffers + case TaskLocality.NODE_LOCAL => numNodeLocalOffers + case TaskLocality.RACK_LOCAL => numRackLocalOffers + numNodeLocalOffers case TaskLocality.ANY => offersPriorityList.size } @@ -343,8 +343,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // prevent more looping launchedTask = false } else if (!lastLoop && !launchedTask) { - // Do this only if TASK_SCHEDULING_AGGRESSION != HOST_LOCAL - if (TASK_SCHEDULING_AGGRESSION != TaskLocality.HOST_LOCAL) { + // Do this only if TASK_SCHEDULING_AGGRESSION != NODE_LOCAL + if (TASK_SCHEDULING_AGGRESSION != TaskLocality.NODE_LOCAL) { // fudge launchedTask to ensure we loop once more launchedTask = true // dont loop anymore diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 5f3faaa5c3..ff4790e4cb 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -13,21 +13,21 @@ import spark.scheduler._ import spark.TaskState.TaskState import java.nio.ByteBuffer -private[spark] object TaskLocality extends Enumeration("INSTANCE_LOCAL", "HOST_LOCAL", "RACK_LOCAL", "ANY") with Logging { +private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") with Logging { - // instance local is expected to be used ONLY within tasksetmanager for now. - val INSTANCE_LOCAL, HOST_LOCAL, RACK_LOCAL, ANY = Value + // process local is expected to be used ONLY within tasksetmanager for now. + val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value type TaskLocality = Value def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = { // Must not be the constraint. - assert (constraint != TaskLocality.INSTANCE_LOCAL) + assert (constraint != TaskLocality.PROCESS_LOCAL) constraint match { - case TaskLocality.HOST_LOCAL => condition == TaskLocality.HOST_LOCAL - case TaskLocality.RACK_LOCAL => condition == TaskLocality.HOST_LOCAL || condition == TaskLocality.RACK_LOCAL + case TaskLocality.NODE_LOCAL => condition == TaskLocality.NODE_LOCAL + case TaskLocality.RACK_LOCAL => condition == TaskLocality.NODE_LOCAL || condition == TaskLocality.RACK_LOCAL // For anything else, allow case _ => true } @@ -37,15 +37,15 @@ private[spark] object TaskLocality extends Enumeration("INSTANCE_LOCAL", "HOST_L // better way to do this ? try { val retval = TaskLocality.withName(str) - // Must not specify INSTANCE_LOCAL ! - assert (retval != TaskLocality.INSTANCE_LOCAL) + // Must not specify PROCESS_LOCAL ! + assert (retval != TaskLocality.PROCESS_LOCAL) retval } catch { case nEx: NoSuchElementException => { - logWarning("Invalid task locality specified '" + str + "', defaulting to HOST_LOCAL"); + logWarning("Invalid task locality specified '" + str + "', defaulting to NODE_LOCAL"); // default to preserve earlier behavior - HOST_LOCAL + NODE_LOCAL } } } @@ -84,7 +84,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // Last time when we launched a preferred task (for delay scheduling) var lastPreferredLaunchTime = System.currentTimeMillis - // List of pending tasks for each node (instance local to container). These collections are actually + // List of pending tasks for each node (process local to container). These collections are actually // treated as stacks, in which new tasks are added to the end of the // ArrayBuffer and removed from the end. This makes it faster to detect // tasks that repeatedly fail because whenever a task failed, it is put @@ -142,12 +142,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe } // Note that it follows the hierarchy. - // if we search for HOST_LOCAL, the output will include INSTANCE_LOCAL and - // if we search for RACK_LOCAL, it will include INSTANCE_LOCAL & HOST_LOCAL + // if we search for NODE_LOCAL, the output will include PROCESS_LOCAL and + // if we search for RACK_LOCAL, it will include PROCESS_LOCAL & NODE_LOCAL private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler, taskLocality: TaskLocality.TaskLocality): HashSet[String] = { - if (TaskLocality.INSTANCE_LOCAL == taskLocality) { + if (TaskLocality.PROCESS_LOCAL == taskLocality) { // straight forward comparison ! Special case it. val retval = new HashSet[String]() scheduler.synchronized { @@ -162,7 +162,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe } val taskPreferredLocations = - if (TaskLocality.HOST_LOCAL == taskLocality) { + if (TaskLocality.NODE_LOCAL == taskLocality) { _taskPreferredLocations } else { assert (TaskLocality.RACK_LOCAL == taskLocality) @@ -203,19 +203,19 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe private def addPendingTask(index: Int) { // We can infer hostLocalLocations from rackLocalLocations by joining it against tasks(index).preferredLocations (with appropriate // hostPort <-> host conversion). But not doing it for simplicity sake. If this becomes a performance issue, modify it. - val instanceLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.INSTANCE_LOCAL) - val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL) + val processLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.PROCESS_LOCAL) + val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL) val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL) if (rackLocalLocations.size == 0) { // Current impl ensures this. - assert (instanceLocalLocations.size == 0) + assert (processLocalLocations.size == 0) assert (hostLocalLocations.size == 0) pendingTasksWithNoPrefs += index } else { - // instance local locality - for (hostPort <- instanceLocalLocations) { + // process local locality + for (hostPort <- processLocalLocations) { // DEBUG Code Utils.checkHostPort(hostPort) @@ -223,7 +223,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe hostPortList += index } - // host locality (includes instance local) + // host locality (includes process local) for (hostPort <- hostLocalLocations) { // DEBUG Code Utils.checkHostPort(hostPort) @@ -233,7 +233,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe hostList += index } - // rack locality (includes instance local and host local) + // rack locality (includes process local and host local) for (rackLocalHostPort <- rackLocalLocations) { // DEBUG Code Utils.checkHostPort(rackLocalHostPort) @@ -247,7 +247,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe allPendingTasks += index } - // Return the pending tasks list for a given host port (instance local), or an empty list if + // Return the pending tasks list for a given host port (process local), or an empty list if // there is no map entry for that host private def getPendingTasksForHostPort(hostPort: String): ArrayBuffer[Int] = { // DEBUG Code @@ -269,7 +269,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe pendingRackLocalTasksForHost.getOrElse(host, ArrayBuffer()) } - // Number of pending tasks for a given host Port (which would be instance local) + // Number of pending tasks for a given host Port (which would be process local) def numPendingTasksForHostPort(hostPort: String): Int = { getPendingTasksForHostPort(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) ) } @@ -305,13 +305,13 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // task must have a preference for this host/rack/no preferred locations at all. private def findSpeculativeTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = { - assert (TaskLocality.isAllowed(locality, TaskLocality.HOST_LOCAL)) + assert (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set if (speculatableTasks.size > 0) { val localTask = speculatableTasks.find { index => - val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL) + val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL) val attemptLocs = taskAttempts(index).map(_.hostPort) (locations.size == 0 || locations.contains(hostPort)) && !attemptLocs.contains(hostPort) } @@ -352,9 +352,9 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // Dequeue a pending task for a given node and return its index. // If localOnly is set to false, allow non-local tasks as well. private def findTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = { - val instanceLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort)) - if (instanceLocalTask != None) { - return instanceLocalTask + val processLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort)) + if (processLocalTask != None) { + return processLocalTask } val localTask = findTaskFromList(getPendingTasksForHost(hostPort)) @@ -387,7 +387,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe return findSpeculativeTask(hostPort, locality) } - private def isInstanceLocalLocation(task: Task[_], hostPort: String): Boolean = { + private def isProcessLocalLocation(task: Task[_], hostPort: String): Boolean = { Utils.checkHostPort(hostPort) val locs = task.preferredLocations @@ -433,7 +433,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe val locality = if (overrideLocality != null) overrideLocality else { // expand only if we have waited for more than LOCALITY_WAIT for a host local task ... val time = System.currentTimeMillis - if (time - lastPreferredLaunchTime < LOCALITY_WAIT) TaskLocality.HOST_LOCAL else TaskLocality.ANY + if (time - lastPreferredLaunchTime < LOCALITY_WAIT) TaskLocality.NODE_LOCAL else TaskLocality.ANY } findTask(hostPort, locality) match { @@ -443,8 +443,8 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe val taskId = sched.newTaskId() // Figure out whether this should count as a preferred launch val taskLocality = - if (isInstanceLocalLocation(task, hostPort)) TaskLocality.INSTANCE_LOCAL else - if (isHostLocalLocation(task, hostPort)) TaskLocality.HOST_LOCAL else + if (isProcessLocalLocation(task, hostPort)) TaskLocality.PROCESS_LOCAL else + if (isHostLocalLocation(task, hostPort)) TaskLocality.NODE_LOCAL else if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else TaskLocality.ANY val prefStr = taskLocality.toString @@ -456,7 +456,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe val info = new TaskInfo(taskId, index, time, execId, hostPort, taskLocality) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) - if (TaskLocality.HOST_LOCAL == taskLocality) { + if (TaskLocality.NODE_LOCAL == taskLocality) { lastPreferredLaunchTime = time } // Serialize and return the task @@ -608,11 +608,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // host local tasks - should we push this to rack local or no pref list ? For now, preserving behavior and moving to // no prefs list. Note, this was done due to impliations related to 'waiting' for data local tasks, etc. - // Note: NOT checking instance local list - since host local list is super set of that. We need to ad to no prefs only if - // there is no host local node for the task (not if there is no instance local node for the task) + // Note: NOT checking process local list - since host local list is super set of that. We need to ad to no prefs only if + // there is no host local node for the task (not if there is no process local node for the task) for (index <- getPendingTasksForHost(Utils.parseHostPort(hostPort)._1)) { // val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL) - val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.HOST_LOCAL) + val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL) if (newLocs.isEmpty) { pendingTasksWithNoPrefs += index } diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index f060a940a9..53dd6fbe13 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -53,7 +53,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon def runTask(task: Task[_], idInJob: Int, attemptId: Int) { logInfo("Running " + task) - val info = new TaskInfo(attemptId, idInJob, System.currentTimeMillis(), "local", "local:1", TaskLocality.HOST_LOCAL) + val info = new TaskInfo(attemptId, idInJob, System.currentTimeMillis(), "local", "local:1", TaskLocality.NODE_LOCAL) // Set the Spark execution environment for the worker thread SparkEnv.set(env) try { -- cgit v1.2.3 From dfde9ce9dde0a151d42f7aecb826b40a4c98b459 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Thu, 2 May 2013 07:41:33 +0530 Subject: comment out debug versions of checkHost, etc from Utils - which were used to test --- core/src/main/scala/spark/Utils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 0e348f8189..c1495d5317 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -335,6 +335,7 @@ private object Utils extends Logging { retval } +/* // Used by DEBUG code : remove when all testing done private val ipPattern = Pattern.compile("^[0-9]+(\\.[0-9]+)*$") def checkHost(host: String, message: String = "") { @@ -363,8 +364,8 @@ private object Utils extends Logging { // temp code for debug System.exit(-1) } +*/ -/* // Once testing is complete in various modes, replace with this ? def checkHost(host: String, message: String = "") {} def checkHostPort(hostPort: String, message: String = "") {} @@ -373,7 +374,6 @@ private object Utils extends Logging { def logErrorWithStack(msg: String) { try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } } } -*/ def getUserNameFromEnvironment(): String = { SparkHadoopUtil.getUserNameFromEnvironment -- cgit v1.2.3 From 11589c39d9f75e9757ba1717c5202f77d30031b2 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Fri, 3 May 2013 12:23:30 +0530 Subject: Fix ZippedRDD as part Matei's suggestion --- core/src/main/scala/spark/rdd/ZippedRDD.scala | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala index e80250a99b..51573fe68a 100644 --- a/core/src/main/scala/spark/rdd/ZippedRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala @@ -1,6 +1,6 @@ package spark.rdd -import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext} +import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext} import java.io.{ObjectOutputStream, IOException} @@ -49,9 +49,20 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( override def getPreferredLocations(s: Partition): Seq[String] = { val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions - // TODO: becomes complicated - intersect on hostPort if available, else fallback to host (removing intersected hostPort's). - // Since I am not very sure about this RDD, leaving it to others to comment better ! - rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2)) + val pref1 = rdd1.preferredLocations(partition1) + val pref2 = rdd2.preferredLocations(partition2) + + // both partitions are instance local. + val instanceLocalLocations = pref1.intersect(pref2) + + // remove locations which are already handled via instanceLocalLocations, and intersect where both partitions are node local. + val nodeLocalPref1 = pref1.filter(loc => ! instanceLocalLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) + val nodeLocalPref2 = pref2.filter(loc => ! instanceLocalLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) + val nodeLocalLocations = nodeLocalPref1.intersect(nodeLocalPref2) + + + // Can have mix of instance local (hostPort) and node local (host) locations as preference ! + instanceLocalLocations ++ nodeLocalLocations } override def clearDependencies() { -- cgit v1.2.3 From edb57c8331738403d66c15ed99996e8bfb0488f7 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Sat, 4 May 2013 19:47:45 +0530 Subject: Add support for instance local in getPreferredLocations of ZippedPartitionsBaseRDD. Add comments to both ZippedPartitionsBaseRDD and ZippedRDD to better describe the potential problem with the approach --- .../main/scala/spark/rdd/ZippedPartitionsRDD.scala | 28 +++++++++++++++++++--- core/src/main/scala/spark/rdd/ZippedRDD.scala | 20 ++++++++++------ 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala index fc3f29ffcd..dd9f3c2680 100644 --- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala @@ -1,6 +1,6 @@ package spark.rdd -import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext} +import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext} import java.io.{ObjectOutputStream, IOException} private[spark] class ZippedPartitionsPartition( @@ -38,9 +38,31 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest]( } override def getPreferredLocations(s: Partition): Seq[String] = { + // Note that as number of rdd's increase and/or number of slaves in cluster increase, the computed preferredLocations below + // become diminishingly small : so we might need to look at alternate strategies to alleviate this. + // If there are no (or very small number of preferred locations), we will end up transferred the blocks to 'any' node in the + // cluster - paying with n/w and cache cost. + // Maybe pick a node which figures max amount of time ? + // Choose node which is hosting 'larger' of some subset of blocks ? + // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible) val splits = s.asInstanceOf[ZippedPartitionsPartition].partitions - val preferredLocations = rdds.zip(splits).map(x => x._1.preferredLocations(x._2)) - preferredLocations.reduce((x, y) => x.intersect(y)) + val rddSplitZip = rdds.zip(splits) + + // exact match. + val exactMatchPreferredLocations = rddSplitZip.map(x => x._1.preferredLocations(x._2)) + val exactMatchLocations = exactMatchPreferredLocations.reduce((x, y) => x.intersect(y)) + + // Remove exact match and then do host local match. + val otherNodePreferredLocations = rddSplitZip.map(x => { + x._1.preferredLocations(x._2).map(hostPort => { + val host = Utils.parseHostPort(hostPort)._1 + + if (exactMatchLocations.contains(host)) null else host + }).filter(_ != null) + }) + val otherNodeLocalLocations = otherNodePreferredLocations.reduce((x, y) => x.intersect(y)) + + otherNodeLocalLocations ++ exactMatchLocations } override def clearDependencies() { diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala index 51573fe68a..f728e93d24 100644 --- a/core/src/main/scala/spark/rdd/ZippedRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala @@ -48,21 +48,27 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( } override def getPreferredLocations(s: Partition): Seq[String] = { + // Note that as number of slaves in cluster increase, the computed preferredLocations can become small : so we might need + // to look at alternate strategies to alleviate this. (If there are no (or very small number of preferred locations), we + // will end up transferred the blocks to 'any' node in the cluster - paying with n/w and cache cost. + // Maybe pick one or the other ? (so that atleast one block is local ?). + // Choose node which is hosting 'larger' of the blocks ? + // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible) val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions val pref1 = rdd1.preferredLocations(partition1) val pref2 = rdd2.preferredLocations(partition2) - // both partitions are instance local. - val instanceLocalLocations = pref1.intersect(pref2) + // exact match - instance local and host local. + val exactMatchLocations = pref1.intersect(pref2) - // remove locations which are already handled via instanceLocalLocations, and intersect where both partitions are node local. - val nodeLocalPref1 = pref1.filter(loc => ! instanceLocalLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) - val nodeLocalPref2 = pref2.filter(loc => ! instanceLocalLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) - val nodeLocalLocations = nodeLocalPref1.intersect(nodeLocalPref2) + // remove locations which are already handled via exactMatchLocations, and intersect where both partitions are node local. + val otherNodeLocalPref1 = pref1.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) + val otherNodeLocalPref2 = pref2.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) + val otherNodeLocalLocations = otherNodeLocalPref1.intersect(otherNodeLocalPref2) // Can have mix of instance local (hostPort) and node local (host) locations as preference ! - instanceLocalLocations ++ nodeLocalLocations + exactMatchLocations ++ otherNodeLocalLocations } override def clearDependencies() { -- cgit v1.2.3 From 02e8cfa61792f296555c7ed16613a91d895181a1 Mon Sep 17 00:00:00 2001 From: Ethan Jewett Date: Sat, 4 May 2013 12:31:30 -0500 Subject: HBase example --- .../src/main/scala/spark/examples/HBaseTest.scala | 34 ++++++++++++++++++++++ project/SparkBuild.scala | 6 +++- 2 files changed, 39 insertions(+), 1 deletion(-) create mode 100644 examples/src/main/scala/spark/examples/HBaseTest.scala diff --git a/examples/src/main/scala/spark/examples/HBaseTest.scala b/examples/src/main/scala/spark/examples/HBaseTest.scala new file mode 100644 index 0000000000..90ff64b483 --- /dev/null +++ b/examples/src/main/scala/spark/examples/HBaseTest.scala @@ -0,0 +1,34 @@ +package spark.examples + +import spark._ +import spark.rdd.NewHadoopRDD +import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, HColumnDescriptor} +import org.apache.hadoop.hbase.client.HBaseAdmin +import org.apache.hadoop.hbase.mapreduce.TableInputFormat + +object HBaseTest { + def main(args: Array[String]) { + val sc = new SparkContext(args(0), "HBaseTest", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + + val conf = HBaseConfiguration.create() + conf.set(TableInputFormat.INPUT_TABLE, args(1)) + + // Initialize hBase tables if necessary + val admin = new HBaseAdmin(conf) + if(!admin.isTableAvailable(args(1))) { + val colDesc = new HColumnDescriptor(args(2)) + val tableDesc = new HTableDescriptor(args(1)) + tableDesc.addFamily(colDesc) + admin.createTable(tableDesc) + } + + val hBaseRDD = new NewHadoopRDD(sc, classOf[TableInputFormat], + classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], + classOf[org.apache.hadoop.hbase.client.Result], conf) + + hBaseRDD.count() + + System.exit(0) + } +} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 190d723435..6f5607d31c 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -200,7 +200,11 @@ object SparkBuild extends Build { def examplesSettings = sharedSettings ++ Seq( name := "spark-examples", - libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.11") + resolvers ++= Seq("Apache HBase" at "https://repository.apache.org/content/repositories/releases"), + libraryDependencies ++= Seq( + "com.twitter" % "algebird-core_2.9.2" % "0.1.11", + "org.apache.hbase" % "hbase" % "0.94.6" + ) ) def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel") -- cgit v1.2.3 From 9290f16430f92c66d4ec3b1ec76e491ae7cf26dc Mon Sep 17 00:00:00 2001 From: Ethan Jewett Date: Sat, 4 May 2013 12:39:14 -0500 Subject: Remove unnecessary column family config --- examples/src/main/scala/spark/examples/HBaseTest.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/examples/src/main/scala/spark/examples/HBaseTest.scala b/examples/src/main/scala/spark/examples/HBaseTest.scala index 90ff64b483..37aedde302 100644 --- a/examples/src/main/scala/spark/examples/HBaseTest.scala +++ b/examples/src/main/scala/spark/examples/HBaseTest.scala @@ -2,7 +2,7 @@ package spark.examples import spark._ import spark.rdd.NewHadoopRDD -import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, HColumnDescriptor} +import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat @@ -14,12 +14,10 @@ object HBaseTest { val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, args(1)) - // Initialize hBase tables if necessary + // Initialize hBase table if necessary val admin = new HBaseAdmin(conf) if(!admin.isTableAvailable(args(1))) { - val colDesc = new HColumnDescriptor(args(2)) val tableDesc = new HTableDescriptor(args(1)) - tableDesc.addFamily(colDesc) admin.createTable(tableDesc) } -- cgit v1.2.3 From 7cff7e789723b646d1692fc71ef99f89a862bdc6 Mon Sep 17 00:00:00 2001 From: Ethan Jewett Date: Sat, 4 May 2013 14:56:55 -0500 Subject: Fix indents and mention other configuration options --- examples/src/main/scala/spark/examples/HBaseTest.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/examples/src/main/scala/spark/examples/HBaseTest.scala b/examples/src/main/scala/spark/examples/HBaseTest.scala index 37aedde302..d94b25828d 100644 --- a/examples/src/main/scala/spark/examples/HBaseTest.scala +++ b/examples/src/main/scala/spark/examples/HBaseTest.scala @@ -12,6 +12,9 @@ object HBaseTest { System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val conf = HBaseConfiguration.create() + + // Other options for configuring scan behavior are available. More information available at + // http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html conf.set(TableInputFormat.INPUT_TABLE, args(1)) // Initialize hBase table if necessary @@ -22,8 +25,8 @@ object HBaseTest { } val hBaseRDD = new NewHadoopRDD(sc, classOf[TableInputFormat], - classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], - classOf[org.apache.hadoop.hbase.client.Result], conf) + classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], + classOf[org.apache.hadoop.hbase.client.Result], conf) hBaseRDD.count() -- cgit v1.2.3 From e014c1d1cb4bd1037dc674ef474d0197267b399b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 5 May 2013 11:30:36 -0700 Subject: Fix SPARK-670: EC2 start command should require -i option. --- ec2/spark_ec2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 9f2daad2b6..7affe6fffc 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -103,7 +103,7 @@ def parse_args(): parser.print_help() sys.exit(1) (action, cluster_name) = args - if opts.identity_file == None and action in ['launch', 'login']: + if opts.identity_file == None and action in ['launch', 'login', 'start']: print >> stderr, ("ERROR: The -i or --identity-file argument is " + "required for " + action) sys.exit(1) -- cgit v1.2.3 From 0fd84965f66aa37d2ae14da799b86a5c8ed1cb32 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 6 May 2013 15:35:18 -0700 Subject: Added EmptyRDD. --- core/src/main/scala/spark/rdd/EmptyRDD.scala | 16 ++++++++++++++++ core/src/test/scala/spark/RDDSuite.scala | 14 +++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) create mode 100644 core/src/main/scala/spark/rdd/EmptyRDD.scala diff --git a/core/src/main/scala/spark/rdd/EmptyRDD.scala b/core/src/main/scala/spark/rdd/EmptyRDD.scala new file mode 100644 index 0000000000..e4dd3a7fa7 --- /dev/null +++ b/core/src/main/scala/spark/rdd/EmptyRDD.scala @@ -0,0 +1,16 @@ +package spark.rdd + +import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext} + + +/** + * An RDD that is empty, i.e. has no element in it. + */ +class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) { + + override def getPartitions: Array[Partition] = Array.empty + + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + throw new UnsupportedOperationException("empty RDD") + } +} diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index cee6312572..2ce757b13c 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -5,7 +5,7 @@ import org.scalatest.FunSuite import org.scalatest.concurrent.Timeouts._ import org.scalatest.time.{Span, Millis} import spark.SparkContext._ -import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD} +import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD} class RDDSuite extends FunSuite with LocalSparkContext { @@ -147,6 +147,18 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(rdd.collect().toList === List(1, 2, 3, 4)) } + test("empty RDD") { + sc = new SparkContext("local", "test") + val empty = new EmptyRDD[Int](sc) + assert(empty.count === 0) + assert(empty.collect().size === 0) + + val thrown = intercept[UnsupportedOperationException]{ + empty.reduce(_+_) + } + assert(thrown.getMessage.contains("empty")) + } + test("cogrouped RDDs") { sc = new SparkContext("local", "test") val rdd1 = sc.makeRDD(Array((1, "one"), (1, "another one"), (2, "two"), (3, "three")), 2) -- cgit v1.2.3 From 64d4d2b036447f42bfcd3bac5687c79a3b0661ca Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 6 May 2013 16:30:46 -0700 Subject: Added tests for joins, cogroups, and unions for EmptyRDD. --- core/src/test/scala/spark/RDDSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 2ce757b13c..a761dd77c5 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -157,6 +157,14 @@ class RDDSuite extends FunSuite with LocalSparkContext { empty.reduce(_+_) } assert(thrown.getMessage.contains("empty")) + + val emptyKv = new EmptyRDD[(Int, Int)](sc) + val rdd = sc.parallelize(1 to 2, 2).map(x => (x, x)) + assert(rdd.join(emptyKv).collect().size === 0) + assert(rdd.rightOuterJoin(emptyKv).collect().size === 0) + assert(rdd.leftOuterJoin(emptyKv).collect().size === 2) + assert(rdd.cogroup(emptyKv).collect().size === 2) + assert(rdd.union(emptyKv).collect().size === 2) } test("cogrouped RDDs") { -- cgit v1.2.3 From 4d8919d33056a006ebf6b1ddb0509aeccaa828d7 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Thu, 18 Apr 2013 14:58:38 -0700 Subject: Update Maven build to Scala 2.9.3 --- core/pom.xml | 4 ++-- pom.xml | 11 +++++------ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index da26d674ec..9a019b5a42 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -73,7 +73,7 @@ cc.spray - spray-json_${scala.version} + spray-json_2.9.2 org.tomdz.twirl @@ -81,7 +81,7 @@ com.github.scala-incubator.io - scala-io-file_${scala.version} + scala-io-file_2.9.2 org.apache.mesos diff --git a/pom.xml b/pom.xml index c3323ffad0..3936165d78 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ UTF-8 1.5 - 2.9.2 + 2.9.3 0.9.0-incubating 2.0.3 1.0-M2.1 @@ -238,7 +238,7 @@ cc.spray - spray-json_${scala.version} + spray-json_2.9.2 ${spray.json.version} @@ -248,7 +248,7 @@ com.github.scala-incubator.io - scala-io-file_${scala.version} + scala-io-file_2.9.2 0.4.1 @@ -277,7 +277,7 @@ org.scalatest scalatest_${scala.version} - 1.8 + 1.9.1 test @@ -289,7 +289,7 @@ org.scalacheck scalacheck_${scala.version} - 1.9 + 1.10.0 test @@ -513,7 +513,6 @@ hadoop1 - 1 -- cgit v1.2.3 From a3d5f922109caa878f8350fe0634514b8af55cbc Mon Sep 17 00:00:00 2001 From: Ethan Jewett Date: Tue, 7 May 2013 11:43:06 -0500 Subject: Switch to using SparkContext method to create RDD --- examples/src/main/scala/spark/examples/HBaseTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/src/main/scala/spark/examples/HBaseTest.scala b/examples/src/main/scala/spark/examples/HBaseTest.scala index d94b25828d..9bad876860 100644 --- a/examples/src/main/scala/spark/examples/HBaseTest.scala +++ b/examples/src/main/scala/spark/examples/HBaseTest.scala @@ -24,9 +24,9 @@ object HBaseTest { admin.createTable(tableDesc) } - val hBaseRDD = new NewHadoopRDD(sc, classOf[TableInputFormat], + val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], - classOf[org.apache.hadoop.hbase.client.Result], conf) + classOf[org.apache.hadoop.hbase.client.Result]) hBaseRDD.count() -- cgit v1.2.3 From b05c9d22d70333924b988b2dfa359ce3e11f7c9d Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Thu, 9 May 2013 18:49:12 +0530 Subject: Remove explicit hardcoding of yarn-standalone as args(0) if it is missing. --- .../scala/spark/deploy/yarn/ApplicationMaster.scala | 19 +++---------------- .../deploy/yarn/ApplicationMasterArguments.scala | 1 - .../scala/spark/deploy/yarn/ClientArguments.scala | 1 - 3 files changed, 3 insertions(+), 18 deletions(-) diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala index ae719267e8..aa72c1e5fe 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -148,22 +148,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e .getMethod("main", classOf[Array[String]]) val t = new Thread { override def run() { - var mainArgs: Array[String] = null - var startIndex = 0 - - // I am sure there is a better 'scala' way to do this .... but I am just trying to get things to work right now ! - if (args.userArgs.isEmpty || args.userArgs.get(0) != "yarn-standalone") { - // ensure that first param is ALWAYS "yarn-standalone" - mainArgs = new Array[String](args.userArgs.size() + 1) - mainArgs.update(0, "yarn-standalone") - startIndex = 1 - } - else { - mainArgs = new Array[String](args.userArgs.size()) - } - - args.userArgs.copyToArray(mainArgs, startIndex, args.userArgs.size()) - + // Copy + var mainArgs: Array[String] = new Array[String](args.userArgs.size()) + args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size()) mainMethod.invoke(null, mainArgs) } } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala index dc89125d81..1b00208511 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -69,7 +69,6 @@ class ApplicationMasterArguments(val args: Array[String]) { " --class CLASS_NAME Name of your application's main class (required)\n" + " --args ARGS Arguments to be passed to your application's main class.\n" + " Mutliple invocations are possible, each will be passed in order.\n" + - " Note that first argument will ALWAYS be yarn-standalone : will be added if missing.\n" + " --num-workers NUM Number of workers to start (Default: 2)\n" + " --worker-cores NUM Number of cores for the workers (Default: 1)\n" + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n") diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala index 2e69fe3fb0..24110558e7 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala @@ -92,7 +92,6 @@ class ClientArguments(val args: Array[String]) { " --class CLASS_NAME Name of your application's main class (required)\n" + " --args ARGS Arguments to be passed to your application's main class.\n" + " Mutliple invocations are possible, each will be passed in order.\n" + - " Note that first argument will ALWAYS be yarn-standalone : will be added if missing.\n" + " --num-workers NUM Number of workers to start (Default: 2)\n" + " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" + " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + -- cgit v1.2.3 From 012c9e5ab072239e07202abe4775b434be6e32b9 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 9 May 2013 14:20:01 -0700 Subject: Revert "Merge pull request #596 from esjewett/master" because the dependency on hbase introduces netty-3.2.2 which conflicts with netty-3.5.3 already in Spark. This caused multiple test failures. This reverts commit 0f1b7a06e1f6782711170234f105f1b277e3b04c, reversing changes made to aacca1b8a85bd073ce185a06d6470b070761b2f4. --- .../src/main/scala/spark/examples/HBaseTest.scala | 35 ---------------------- project/SparkBuild.scala | 6 +--- 2 files changed, 1 insertion(+), 40 deletions(-) delete mode 100644 examples/src/main/scala/spark/examples/HBaseTest.scala diff --git a/examples/src/main/scala/spark/examples/HBaseTest.scala b/examples/src/main/scala/spark/examples/HBaseTest.scala deleted file mode 100644 index 9bad876860..0000000000 --- a/examples/src/main/scala/spark/examples/HBaseTest.scala +++ /dev/null @@ -1,35 +0,0 @@ -package spark.examples - -import spark._ -import spark.rdd.NewHadoopRDD -import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} -import org.apache.hadoop.hbase.client.HBaseAdmin -import org.apache.hadoop.hbase.mapreduce.TableInputFormat - -object HBaseTest { - def main(args: Array[String]) { - val sc = new SparkContext(args(0), "HBaseTest", - System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) - - val conf = HBaseConfiguration.create() - - // Other options for configuring scan behavior are available. More information available at - // http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html - conf.set(TableInputFormat.INPUT_TABLE, args(1)) - - // Initialize hBase table if necessary - val admin = new HBaseAdmin(conf) - if(!admin.isTableAvailable(args(1))) { - val tableDesc = new HTableDescriptor(args(1)) - admin.createTable(tableDesc) - } - - val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], - classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], - classOf[org.apache.hadoop.hbase.client.Result]) - - hBaseRDD.count() - - System.exit(0) - } -} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 6f5607d31c..190d723435 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -200,11 +200,7 @@ object SparkBuild extends Build { def examplesSettings = sharedSettings ++ Seq( name := "spark-examples", - resolvers ++= Seq("Apache HBase" at "https://repository.apache.org/content/repositories/releases"), - libraryDependencies ++= Seq( - "com.twitter" % "algebird-core_2.9.2" % "0.1.11", - "org.apache.hbase" % "hbase" % "0.94.6" - ) + libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.11") ) def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel") -- cgit v1.2.3 From 6e6b3e0d7eadab97d45e975452c7e0c18246686e Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Fri, 10 May 2013 13:02:34 -0700 Subject: Actually use the cleaned closure in foreachPartition --- core/src/main/scala/spark/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index fd14ef17f1..dde131696f 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -489,7 +489,7 @@ abstract class RDD[T: ClassManifest]( */ def foreachPartition(f: Iterator[T] => Unit) { val cleanF = sc.clean(f) - sc.runJob(this, (iter: Iterator[T]) => f(iter)) + sc.runJob(this, (iter: Iterator[T]) => cleanF(iter)) } /** -- cgit v1.2.3 From ee37612bc95e8486fa328908005293585912db71 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Sat, 11 May 2013 11:12:22 +0530 Subject: 1) Add support for HADOOP_CONF_DIR (and/or YARN_CONF_DIR - use either) : which is used to specify the client side configuration directory : which needs to be part of the CLASSPATH. 2) Move from var+=".." to var="$var.." : the former does not work on older bash shells unfortunately. --- docs/running-on-yarn.md | 3 +++ run | 65 +++++++++++++++++++++++++++++-------------------- run2.cmd | 13 ++++++++++ 3 files changed, 54 insertions(+), 27 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 26424bbe52..c8cf8ffc35 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -30,6 +30,9 @@ If you want to test out the YARN deployment mode, you can use the current Spark # Launching Spark on YARN +Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the hadoop cluster. +This would be used to connect to the cluster, write to the dfs and submit jobs to the resource manager. + The command to launch the YARN Client is as follows: SPARK_JAR= ./run spark.deploy.yarn.Client \ diff --git a/run b/run index 0a58ac4a36..c744bbd3dc 100755 --- a/run +++ b/run @@ -22,7 +22,7 @@ fi # values for that; it doesn't need a lot if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m} - SPARK_DAEMON_JAVA_OPTS+=" -Dspark.akka.logLifecycleEvents=true" + SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true" SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default fi @@ -30,19 +30,19 @@ fi # Add java opts for master, worker, executor. The opts maybe null case "$1" in 'spark.deploy.master.Master') - SPARK_JAVA_OPTS+=" $SPARK_MASTER_OPTS" + SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_MASTER_OPTS" ;; 'spark.deploy.worker.Worker') - SPARK_JAVA_OPTS+=" $SPARK_WORKER_OPTS" + SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_WORKER_OPTS" ;; 'spark.executor.StandaloneExecutorBackend') - SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS" + SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS" ;; 'spark.executor.MesosExecutorBackend') - SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS" + SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS" ;; 'spark.repl.Main') - SPARK_JAVA_OPTS+=" $SPARK_REPL_OPTS" + SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_REPL_OPTS" ;; esac @@ -85,11 +85,11 @@ export SPARK_MEM # Set JAVA_OPTS to be able to load native libraries and to set heap size JAVA_OPTS="$SPARK_JAVA_OPTS" -JAVA_OPTS+=" -Djava.library.path=$SPARK_LIBRARY_PATH" -JAVA_OPTS+=" -Xms$SPARK_MEM -Xmx$SPARK_MEM" +JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH" +JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM" # Load extra JAVA_OPTS from conf/java-opts, if it exists if [ -e $FWDIR/conf/java-opts ] ; then - JAVA_OPTS+=" `cat $FWDIR/conf/java-opts`" + JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`" fi export JAVA_OPTS @@ -110,30 +110,30 @@ fi # Build up classpath CLASSPATH="$SPARK_CLASSPATH" -CLASSPATH+=":$FWDIR/conf" -CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/classes" +CLASSPATH="$CLASSPATH:$FWDIR/conf" +CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/classes" if [ -n "$SPARK_TESTING" ] ; then - CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH+=":$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes" fi -CLASSPATH+=":$CORE_DIR/src/main/resources" -CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes" -CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes" -CLASSPATH+=":$STREAMING_DIR/target/scala-$SCALA_VERSION/classes" -CLASSPATH+=":$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar +CLASSPATH="$CLASSPATH:$CORE_DIR/src/main/resources" +CLASSPATH="$CLASSPATH:$REPL_DIR/target/scala-$SCALA_VERSION/classes" +CLASSPATH="$CLASSPATH:$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes" +CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/classes" +CLASSPATH="$CLASSPATH:$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar if [ -e "$FWDIR/lib_managed" ]; then - CLASSPATH+=":$FWDIR/lib_managed/jars/*" - CLASSPATH+=":$FWDIR/lib_managed/bundles/*" + CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/jars/*" + CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/bundles/*" fi -CLASSPATH+=":$REPL_DIR/lib/*" +CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*" if [ -e $REPL_BIN_DIR/target ]; then for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do - CLASSPATH+=":$jar" + CLASSPATH="$CLASSPATH:$jar" done fi -CLASSPATH+=":$BAGEL_DIR/target/scala-$SCALA_VERSION/classes" +CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes" for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do - CLASSPATH+=":$jar" + CLASSPATH="$CLASSPATH:$jar" done # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack @@ -147,6 +147,17 @@ if [ -e "$EXAMPLES_DIR/target/spark-examples-"*hadoop[12].jar ]; then export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples-"*hadoop[12].jar` fi +# Add hadoop conf dir - else FileSystem.*, etc fail ! +# Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts +# the configurtion files. +if [ "x" != "x$HADOOP_CONF_DIR" ]; then + CLASSPATH="$CLASSPATH:$HADOOP_CONF_DIR" +fi +if [ "x" != "x$YARN_CONF_DIR" ]; then + CLASSPATH="$CLASSPATH:$YARN_CONF_DIR" +fi + + # Figure out whether to run our class with java or with the scala launcher. # In most cases, we'd prefer to execute our process with java because scala # creates a shell script as the parent of its Java process, which makes it @@ -156,9 +167,9 @@ fi if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then EXTRA_ARGS="" # Java options will be passed to scala as JAVA_OPTS else - CLASSPATH+=":$SCALA_LIBRARY_PATH/scala-library.jar" - CLASSPATH+=":$SCALA_LIBRARY_PATH/scala-compiler.jar" - CLASSPATH+=":$SCALA_LIBRARY_PATH/jline.jar" + CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-library.jar" + CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-compiler.jar" + CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/jline.jar" # The JVM doesn't read JAVA_OPTS by default so we need to pass it in EXTRA_ARGS="$JAVA_OPTS" fi diff --git a/run2.cmd b/run2.cmd index d2d4807971..c6f43dde5b 100644 --- a/run2.cmd +++ b/run2.cmd @@ -63,6 +63,19 @@ set CLASSPATH=%CLASSPATH%;%FWDIR%repl\lib\* set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\* set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes +rem Add hadoop conf dir - else FileSystem.*, etc fail +rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts +rem the configurtion files. +if "x%HADOOP_CONF_DIR%"=="x" goto no_hadoop_conf_dir + set CLASSPATH=%CLASSPATH%;%HADOOP_CONF_DIR% +:no_hadoop_conf_dir + +if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir + set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR% +:no_yarn_conf_dir + + + rem Figure out the JAR file that our examples were packaged into. rem First search in the build path from SBT: for %%d in ("examples/target/scala-%SCALA_VERSION%/spark-examples*.jar") do ( -- cgit v1.2.3 From 0345954530a445b275595962c9f949cad55a01f6 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 11 May 2013 14:17:09 -0700 Subject: SPARK-738: Spark should detect and squash nonserializable exceptions --- core/src/main/scala/spark/executor/Executor.scala | 16 ++++++++++++++-- core/src/test/scala/spark/DistributedSuite.scala | 21 +++++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 344face5e6..f9061b1c71 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -1,6 +1,6 @@ package spark.executor -import java.io.{File, FileOutputStream} +import java.io.{NotSerializableException, File, FileOutputStream} import java.net.{URI, URL, URLClassLoader} import java.util.concurrent._ @@ -123,7 +123,19 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert case t: Throwable => { val reason = ExceptionFailure(t) - context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) + val serReason = + try { + ser.serialize(reason) + } + catch { + case e: NotSerializableException => { + val message = "Spark caught unserializable exn: " + t.toString + val throwable = new Exception(message) + throwable.setStackTrace(t.getStackTrace) + ser.serialize(new ExceptionFailure(throwable)) + } + } + context.statusUpdate(taskId, TaskState.FAILED, serReason) // TODO: Should we exit the whole executor here? On the one hand, the failed task may // have left some weird state around depending on when the exception was thrown, but on diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 4df3bb5b67..8ab0f2cfa2 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -18,6 +18,9 @@ import scala.collection.mutable.ArrayBuffer import SparkContext._ import storage.{GetBlock, BlockManagerWorker, StorageLevel} +class NotSerializableClass +class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {} + class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext { val clusterUrl = "local-cluster[2,1,512]" @@ -27,6 +30,24 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter System.clearProperty("spark.storage.memoryFraction") } + test("task throws not serializable exception") { + // Ensures that executors do not crash when an exn is not serializable. If executors crash, + // this test will hang. Correct behavior is that executors don't crash but fail tasks + // and the scheduler throws a SparkException. + + // numSlaves must be less than numPartitions + val numSlaves = 3 + val numPartitions = 10 + + sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test") + val data = sc.parallelize(1 to 100, numPartitions).map(x => (x, x)). + map(x => throw new NotSerializableExn(new NotSerializableClass)) + intercept[SparkException] { + data.count() + } + resetSparkContext() + } + test("local-cluster format") { sc = new SparkContext("local-cluster[2,1,512]", "test") assert(sc.parallelize(1 to 2, 2).count() == 2) -- cgit v1.2.3 From a5c28bb888f74d27893c198865f588ca0334a8a6 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 11 May 2013 14:20:39 -0700 Subject: Removing unnecessary map --- core/src/test/scala/spark/DistributedSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 8ab0f2cfa2..33c99471c6 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -40,7 +40,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter val numPartitions = 10 sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test") - val data = sc.parallelize(1 to 100, numPartitions).map(x => (x, x)). + val data = sc.parallelize(1 to 100, numPartitions). map(x => throw new NotSerializableExn(new NotSerializableClass)) intercept[SparkException] { data.count() -- cgit v1.2.3 From 3da2305ed0d4add7127953e5240632f86053b4aa Mon Sep 17 00:00:00 2001 From: Cody Koeninger Date: Sat, 11 May 2013 23:59:07 -0500 Subject: code cleanup per rxin comments --- core/src/main/scala/spark/rdd/JdbcRDD.scala | 67 ++++++++++++++++------------- 1 file changed, 37 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/spark/rdd/JdbcRDD.scala b/core/src/main/scala/spark/rdd/JdbcRDD.scala index 4c3054465c..b0f7054233 100644 --- a/core/src/main/scala/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/spark/rdd/JdbcRDD.scala @@ -5,23 +5,27 @@ import java.sql.{Connection, ResultSet} import spark.{Logging, Partition, RDD, SparkContext, TaskContext} import spark.util.NextIterator +private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition { + override def index = idx +} + /** - An RDD that executes an SQL query on a JDBC connection and reads results. - @param getConnection a function that returns an open Connection. - The RDD takes care of closing the connection. - @param sql the text of the query. - The query must contain two ? placeholders for parameters used to partition the results. - E.g. "select title, author from books where ? <= id and id <= ?" - @param lowerBound the minimum value of the first placeholder - @param upperBound the maximum value of the second placeholder - The lower and upper bounds are inclusive. - @param numPartitions the number of partitions. - Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2, - the query would be executed twice, once with (1, 10) and once with (11, 20) - @param mapRow a function from a ResultSet to a single row of the desired result type(s). - This should only call getInt, getString, etc; the RDD takes care of calling next. - The default maps a ResultSet to an array of Object. -*/ + * An RDD that executes an SQL query on a JDBC connection and reads results. + * @param getConnection a function that returns an open Connection. + * The RDD takes care of closing the connection. + * @param sql the text of the query. + * The query must contain two ? placeholders for parameters used to partition the results. + * E.g. "select title, author from books where ? <= id and id <= ?" + * @param lowerBound the minimum value of the first placeholder + * @param upperBound the maximum value of the second placeholder + * The lower and upper bounds are inclusive. + * @param numPartitions the number of partitions. + * Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2, + * the query would be executed twice, once with (1, 10) and once with (11, 20) + * @param mapRow a function from a ResultSet to a single row of the desired result type(s). + * This should only call getInt, getString, etc; the RDD takes care of calling next. + * The default maps a ResultSet to an array of Object. + */ class JdbcRDD[T: ClassManifest]( sc: SparkContext, getConnection: () => Connection, @@ -29,26 +33,33 @@ class JdbcRDD[T: ClassManifest]( lowerBound: Long, upperBound: Long, numPartitions: Int, - mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray) + mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _) extends RDD[T](sc, Nil) with Logging { - override def getPartitions: Array[Partition] = - ParallelCollectionRDD.slice(lowerBound to upperBound, numPartitions). - filter(! _.isEmpty). - zipWithIndex. - map(x => new JdbcPartition(x._2, x._1.head, x._1.last)). - toArray + override def getPartitions: Array[Partition] = { + // bounds are inclusive, hence the + 1 here and - 1 on end + val length = 1 + upperBound - lowerBound + (0 until numPartitions).map(i => { + val start = lowerBound + ((i * length) / numPartitions).toLong + val end = lowerBound + (((i + 1) * length) / numPartitions).toLong - 1 + new JdbcPartition(i, start, end) + }).toArray + } override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] { context.addOnCompleteCallback{ () => closeIfNeeded() } val part = thePart.asInstanceOf[JdbcPartition] val conn = getConnection() val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) - // force mysql driver to stream rather than pull entire resultset into memory + + // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results, + // rather than pulling entire resultset into memory. + // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) { stmt.setFetchSize(Integer.MIN_VALUE) logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ") } + stmt.setLong(1, part.lower) stmt.setLong(2, part.upper) val rs = stmt.executeQuery() @@ -81,14 +92,10 @@ class JdbcRDD[T: ClassManifest]( } } } - -} - -private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition { - override def index = idx } object JdbcRDD { - val resultSetToObjectArray = (rs: ResultSet) => + def resultSetToObjectArray(rs: ResultSet) = { Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1)) + } } -- cgit v1.2.3 From 059ab8875463ab22fe329fb6a627cac0a7d8158c Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 11 May 2013 23:39:14 -0700 Subject: Changing technique to use same code path in all cases --- core/src/main/scala/spark/TaskEndReason.scala | 13 ++++++++++--- core/src/main/scala/spark/executor/Executor.scala | 16 ++-------------- .../scala/spark/scheduler/cluster/TaskSetManager.scala | 8 ++++---- .../scala/spark/scheduler/local/LocalScheduler.scala | 3 ++- 4 files changed, 18 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala index 420c54bc9a..ce9bb49897 100644 --- a/core/src/main/scala/spark/TaskEndReason.scala +++ b/core/src/main/scala/spark/TaskEndReason.scala @@ -14,9 +14,16 @@ private[spark] case object Success extends TaskEndReason private[spark] case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it -private[spark] -case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason +private[spark] case class FetchFailed( + bmAddress: BlockManagerId, + shuffleId: Int, + mapId: Int, + reduceId: Int) + extends TaskEndReason -private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason +private[spark] case class ExceptionFailure( + description: String, + stackTrace: Array[StackTraceElement]) + extends TaskEndReason private[spark] case class OtherFailure(message: String) extends TaskEndReason diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index f9061b1c71..9084def9b2 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -122,20 +122,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert } case t: Throwable => { - val reason = ExceptionFailure(t) - val serReason = - try { - ser.serialize(reason) - } - catch { - case e: NotSerializableException => { - val message = "Spark caught unserializable exn: " + t.toString - val throwable = new Exception(message) - throwable.setStackTrace(t.getStackTrace) - ser.serialize(new ExceptionFailure(throwable)) - } - } - context.statusUpdate(taskId, TaskState.FAILED, serReason) + val reason = ExceptionFailure(t.toString, t.getStackTrace) + context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) // TODO: Should we exit the whole executor here? On the one hand, the failed task may // have left some weird state around depending on when the exception was thrown, but on diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 27e713e2c4..6d663de2f8 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -493,7 +493,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe return case ef: ExceptionFailure => - val key = ef.exception.toString + val key = ef.description val now = System.currentTimeMillis val (printFull, dupCount) = { if (recentExceptions.contains(key)) { @@ -511,10 +511,10 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe } } if (printFull) { - val locs = ef.exception.getStackTrace.map(loc => "\tat %s".format(loc.toString)) - logInfo("Loss was due to %s\n%s".format(ef.exception.toString, locs.mkString("\n"))) + val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString)) + logInfo("Loss was due to %s\n%s".format(ef.description, locs.mkString("\n"))) } else { - logInfo("Loss was due to %s [duplicate %d]".format(ef.exception.toString, dupCount)) + logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount)) } case _ => {} diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index f060a940a9..42d5bc4813 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -102,7 +102,8 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon } else { // TODO: Do something nicer here to return all the way to the user if (!Thread.currentThread().isInterrupted) - listener.taskEnded(task, new ExceptionFailure(t), null, null, info, null) + listener.taskEnded( + task, new ExceptionFailure(t.getMessage, t.getStackTrace), null, null, info, null) } } } -- cgit v1.2.3 From 1c15b8505124c157449b6d41e1127f3eb4081a23 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 11 May 2013 23:52:53 -0700 Subject: Removing import --- core/src/main/scala/spark/executor/Executor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 9084def9b2..1d5516966d 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -1,6 +1,6 @@ package spark.executor -import java.io.{NotSerializableException, File, FileOutputStream} +import java.io.{File, FileOutputStream} import java.net.{URI, URL, URLClassLoader} import java.util.concurrent._ -- cgit v1.2.3 From 72b9c4cb6ec4080eb8751e5e040f180272ac82a6 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 11 May 2013 23:53:50 -0700 Subject: Small fix --- core/src/main/scala/spark/scheduler/local/LocalScheduler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index 42d5bc4813..a357422466 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -103,7 +103,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon // TODO: Do something nicer here to return all the way to the user if (!Thread.currentThread().isInterrupted) listener.taskEnded( - task, new ExceptionFailure(t.getMessage, t.getStackTrace), null, null, info, null) + task, new ExceptionFailure(t.toString, t.getStackTrace), null, null, info, null) } } } -- cgit v1.2.3 From 7f0833647b784c4ec7cd2f2e8e4fcd5ed6f673cd Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 12 May 2013 07:54:03 -0700 Subject: Capturing class name --- core/src/main/scala/spark/TaskEndReason.scala | 1 + core/src/main/scala/spark/executor/Executor.scala | 2 +- core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala | 3 ++- core/src/main/scala/spark/scheduler/local/LocalScheduler.scala | 7 ++++--- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala index ce9bb49897..ca793eb402 100644 --- a/core/src/main/scala/spark/TaskEndReason.scala +++ b/core/src/main/scala/spark/TaskEndReason.scala @@ -22,6 +22,7 @@ private[spark] case class FetchFailed( extends TaskEndReason private[spark] case class ExceptionFailure( + className: String, description: String, stackTrace: Array[StackTraceElement]) extends TaskEndReason diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 1d5516966d..da20b84544 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -122,7 +122,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert } case t: Throwable => { - val reason = ExceptionFailure(t.toString, t.getStackTrace) + val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace) context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) // TODO: Should we exit the whole executor here? On the one hand, the failed task may diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 6d663de2f8..06de3c755e 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -512,7 +512,8 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe } if (printFull) { val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString)) - logInfo("Loss was due to %s\n%s".format(ef.description, locs.mkString("\n"))) + logInfo("Loss was due to %s\n%s\n%s".format( + ef.className, ef.description, locs.mkString("\n"))) } else { logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount)) } diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index a357422466..ebe42685ad 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -101,9 +101,10 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon submitTask(task, idInJob) } else { // TODO: Do something nicer here to return all the way to the user - if (!Thread.currentThread().isInterrupted) - listener.taskEnded( - task, new ExceptionFailure(t.toString, t.getStackTrace), null, null, info, null) + if (!Thread.currentThread().isInterrupted) { + val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace) + listener.taskEnded(task, failure, null, null, info, null) + } } } } -- cgit v1.2.3 From b16c4896f617f352bb230908b7c08c7c5b028434 Mon Sep 17 00:00:00 2001 From: Cody Koeninger Date: Tue, 14 May 2013 23:44:04 -0500 Subject: add test for JdbcRDD using embedded derby, per rxin suggestion --- .gitignore | 1 + core/src/test/scala/spark/rdd/JdbcRDDSuite.scala | 56 ++++++++++++++++++++++++ project/SparkBuild.scala | 1 + 3 files changed, 58 insertions(+) create mode 100644 core/src/test/scala/spark/rdd/JdbcRDDSuite.scala diff --git a/.gitignore b/.gitignore index 155e785b01..b87fc1ee79 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,4 @@ streaming-tests.log dependency-reduced-pom.xml .ensime .ensime_lucene +derby.log diff --git a/core/src/test/scala/spark/rdd/JdbcRDDSuite.scala b/core/src/test/scala/spark/rdd/JdbcRDDSuite.scala new file mode 100644 index 0000000000..6afb0fa9bc --- /dev/null +++ b/core/src/test/scala/spark/rdd/JdbcRDDSuite.scala @@ -0,0 +1,56 @@ +package spark + +import org.scalatest.{ BeforeAndAfter, FunSuite } +import spark.SparkContext._ +import spark.rdd.JdbcRDD +import java.sql._ + +class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { + + before { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver") + val conn = DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb;create=true") + try { + val create = conn.createStatement + create.execute(""" + CREATE TABLE FOO( + ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), + DATA INTEGER + )""") + create.close + val insert = conn.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)") + (1 to 100).foreach { i => + insert.setInt(1, i * 2) + insert.executeUpdate + } + insert.close + } catch { + case e: SQLException if e.getSQLState == "X0Y32" => + // table exists + } finally { + conn.close + } + } + + test("basic functionality") { + sc = new SparkContext("local", "test") + val rdd = new JdbcRDD( + sc, + () => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") }, + "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", + 1, 100, 3, + (r: ResultSet) => { r.getInt(1) } ).cache + + assert(rdd.count === 100) + assert(rdd.reduce(_+_) === 10100) + } + + after { + try { + DriverManager.getConnection("jdbc:derby:;shutdown=true") + } catch { + case se: SQLException if se.getSQLState == "XJ015" => + // normal shutdown + } + } +} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f0b371b2cf..b11893590e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -147,6 +147,7 @@ object SparkBuild extends Build { "cc.spray" % "spray-can" % "1.0-M2.1", "cc.spray" % "spray-server" % "1.0-M2.1", "cc.spray" % "spray-json_2.9.2" % "1.1.1", + "org.apache.derby" % "derby" % "10.4.2.0" % "test", "org.apache.mesos" % "mesos" % "0.9.0-incubating" ) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq, unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") } -- cgit v1.2.3 From 404f9ff617401a2f8d12845861ce8f02cfe6442c Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 14 May 2013 23:28:34 -0700 Subject: Added derby dependency to Maven pom files for the JDBC Java test. --- core/pom.xml | 5 +++++ pom.xml | 8 +++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/core/pom.xml b/core/pom.xml index 9a019b5a42..57a95328c3 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -92,6 +92,11 @@ log4j + + org.apache.derby + derby + test + org.scalatest scalatest_${scala.version} diff --git a/pom.xml b/pom.xml index 3936165d78..d7cdc591cf 100644 --- a/pom.xml +++ b/pom.xml @@ -256,6 +256,12 @@ mesos ${mesos.version} + + org.apache.derby + derby + 10.4.2.0 + test + org.scala-lang @@ -565,7 +571,7 @@ 2 - 2.0.2-alpha + 2.0.2-alpha -- cgit v1.2.3 From f9d40a5848a2e1eef31ac63cd9221d5b77c1c5a7 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 14 May 2013 23:29:57 -0700 Subject: Added a comment in JdbcRDD for example usage. --- core/src/main/scala/spark/rdd/JdbcRDD.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/rdd/JdbcRDD.scala b/core/src/main/scala/spark/rdd/JdbcRDD.scala index b0f7054233..a50f407737 100644 --- a/core/src/main/scala/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/spark/rdd/JdbcRDD.scala @@ -11,11 +11,13 @@ private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) e /** * An RDD that executes an SQL query on a JDBC connection and reads results. + * For usage example, see test case JdbcRDDSuite. + * * @param getConnection a function that returns an open Connection. * The RDD takes care of closing the connection. * @param sql the text of the query. * The query must contain two ? placeholders for parameters used to partition the results. - * E.g. "select title, author from books where ? <= id and id <= ?" + * E.g. "select title, author from books where ? <= id and id <= ?" * @param lowerBound the minimum value of the first placeholder * @param upperBound the maximum value of the second placeholder * The lower and upper bounds are inclusive. -- cgit v1.2.3