aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-05-15 00:31:52 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2013-05-15 00:31:52 -0700
commitf3491cb89bcbd8748bedbd2fe6a200ba09d9e3c2 (patch)
tree42dd270599a9445998b064a942d5092a62f702d7
parent9cafacf32ddb9a3f6c5cb774e4fe527225273f16 (diff)
parentf9d40a5848a2e1eef31ac63cd9221d5b77c1c5a7 (diff)
downloadspark-f3491cb89bcbd8748bedbd2fe6a200ba09d9e3c2.tar.gz
spark-f3491cb89bcbd8748bedbd2fe6a200ba09d9e3c2.tar.bz2
spark-f3491cb89bcbd8748bedbd2fe6a200ba09d9e3c2.zip
Merge branch 'master' of github.com:mesos/spark into shufflemerge
Conflicts: core/src/main/scala/spark/storage/BlockManager.scala core/src/test/scala/spark/DistributedSuite.scala project/SparkBuild.scala
-rw-r--r--.gitignore1
-rw-r--r--core/pom.xml5
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala19
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala1
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala1
-rw-r--r--core/src/main/scala/spark/RDD.scala2
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala22
-rw-r--r--core/src/main/scala/spark/TaskEndReason.scala14
-rw-r--r--core/src/main/scala/spark/Utils.scala21
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala5
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/spark/rdd/BlockRDD.scala9
-rw-r--r--core/src/main/scala/spark/rdd/EmptyRDD.scala16
-rw-r--r--core/src/main/scala/spark/rdd/JdbcRDD.scala103
-rw-r--r--core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala28
-rw-r--r--core/src/main/scala/spark/rdd/ZippedRDD.scala23
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala7
-rw-r--r--core/src/main/scala/spark/scheduler/ResultTask.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala11
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala108
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala182
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala8
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala57
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala23
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala7
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala22
-rw-r--r--core/src/test/scala/spark/rdd/JdbcRDDSuite.scala56
-rw-r--r--core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala4
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala2
-rw-r--r--docs/running-on-yarn.md3
-rwxr-xr-xec2/spark_ec2.py2
-rw-r--r--pom.xml8
-rw-r--r--project/SparkBuild.scala3
-rwxr-xr-xrun65
-rw-r--r--run2.cmd13
35 files changed, 650 insertions, 208 deletions
diff --git a/.gitignore b/.gitignore
index 155e785b01..b87fc1ee79 100644
--- a/.gitignore
+++ b/.gitignore
@@ -36,3 +36,4 @@ streaming-tests.log
dependency-reduced-pom.xml
.ensime
.ensime_lucene
+derby.log
diff --git a/core/pom.xml b/core/pom.xml
index 9a019b5a42..57a95328c3 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -93,6 +93,11 @@
</dependency>
<dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
<scope>test</scope>
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
index ae719267e8..aa72c1e5fe 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -148,22 +148,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
- var mainArgs: Array[String] = null
- var startIndex = 0
-
- // I am sure there is a better 'scala' way to do this .... but I am just trying to get things to work right now !
- if (args.userArgs.isEmpty || args.userArgs.get(0) != "yarn-standalone") {
- // ensure that first param is ALWAYS "yarn-standalone"
- mainArgs = new Array[String](args.userArgs.size() + 1)
- mainArgs.update(0, "yarn-standalone")
- startIndex = 1
- }
- else {
- mainArgs = new Array[String](args.userArgs.size())
- }
-
- args.userArgs.copyToArray(mainArgs, startIndex, args.userArgs.size())
-
+ // Copy
+ var mainArgs: Array[String] = new Array[String](args.userArgs.size())
+ args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
mainMethod.invoke(null, mainArgs)
}
}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala
index dc89125d81..1b00208511 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -69,7 +69,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
- " Note that first argument will ALWAYS be yarn-standalone : will be added if missing.\n" +
" --num-workers NUM Number of workers to start (Default: 2)\n" +
" --worker-cores NUM Number of cores for the workers (Default: 1)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n")
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
index 2e69fe3fb0..24110558e7 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
@@ -92,7 +92,6 @@ class ClientArguments(val args: Array[String]) {
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
- " Note that first argument will ALWAYS be yarn-standalone : will be added if missing.\n" +
" --num-workers NUM Number of workers to start (Default: 2)\n" +
" --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index fd14ef17f1..dde131696f 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -489,7 +489,7 @@ abstract class RDD[T: ClassManifest](
*/
def foreachPartition(f: Iterator[T] => Unit) {
val cleanF = sc.clean(f)
- sc.runJob(this, (iter: Iterator[T]) => f(iter))
+ sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}
/**
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 2fa97cd829..be1a04d619 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -31,8 +31,11 @@ class SparkEnv (
val blockManager: BlockManager,
val connectionManager: ConnectionManager,
val httpFileServer: HttpFileServer,
- val sparkFilesDir: String
- ) {
+ val sparkFilesDir: String,
+ // To be set only as part of initialization of SparkContext.
+ // (executorId, defaultHostPort) => executorHostPort
+ // If executorId is NOT found, return defaultHostPort
+ var executorIdToHostPort: Option[(String, String) => String]) {
def stop() {
httpFileServer.stop()
@@ -46,6 +49,17 @@ class SparkEnv (
// down, but let's call it anyway in case it gets fixed in a later release
actorSystem.awaitTermination()
}
+
+
+ def resolveExecutorIdToHostPort(executorId: String, defaultHostPort: String): String = {
+ val env = SparkEnv.get
+ if (env.executorIdToHostPort.isEmpty) {
+ // default to using host, not host port. Relevant to non cluster modes.
+ return defaultHostPort
+ }
+
+ env.executorIdToHostPort.get(executorId, defaultHostPort)
+ }
}
object SparkEnv extends Logging {
@@ -168,7 +182,7 @@ object SparkEnv extends Logging {
blockManager,
connectionManager,
httpFileServer,
- sparkFilesDir)
+ sparkFilesDir,
+ None)
}
-
}
diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala
index 420c54bc9a..ca793eb402 100644
--- a/core/src/main/scala/spark/TaskEndReason.scala
+++ b/core/src/main/scala/spark/TaskEndReason.scala
@@ -14,9 +14,17 @@ private[spark] case object Success extends TaskEndReason
private[spark]
case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
-private[spark]
-case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason
+private[spark] case class FetchFailed(
+ bmAddress: BlockManagerId,
+ shuffleId: Int,
+ mapId: Int,
+ reduceId: Int)
+ extends TaskEndReason
-private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason
+private[spark] case class ExceptionFailure(
+ className: String,
+ description: String,
+ stackTrace: Array[StackTraceElement])
+ extends TaskEndReason
private[spark] case class OtherFailure(message: String) extends TaskEndReason
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 9f48cbe490..c1495d5317 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -335,7 +335,7 @@ private object Utils extends Logging {
retval
}
- /*
+/*
// Used by DEBUG code : remove when all testing done
private val ipPattern = Pattern.compile("^[0-9]+(\\.[0-9]+)*$")
def checkHost(host: String, message: String = "") {
@@ -357,21 +357,26 @@ private object Utils extends Logging {
Utils.logErrorWithStack("Unexpected to have port " + port + " which is not valid in " + hostPort + ". Message " + message)
}
}
- */
+
+ // Used by DEBUG code : remove when all testing done
+ def logErrorWithStack(msg: String) {
+ try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
+ // temp code for debug
+ System.exit(-1)
+ }
+*/
// Once testing is complete in various modes, replace with this ?
def checkHost(host: String, message: String = "") {}
def checkHostPort(hostPort: String, message: String = "") {}
- def getUserNameFromEnvironment(): String = {
- SparkHadoopUtil.getUserNameFromEnvironment
- }
-
// Used by DEBUG code : remove when all testing done
def logErrorWithStack(msg: String) {
try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
- // temp code for debug
- System.exit(-1)
+ }
+
+ def getUserNameFromEnvironment(): String = {
+ SparkHadoopUtil.getUserNameFromEnvironment
}
// Typically, this will be of order of number of nodes in cluster
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 1a7da0f7bf..3dc2207170 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -54,7 +54,10 @@ private[spark] class Worker(
def createWorkDir() {
workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
try {
- if ( (workDir.exists() && !workDir.isDirectory) || (!workDir.exists() && !workDir.mkdirs()) ) {
+ // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
+ // So attempting to create and then check if directory was created or not.
+ workDir.mkdirs()
+ if ( !workDir.exists() || !workDir.isDirectory) {
logError("Failed to create work directory " + workDir)
System.exit(1)
}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 344face5e6..da20b84544 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -122,7 +122,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
}
case t: Throwable => {
- val reason = ExceptionFailure(t)
+ val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace)
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
// TODO: Should we exit the whole executor here? On the one hand, the failed task may
diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala
index 7348c4f15b..719d4bf03e 100644
--- a/core/src/main/scala/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/spark/rdd/BlockRDD.scala
@@ -1,7 +1,7 @@
package spark.rdd
-import scala.collection.mutable.HashMap
import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
+import spark.storage.BlockManager
private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition {
val index = idx
@@ -11,12 +11,7 @@ private[spark]
class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
extends RDD[T](sc, Nil) {
- @transient lazy val locations_ = {
- val blockManager = SparkEnv.get.blockManager
- /*val locations = blockIds.map(id => blockManager.getLocations(id))*/
- val locations = blockManager.getLocations(blockIds)
- HashMap(blockIds.zip(locations):_*)
- }
+ @transient lazy val locations_ = BlockManager.blockIdsToExecutorLocations(blockIds, SparkEnv.get)
override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i => {
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
diff --git a/core/src/main/scala/spark/rdd/EmptyRDD.scala b/core/src/main/scala/spark/rdd/EmptyRDD.scala
new file mode 100644
index 0000000000..e4dd3a7fa7
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/EmptyRDD.scala
@@ -0,0 +1,16 @@
+package spark.rdd
+
+import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
+
+
+/**
+ * An RDD that is empty, i.e. has no element in it.
+ */
+class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) {
+
+ override def getPartitions: Array[Partition] = Array.empty
+
+ override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+ throw new UnsupportedOperationException("empty RDD")
+ }
+}
diff --git a/core/src/main/scala/spark/rdd/JdbcRDD.scala b/core/src/main/scala/spark/rdd/JdbcRDD.scala
new file mode 100644
index 0000000000..a50f407737
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/JdbcRDD.scala
@@ -0,0 +1,103 @@
+package spark.rdd
+
+import java.sql.{Connection, ResultSet}
+
+import spark.{Logging, Partition, RDD, SparkContext, TaskContext}
+import spark.util.NextIterator
+
+private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {
+ override def index = idx
+}
+
+/**
+ * An RDD that executes an SQL query on a JDBC connection and reads results.
+ * For usage example, see test case JdbcRDDSuite.
+ *
+ * @param getConnection a function that returns an open Connection.
+ * The RDD takes care of closing the connection.
+ * @param sql the text of the query.
+ * The query must contain two ? placeholders for parameters used to partition the results.
+ * E.g. "select title, author from books where ? <= id and id <= ?"
+ * @param lowerBound the minimum value of the first placeholder
+ * @param upperBound the maximum value of the second placeholder
+ * The lower and upper bounds are inclusive.
+ * @param numPartitions the number of partitions.
+ * Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
+ * the query would be executed twice, once with (1, 10) and once with (11, 20)
+ * @param mapRow a function from a ResultSet to a single row of the desired result type(s).
+ * This should only call getInt, getString, etc; the RDD takes care of calling next.
+ * The default maps a ResultSet to an array of Object.
+ */
+class JdbcRDD[T: ClassManifest](
+ sc: SparkContext,
+ getConnection: () => Connection,
+ sql: String,
+ lowerBound: Long,
+ upperBound: Long,
+ numPartitions: Int,
+ mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
+ extends RDD[T](sc, Nil) with Logging {
+
+ override def getPartitions: Array[Partition] = {
+ // bounds are inclusive, hence the + 1 here and - 1 on end
+ val length = 1 + upperBound - lowerBound
+ (0 until numPartitions).map(i => {
+ val start = lowerBound + ((i * length) / numPartitions).toLong
+ val end = lowerBound + (((i + 1) * length) / numPartitions).toLong - 1
+ new JdbcPartition(i, start, end)
+ }).toArray
+ }
+
+ override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
+ context.addOnCompleteCallback{ () => closeIfNeeded() }
+ val part = thePart.asInstanceOf[JdbcPartition]
+ val conn = getConnection()
+ val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
+
+ // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
+ // rather than pulling entire resultset into memory.
+ // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
+ if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
+ stmt.setFetchSize(Integer.MIN_VALUE)
+ logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ")
+ }
+
+ stmt.setLong(1, part.lower)
+ stmt.setLong(2, part.upper)
+ val rs = stmt.executeQuery()
+
+ override def getNext: T = {
+ if (rs.next()) {
+ mapRow(rs)
+ } else {
+ finished = true
+ null.asInstanceOf[T]
+ }
+ }
+
+ override def close() {
+ try {
+ if (null != rs && ! rs.isClosed()) rs.close()
+ } catch {
+ case e: Exception => logWarning("Exception closing resultset", e)
+ }
+ try {
+ if (null != stmt && ! stmt.isClosed()) stmt.close()
+ } catch {
+ case e: Exception => logWarning("Exception closing statement", e)
+ }
+ try {
+ if (null != conn && ! stmt.isClosed()) conn.close()
+ logInfo("closed connection")
+ } catch {
+ case e: Exception => logWarning("Exception closing connection", e)
+ }
+ }
+ }
+}
+
+object JdbcRDD {
+ def resultSetToObjectArray(rs: ResultSet) = {
+ Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
+ }
+}
diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
index fc3f29ffcd..dd9f3c2680 100644
--- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
@@ -1,6 +1,6 @@
package spark.rdd
-import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
+import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException}
private[spark] class ZippedPartitionsPartition(
@@ -38,9 +38,31 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
}
override def getPreferredLocations(s: Partition): Seq[String] = {
+ // Note that as number of rdd's increase and/or number of slaves in cluster increase, the computed preferredLocations below
+ // become diminishingly small : so we might need to look at alternate strategies to alleviate this.
+ // If there are no (or very small number of preferred locations), we will end up transferred the blocks to 'any' node in the
+ // cluster - paying with n/w and cache cost.
+ // Maybe pick a node which figures max amount of time ?
+ // Choose node which is hosting 'larger' of some subset of blocks ?
+ // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible)
val splits = s.asInstanceOf[ZippedPartitionsPartition].partitions
- val preferredLocations = rdds.zip(splits).map(x => x._1.preferredLocations(x._2))
- preferredLocations.reduce((x, y) => x.intersect(y))
+ val rddSplitZip = rdds.zip(splits)
+
+ // exact match.
+ val exactMatchPreferredLocations = rddSplitZip.map(x => x._1.preferredLocations(x._2))
+ val exactMatchLocations = exactMatchPreferredLocations.reduce((x, y) => x.intersect(y))
+
+ // Remove exact match and then do host local match.
+ val otherNodePreferredLocations = rddSplitZip.map(x => {
+ x._1.preferredLocations(x._2).map(hostPort => {
+ val host = Utils.parseHostPort(hostPort)._1
+
+ if (exactMatchLocations.contains(host)) null else host
+ }).filter(_ != null)
+ })
+ val otherNodeLocalLocations = otherNodePreferredLocations.reduce((x, y) => x.intersect(y))
+
+ otherNodeLocalLocations ++ exactMatchLocations
}
override def clearDependencies() {
diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala
index 35b0e06785..f728e93d24 100644
--- a/core/src/main/scala/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala
@@ -1,6 +1,6 @@
package spark.rdd
-import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
+import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException}
@@ -48,8 +48,27 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
}
override def getPreferredLocations(s: Partition): Seq[String] = {
+ // Note that as number of slaves in cluster increase, the computed preferredLocations can become small : so we might need
+ // to look at alternate strategies to alleviate this. (If there are no (or very small number of preferred locations), we
+ // will end up transferred the blocks to 'any' node in the cluster - paying with n/w and cache cost.
+ // Maybe pick one or the other ? (so that atleast one block is local ?).
+ // Choose node which is hosting 'larger' of the blocks ?
+ // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible)
val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
- rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2))
+ val pref1 = rdd1.preferredLocations(partition1)
+ val pref2 = rdd2.preferredLocations(partition2)
+
+ // exact match - instance local and host local.
+ val exactMatchLocations = pref1.intersect(pref2)
+
+ // remove locations which are already handled via exactMatchLocations, and intersect where both partitions are node local.
+ val otherNodeLocalPref1 = pref1.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
+ val otherNodeLocalPref2 = pref2.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
+ val otherNodeLocalLocations = otherNodeLocalPref1.intersect(otherNodeLocalPref2)
+
+
+ // Can have mix of instance local (hostPort) and node local (host) locations as preference !
+ exactMatchLocations ++ otherNodeLocalLocations
}
override def clearDependencies() {
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 1440b93e65..b18248d2b5 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -12,7 +12,7 @@ import spark.executor.TaskMetrics
import spark.partial.ApproximateActionListener
import spark.partial.ApproximateEvaluator
import spark.partial.PartialResult
-import spark.storage.BlockManagerMaster
+import spark.storage.{BlockManager, BlockManagerMaster}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
/**
@@ -117,9 +117,8 @@ class DAGScheduler(
private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = {
if (!cacheLocs.contains(rdd.id)) {
val blockIds = rdd.partitions.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray
- cacheLocs(rdd.id) = blockManagerMaster.getLocations(blockIds).map {
- locations => locations.map(_.hostPort).toList
- }.toArray
+ val locs = BlockManager.blockIdsToExecutorLocations(blockIds, env, blockManagerMaster)
+ cacheLocs(rdd.id) = blockIds.map(locs.getOrElse(_, Nil))
}
cacheLocs(rdd.id)
}
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index 89dc6640b2..83166bce22 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -70,12 +70,11 @@ private[spark] class ResultTask[T, U](
rdd.partitions(partition)
}
- // data locality is on a per host basis, not hyper specific to container (host:port). Unique on set of hosts.
- val preferredLocs: Seq[String] = if (locs == null) Nil else locs.map(loc => Utils.parseHostPort(loc)._1).toSet.toSeq
+ private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq
{
// DEBUG code
- preferredLocs.foreach (host => Utils.checkHost(host, "preferredLocs : " + preferredLocs))
+ preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs))
}
override def run(attemptId: Long): U = {
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index f097213ab5..95647389c3 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -85,18 +85,11 @@ private[spark] class ShuffleMapTask(
protected def this() = this(0, null, null, 0, null)
- // Data locality is on a per host basis, not hyper specific to container (host:port).
- // Unique on set of hosts.
- // TODO(rxin): The above statement seems problematic. Even if partitions are on the same host,
- // the worker would still need to serialize / deserialize those data when they are in
- // different jvm processes. Often that is very costly ...
- @transient
- private val preferredLocs: Seq[String] =
- if (locs == null) Nil else locs.map(loc => Utils.parseHostPort(loc)._1).toSet.toSeq
+ @transient private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq
{
// DEBUG code
- preferredLocs.foreach (host => Utils.checkHost(host, "preferredLocs : " + preferredLocs))
+ preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs))
}
var split = if (rdd == null) {
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index a9d9c5e44c..cf4483f144 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -32,28 +32,28 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val TASK_REVIVAL_INTERVAL = System.getProperty("spark.tasks.revive.interval", "0").toLong
/*
- This property controls how aggressive we should be to modulate waiting for host local task scheduling.
- To elaborate, currently there is a time limit (3 sec def) to ensure that spark attempts to wait for host locality of tasks before
+ This property controls how aggressive we should be to modulate waiting for node local task scheduling.
+ To elaborate, currently there is a time limit (3 sec def) to ensure that spark attempts to wait for node locality of tasks before
scheduling on other nodes. We have modified this in yarn branch such that offers to task set happen in prioritized order :
- host-local, rack-local and then others
- But once all available host local (and no pref) tasks are scheduled, instead of waiting for 3 sec before
+ node-local, rack-local and then others
+ But once all available node local (and no pref) tasks are scheduled, instead of waiting for 3 sec before
scheduling to other nodes (which degrades performance for time sensitive tasks and on larger clusters), we can
modulate that : to also allow rack local nodes or any node. The default is still set to HOST - so that previous behavior is
maintained. This is to allow tuning the tension between pulling rdd data off node and scheduling computation asap.
TODO: rename property ? The value is one of
- - HOST_LOCAL (default, no change w.r.t current behavior),
+ - NODE_LOCAL (default, no change w.r.t current behavior),
- RACK_LOCAL and
- ANY
Note that this property makes more sense when used in conjugation with spark.tasks.revive.interval > 0 : else it is not very effective.
Additional Note: For non trivial clusters, there is a 4x - 5x reduction in running time (in some of our experiments) based on whether
- it is left at default HOST_LOCAL, RACK_LOCAL (if cluster is configured to be rack aware) or ANY.
+ it is left at default NODE_LOCAL, RACK_LOCAL (if cluster is configured to be rack aware) or ANY.
If cluster is rack aware, then setting it to RACK_LOCAL gives best tradeoff and a 3x - 4x performance improvement while minimizing IO impact.
Also, it brings down the variance in running time drastically.
*/
- val TASK_SCHEDULING_AGGRESSION = TaskLocality.parse(System.getProperty("spark.tasks.schedule.aggression", "HOST_LOCAL"))
+ val TASK_SCHEDULING_AGGRESSION = TaskLocality.parse(System.getProperty("spark.tasks.schedule.aggression", "NODE_LOCAL"))
val activeTaskSets = new HashMap[String, TaskSetManager]
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
@@ -73,15 +73,15 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val activeExecutorIds = new HashSet[String]
// TODO: We might want to remove this and merge it with execId datastructures - but later.
- // Which hosts in the cluster are alive (contains hostPort's) - used for hyper local and local task locality.
+ // Which hosts in the cluster are alive (contains hostPort's) - used for process local and node local task locality.
private val hostPortsAlive = new HashSet[String]
private val hostToAliveHostPorts = new HashMap[String, HashSet[String]]
// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
- val executorsByHostPort = new HashMap[String, HashSet[String]]
+ private val executorsByHostPort = new HashMap[String, HashSet[String]]
- val executorIdToHostPort = new HashMap[String, String]
+ private val executorIdToHostPort = new HashMap[String, String]
// JAR server, if any JARs were added by the user to the SparkContext
var jarServer: HttpServer = null
@@ -102,6 +102,14 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def initialize(context: SchedulerBackend) {
backend = context
+ // resolve executorId to hostPort mapping.
+ def executorToHostPort(executorId: String, defaultHostPort: String): String = {
+ executorIdToHostPort.getOrElse(executorId, defaultHostPort)
+ }
+
+ // Unfortunately, this means that SparkEnv is indirectly referencing ClusterScheduler
+ // Will that be a design violation ?
+ SparkEnv.get.executorIdToHostPort = Some(executorToHostPort)
}
def newTaskId(): Long = nextTaskId.getAndIncrement()
@@ -209,14 +217,31 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
// Build a list of tasks to assign to each slave
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
+ // merge availableCpus into nodeToAvailableCpus block ?
val availableCpus = offers.map(o => o.cores).toArray
+ val nodeToAvailableCpus = {
+ val map = new HashMap[String, Int]()
+ for (offer <- offers) {
+ val hostPort = offer.hostPort
+ val cores = offer.cores
+ // DEBUG code
+ Utils.checkHostPort(hostPort)
+
+ val host = Utils.parseHostPort(hostPort)._1
+
+ map.put(host, map.getOrElse(host, 0) + cores)
+ }
+
+ map
+ }
var launchedTask = false
for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) {
- // Split offers based on host local, rack local and off-rack tasks.
- val hostLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
+ // Split offers based on node local, rack local and off-rack tasks.
+ val processLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
+ val nodeLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
val rackLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
val otherOffers = new HashMap[String, ArrayBuffer[Int]]()
@@ -224,21 +249,30 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val hostPort = offers(i).hostPort
// DEBUG code
Utils.checkHostPort(hostPort)
+
+ val numProcessLocalTasks = math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i)))
+ if (numProcessLocalTasks > 0){
+ val list = processLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int])
+ for (j <- 0 until numProcessLocalTasks) list += i
+ }
+
val host = Utils.parseHostPort(hostPort)._1
- val numHostLocalTasks = math.max(0, math.min(manager.numPendingTasksForHost(hostPort), availableCpus(i)))
- if (numHostLocalTasks > 0){
- val list = hostLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
- for (j <- 0 until numHostLocalTasks) list += i
+ val numNodeLocalTasks = math.max(0,
+ // Remove process local tasks (which are also host local btw !) from this
+ math.min(manager.numPendingTasksForHost(hostPort) - numProcessLocalTasks, nodeToAvailableCpus(host)))
+ if (numNodeLocalTasks > 0){
+ val list = nodeLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
+ for (j <- 0 until numNodeLocalTasks) list += i
}
val numRackLocalTasks = math.max(0,
- // Remove host local tasks (which are also rack local btw !) from this
- math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numHostLocalTasks, availableCpus(i)))
+ // Remove node local tasks (which are also rack local btw !) from this
+ math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numProcessLocalTasks - numNodeLocalTasks, nodeToAvailableCpus(host)))
if (numRackLocalTasks > 0){
val list = rackLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
for (j <- 0 until numRackLocalTasks) list += i
}
- if (numHostLocalTasks <= 0 && numRackLocalTasks <= 0){
+ if (numNodeLocalTasks <= 0 && numRackLocalTasks <= 0){
// add to others list - spread even this across cluster.
val list = otherOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
list += i
@@ -246,12 +280,19 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
val offersPriorityList = new ArrayBuffer[Int](
- hostLocalOffers.size + rackLocalOffers.size + otherOffers.size)
- // First host local, then rack, then others
- val numHostLocalOffers = {
- val hostLocalPriorityList = ClusterScheduler.prioritizeContainers(hostLocalOffers)
- offersPriorityList ++= hostLocalPriorityList
- hostLocalPriorityList.size
+ processLocalOffers.size + nodeLocalOffers.size + rackLocalOffers.size + otherOffers.size)
+
+ // First process local, then host local, then rack, then others
+
+ // numNodeLocalOffers contains count of both process local and host offers.
+ val numNodeLocalOffers = {
+ val processLocalPriorityList = ClusterScheduler.prioritizeContainers(processLocalOffers)
+ offersPriorityList ++= processLocalPriorityList
+
+ val nodeLocalPriorityList = ClusterScheduler.prioritizeContainers(nodeLocalOffers)
+ offersPriorityList ++= nodeLocalPriorityList
+
+ processLocalPriorityList.size + nodeLocalPriorityList.size
}
val numRackLocalOffers = {
val rackLocalPriorityList = ClusterScheduler.prioritizeContainers(rackLocalOffers)
@@ -262,8 +303,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var lastLoop = false
val lastLoopIndex = TASK_SCHEDULING_AGGRESSION match {
- case TaskLocality.HOST_LOCAL => numHostLocalOffers
- case TaskLocality.RACK_LOCAL => numRackLocalOffers + numHostLocalOffers
+ case TaskLocality.NODE_LOCAL => numNodeLocalOffers
+ case TaskLocality.RACK_LOCAL => numRackLocalOffers + numNodeLocalOffers
case TaskLocality.ANY => offersPriorityList.size
}
@@ -302,8 +343,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// prevent more looping
launchedTask = false
} else if (!lastLoop && !launchedTask) {
- // Do this only if TASK_SCHEDULING_AGGRESSION != HOST_LOCAL
- if (TASK_SCHEDULING_AGGRESSION != TaskLocality.HOST_LOCAL) {
+ // Do this only if TASK_SCHEDULING_AGGRESSION != NODE_LOCAL
+ if (TASK_SCHEDULING_AGGRESSION != TaskLocality.NODE_LOCAL) {
// fudge launchedTask to ensure we loop once more
launchedTask = true
// dont loop anymore
@@ -477,6 +518,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
def getExecutorsAliveOnHost(host: String): Option[Set[String]] = {
+ Utils.checkHost(host)
+
val retval = hostToAliveHostPorts.get(host)
if (retval.isDefined) {
return Some(retval.get.toSet)
@@ -485,6 +528,13 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
None
}
+ def isExecutorAliveOnHostPort(hostPort: String): Boolean = {
+ // Even if hostPort is a host, it does not matter - it is just a specific check.
+ // But we do have to ensure that only hostPort get into hostPortsAlive !
+ // So no check against Utils.checkHostPort
+ hostPortsAlive.contains(hostPort)
+ }
+
// By default, rack is unknown
def getRackForHost(value: String): Option[String] = None
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 27e713e2c4..c69f3bdb7f 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -13,17 +13,21 @@ import spark.scheduler._
import spark.TaskState.TaskState
import java.nio.ByteBuffer
-private[spark] object TaskLocality extends Enumeration("HOST_LOCAL", "RACK_LOCAL", "ANY") with Logging {
+private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") with Logging {
- val HOST_LOCAL, RACK_LOCAL, ANY = Value
+ // process local is expected to be used ONLY within tasksetmanager for now.
+ val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
type TaskLocality = Value
def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
+ // Must not be the constraint.
+ assert (constraint != TaskLocality.PROCESS_LOCAL)
+
constraint match {
- case TaskLocality.HOST_LOCAL => condition == TaskLocality.HOST_LOCAL
- case TaskLocality.RACK_LOCAL => condition == TaskLocality.HOST_LOCAL || condition == TaskLocality.RACK_LOCAL
+ case TaskLocality.NODE_LOCAL => condition == TaskLocality.NODE_LOCAL
+ case TaskLocality.RACK_LOCAL => condition == TaskLocality.NODE_LOCAL || condition == TaskLocality.RACK_LOCAL
// For anything else, allow
case _ => true
}
@@ -32,12 +36,16 @@ private[spark] object TaskLocality extends Enumeration("HOST_LOCAL", "RACK_LOCAL
def parse(str: String): TaskLocality = {
// better way to do this ?
try {
- TaskLocality.withName(str)
+ val retval = TaskLocality.withName(str)
+ // Must not specify PROCESS_LOCAL !
+ assert (retval != TaskLocality.PROCESS_LOCAL)
+
+ retval
} catch {
case nEx: NoSuchElementException => {
- logWarning("Invalid task locality specified '" + str + "', defaulting to HOST_LOCAL");
+ logWarning("Invalid task locality specified '" + str + "', defaulting to NODE_LOCAL");
// default to preserve earlier behavior
- HOST_LOCAL
+ NODE_LOCAL
}
}
}
@@ -76,7 +84,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
- // List of pending tasks for each node (hyper local to container). These collections are actually
+ // List of pending tasks for each node (process local to container). These collections are actually
// treated as stacks, in which new tasks are added to the end of the
// ArrayBuffer and removed from the end. This makes it faster to detect
// tasks that repeatedly fail because whenever a task failed, it is put
@@ -133,35 +141,55 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
addPendingTask(i)
}
- private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler, rackLocal: Boolean = false): ArrayBuffer[String] = {
- // DEBUG code
- _taskPreferredLocations.foreach(h => Utils.checkHost(h, "taskPreferredLocation " + _taskPreferredLocations))
-
- val taskPreferredLocations = if (! rackLocal) _taskPreferredLocations else {
- // Expand set to include all 'seen' rack local hosts.
- // This works since container allocation/management happens within master - so any rack locality information is updated in msater.
- // Best case effort, and maybe sort of kludge for now ... rework it later ?
- val hosts = new HashSet[String]
- _taskPreferredLocations.foreach(h => {
- val rackOpt = scheduler.getRackForHost(h)
- if (rackOpt.isDefined) {
- val hostsOpt = scheduler.getCachedHostsForRack(rackOpt.get)
- if (hostsOpt.isDefined) {
- hosts ++= hostsOpt.get
+ // Note that it follows the hierarchy.
+ // if we search for NODE_LOCAL, the output will include PROCESS_LOCAL and
+ // if we search for RACK_LOCAL, it will include PROCESS_LOCAL & NODE_LOCAL
+ private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler,
+ taskLocality: TaskLocality.TaskLocality): HashSet[String] = {
+
+ if (TaskLocality.PROCESS_LOCAL == taskLocality) {
+ // straight forward comparison ! Special case it.
+ val retval = new HashSet[String]()
+ scheduler.synchronized {
+ for (location <- _taskPreferredLocations) {
+ if (scheduler.isExecutorAliveOnHostPort(location)) {
+ retval += location
}
}
+ }
- // Ensure that irrespective of what scheduler says, host is always added !
- hosts += h
- })
-
- hosts
+ return retval
}
- val retval = new ArrayBuffer[String]
+ val taskPreferredLocations =
+ if (TaskLocality.NODE_LOCAL == taskLocality) {
+ _taskPreferredLocations
+ } else {
+ assert (TaskLocality.RACK_LOCAL == taskLocality)
+ // Expand set to include all 'seen' rack local hosts.
+ // This works since container allocation/management happens within master - so any rack locality information is updated in msater.
+ // Best case effort, and maybe sort of kludge for now ... rework it later ?
+ val hosts = new HashSet[String]
+ _taskPreferredLocations.foreach(h => {
+ val rackOpt = scheduler.getRackForHost(h)
+ if (rackOpt.isDefined) {
+ val hostsOpt = scheduler.getCachedHostsForRack(rackOpt.get)
+ if (hostsOpt.isDefined) {
+ hosts ++= hostsOpt.get
+ }
+ }
+
+ // Ensure that irrespective of what scheduler says, host is always added !
+ hosts += h
+ })
+
+ hosts
+ }
+
+ val retval = new HashSet[String]
scheduler.synchronized {
for (prefLocation <- taskPreferredLocations) {
- val aliveLocationsOpt = scheduler.getExecutorsAliveOnHost(prefLocation)
+ val aliveLocationsOpt = scheduler.getExecutorsAliveOnHost(Utils.parseHostPort(prefLocation)._1)
if (aliveLocationsOpt.isDefined) {
retval ++= aliveLocationsOpt.get
}
@@ -175,29 +203,37 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
private def addPendingTask(index: Int) {
// We can infer hostLocalLocations from rackLocalLocations by joining it against tasks(index).preferredLocations (with appropriate
// hostPort <-> host conversion). But not doing it for simplicity sake. If this becomes a performance issue, modify it.
- val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched)
- val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, true)
+ val processLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.PROCESS_LOCAL)
+ val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
+ val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
if (rackLocalLocations.size == 0) {
// Current impl ensures this.
+ assert (processLocalLocations.size == 0)
assert (hostLocalLocations.size == 0)
pendingTasksWithNoPrefs += index
} else {
- // host locality
- for (hostPort <- hostLocalLocations) {
+ // process local locality
+ for (hostPort <- processLocalLocations) {
// DEBUG Code
Utils.checkHostPort(hostPort)
val hostPortList = pendingTasksForHostPort.getOrElseUpdate(hostPort, ArrayBuffer())
hostPortList += index
+ }
+
+ // host locality (includes process local)
+ for (hostPort <- hostLocalLocations) {
+ // DEBUG Code
+ Utils.checkHostPort(hostPort)
val host = Utils.parseHostPort(hostPort)._1
val hostList = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer())
hostList += index
}
- // rack locality
+ // rack locality (includes process local and host local)
for (rackLocalHostPort <- rackLocalLocations) {
// DEBUG Code
Utils.checkHostPort(rackLocalHostPort)
@@ -211,7 +247,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
allPendingTasks += index
}
- // Return the pending tasks list for a given host port (hyper local), or an empty list if
+ // Return the pending tasks list for a given host port (process local), or an empty list if
// there is no map entry for that host
private def getPendingTasksForHostPort(hostPort: String): ArrayBuffer[Int] = {
// DEBUG Code
@@ -233,6 +269,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
pendingRackLocalTasksForHost.getOrElse(host, ArrayBuffer())
}
+ // Number of pending tasks for a given host Port (which would be process local)
+ def numPendingTasksForHostPort(hostPort: String): Int = {
+ getPendingTasksForHostPort(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
+ }
+
// Number of pending tasks for a given host (which would be data local)
def numPendingTasksForHost(hostPort: String): Int = {
getPendingTasksForHost(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
@@ -264,13 +305,13 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// task must have a preference for this host/rack/no preferred locations at all.
private def findSpeculativeTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = {
- assert (TaskLocality.isAllowed(locality, TaskLocality.HOST_LOCAL))
+ assert (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL))
speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set
if (speculatableTasks.size > 0) {
val localTask = speculatableTasks.find {
index =>
- val locations = findPreferredLocations(tasks(index).preferredLocations, sched)
+ val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
val attemptLocs = taskAttempts(index).map(_.hostPort)
(locations.size == 0 || locations.contains(hostPort)) && !attemptLocs.contains(hostPort)
}
@@ -284,7 +325,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
val rackTask = speculatableTasks.find {
index =>
- val locations = findPreferredLocations(tasks(index).preferredLocations, sched, true)
+ val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
val attemptLocs = taskAttempts(index).map(_.hostPort)
locations.contains(hostPort) && !attemptLocs.contains(hostPort)
}
@@ -311,6 +352,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// Dequeue a pending task for a given node and return its index.
// If localOnly is set to false, allow non-local tasks as well.
private def findTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = {
+ val processLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort))
+ if (processLocalTask != None) {
+ return processLocalTask
+ }
+
val localTask = findTaskFromList(getPendingTasksForHost(hostPort))
if (localTask != None) {
return localTask
@@ -341,30 +387,31 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
return findSpeculativeTask(hostPort, locality)
}
- // Does a host count as a preferred location for a task? This is true if
- // either the task has preferred locations and this host is one, or it has
- // no preferred locations (in which we still count the launch as preferred).
- private def isPreferredLocation(task: Task[_], hostPort: String): Boolean = {
+ private def isProcessLocalLocation(task: Task[_], hostPort: String): Boolean = {
+ Utils.checkHostPort(hostPort)
+
val locs = task.preferredLocations
- // DEBUG code
- locs.foreach(h => Utils.checkHost(h, "preferredLocation " + locs))
- if (locs.contains(hostPort) || locs.isEmpty) return true
+ locs.contains(hostPort)
+ }
+
+ private def isHostLocalLocation(task: Task[_], hostPort: String): Boolean = {
+ val locs = task.preferredLocations
+
+ // If no preference, consider it as host local
+ if (locs.isEmpty) return true
val host = Utils.parseHostPort(hostPort)._1
- locs.contains(host)
+ locs.find(h => Utils.parseHostPort(h)._1 == host).isDefined
}
// Does a host count as a rack local preferred location for a task? (assumes host is NOT preferred location).
// This is true if either the task has preferred locations and this host is one, or it has
// no preferred locations (in which we still count the launch as preferred).
- def isRackLocalLocation(task: Task[_], hostPort: String): Boolean = {
+ private def isRackLocalLocation(task: Task[_], hostPort: String): Boolean = {
val locs = task.preferredLocations
- // DEBUG code
- locs.foreach(h => Utils.checkHost(h, "preferredLocation " + locs))
-
val preferredRacks = new HashSet[String]()
for (preferredHost <- locs) {
val rack = sched.getRackForHost(preferredHost)
@@ -386,7 +433,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val locality = if (overrideLocality != null) overrideLocality else {
// expand only if we have waited for more than LOCALITY_WAIT for a host local task ...
val time = System.currentTimeMillis
- if (time - lastPreferredLaunchTime < LOCALITY_WAIT) TaskLocality.HOST_LOCAL else TaskLocality.ANY
+ if (time - lastPreferredLaunchTime < LOCALITY_WAIT) TaskLocality.NODE_LOCAL else TaskLocality.ANY
}
findTask(hostPort, locality) match {
@@ -395,8 +442,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val task = tasks(index)
val taskId = sched.newTaskId()
// Figure out whether this should count as a preferred launch
- val taskLocality = if (isPreferredLocation(task, hostPort)) TaskLocality.HOST_LOCAL else
- if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else TaskLocality.ANY
+ val taskLocality =
+ if (isProcessLocalLocation(task, hostPort)) TaskLocality.PROCESS_LOCAL else
+ if (isHostLocalLocation(task, hostPort)) TaskLocality.NODE_LOCAL else
+ if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else
+ TaskLocality.ANY
val prefStr = taskLocality.toString
logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format(
taskSet.id, index, taskId, execId, hostPort, prefStr))
@@ -406,7 +456,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val info = new TaskInfo(taskId, index, time, execId, hostPort, taskLocality)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
- if (TaskLocality.HOST_LOCAL == taskLocality) {
+ if (TaskLocality.NODE_LOCAL == taskLocality) {
lastPreferredLaunchTime = time
}
// Serialize and return the task
@@ -493,7 +543,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
return
case ef: ExceptionFailure =>
- val key = ef.exception.toString
+ val key = ef.description
val now = System.currentTimeMillis
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
@@ -511,10 +561,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
}
}
if (printFull) {
- val locs = ef.exception.getStackTrace.map(loc => "\tat %s".format(loc.toString))
- logInfo("Loss was due to %s\n%s".format(ef.exception.toString, locs.mkString("\n")))
+ val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
+ logInfo("Loss was due to %s\n%s\n%s".format(
+ ef.className, ef.description, locs.mkString("\n")))
} else {
- logInfo("Loss was due to %s [duplicate %d]".format(ef.exception.toString, dupCount))
+ logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
}
case _ => {}
@@ -552,15 +603,22 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
def executorLost(execId: String, hostPort: String) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
+
// If some task has preferred locations only on hostname, and there are no more executors there,
// put it in the no-prefs list to avoid the wait from delay scheduling
- for (index <- getPendingTasksForHostPort(hostPort)) {
- val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, true)
+
+ // host local tasks - should we push this to rack local or no pref list ? For now, preserving behavior and moving to
+ // no prefs list. Note, this was done due to impliations related to 'waiting' for data local tasks, etc.
+ // Note: NOT checking process local list - since host local list is super set of that. We need to ad to no prefs only if
+ // there is no host local node for the task (not if there is no process local node for the task)
+ for (index <- getPendingTasksForHost(Utils.parseHostPort(hostPort)._1)) {
+ // val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
+ val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
if (newLocs.isEmpty) {
- assert (findPreferredLocations(tasks(index).preferredLocations, sched).isEmpty)
pendingTasksWithNoPrefs += index
}
}
+
// Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage
if (tasks(0).isInstanceOf[ShuffleMapTask]) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index f060a940a9..37a67f9b1b 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -53,7 +53,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
def runTask(task: Task[_], idInJob: Int, attemptId: Int) {
logInfo("Running " + task)
- val info = new TaskInfo(attemptId, idInJob, System.currentTimeMillis(), "local", "local:1", TaskLocality.HOST_LOCAL)
+ val info = new TaskInfo(attemptId, idInJob, System.currentTimeMillis(), "local", "local:1", TaskLocality.NODE_LOCAL)
// Set the Spark execution environment for the worker thread
SparkEnv.set(env)
try {
@@ -101,8 +101,10 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
submitTask(task, idInJob)
} else {
// TODO: Do something nicer here to return all the way to the user
- if (!Thread.currentThread().isInterrupted)
- listener.taskEnded(task, new ExceptionFailure(t), null, null, info, null)
+ if (!Thread.currentThread().isInterrupted) {
+ val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace)
+ listener.taskEnded(task, failure, null, null, info, null)
+ }
}
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index e0dec3a8bb..40d608628e 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -3,7 +3,7 @@ package spark.storage
import java.io.{InputStream, OutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet, Queue}
import scala.collection.JavaConversions._
import akka.actor.{ActorSystem, Cancellable, Props}
@@ -268,22 +268,11 @@ private[spark] class BlockManager(
/**
- * Get locations of the block.
- */
- def getLocations(blockId: String): Seq[String] = {
- val startTimeMs = System.currentTimeMillis
- var managers = master.getLocations(blockId)
- val locations = managers.map(_.hostPort)
- logDebug("Got block locations in " + Utils.getUsedTimeMs(startTimeMs))
- return locations
- }
-
- /**
* Get locations of an array of blocks.
*/
- def getLocations(blockIds: Array[String]): Array[Seq[String]] = {
+ def getLocationBlockIds(blockIds: Array[String]): Array[Seq[BlockManagerId]] = {
val startTimeMs = System.currentTimeMillis
- val locations = master.getLocations(blockIds).map(_.map(_.hostPort).toSeq).toArray
+ val locations = master.getLocations(blockIds).toArray
logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
return locations
}
@@ -976,5 +965,45 @@ private[spark] object BlockManager extends Logging {
}
}
}
+
+ def blockIdsToExecutorLocations(blockIds: Array[String], env: SparkEnv, blockManagerMaster: BlockManagerMaster = null): HashMap[String, List[String]] = {
+ // env == null and blockManagerMaster != null is used in tests
+ assert (env != null || blockManagerMaster != null)
+ val locationBlockIds: Seq[Seq[BlockManagerId]] =
+ if (env != null) {
+ val blockManager = env.blockManager
+ blockManager.getLocationBlockIds(blockIds)
+ } else {
+ blockManagerMaster.getLocations(blockIds)
+ }
+
+ // Convert from block master locations to executor locations (we need that for task scheduling)
+ val executorLocations = new HashMap[String, List[String]]()
+ for (i <- 0 until blockIds.length) {
+ val blockId = blockIds(i)
+ val blockLocations = locationBlockIds(i)
+
+ val executors = new HashSet[String]()
+
+ if (env != null) {
+ for (bkLocation <- blockLocations) {
+ val executorHostPort = env.resolveExecutorIdToHostPort(bkLocation.executorId, bkLocation.host)
+ executors += executorHostPort
+ // logInfo("bkLocation = " + bkLocation + ", executorHostPort = " + executorHostPort)
+ }
+ } else {
+ // Typically while testing, etc - revert to simply using host.
+ for (bkLocation <- blockLocations) {
+ executors += bkLocation.host
+ // logInfo("bkLocation = " + bkLocation + ", executorHostPort = " + executorHostPort)
+ }
+ }
+
+ executorLocations.put(blockId, executors.toSeq.toList)
+ }
+
+ executorLocations
+ }
+
}
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 488c70c414..06a94ed24c 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -18,6 +18,11 @@ import scala.collection.mutable.ArrayBuffer
import SparkContext._
import storage.{GetBlock, BlockManagerWorker, StorageLevel}
+
+class NotSerializableClass
+class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
+
+
class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
with LocalSparkContext {
@@ -28,6 +33,24 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
System.clearProperty("spark.storage.memoryFraction")
}
+ test("task throws not serializable exception") {
+ // Ensures that executors do not crash when an exn is not serializable. If executors crash,
+ // this test will hang. Correct behavior is that executors don't crash but fail tasks
+ // and the scheduler throws a SparkException.
+
+ // numSlaves must be less than numPartitions
+ val numSlaves = 3
+ val numPartitions = 10
+
+ sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test")
+ val data = sc.parallelize(1 to 100, numPartitions).
+ map(x => throw new NotSerializableExn(new NotSerializableClass))
+ intercept[SparkException] {
+ data.count()
+ }
+ resetSparkContext()
+ }
+
test("local-cluster format") {
sc = new SparkContext("local-cluster[2,1,512]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index e95818db61..b5cedc0b68 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -80,15 +80,16 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
}
test("remote fetch") {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", "localhost", 0)
+ val hostname = "localhost"
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
- System.setProperty("spark.hostPort", "localhost:" + boundPort)
+ System.setProperty("spark.hostPort", hostname + ":" + boundPort)
val masterTracker = new MapOutputTracker()
masterTracker.trackerActor = actorSystem.actorOf(
Props(new MapOutputTrackerActor(masterTracker)), "MapOutputTracker")
- val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", "localhost", 0)
+ val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0)
val slaveTracker = new MapOutputTracker()
slaveTracker.trackerActor = slaveSystem.actorFor(
"akka://spark@localhost:" + boundPort + "/user/MapOutputTracker")
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index cee6312572..a761dd77c5 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -5,7 +5,7 @@ import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts._
import org.scalatest.time.{Span, Millis}
import spark.SparkContext._
-import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD}
+import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD}
class RDDSuite extends FunSuite with LocalSparkContext {
@@ -147,6 +147,26 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(rdd.collect().toList === List(1, 2, 3, 4))
}
+ test("empty RDD") {
+ sc = new SparkContext("local", "test")
+ val empty = new EmptyRDD[Int](sc)
+ assert(empty.count === 0)
+ assert(empty.collect().size === 0)
+
+ val thrown = intercept[UnsupportedOperationException]{
+ empty.reduce(_+_)
+ }
+ assert(thrown.getMessage.contains("empty"))
+
+ val emptyKv = new EmptyRDD[(Int, Int)](sc)
+ val rdd = sc.parallelize(1 to 2, 2).map(x => (x, x))
+ assert(rdd.join(emptyKv).collect().size === 0)
+ assert(rdd.rightOuterJoin(emptyKv).collect().size === 0)
+ assert(rdd.leftOuterJoin(emptyKv).collect().size === 2)
+ assert(rdd.cogroup(emptyKv).collect().size === 2)
+ assert(rdd.union(emptyKv).collect().size === 2)
+ }
+
test("cogrouped RDDs") {
sc = new SparkContext("local", "test")
val rdd1 = sc.makeRDD(Array((1, "one"), (1, "another one"), (2, "two"), (3, "three")), 2)
diff --git a/core/src/test/scala/spark/rdd/JdbcRDDSuite.scala b/core/src/test/scala/spark/rdd/JdbcRDDSuite.scala
new file mode 100644
index 0000000000..6afb0fa9bc
--- /dev/null
+++ b/core/src/test/scala/spark/rdd/JdbcRDDSuite.scala
@@ -0,0 +1,56 @@
+package spark
+
+import org.scalatest.{ BeforeAndAfter, FunSuite }
+import spark.SparkContext._
+import spark.rdd.JdbcRDD
+import java.sql._
+
+class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
+
+ before {
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
+ val conn = DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb;create=true")
+ try {
+ val create = conn.createStatement
+ create.execute("""
+ CREATE TABLE FOO(
+ ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
+ DATA INTEGER
+ )""")
+ create.close
+ val insert = conn.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)")
+ (1 to 100).foreach { i =>
+ insert.setInt(1, i * 2)
+ insert.executeUpdate
+ }
+ insert.close
+ } catch {
+ case e: SQLException if e.getSQLState == "X0Y32" =>
+ // table exists
+ } finally {
+ conn.close
+ }
+ }
+
+ test("basic functionality") {
+ sc = new SparkContext("local", "test")
+ val rdd = new JdbcRDD(
+ sc,
+ () => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") },
+ "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
+ 1, 100, 3,
+ (r: ResultSet) => { r.getInt(1) } ).cache
+
+ assert(rdd.count === 100)
+ assert(rdd.reduce(_+_) === 10100)
+ }
+
+ after {
+ try {
+ DriverManager.getConnection("jdbc:derby:;shutdown=true")
+ } catch {
+ case se: SQLException if se.getSQLState == "XJ015" =>
+ // normal shutdown
+ }
+ }
+}
diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
index c0f8986de8..16554eac6e 100644
--- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
@@ -385,12 +385,12 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assert(results === Map(0 -> 42))
}
- /** Assert that the supplied TaskSet has exactly the given preferredLocations. */
+ /** Assert that the supplied TaskSet has exactly the given preferredLocations. Note, converts taskSet's locations to host only. */
private def assertLocations(taskSet: TaskSet, locations: Seq[Seq[String]]) {
assert(locations.size === taskSet.tasks.size)
for ((expectLocs, taskLocs) <-
taskSet.tasks.map(_.preferredLocations).zip(locations)) {
- assert(expectLocs === taskLocs)
+ assert(expectLocs.map(loc => spark.Utils.parseHostPort(loc)._1) === taskLocs)
}
}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 9fe0de665c..71d1f0bcc8 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -47,6 +47,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
+ // Set some value ...
+ System.setProperty("spark.hostPort", spark.Utils.localHostName() + ":" + 1111)
}
after {
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 26424bbe52..c8cf8ffc35 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -30,6 +30,9 @@ If you want to test out the YARN deployment mode, you can use the current Spark
# Launching Spark on YARN
+Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the hadoop cluster.
+This would be used to connect to the cluster, write to the dfs and submit jobs to the resource manager.
+
The command to launch the YARN Client is as follows:
SPARK_JAR=<SPARK_YAR_FILE> ./run spark.deploy.yarn.Client \
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 9f2daad2b6..7affe6fffc 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -103,7 +103,7 @@ def parse_args():
parser.print_help()
sys.exit(1)
(action, cluster_name) = args
- if opts.identity_file == None and action in ['launch', 'login']:
+ if opts.identity_file == None and action in ['launch', 'login', 'start']:
print >> stderr, ("ERROR: The -i or --identity-file argument is " +
"required for " + action)
sys.exit(1)
diff --git a/pom.xml b/pom.xml
index 3936165d78..d7cdc591cf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -256,6 +256,12 @@
<artifactId>mesos</artifactId>
<version>${mesos.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>10.4.2.0</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.scala-lang</groupId>
@@ -565,7 +571,7 @@
<hadoop.major.version>2</hadoop.major.version>
<!-- 0.23.* is same as 2.0.* - except hardened to run production jobs -->
<!-- <yarn.version>0.23.7</yarn.version> -->
- <yarn.version>2.0.2-alpha</yarn.version>
+ <yarn.version>2.0.2-alpha</yarn.version>
</properties>
<repositories>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index dbfe5b0aa6..234b021c93 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -161,7 +161,8 @@ object SparkBuild extends Build {
"cc.spray" % "spray-server" % "1.0-M2.1" excludeAll(excludeNetty),
"cc.spray" % "spray-json_2.9.2" % "1.1.1" excludeAll(excludeNetty),
"org.apache.mesos" % "mesos" % "0.9.0-incubating",
- "io.netty" % "netty-all" % "4.0.0.Beta2"
+ "io.netty" % "netty-all" % "4.0.0.Beta2",
+ "org.apache.derby" % "derby" % "10.4.2.0" % "test"
) ++ (
if (HADOOP_MAJOR_VERSION == "2") {
if (HADOOP_YARN) {
diff --git a/run b/run
index 0a58ac4a36..c744bbd3dc 100755
--- a/run
+++ b/run
@@ -22,7 +22,7 @@ fi
# values for that; it doesn't need a lot
if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then
SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m}
- SPARK_DAEMON_JAVA_OPTS+=" -Dspark.akka.logLifecycleEvents=true"
+ SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default
fi
@@ -30,19 +30,19 @@ fi
# Add java opts for master, worker, executor. The opts maybe null
case "$1" in
'spark.deploy.master.Master')
- SPARK_JAVA_OPTS+=" $SPARK_MASTER_OPTS"
+ SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_MASTER_OPTS"
;;
'spark.deploy.worker.Worker')
- SPARK_JAVA_OPTS+=" $SPARK_WORKER_OPTS"
+ SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_WORKER_OPTS"
;;
'spark.executor.StandaloneExecutorBackend')
- SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS"
+ SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
;;
'spark.executor.MesosExecutorBackend')
- SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS"
+ SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
;;
'spark.repl.Main')
- SPARK_JAVA_OPTS+=" $SPARK_REPL_OPTS"
+ SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_REPL_OPTS"
;;
esac
@@ -85,11 +85,11 @@ export SPARK_MEM
# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$SPARK_JAVA_OPTS"
-JAVA_OPTS+=" -Djava.library.path=$SPARK_LIBRARY_PATH"
-JAVA_OPTS+=" -Xms$SPARK_MEM -Xmx$SPARK_MEM"
+JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
+JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"
# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e $FWDIR/conf/java-opts ] ; then
- JAVA_OPTS+=" `cat $FWDIR/conf/java-opts`"
+ JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
fi
export JAVA_OPTS
@@ -110,30 +110,30 @@ fi
# Build up classpath
CLASSPATH="$SPARK_CLASSPATH"
-CLASSPATH+=":$FWDIR/conf"
-CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/classes"
+CLASSPATH="$CLASSPATH:$FWDIR/conf"
+CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/classes"
if [ -n "$SPARK_TESTING" ] ; then
- CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/test-classes"
- CLASSPATH+=":$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes"
+ CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/test-classes"
+ CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes"
fi
-CLASSPATH+=":$CORE_DIR/src/main/resources"
-CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes"
-CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
-CLASSPATH+=":$STREAMING_DIR/target/scala-$SCALA_VERSION/classes"
-CLASSPATH+=":$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar
+CLASSPATH="$CLASSPATH:$CORE_DIR/src/main/resources"
+CLASSPATH="$CLASSPATH:$REPL_DIR/target/scala-$SCALA_VERSION/classes"
+CLASSPATH="$CLASSPATH:$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
+CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/classes"
+CLASSPATH="$CLASSPATH:$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar
if [ -e "$FWDIR/lib_managed" ]; then
- CLASSPATH+=":$FWDIR/lib_managed/jars/*"
- CLASSPATH+=":$FWDIR/lib_managed/bundles/*"
+ CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/jars/*"
+ CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/bundles/*"
fi
-CLASSPATH+=":$REPL_DIR/lib/*"
+CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*"
if [ -e $REPL_BIN_DIR/target ]; then
for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
- CLASSPATH+=":$jar"
+ CLASSPATH="$CLASSPATH:$jar"
done
fi
-CLASSPATH+=":$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
+CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
- CLASSPATH+=":$jar"
+ CLASSPATH="$CLASSPATH:$jar"
done
# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
@@ -147,6 +147,17 @@ if [ -e "$EXAMPLES_DIR/target/spark-examples-"*hadoop[12].jar ]; then
export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples-"*hadoop[12].jar`
fi
+# Add hadoop conf dir - else FileSystem.*, etc fail !
+# Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
+# the configurtion files.
+if [ "x" != "x$HADOOP_CONF_DIR" ]; then
+ CLASSPATH="$CLASSPATH:$HADOOP_CONF_DIR"
+fi
+if [ "x" != "x$YARN_CONF_DIR" ]; then
+ CLASSPATH="$CLASSPATH:$YARN_CONF_DIR"
+fi
+
+
# Figure out whether to run our class with java or with the scala launcher.
# In most cases, we'd prefer to execute our process with java because scala
# creates a shell script as the parent of its Java process, which makes it
@@ -156,9 +167,9 @@ fi
if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
EXTRA_ARGS="" # Java options will be passed to scala as JAVA_OPTS
else
- CLASSPATH+=":$SCALA_LIBRARY_PATH/scala-library.jar"
- CLASSPATH+=":$SCALA_LIBRARY_PATH/scala-compiler.jar"
- CLASSPATH+=":$SCALA_LIBRARY_PATH/jline.jar"
+ CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-library.jar"
+ CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-compiler.jar"
+ CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/jline.jar"
# The JVM doesn't read JAVA_OPTS by default so we need to pass it in
EXTRA_ARGS="$JAVA_OPTS"
fi
diff --git a/run2.cmd b/run2.cmd
index d2d4807971..c6f43dde5b 100644
--- a/run2.cmd
+++ b/run2.cmd
@@ -63,6 +63,19 @@ set CLASSPATH=%CLASSPATH%;%FWDIR%repl\lib\*
set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\*
set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes
+rem Add hadoop conf dir - else FileSystem.*, etc fail
+rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
+rem the configurtion files.
+if "x%HADOOP_CONF_DIR%"=="x" goto no_hadoop_conf_dir
+ set CLASSPATH=%CLASSPATH%;%HADOOP_CONF_DIR%
+:no_hadoop_conf_dir
+
+if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir
+ set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%
+:no_yarn_conf_dir
+
+
+
rem Figure out the JAR file that our examples were packaged into.
rem First search in the build path from SBT:
for %%d in ("examples/target/scala-%SCALA_VERSION%/spark-examples*.jar") do (