diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-10-10 09:42:23 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-10-10 09:42:23 +0530 |
commit | 26860639c5fee7fc23db1e686f8eb202921e4314 (patch) | |
tree | e05e555fcd713a7eb15680ae078994d70f396135 | |
parent | 7d50f9f87baeb1f4b8d77d669d25649b97dd1d57 (diff) | |
parent | 7be75682b931dd52014f3cfdc6887e54583ad0af (diff) | |
download | spark-26860639c5fee7fc23db1e686f8eb202921e4314.tar.gz spark-26860639c5fee7fc23db1e686f8eb202921e4314.tar.bz2 spark-26860639c5fee7fc23db1e686f8eb202921e4314.zip |
Merge branch 'scala-2.10' of github.com:ScrapCodes/spark into scala-2.10
Conflicts:
core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
project/SparkBuild.scala
83 files changed, 1071 insertions, 403 deletions
@@ -108,3 +108,4 @@ project's open source license. Whether or not you state this explicitly, by submitting any copyrighted material via pull request, email, or other means you agree to license the material under the project's open source license and warrant that you have the legal authority to do so. + diff --git a/assembly/pom.xml b/assembly/pom.xml index 47a110ca6c..28b0692dff 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,12 +21,12 @@ <parent> <groupId>org.apache.spark</groupId> <artifactId>spark-parent</artifactId> - <version>0.8.0-SNAPSHOT</version> + <version>0.9.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-assembly</artifactId> + <artifactId>spark-assembly_${scala-short.version}</artifactId> <name>Spark Project Assembly</name> <url>http://spark.incubator.apache.org/</url> @@ -41,27 +41,27 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_${scala-short.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-bagel</artifactId> + <artifactId>spark-bagel_${scala-short.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib</artifactId> + <artifactId>spark-mllib_${scala-short.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-repl</artifactId> + <artifactId>spark-repl_${scala-short.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming</artifactId> + <artifactId>spark-streaming_${scala-short.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -104,13 +104,13 @@ </goals> <configuration> <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/services/org.apache.hadoop.fs.FileSystem</resource> </transformer> </transformers> <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> @@ -128,7 +128,7 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn</artifactId> + <artifactId>spark-yarn_${scala-short.version}</artifactId> <version>${project.version}</version> </dependency> </dependencies> diff --git a/bagel/pom.xml b/bagel/pom.xml index feaed6d2b0..c8b9c4f4cd 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,12 +21,12 @@ <parent> <groupId>org.apache.spark</groupId> <artifactId>spark-parent</artifactId> - <version>0.8.0-SNAPSHOT</version> + <version>0.9.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-bagel</artifactId> + <artifactId>spark-bagel_${scala-short.version}</artifactId> <packaging>jar</packaging> <name>Spark Project Bagel</name> <url>http://spark.incubator.apache.org/</url> @@ -34,7 +34,7 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_${scala-short.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> diff --git a/core/pom.xml b/core/pom.xml index 8d9f0e386f..595240b5e5 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,12 +21,12 @@ <parent> <groupId>org.apache.spark</groupId> <artifactId>spark-parent</artifactId> - <version>0.8.0-SNAPSHOT</version> + <version>0.9.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_${scala-short.version}</artifactId> <packaging>jar</packaging> <name>Spark Project Core</name> <url>http://spark.incubator.apache.org/</url> @@ -39,7 +39,6 @@ <dependency> <groupId>net.java.dev.jets3t</groupId> <artifactId>jets3t</artifactId> - <version>0.7.1</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> @@ -199,14 +198,14 @@ <configuration> <exportAntProperties>true</exportAntProperties> <tasks> - <property name="spark.classpath" refid="maven.test.classpath"/> - <property environment="env"/> + <property name="spark.classpath" refid="maven.test.classpath" /> + <property environment="env" /> <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry."> <condition> <not> <or> - <isset property="env.SCALA_HOME"/> - <isset property="env.SCALA_LIBRARY_PATH"/> + <isset property="env.SCALA_HOME" /> + <isset property="env.SCALA_LIBRARY_PATH" /> </or> </not> </condition> diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 68b99ca125..4cf7eb96da 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -26,28 +26,29 @@ import org.apache.spark.rdd.RDD sure a node doesn't load two copies of an RDD at once. */ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { - private val loading = new HashSet[String] + + /** Keys of RDD splits that are being computed/loaded. */ + private val loading = new HashSet[String]() /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */ def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel) : Iterator[T] = { val key = "rdd_%d_%d".format(rdd.id, split.index) - logInfo("Cache key is " + key) + logDebug("Looking for partition " + key) blockManager.get(key) match { - case Some(cachedValues) => - // Partition is in cache, so just return its values - logInfo("Found partition in cache!") - return cachedValues.asInstanceOf[Iterator[T]] + case Some(values) => + // Partition is already materialized, so just return its values + return values.asInstanceOf[Iterator[T]] case None => // Mark the split as loading (unless someone else marks it first) loading.synchronized { if (loading.contains(key)) { - logInfo("Loading contains " + key + ", waiting...") + logInfo("Another thread is loading %s, waiting for it to finish...".format(key)) while (loading.contains(key)) { try {loading.wait()} catch {case _ : Throwable =>} } - logInfo("Loading no longer contains " + key + ", so returning cached result") + logInfo("Finished waiting for %s".format(key)) // See whether someone else has successfully loaded it. The main way this would fail // is for the RDD-level cache eviction policy if someone else has loaded the same RDD // partition but we didn't want to make space for it. However, that case is unlikely @@ -57,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { case Some(values) => return values.asInstanceOf[Iterator[T]] case None => - logInfo("Whoever was loading " + key + " failed; we'll try it ourselves") + logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key)) loading.add(key) } } else { @@ -66,7 +67,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } try { // If we got here, we have to load the split - logInfo("Computing partition " + split) + logInfo("Partition %s not found, computing it".format(key)) val computedValues = rdd.computeOrReadCheckpoint(split, context) // Persist the result, so long as the task is not running locally if (context.runningLocally) { return computedValues } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ef97fa85fa..efcc92e8e7 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -53,14 +53,15 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.mesos.MesosNativeLibrary +import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.LocalSparkCluster import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, - ClusterScheduler, Schedulable, SchedulingMode} + ClusterScheduler} import org.apache.spark.scheduler.local.LocalScheduler -import org.apache.spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} +import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.storage.{StorageUtils, BlockManagerSource} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap} @@ -85,9 +86,11 @@ class SparkContext( val sparkHome: String = null, val jars: Seq[String] = Nil, val environment: Map[String, String] = Map(), - // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too. - // This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host - val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map()) + // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) + // too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set + // of data-local splits on host + val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = + scala.collection.immutable.Map()) extends Logging { // Ensure logging is initialized before we spawn any threads @@ -147,7 +150,7 @@ class SparkContext( } // Create and start the scheduler - private var taskScheduler: TaskScheduler = { + private[spark] var taskScheduler: TaskScheduler = { // Regular expression used for local[N] master format val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r // Regular expression for local[N, maxRetries], used in tests with failing tasks @@ -240,7 +243,8 @@ class SparkContext( val env = SparkEnv.get val conf = env.hadoop.newConfiguration() // Explicitly check for S3 environment variables - if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) { + if (System.getenv("AWS_ACCESS_KEY_ID") != null && + System.getenv("AWS_SECRET_ACCESS_KEY") != null) { conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) @@ -339,6 +343,8 @@ class SparkContext( valueClass: Class[V], minSplits: Int = defaultMinSplits ): RDD[(K, V)] = { + // Add necessary security credentials to the JobConf before broadcasting it. + SparkEnv.get.hadoop.addCredentials(conf) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } @@ -349,10 +355,27 @@ class SparkContext( keyClass: Class[K], valueClass: Class[V], minSplits: Int = defaultMinSplits - ) : RDD[(K, V)] = { - val conf = new JobConf(hadoopConfiguration) - FileInputFormat.setInputPaths(conf, path) - new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) + ): RDD[(K, V)] = { + // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. + val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) + hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) + } + + /** + * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration + * that has already been broadcast, assuming that it's safe to use it to construct a + * HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued). + */ + def hadoopFile[K, V]( + path: String, + confBroadcast: Broadcast[SerializableWritable[Configuration]], + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int + ): RDD[(K, V)] = { + new HadoopFileRDD( + this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) } /** diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 03bf268863..8466c2a004 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -46,6 +46,10 @@ private[spark] case class ExceptionFailure( metrics: Option[TaskMetrics]) extends TaskEndReason -private[spark] case class OtherFailure(message: String) extends TaskEndReason +/** + * The task finished successfully, but the result was lost from the executor's block manager before + * it was fetched. + */ +private[spark] case object TaskResultLost extends TaskEndReason -private[spark] case class TaskResultTooBigFailure() extends TaskEndReason +private[spark] case class OtherFailure(message: String) extends TaskEndReason diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index feb2cab578..d7b45d4caa 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -69,6 +69,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { new JavaRDD(rdd.map(f)(f.returnType()))(f.returnType()) /** + * Return a new RDD by applying a function to each partition of this RDD, while tracking the index + * of the original partition. + */ + def mapPartitionsWithIndex[R: ClassManifest]( + f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]], + preservesPartitioning: Boolean = false): JavaRDD[R] = + new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))), + preservesPartitioning)) + + /** * Return a new RDD by applying a function to all elements of this RDD. */ def map[R](f: DoubleFunction[T]): JavaDoubleRDD = diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala index b090c6edf3..2be4e323be 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala @@ -17,12 +17,13 @@ package org.apache.spark.api.python -import org.apache.spark.Partitioner import java.util.Arrays + +import org.apache.spark.Partitioner import org.apache.spark.util.Utils /** - * A [[org.apache.spark.Partitioner]] that performs handling of byte arrays, for use by the Python API. + * A [[org.apache.spark.Partitioner]] that performs handling of long-valued keys, for use by the Python API. * * Stores the unique id() of the Python-side partitioning function so that it is incorporated into * equality comparisons. Correctness requires that the id is a unique identifier for the @@ -30,6 +31,7 @@ import org.apache.spark.util.Utils * function). This can be ensured by using the Python id() function and maintaining a reference * to the Python partitioning function so that its id() is not reused. */ + private[spark] class PythonPartitioner( override val numPartitions: Int, val pyPartitionFunctionId: Long) @@ -37,7 +39,9 @@ private[spark] class PythonPartitioner( override def getPartition(key: Any): Int = key match { case null => 0 - case key: Array[Byte] => Utils.nonNegativeMod(Arrays.hashCode(key), numPartitions) + // we don't trust the Python partition function to return valid partition ID's so + // let's do a modulo numPartitions in any case + case key: Long => Utils.nonNegativeMod(key.toInt, numPartitions) case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index cb2db77f39..4d887cf195 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -187,14 +187,14 @@ private class PythonException(msg: String) extends Exception(msg) * This is used by PySpark's shuffle operations. */ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends - RDD[(Array[Byte], Array[Byte])](prev) { + RDD[(Long, Array[Byte])](prev) { override def getPartitions = prev.partitions override def compute(split: Partition, context: TaskContext) = prev.iterator(split, context).grouped(2).map { - case Seq(a, b) => (a, b) + case Seq(a, b) => (Utils.deserializeLongValue(a), b) case x => throw new SparkException("PairwiseRDD: unexpected value: " + x) } - val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this) + val asJavaPairRDD : JavaPairRDD[Long, Array[Byte]] = JavaPairRDD.fromRDD(this) } private[spark] object PythonRDD { diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index 87a703427c..04d01c169d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -41,6 +41,7 @@ private[spark] object JsonProtocol { ("starttime" -> obj.startTime) ~ ("id" -> obj.id) ~ ("name" -> obj.desc.name) ~ + ("appuiurl" -> obj.appUiUrl) ~ ("cores" -> obj.desc.maxCores) ~ ("user" -> obj.desc.user) ~ ("memoryperslave" -> obj.desc.memoryPerSlave) ~ @@ -64,7 +65,7 @@ private[spark] object JsonProtocol { } def writeMasterState(obj: MasterStateResponse) = { - ("url" -> ("spark://" + obj.uri)) ~ + ("url" -> obj.uri) ~ ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~ ("cores" -> obj.workers.map(_.cores).sum) ~ ("coresused" -> obj.workers.map(_.coresUsed).sum) ~ diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 0a5f4c368f..993ba6bd3d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -16,6 +16,9 @@ */ package org.apache.spark.deploy + +import com.google.common.collect.MapMaker + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf @@ -24,11 +27,16 @@ import org.apache.hadoop.mapred.JobConf * Contains util methods to interact with Hadoop from spark. */ class SparkHadoopUtil { + // A general, soft-reference map for metadata needed during HadoopRDD split computation + // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). + private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() - // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems + // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop + // subsystems def newConfiguration(): Configuration = new Configuration() - // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster + // Add any user credentials to the job conf which are necessary for running on a secure Hadoop + // cluster def addCredentials(conf: JobConf) {} def isYarnMode(): Boolean = { false } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 99a4a95e82..b4153f3533 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import java.io.{File} +import java.io.File import java.lang.management.ManagementFactory import java.nio.ByteBuffer import java.util.concurrent._ @@ -27,11 +27,11 @@ import scala.collection.mutable.HashMap import org.apache.spark.scheduler._ import org.apache.spark._ +import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils - /** - * The Mesos executor for Spark. + * Spark executor used with Mesos and the standalone scheduler. */ private[spark] class Executor( executorId: String, @@ -167,12 +167,20 @@ private[spark] class Executor( // we need to serialize the task metrics first. If TaskMetrics had a custom serialized format, we could // just change the relevants bytes in the byte buffer val accumUpdates = Accumulators.values - val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null)) - val serializedResult = ser.serialize(result) - logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit) - if (serializedResult.limit >= (akkaFrameSize - 1024)) { - context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskResultTooBigFailure())) - return + val directResult = new DirectTaskResult(value, accumUpdates, task.metrics.getOrElse(null)) + val serializedDirectResult = ser.serialize(directResult) + logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit) + val serializedResult = { + if (serializedDirectResult.limit >= akkaFrameSize - 1024) { + logInfo("Storing result for " + taskId + " in local BlockManager") + val blockId = "taskresult_" + taskId + env.blockManager.putBytes( + blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) + ser.serialize(new IndirectTaskResult[Any](blockId)) + } else { + logInfo("Sending result for " + taskId + " directly to driver") + serializedDirectResult + } } context.statusUpdate(taskId, TaskState.FINISHED, serializedResult) logInfo("Finished task ID " + taskId) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 2cb6734e41..d3b3fffd40 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -19,6 +19,7 @@ package org.apache.spark.rdd import java.io.EOFException +import org.apache.hadoop.mapred.FileInputFormat import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.InputSplit import org.apache.hadoop.mapred.JobConf @@ -26,10 +27,47 @@ import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils -import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, + TaskContext} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.util.NextIterator import org.apache.hadoop.conf.{Configuration, Configurable} +/** + * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file + * system, or S3). + * This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same + * across multiple reads; the 'path' is the only variable that is different across new JobConfs + * created from the Configuration. + */ +class HadoopFileRDD[K, V]( + sc: SparkContext, + path: String, + broadcastedConf: Broadcast[SerializableWritable[Configuration]], + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int) + extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) { + + override def getJobConf(): JobConf = { + if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { + // getJobConf() has been called previously, so there is already a local cache of the JobConf + // needed by this RDD. + return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] + } else { + // Create a new JobConf, set the input file/directory paths to read from, and cache the + // JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through + // HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple + // getJobConf() calls for this RDD in the local process. + // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. + val newJobConf = new JobConf(broadcastedConf.value.value) + FileInputFormat.setInputPaths(newJobConf, path) + HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) + return newJobConf + } + } +} /** * A Spark split class that wraps around a Hadoop InputSplit. @@ -45,29 +83,80 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp } /** - * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file - * system, or S3, tables in HBase, etc). + * A base class that provides core functionality for reading data partitions stored in Hadoop. */ class HadoopRDD[K, V]( sc: SparkContext, - @transient conf: JobConf, + broadcastedConf: Broadcast[SerializableWritable[Configuration]], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minSplits: Int) extends RDD[(K, V)](sc, Nil) with Logging { - // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it - private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) + def this( + sc: SparkContext, + conf: JobConf, + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int) = { + this( + sc, + sc.broadcast(new SerializableWritable(conf)) + .asInstanceOf[Broadcast[SerializableWritable[Configuration]]], + inputFormatClass, + keyClass, + valueClass, + minSplits) + } + + protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) + + protected val inputFormatCacheKey = "rdd_%d_input_format".format(id) + + // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. + protected def getJobConf(): JobConf = { + val conf: Configuration = broadcastedConf.value.value + if (conf.isInstanceOf[JobConf]) { + // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. + return conf.asInstanceOf[JobConf] + } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { + // getJobConf() has been called previously, so there is already a local cache of the JobConf + // needed by this RDD. + return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] + } else { + // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the + // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). + // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. + val newJobConf = new JobConf(broadcastedConf.value.value) + HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) + return newJobConf + } + } + + protected def getInputFormat(conf: JobConf): InputFormat[K, V] = { + if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) { + return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]] + } + // Once an InputFormat for this RDD is created, cache it so that only one reflection call is + // done in each local process. + val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) + .asInstanceOf[InputFormat[K, V]] + if (newInputFormat.isInstanceOf[Configurable]) { + newInputFormat.asInstanceOf[Configurable].setConf(conf) + } + HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat) + return newInputFormat + } override def getPartitions: Array[Partition] = { - val env = SparkEnv.get - env.hadoop.addCredentials(conf) - val inputFormat = createInputFormat(conf) + val jobConf = getJobConf() + val inputFormat = getInputFormat(jobConf) if (inputFormat.isInstanceOf[Configurable]) { - inputFormat.asInstanceOf[Configurable].setConf(conf) + inputFormat.asInstanceOf[Configurable].setConf(jobConf) } - val inputSplits = inputFormat.getSplits(conf, minSplits) + val inputSplits = inputFormat.getSplits(jobConf, minSplits) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) @@ -75,22 +164,14 @@ class HadoopRDD[K, V]( array } - def createInputFormat(conf: JobConf): InputFormat[K, V] = { - ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) - .asInstanceOf[InputFormat[K, V]] - } - override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] { val split = theSplit.asInstanceOf[HadoopPartition] logInfo("Input split: " + split.inputSplit) var reader: RecordReader[K, V] = null - val conf = confBroadcast.value.value - val fmt = createInputFormat(conf) - if (fmt.isInstanceOf[Configurable]) { - fmt.asInstanceOf[Configurable].setConf(conf) - } - reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) + val jobConf = getJobConf() + val inputFormat = getInputFormat(jobConf) + reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback{ () => closeIfNeeded() } @@ -127,5 +208,18 @@ class HadoopRDD[K, V]( // Do nothing. Hadoop RDD should not be checkpointed. } - def getConf: Configuration = confBroadcast.value.value + def getConf: Configuration = getJobConf() +} + +private[spark] object HadoopRDD { + /** + * The three methods below are helpers for accessing the local map, a property of the SparkEnv of + * the local process. + */ + def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key) + + def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key) + + def putCachedMetadata(key: String, value: Any) = + SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index ea4eeb7dbf..731ef90c90 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -756,24 +756,42 @@ abstract class RDD[T: ClassTag]( } /** - * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so - * it will be slow if a lot of partitions are required. In that case, use collect() to get the - * whole RDD instead. + * Take the first num elements of the RDD. It works by first scanning one partition, and use the + * results from that partition to estimate the number of additional partitions needed to satisfy + * the limit. */ def take(num: Int): Array[T] = { if (num == 0) { return new Array[T](0) } + val buf = new ArrayBuffer[T] - var p = 0 - while (buf.size < num && p < partitions.size) { + val totalParts = this.partitions.length + var partsScanned = 0 + while (buf.size < num && partsScanned < totalParts) { + // The number of partitions to try in this iteration. It is ok for this number to be + // greater than totalParts because we actually cap it at totalParts in runJob. + var numPartsToTry = 1 + if (partsScanned > 0) { + // If we didn't find any rows after the first iteration, just try all partitions next. + // Otherwise, interpolate the number of partitions we need to try, but overestimate it + // by 50%. + if (buf.size == 0) { + numPartsToTry = totalParts - 1 + } else { + numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt + } + } + numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions + val left = num - buf.size - val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true) - buf ++= res(0) - if (buf.size == num) - return buf.toArray - p += 1 + val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) + val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true) + + res.foreach(buf ++= _.take(num - buf.size)) + partsScanned += numPartsToTry } + return buf.toArray } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dca84db597..e79b67579e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -29,7 +29,6 @@ import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} -import org.apache.spark.scheduler.cluster.TaskInfo import org.apache.spark.storage.{BlockManager, BlockManagerMaster} import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap} @@ -554,7 +553,7 @@ class DAGScheduler( SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head) } catch { case e: NotSerializableException => - abortStage(stage, e.toString) + abortStage(stage, "Task not serializable: " + e.toString) running -= stage return } @@ -706,6 +705,9 @@ class DAGScheduler( case ExceptionFailure(className, description, stackTrace, metrics) => // Do nothing here, left up to the TaskScheduler to decide how to handle user failures + case TaskResultLost => + // Do nothing here; the TaskScheduler handles these failures and resubmits the task. + case other => // Unrecognized failure - abort all jobs depending on this stage abortStage(stageIdToStage(task.stageId), task + " failed: " + other) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 0d99670648..10ff1b4376 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -19,7 +19,6 @@ package org.apache.spark.scheduler import java.util.Properties -import org.apache.spark.scheduler.cluster.TaskInfo import scala.collection.mutable.Map import org.apache.spark._ diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index c8b78bf00a..3628b1b078 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -30,7 +30,6 @@ import scala.io.Source import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.scheduler.cluster.TaskInfo
// Used to record runtime information for each job, including RDD graph
// tasks' start/stop shuffle information and information from outside
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 35b32600da..9eb8d48501 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import org.apache.spark.Logging -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode /** * An Schedulable entity that represent collection of Pools or TaskSetManagers @@ -45,7 +45,7 @@ private[spark] class Pool( var priority = 0 var stageId = 0 var name = poolName - var parent:Schedulable = null + var parent: Pool = null var taskSetSchedulingAlgorithm: SchedulingAlgorithm = { schedulingMode match { @@ -101,14 +101,14 @@ private[spark] class Pool( return sortedTaskSetQueue } - override def increaseRunningTasks(taskNum: Int) { + def increaseRunningTasks(taskNum: Int) { runningTasks += taskNum if (parent != null) { parent.increaseRunningTasks(taskNum) } } - override def decreaseRunningTasks(taskNum: Int) { + def decreaseRunningTasks(taskNum: Int) { runningTasks -= taskNum if (parent != null) { parent.decreaseRunningTasks(taskNum) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala index f4726450ec..1c7ea2dccc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala @@ -15,9 +15,9 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import scala.collection.mutable.ArrayBuffer /** @@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer * there are two type of Schedulable entities(Pools and TaskSetManagers) */ private[spark] trait Schedulable { - var parent: Schedulable + var parent: Pool // child queues def schedulableQueue: ArrayBuffer[Schedulable] def schedulingMode: SchedulingMode @@ -36,8 +36,6 @@ private[spark] trait Schedulable { def stageId: Int def name: String - def increaseRunningTasks(taskNum: Int): Unit - def decreaseRunningTasks(taskNum: Int): Unit def addSchedulable(schedulable: Schedulable): Unit def removeSchedulable(schedulable: Schedulable): Unit def getSchedulableByName(name: String): Schedulable diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 114617c51a..4e25086ec9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import java.io.{FileInputStream, InputStream} import java.util.{NoSuchElementException, Properties} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala index cbeed4731a..3418640b8c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler /** * An interface for sort algorithm diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala index 16013b3208..3832ee7ff6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler /** * "FAIR" and "FIFO" determines which policy is used diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index c3cf4b8907..62b521ad45 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -18,7 +18,6 @@ package org.apache.spark.scheduler import java.util.Properties -import org.apache.spark.scheduler.cluster.TaskInfo import org.apache.spark.util.{Utils, Distribution} import org.apache.spark.{Logging, SparkContext, TaskEndReason} import org.apache.spark.executor.TaskMetrics diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 72cb1c9ce8..b6f11969e5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -17,8 +17,8 @@ package org.apache.spark.scheduler -import org.apache.spark.scheduler.cluster.TaskInfo import scala.collection._ + import org.apache.spark.executor.TaskMetrics case class StageInfo( diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 309ac2f6c9..5190d234d4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import java.nio.ByteBuffer import org.apache.spark.util.SerializableBuffer diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 9685fb1a67..7c2a422aff 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala index 8d8d708612..d31a5d5177 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler private[spark] object TaskLocality diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index 5c7e5bb977..db3954a9d3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -26,12 +26,17 @@ import java.nio.ByteBuffer import org.apache.spark.util.Utils // Task result. Also contains updates to accumulator variables. -// TODO: Use of distributed cache to return result is a hack to get around -// what seems to be a bug with messages over 60KB in libprocess; fix it +private[spark] sealed trait TaskResult[T] + +/** A reference to a DirectTaskResult that has been stored in the worker's BlockManager. */ +private[spark] +case class IndirectTaskResult[T](val blockId: String) extends TaskResult[T] with Serializable + +/** A TaskResult that contains the task's return value and accumulator updates. */ private[spark] -class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics) - extends Externalizable -{ +class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics) + extends TaskResult[T] with Externalizable { + def this() = this(null.asInstanceOf[T], null, null) override def writeExternal(out: ObjectOutput) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 63be8ba3f5..7c2a9f03d7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -17,10 +17,11 @@ package org.apache.spark.scheduler -import org.apache.spark.scheduler.cluster.Pool -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode + /** * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler. + * Each TaskScheduler schedulers task for a single SparkContext. * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage, * and are responsible for sending the tasks to the cluster, running them, retrying if there * are failures, and mitigating stragglers. They return events to the DAGScheduler through diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala index 83be051c1a..593fa9fb93 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala @@ -17,7 +17,6 @@ package org.apache.spark.scheduler -import org.apache.spark.scheduler.cluster.TaskInfo import scala.collection.mutable.Map import org.apache.spark.TaskEndReason diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 648a3ef922..90f6bcefac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -15,12 +15,11 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import java.nio.ByteBuffer import org.apache.spark.TaskState.TaskState -import org.apache.spark.scheduler.TaskSet /** * Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of @@ -45,7 +44,5 @@ private[spark] trait TaskSetManager extends Schedulable { maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] - def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) - def error(message: String) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 919acce828..1a844b7e7e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -18,6 +18,9 @@ package org.apache.spark.scheduler.cluster import java.lang.{Boolean => JBoolean} +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicLong +import java.util.{TimerTask, Timer} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap @@ -26,10 +29,7 @@ import scala.collection.mutable.HashSet import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode -import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicLong -import java.util.{TimerTask, Timer} +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode /** * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call @@ -55,7 +55,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // Threshold above which we warn user initial TaskSet may be starved val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong - val activeTaskSets = new HashMap[String, TaskSetManager] + // ClusterTaskSetManagers are not thread safe, so any access to one should be synchronized + // on this class. + val activeTaskSets = new HashMap[String, ClusterTaskSetManager] val taskIdToTaskSetId = new HashMap[Long, String] val taskIdToExecutorId = new HashMap[Long, String] @@ -65,7 +67,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) @volatile private var hasLaunchedTask = false private val starvationTimer = new Timer(true) - // Incrementing Mesos task IDs + // Incrementing task IDs val nextTaskId = new AtomicLong(0) // Which executor IDs we have executors on @@ -96,6 +98,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val schedulingMode: SchedulingMode = SchedulingMode.withName( System.getProperty("spark.scheduler.mode", "FIFO")) + // This is a var so that we can reset it for testing purposes. + private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this) + override def setListener(listener: TaskSchedulerListener) { this.listener = listener } @@ -234,7 +239,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { - var taskSetToUpdate: Option[TaskSetManager] = None var failedExecutor: Option[String] = None var taskFailed = false synchronized { @@ -249,9 +253,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } taskIdToTaskSetId.get(tid) match { case Some(taskSetId) => - if (activeTaskSets.contains(taskSetId)) { - taskSetToUpdate = Some(activeTaskSets(taskSetId)) - } if (TaskState.isFinished(state)) { taskIdToTaskSetId.remove(tid) if (taskSetTaskIds.contains(taskSetId)) { @@ -262,6 +263,15 @@ private[spark] class ClusterScheduler(val sc: SparkContext) if (state == TaskState.FAILED) { taskFailed = true } + activeTaskSets.get(taskSetId).foreach { taskSet => + if (state == TaskState.FINISHED) { + taskSet.removeRunningTask(tid) + taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) + } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { + taskSet.removeRunningTask(tid) + taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) + } + } case None => logInfo("Ignoring update from TID " + tid + " because its task set is gone") } @@ -269,10 +279,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) case e: Exception => logError("Exception in statusUpdate", e) } } - // Update the task set and DAGScheduler without holding a lock on this, since that can deadlock - if (taskSetToUpdate != None) { - taskSetToUpdate.get.statusUpdate(tid, state, serializedData) - } + // Update the DAGScheduler without holding a lock on this, since that can deadlock if (failedExecutor != None) { listener.executorLost(failedExecutor.get) backend.reviveOffers() @@ -283,6 +290,25 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } + def handleSuccessfulTask( + taskSetManager: ClusterTaskSetManager, + tid: Long, + taskResult: DirectTaskResult[_]) = synchronized { + taskSetManager.handleSuccessfulTask(tid, taskResult) + } + + def handleFailedTask( + taskSetManager: ClusterTaskSetManager, + tid: Long, + taskState: TaskState, + reason: Option[TaskEndReason]) = synchronized { + taskSetManager.handleFailedTask(tid, taskState, reason) + if (taskState == TaskState.FINISHED) { + // The task finished successfully but the result was lost, so we should revive offers. + backend.reviveOffers() + } + } + def error(message: String) { synchronized { if (activeTaskSets.size > 0) { @@ -311,6 +337,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) if (jarServer != null) { jarServer.stop() } + if (taskResultGetter != null) { + taskResultGetter.stop() + } // sleeping for an arbitrary 5 seconds : to ensure that messages are sent out. // TODO: Do something better ! diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala index f61fde6957..194ab55102 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -25,15 +25,12 @@ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet import scala.math.max import scala.math.min +import scala.Some -import org.apache.spark.{FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskState} -import org.apache.spark.{ExceptionFailure, SparkException, TaskResultTooBigFailure} +import org.apache.spark.{ExceptionFailure, FetchFailed, Logging, Resubmitted, SparkEnv, + SparkException, Success, TaskEndReason, TaskResultLost, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler._ -import scala.Some -import org.apache.spark.FetchFailed -import org.apache.spark.ExceptionFailure -import org.apache.spark.TaskResultTooBigFailure import org.apache.spark.util.{SystemClock, Clock} @@ -71,18 +68,20 @@ private[spark] class ClusterTaskSetManager( val tasks = taskSet.tasks val numTasks = tasks.length val copiesRunning = new Array[Int](numTasks) - val finished = new Array[Boolean](numTasks) + val successful = new Array[Boolean](numTasks) val numFailures = new Array[Int](numTasks) val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) - var tasksFinished = 0 + var tasksSuccessful = 0 var weight = 1 var minShare = 0 - var runningTasks = 0 var priority = taskSet.priority var stageId = taskSet.stageId var name = "TaskSet_"+taskSet.stageId.toString - var parent: Schedulable = null + var parent: Pool = null + + var runningTasks = 0 + private val runningTasksSet = new HashSet[Long] // Set of pending tasks for each executor. These collections are actually // treated as stacks, in which new tasks are added to the end of the @@ -223,7 +222,7 @@ private[spark] class ClusterTaskSetManager( while (!list.isEmpty) { val index = list.last list.trimEnd(1) - if (copiesRunning(index) == 0 && !finished(index)) { + if (copiesRunning(index) == 0 && !successful(index)) { return Some(index) } } @@ -243,7 +242,7 @@ private[spark] class ClusterTaskSetManager( private def findSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value) : Option[(Int, TaskLocality.Value)] = { - speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set + speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set if (!speculatableTasks.isEmpty) { // Check for process-local or preference-less tasks; note that tasks can be process-local @@ -344,7 +343,7 @@ private[spark] class ClusterTaskSetManager( maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = { - if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { + if (tasksSuccessful < numTasks && availableCpus >= CPUS_PER_TASK) { val curTime = clock.getTime() var allowedLocality = getAllowedLocalityLevel(curTime) @@ -375,7 +374,7 @@ private[spark] class ClusterTaskSetManager( val serializedTask = Task.serializeWithDependencies( task, sched.sc.addedFiles, sched.sc.addedJars, ser) val timeTaken = clock.getTime() - startTime - increaseRunningTasks(1) + addRunningTask(taskId) logInfo("Serialized task %s:%d as %d bytes in %d ms".format( taskSet.id, index, serializedTask.limit, timeTaken)) val taskName = "task %s:%d".format(taskSet.id, index) @@ -417,94 +416,61 @@ private[spark] class ClusterTaskSetManager( index } - /** Called by cluster scheduler when one of our tasks changes state */ - override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { - SparkEnv.set(env) - state match { - case TaskState.FINISHED => - taskFinished(tid, state, serializedData) - case TaskState.LOST => - taskLost(tid, state, serializedData) - case TaskState.FAILED => - taskLost(tid, state, serializedData) - case TaskState.KILLED => - taskLost(tid, state, serializedData) - case _ => - } - } - - def taskStarted(task: Task[_], info: TaskInfo) { + private def taskStarted(task: Task[_], info: TaskInfo) { sched.listener.taskStarted(task, info) } - def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) { + /** + * Marks the task as successful and notifies the listener that a task has ended. + */ + def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = { val info = taskInfos(tid) - if (info.failed) { - // We might get two task-lost messages for the same task in coarse-grained Mesos mode, - // or even from Mesos itself when acks get delayed. - return - } val index = info.index info.markSuccessful() - decreaseRunningTasks(1) - if (!finished(index)) { - tasksFinished += 1 + removeRunningTask(tid) + if (!successful(index)) { logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format( - tid, info.duration, info.host, tasksFinished, numTasks)) - // Deserialize task result and pass it to the scheduler - try { - val result = ser.deserialize[TaskResult[_]](serializedData) - result.metrics.resultSize = serializedData.limit() - sched.listener.taskEnded( - tasks(index), Success, result.value, result.accumUpdates, info, result.metrics) - } catch { - case cnf: ClassNotFoundException => - val loader = Thread.currentThread().getContextClassLoader - throw new SparkException("ClassNotFound with classloader: " + loader, cnf) - case ex: Throwable => throw ex - } - // Mark finished and stop if we've finished all the tasks - finished(index) = true - if (tasksFinished == numTasks) { + tid, info.duration, info.host, tasksSuccessful, numTasks)) + sched.listener.taskEnded( + tasks(index), Success, result.value, result.accumUpdates, info, result.metrics) + + // Mark successful and stop if all the tasks have succeeded. + tasksSuccessful += 1 + successful(index) = true + if (tasksSuccessful == numTasks) { sched.taskSetFinished(this) } } else { - logInfo("Ignoring task-finished event for TID " + tid + - " because task " + index + " is already finished") + logInfo("Ignorning task-finished event for TID " + tid + " because task " + + index + " has already completed successfully") } } - def taskLost(tid: Long, state: TaskState, serializedData: ByteBuffer) { + /** + * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the listener. + */ + def handleFailedTask(tid: Long, state: TaskState, reason: Option[TaskEndReason]) { val info = taskInfos(tid) if (info.failed) { - // We might get two task-lost messages for the same task in coarse-grained Mesos mode, - // or even from Mesos itself when acks get delayed. return } + removeRunningTask(tid) val index = info.index info.markFailed() - decreaseRunningTasks(1) - if (!finished(index)) { + if (!successful(index)) { logInfo("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index)) copiesRunning(index) -= 1 // Check if the problem is a map output fetch failure. In that case, this // task will never succeed on any node, so tell the scheduler about it. - if (serializedData != null && serializedData.limit() > 0) { - val reason = ser.deserialize[TaskEndReason](serializedData, getClass.getClassLoader) - reason match { + reason.foreach { + _ match { case fetchFailed: FetchFailed => logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress) sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null) - finished(index) = true - tasksFinished += 1 + successful(index) = true + tasksSuccessful += 1 sched.taskSetFinished(this) - decreaseRunningTasks(runningTasks) - return - - case taskResultTooBig: TaskResultTooBigFailure => - logInfo("Loss was due to task %s result exceeding Akka frame size; aborting job".format( - tid)) - abort("Task %s result exceeded Akka frame size".format(tid)) + removeAllRunningTasks() return case ef: ExceptionFailure => @@ -534,13 +500,16 @@ private[spark] class ClusterTaskSetManager( logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount)) } + case TaskResultLost => + logInfo("Lost result for TID %s on host %s".format(tid, info.host)) + sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null) + case _ => {} } } // On non-fetch failures, re-enqueue the task as pending for a max number of retries addPendingTask(index) - // Count failed attempts only on FAILED and LOST state (not on KILLED) - if (state == TaskState.FAILED || state == TaskState.LOST) { + if (state != TaskState.KILLED) { numFailures(index) += 1 if (numFailures(index) > MAX_TASK_FAILURES) { logError("Task %s:%d failed more than %d times; aborting job".format( @@ -564,22 +533,36 @@ private[spark] class ClusterTaskSetManager( causeOfFailure = message // TODO: Kill running tasks if we were not terminated due to a Mesos error sched.listener.taskSetFailed(taskSet, message) - decreaseRunningTasks(runningTasks) + removeAllRunningTasks() sched.taskSetFinished(this) } - override def increaseRunningTasks(taskNum: Int) { - runningTasks += taskNum - if (parent != null) { - parent.increaseRunningTasks(taskNum) + /** If the given task ID is not in the set of running tasks, adds it. + * + * Used to keep track of the number of running tasks, for enforcing scheduling policies. + */ + def addRunningTask(tid: Long) { + if (runningTasksSet.add(tid) && parent != null) { + parent.increaseRunningTasks(1) + } + runningTasks = runningTasksSet.size + } + + /** If the given task ID is in the set of running tasks, removes it. */ + def removeRunningTask(tid: Long) { + if (runningTasksSet.remove(tid) && parent != null) { + parent.decreaseRunningTasks(1) } + runningTasks = runningTasksSet.size } - override def decreaseRunningTasks(taskNum: Int) { - runningTasks -= taskNum + private def removeAllRunningTasks() { + val numRunningTasks = runningTasksSet.size + runningTasksSet.clear() if (parent != null) { - parent.decreaseRunningTasks(taskNum) + parent.decreaseRunningTasks(numRunningTasks) } + runningTasks = 0 } override def getSchedulableByName(name: String): Schedulable = { @@ -615,10 +598,10 @@ private[spark] class ClusterTaskSetManager( if (tasks(0).isInstanceOf[ShuffleMapTask]) { for ((tid, info) <- taskInfos if info.executorId == execId) { val index = taskInfos(tid).index - if (finished(index)) { - finished(index) = false + if (successful(index)) { + successful(index) = false copiesRunning(index) -= 1 - tasksFinished -= 1 + tasksSuccessful -= 1 addPendingTask(index) // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. @@ -628,7 +611,7 @@ private[spark] class ClusterTaskSetManager( } // Also re-enqueue any tasks that were running on the node for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { - taskLost(tid, TaskState.KILLED, null) + handleFailedTask(tid, TaskState.KILLED, None) } } @@ -641,13 +624,13 @@ private[spark] class ClusterTaskSetManager( */ override def checkSpeculatableTasks(): Boolean = { // Can't speculate if we only have one task, or if all tasks have finished. - if (numTasks == 1 || tasksFinished == numTasks) { + if (numTasks == 1 || tasksSuccessful == numTasks) { return false } var foundTasks = false val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation) - if (tasksFinished >= minFinishedForSpeculation) { + if (tasksSuccessful >= minFinishedForSpeculation) { val time = clock.getTime() val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray Arrays.sort(durations) @@ -658,7 +641,7 @@ private[spark] class ClusterTaskSetManager( logDebug("Task length threshold for speculation: " + threshold) for ((tid, info) <- taskInfos) { val index = info.index - if (!finished(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && + if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && !speculatableTasks.contains(index)) { logInfo( "Marking task %s:%d (on %s) as speculatable because it ran more than %.0f ms".format( @@ -672,7 +655,7 @@ private[spark] class ClusterTaskSetManager( } override def hasPendingTasks(): Boolean = { - numTasks > 0 && tasksFinished < numTasks + numTasks > 0 && tasksSuccessful < numTasks } private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala index 9c36d221f6..c0b836bf1a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster import java.nio.ByteBuffer import org.apache.spark.TaskState.TaskState +import org.apache.spark.scheduler.TaskDescription import org.apache.spark.util.{Utils, SerializableBuffer} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 49f668eb32..b6f0ec961a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -28,6 +28,7 @@ import akka.pattern.ask import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent} import org.apache.spark.{SparkException, Logging, TaskState} +import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._ import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala new file mode 100644 index 0000000000..feec8ecfe4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import java.nio.ByteBuffer +import java.util.concurrent.{LinkedBlockingDeque, ThreadFactory, ThreadPoolExecutor, TimeUnit} + +import org.apache.spark._ +import org.apache.spark.TaskState.TaskState +import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult} +import org.apache.spark.serializer.SerializerInstance + +/** + * Runs a thread pool that deserializes and remotely fetches (if necessary) task results. + */ +private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler) + extends Logging { + private val MIN_THREADS = System.getProperty("spark.resultGetter.minThreads", "4").toInt + private val MAX_THREADS = System.getProperty("spark.resultGetter.maxThreads", "4").toInt + private val getTaskResultExecutor = new ThreadPoolExecutor( + MIN_THREADS, + MAX_THREADS, + 0L, + TimeUnit.SECONDS, + new LinkedBlockingDeque[Runnable], + new ResultResolverThreadFactory) + + class ResultResolverThreadFactory extends ThreadFactory { + private var counter = 0 + private var PREFIX = "Result resolver thread" + + override def newThread(r: Runnable): Thread = { + val thread = new Thread(r, "%s-%s".format(PREFIX, counter)) + counter += 1 + thread.setDaemon(true) + return thread + } + } + + protected val serializer = new ThreadLocal[SerializerInstance] { + override def initialValue(): SerializerInstance = { + return sparkEnv.closureSerializer.newInstance() + } + } + + def enqueueSuccessfulTask( + taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) { + getTaskResultExecutor.execute(new Runnable { + override def run() { + try { + val result = serializer.get().deserialize[TaskResult[_]](serializedData) match { + case directResult: DirectTaskResult[_] => directResult + case IndirectTaskResult(blockId) => + logDebug("Fetching indirect task result for TID %s".format(tid)) + val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId) + if (!serializedTaskResult.isDefined) { + /* We won't be able to get the task result if the machine that ran the task failed + * between when the task ended and when we tried to fetch the result, or if the + * block manager had to flush the result. */ + scheduler.handleFailedTask( + taskSetManager, tid, TaskState.FINISHED, Some(TaskResultLost)) + return + } + val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]]( + serializedTaskResult.get) + sparkEnv.blockManager.master.removeBlock(blockId) + deserializedResult + } + result.metrics.resultSize = serializedData.limit() + scheduler.handleSuccessfulTask(taskSetManager, tid, result) + } catch { + case cnf: ClassNotFoundException => + val loader = Thread.currentThread.getContextClassLoader + taskSetManager.abort("ClassNotFound with classloader: " + loader) + case ex => + taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex)) + } + } + }) + } + + def enqueueFailedTask(taskSetManager: ClusterTaskSetManager, tid: Long, taskState: TaskState, + serializedData: ByteBuffer) { + var reason: Option[TaskEndReason] = None + getTaskResultExecutor.execute(new Runnable { + override def run() { + try { + if (serializedData != null && serializedData.limit() > 0) { + reason = Some(serializer.get().deserialize[TaskEndReason]( + serializedData, getClass.getClassLoader)) + } + } catch { + case cnd: ClassNotFoundException => + // Log an error but keep going here -- the task failed, so not catastropic if we can't + // deserialize the reason. + val loader = Thread.currentThread.getContextClassLoader + logError( + "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) + case ex => {} + } + scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) + } + }) + } + + def stop() { + getTaskResultExecutor.shutdownNow() + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index babe875fa1..bf4040fafc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -15,22 +15,22 @@ * limitations under the License. */ -package org.apache.spark.scheduler.mesos +package org.apache.spark.scheduler.cluster.mesos -import com.google.protobuf.ByteString +import java.io.File +import java.util.{ArrayList => JArrayList, List => JList} +import java.util.Collections + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.JavaConversions._ +import com.google.protobuf.ByteString import org.apache.mesos.{Scheduler => MScheduler} import org.apache.mesos._ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} -import org.apache.spark.{SparkException, Logging, SparkContext} -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.collection.JavaConversions._ -import java.io.File -import org.apache.spark.scheduler.cluster._ -import java.util.{ArrayList => JArrayList, List => JList} -import java.util.Collections -import org.apache.spark.TaskState +import org.apache.spark.{SparkException, Logging, SparkContext, TaskState} +import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend} /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 541f86e338..50cbc2ca92 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -15,22 +15,24 @@ * limitations under the License. */ -package org.apache.spark.scheduler.mesos +package org.apache.spark.scheduler.cluster.mesos -import com.google.protobuf.ByteString +import java.io.File +import java.util.{ArrayList => JArrayList, List => JList} +import java.util.Collections + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.JavaConversions._ +import com.google.protobuf.ByteString import org.apache.mesos.{Scheduler => MScheduler} import org.apache.mesos._ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} -import org.apache.spark.{SparkException, Logging, SparkContext} -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.collection.JavaConversions._ -import java.io.File -import org.apache.spark.scheduler.cluster._ -import java.util.{ArrayList => JArrayList, List => JList} -import java.util.Collections -import org.apache.spark.TaskState +import org.apache.spark.{Logging, SparkException, SparkContext, TaskState} +import org.apache.spark.scheduler.TaskDescription +import org.apache.spark.scheduler.cluster.{ClusterScheduler, ExecutorExited, ExecutorLossReason} +import org.apache.spark.scheduler.cluster.{SchedulerBackend, SlaveLost, WorkerOffer} import org.apache.spark.util.Utils /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala index 8cb4d1396f..4d1bb1c639 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala @@ -31,8 +31,7 @@ import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.ExecutorURLClassLoader import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster._ -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import akka.actor._ import org.apache.spark.util.Utils @@ -92,7 +91,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: var rootPool: Pool = null val schedulingMode: SchedulingMode = SchedulingMode.withName( System.getProperty("spark.scheduler.mode", "FIFO")) - val activeTaskSets = new HashMap[String, TaskSetManager] + val activeTaskSets = new HashMap[String, LocalTaskSetManager] val taskIdToTaskSetId = new HashMap[Long, String] val taskSetTaskIds = new HashMap[String, HashSet[Long]] @@ -211,7 +210,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: deserializedTask.metrics.get.executorRunTime = serviceTime.toInt deserializedTask.metrics.get.jvmGCTime = getTotalGCTime - startGCTime deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt - val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null)) + val taskResult = new DirectTaskResult( + result, accumUpdates, deserializedTask.metrics.getOrElse(null)) val serializedResult = ser.serialize(taskResult) localActor ! LocalStatusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala index e52cb998bd..c2e2399ccb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala @@ -21,16 +21,16 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, Success, TaskState} +import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, SparkException, Success, TaskState} import org.apache.spark.TaskState.TaskState -import org.apache.spark.scheduler.{Task, TaskResult, TaskSet} -import org.apache.spark.scheduler.cluster.{Schedulable, TaskDescription, TaskInfo, TaskLocality, TaskSetManager} +import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Pool, Schedulable, Task, + TaskDescription, TaskInfo, TaskLocality, TaskResult, TaskSet, TaskSetManager} private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet) extends TaskSetManager with Logging { - var parent: Schedulable = null + var parent: Pool = null var weight: Int = 1 var minShare: Int = 0 var runningTasks: Int = 0 @@ -49,14 +49,14 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas val numFailures = new Array[Int](numTasks) val MAX_TASK_FAILURES = sched.maxFailures - override def increaseRunningTasks(taskNum: Int): Unit = { + def increaseRunningTasks(taskNum: Int): Unit = { runningTasks += taskNum if (parent != null) { parent.increaseRunningTasks(taskNum) } } - override def decreaseRunningTasks(taskNum: Int): Unit = { + def decreaseRunningTasks(taskNum: Int): Unit = { runningTasks -= taskNum if (parent != null) { parent.decreaseRunningTasks(taskNum) @@ -132,7 +132,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas return None } - override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { + def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { SparkEnv.set(env) state match { case TaskState.FINISHED => @@ -152,7 +152,12 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas val index = info.index val task = taskSet.tasks(index) info.markSuccessful() - val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader) + val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader) match { + case directResult: DirectTaskResult[_] => directResult + case IndirectTaskResult(blockId) => { + throw new SparkException("Expect only DirectTaskResults when using LocalScheduler") + } + } result.metrics.resultSize = serializedData.limit() sched.listener.taskEnded(task, Success, result.value, result.accumUpdates, info, result.metrics) numFinished += 1 diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 13b98a51a1..7852849ce5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -484,7 +484,7 @@ private[spark] class BlockManager( for (loc <- locations) { logDebug("Getting remote block " + blockId + " from " + loc) val data = BlockManagerWorker.syncGetBlock( - GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) + GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) if (data != null) { return Some(dataDeserialize(blockId, data)) } @@ -495,10 +495,45 @@ private[spark] class BlockManager( } /** + * Get block from remote block managers as serialized bytes. + */ + def getRemoteBytes(blockId: String): Option[ByteBuffer] = { + // TODO: As with getLocalBytes, this is very similar to getRemote and perhaps should be + // refactored. + if (blockId == null) { + throw new IllegalArgumentException("Block Id is null") + } + logDebug("Getting remote block " + blockId + " as bytes") + + val locations = master.getLocations(blockId) + for (loc <- locations) { + logDebug("Getting remote block " + blockId + " from " + loc) + val data = BlockManagerWorker.syncGetBlock( + GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) + if (data != null) { + return Some(data) + } + logDebug("The value of block " + blockId + " is null") + } + logDebug("Block " + blockId + " not found") + return None + } + + /** * Get a block from the block manager (either local or remote). */ def get(blockId: String): Option[Iterator[Any]] = { - getLocal(blockId).orElse(getRemote(blockId)) + val local = getLocal(blockId) + if (local.isDefined) { + logInfo("Found block %s locally".format(blockId)) + return local + } + val remote = getRemote(blockId) + if (remote.isDefined) { + logInfo("Found block %s remotely".format(blockId)) + return remote + } + None } /** diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 3b3b2342fa..77a39c71ed 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -30,10 +30,10 @@ import org.apache.spark.util.{SizeEstimator, Utils} private class MemoryStore(blockManager: BlockManager, maxMemory: Long) extends BlockStore(blockManager) { - case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false) + case class Entry(value: Any, size: Long, deserialized: Boolean) private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true) - private var currentMemory = 0L + @volatile private var currentMemory = 0L // Object used to ensure that only one thread is putting blocks and if necessary, dropping // blocks from the memory store. private val putLock = new Object() @@ -110,9 +110,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def remove(blockId: String): Boolean = { entries.synchronized { - val entry = entries.get(blockId) + val entry = entries.remove(blockId) if (entry != null) { - entries.remove(blockId) currentMemory -= entry.size logInfo("Block %s of size %d dropped from memory (free %d)".format( blockId, entry.size, freeMemory)) @@ -126,6 +125,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def clear() { entries.synchronized { entries.clear() + currentMemory = 0 } logInfo("MemoryStore cleared") } @@ -160,8 +160,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) putLock.synchronized { if (ensureFreeSpace(blockId, size)) { val entry = new Entry(value, size, deserialized) - entries.synchronized { entries.put(blockId, entry) } - currentMemory += size + entries.synchronized { + entries.put(blockId, entry) + currentMemory += size + } if (deserialized) { logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala index 3ec9760ed0..453394dfda 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala @@ -21,7 +21,7 @@ import scala.util.Random import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ -import org.apache.spark.scheduler.cluster.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode /** diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index d1868dcf78..42e9be6e19 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -26,8 +26,8 @@ import org.eclipse.jetty.server.Handler import org.apache.spark.{ExceptionFailure, Logging, SparkContext} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.scheduler.cluster.TaskInfo import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListenerTaskEnd, SparkListener} +import org.apache.spark.scheduler.TaskInfo import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.Page.Executors import org.apache.spark.ui.UIUtils diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 3b428effaf..b39c0e9769 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.{NodeSeq, Node} -import org.apache.spark.scheduler.cluster.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode import org.apache.spark.ui.Page._ import org.apache.spark.ui.UIUtils._ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 5d46f38a2a..eb3b4e8522 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -21,10 +21,8 @@ import scala.Seq import scala.collection.mutable.{ListBuffer, HashMap, HashSet} import org.apache.spark.{ExceptionFailure, SparkContext, Success} -import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.TaskInfo import org.apache.spark.executor.TaskMetrics -import collection.mutable +import org.apache.spark.scheduler._ /** * Tracks task-level information to be displayed in the UI. diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala index 54e273fd8b..c1ee2f3d00 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala @@ -31,8 +31,9 @@ import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.{ExceptionFailure, SparkContext, Success} import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.SchedulingMode -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import collection.mutable +import org.apache.spark.scheduler.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.util.Utils /** Web UI showing progress status of all jobs in the given SparkContext. */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index b3d3666944..06810d8dbc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -21,8 +21,7 @@ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet import scala.xml.Node -import org.apache.spark.scheduler.Stage -import org.apache.spark.scheduler.cluster.Schedulable +import org.apache.spark.scheduler.{Schedulable, Stage} import org.apache.spark.ui.UIUtils /** Table showing list of pools */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index a9969ab1c0..163a3746ea 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -23,12 +23,12 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node +import org.apache.spark.{ExceptionFailure} +import org.apache.spark.executor.TaskMetrics import org.apache.spark.ui.UIUtils._ import org.apache.spark.ui.Page._ import org.apache.spark.util.{Utils, Distribution} -import org.apache.spark.{ExceptionFailure} -import org.apache.spark.scheduler.cluster.TaskInfo -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler.TaskInfo /** Page showing statistics and task list for a given stage */ private[spark] class StagePage(parent: JobProgressUI) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 32776eaa25..07db8622da 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -22,8 +22,7 @@ import java.util.Date import scala.xml.Node import scala.collection.mutable.HashSet -import org.apache.spark.scheduler.cluster.{SchedulingMode, TaskInfo} -import org.apache.spark.scheduler.Stage +import org.apache.spark.scheduler.{SchedulingMode, Stage, TaskInfo} import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index be215fc127..94ce50e964 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -77,6 +77,19 @@ private[spark] object Utils extends Logging { return ois.readObject.asInstanceOf[T] } + /** Deserialize a Long value (used for {@link org.apache.spark.api.python.PythonPartitioner}) */ + def deserializeLongValue(bytes: Array[Byte]) : Long = { + // Note: we assume that we are given a Long value encoded in network (big-endian) byte order + var result = bytes(7) & 0xFFL + result = result + ((bytes(6) & 0xFFL) << 8) + result = result + ((bytes(5) & 0xFFL) << 16) + result = result + ((bytes(4) & 0xFFL) << 24) + result = result + ((bytes(3) & 0xFFL) << 32) + result = result + ((bytes(2) & 0xFFL) << 40) + result = result + ((bytes(1) & 0xFFL) << 48) + result + ((bytes(0) & 0xFFL) << 56) + } + /** Serialize via nested stream using specific serializer */ def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(f: SerializationStream => Unit) = { val osWrapper = ser.serializeStream(new OutputStream { diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index adc6ca94ff..25b9c3eb78 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -332,6 +332,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } exception.getMessage should endWith("result exceeded Akka frame size") } + } object DistributedSuite { diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 096023f476..8fd1115207 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -25,7 +25,6 @@ import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ import scala.collection.parallel.mutable import org.apache.spark._ -import org.apache.spark.rdd.CoalescedRDDPartition class RDDSuite extends FunSuite with SharedSparkContext { @@ -321,6 +320,44 @@ class RDDSuite extends FunSuite with SharedSparkContext { for (i <- 0 until sample.size) assert(sample(i) === checkSample(i)) } + test("take") { + var nums = sc.makeRDD(Range(1, 1000), 1) + assert(nums.take(0).size === 0) + assert(nums.take(1) === Array(1)) + assert(nums.take(3) === Array(1, 2, 3)) + assert(nums.take(500) === (1 to 500).toArray) + assert(nums.take(501) === (1 to 501).toArray) + assert(nums.take(999) === (1 to 999).toArray) + assert(nums.take(1000) === (1 to 999).toArray) + + nums = sc.makeRDD(Range(1, 1000), 2) + assert(nums.take(0).size === 0) + assert(nums.take(1) === Array(1)) + assert(nums.take(3) === Array(1, 2, 3)) + assert(nums.take(500) === (1 to 500).toArray) + assert(nums.take(501) === (1 to 501).toArray) + assert(nums.take(999) === (1 to 999).toArray) + assert(nums.take(1000) === (1 to 999).toArray) + + nums = sc.makeRDD(Range(1, 1000), 100) + assert(nums.take(0).size === 0) + assert(nums.take(1) === Array(1)) + assert(nums.take(3) === Array(1, 2, 3)) + assert(nums.take(500) === (1 to 500).toArray) + assert(nums.take(501) === (1 to 501).toArray) + assert(nums.take(999) === (1 to 999).toArray) + assert(nums.take(1000) === (1 to 999).toArray) + + nums = sc.makeRDD(Range(1, 1000), 1000) + assert(nums.take(0).size === 0) + assert(nums.take(1) === Array(1)) + assert(nums.take(3) === Array(1, 2, 3)) + assert(nums.take(500) === (1 to 500).toArray) + assert(nums.take(501) === (1 to 501).toArray) + assert(nums.take(999) === (1 to 999).toArray) + assert(nums.take(1000) === (1 to 999).toArray) + } + test("top with predefined ordering") { val nums = Array.range(1, 100000) val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 94f66c94c6..2f933246b0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -32,9 +32,7 @@ import org.apache.spark.{Dependency, ShuffleDependency, OneToOneDependency} import org.apache.spark.{FetchFailed, Success, TaskEndReason} import org.apache.spark.storage.{BlockManagerId, BlockManagerMaster} -import org.apache.spark.scheduler.cluster.Pool -import org.apache.spark.scheduler.cluster.SchedulingMode -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode /** * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index aac7c207cb..41a161e08a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -29,7 +29,9 @@ import org.apache.spark.SparkContext._ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers { - test("local metrics") { + // TODO: This test has a race condition since the DAGScheduler now reports results + // asynchronously. It needs to be updated for that patch. + ignore("local metrics") { sc = new SparkContext("local[4]", "test") val listener = new SaveStageInfo sc.addSparkListener(listener) @@ -43,6 +45,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)} d.count + Thread.sleep(1000) listener.stageInfos.size should be (1) val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1") @@ -54,6 +57,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc d4.collectAsMap + Thread.sleep(1000) listener.stageInfos.size should be (4) listener.stageInfos.foreach {stageInfo => //small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala index 1b50ce06b3..95d3553d91 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala @@ -43,16 +43,16 @@ class FakeTaskSetManager( stageId = initStageId name = "TaskSet_"+stageId override val numTasks = initNumTasks - tasksFinished = 0 + tasksSuccessful = 0 - override def increaseRunningTasks(taskNum: Int) { + def increaseRunningTasks(taskNum: Int) { runningTasks += taskNum if (parent != null) { parent.increaseRunningTasks(taskNum) } } - override def decreaseRunningTasks(taskNum: Int) { + def decreaseRunningTasks(taskNum: Int) { runningTasks -= taskNum if (parent != null) { parent.decreaseRunningTasks(taskNum) @@ -79,7 +79,7 @@ class FakeTaskSetManager( maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = { - if (tasksFinished + runningTasks < numTasks) { + if (tasksSuccessful + runningTasks < numTasks) { increaseRunningTasks(1) return Some(new TaskDescription(0, execId, "task 0:0", 0, null)) } @@ -92,8 +92,8 @@ class FakeTaskSetManager( def taskFinished() { decreaseRunningTasks(1) - tasksFinished +=1 - if (tasksFinished == numTasks) { + tasksSuccessful +=1 + if (tasksSuccessful == numTasks) { parent.removeSchedulable(this) } } @@ -114,7 +114,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging val taskSetQueue = rootPool.getSortedTaskSetQueue() /* Just for Test*/ for (manager <- taskSetQueue) { - logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks)) + logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format( + manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks)) } for (taskSet <- taskSetQueue) { taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match { diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala index ff70a2cdf0..80d0c5a5e9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala @@ -40,6 +40,7 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* val startedTasks = new ArrayBuffer[Long] val endedTasks = new mutable.HashMap[Long, TaskEndReason] val finishedManagers = new ArrayBuffer[TaskSetManager] + val taskSetsFailed = new ArrayBuffer[String] val executors = new mutable.HashMap[String, String] ++ liveExecutors @@ -63,7 +64,9 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* def executorLost(execId: String) {} - def taskSetFailed(taskSet: TaskSet, reason: String) {} + def taskSetFailed(taskSet: TaskSet, reason: String) { + taskSetsFailed += taskSet.id + } } def removeExecutor(execId: String): Unit = executors -= execId @@ -101,7 +104,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo assert(manager.resourceOffer("exec1", "host1", 2, PROCESS_LOCAL) === None) // Tell it the task has finished - manager.statusUpdate(0, TaskState.FINISHED, createTaskResult(0)) + manager.handleSuccessfulTask(0, createTaskResult(0)) assert(sched.endedTasks(0) === Success) assert(sched.finishedManagers.contains(manager)) } @@ -125,14 +128,14 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None) // Finish the first two tasks - manager.statusUpdate(0, TaskState.FINISHED, createTaskResult(0)) - manager.statusUpdate(1, TaskState.FINISHED, createTaskResult(1)) + manager.handleSuccessfulTask(0, createTaskResult(0)) + manager.handleSuccessfulTask(1, createTaskResult(1)) assert(sched.endedTasks(0) === Success) assert(sched.endedTasks(1) === Success) assert(!sched.finishedManagers.contains(manager)) // Finish the last task - manager.statusUpdate(2, TaskState.FINISHED, createTaskResult(2)) + manager.handleSuccessfulTask(2, createTaskResult(2)) assert(sched.endedTasks(2) === Success) assert(sched.finishedManagers.contains(manager)) } @@ -253,6 +256,47 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None) } + test("task result lost") { + sc = new SparkContext("local", "test") + val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val taskSet = createTaskSet(1) + val clock = new FakeClock + val manager = new ClusterTaskSetManager(sched, taskSet, clock) + + assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) + + // Tell it the task has finished but the result was lost. + manager.handleFailedTask(0, TaskState.FINISHED, Some(TaskResultLost)) + assert(sched.endedTasks(0) === TaskResultLost) + + // Re-offer the host -- now we should get task 0 again. + assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) + } + + test("repeated failures lead to task set abortion") { + sc = new SparkContext("local", "test") + val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val taskSet = createTaskSet(1) + val clock = new FakeClock + val manager = new ClusterTaskSetManager(sched, taskSet, clock) + + // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted + // after the last failure. + (0 until manager.MAX_TASK_FAILURES).foreach { index => + val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY) + assert(offerResult != None, + "Expect resource offer on iteration %s to return a task".format(index)) + assert(offerResult.get.index === 0) + manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost)) + if (index < manager.MAX_TASK_FAILURES) { + assert(!sched.taskSetsFailed.contains(taskSet.id)) + } else { + assert(sched.taskSetsFailed.contains(taskSet.id)) + } + } + } + + /** * Utility method to create a TaskSet, potentially setting a particular sequence of preferred * locations for each task (given as varargs) if this sequence is not empty. @@ -267,7 +311,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo new TaskSet(tasks, 0, 0, 0, null) } - def createTaskResult(id: Int): ByteBuffer = { - ByteBuffer.wrap(Utils.serialize(new TaskResult[Int](id, mutable.Map.empty, new TaskMetrics))) + def createTaskResult(id: Int): DirectTaskResult[Int] = { + new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala new file mode 100644 index 0000000000..370a3eb0eb --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import java.nio.ByteBuffer + +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} + +import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv} +import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult} + +/** + * Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter. + * + * Used to test the case where a BlockManager evicts the task result (or dies) before the + * TaskResult is retrieved. + */ +class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler) + extends TaskResultGetter(sparkEnv, scheduler) { + var removedResult = false + + override def enqueueSuccessfulTask( + taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) { + if (!removedResult) { + // Only remove the result once, since we'd like to test the case where the task eventually + // succeeds. + serializer.get().deserialize[TaskResult[_]](serializedData) match { + case IndirectTaskResult(blockId) => + sparkEnv.blockManager.master.removeBlock(blockId) + case directResult: DirectTaskResult[_] => + taskSetManager.abort("Internal error: expect only indirect results") + } + serializedData.rewind() + removedResult = true + } + super.enqueueSuccessfulTask(taskSetManager, tid, serializedData) + } +} + +/** + * Tests related to handling task results (both direct and indirect). + */ +class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll + with LocalSparkContext { + + override def beforeAll { + // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small + // as we can make it) so the tests don't take too long. + System.setProperty("spark.akka.frameSize", "1") + } + + before { + // Use local-cluster mode because results are returned differently when running with the + // LocalScheduler. + sc = new SparkContext("local-cluster[1,1,512]", "test") + } + + override def afterAll { + System.clearProperty("spark.akka.frameSize") + } + + test("handling results smaller than Akka frame size") { + val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x) + assert(result === 2) + } + + test("handling results larger than Akka frame size") { + val akkaFrameSize = + sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.message-frame-size").toInt + val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x) + assert(result === 1.to(akkaFrameSize).toArray) + + val RESULT_BLOCK_ID = "taskresult_0" + assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0, + "Expect result to be removed from the block manager.") + } + + test("task retried if result missing from block manager") { + // If this test hangs, it's probably because no resource offers were made after the task + // failed. + val scheduler: ClusterScheduler = sc.taskScheduler match { + case clusterScheduler: ClusterScheduler => + clusterScheduler + case _ => + assert(false, "Expect local cluster to use ClusterScheduler") + throw new ClassCastException + } + scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler) + val akkaFrameSize = + sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.message-frame-size").toInt + val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x) + assert(result === 1.to(akkaFrameSize).toArray) + + // Make sure two tasks were run (one failed one, and a second retried one). + assert(scheduler.nextTaskId.get() === 2) + } +} + diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index f4e1d4e802..3764f4d1a0 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -25,6 +25,13 @@ import org.eclipse.jetty.server.Server class UISuite extends FunSuite { test("jetty port increases under contention") { val startPort = 4040 + val server = new Server(startPort) + + Try { server.start() } match { + case Success(s) => + case Failure(e) => + // Either case server port is busy hence setup for test complete + } val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq()) val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq()) // Allow some wiggle room in case ports on the machine are under contention diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index e2859caf58..4684c8c972 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.util import com.google.common.base.Charsets import com.google.common.io.Files import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream, File} +import java.nio.{ByteBuffer, ByteOrder} import org.scalatest.FunSuite import org.apache.commons.io.FileUtils import scala.util.Random @@ -135,5 +136,15 @@ class UtilsSuite extends FunSuite { FileUtils.deleteDirectory(tmpDir2) } + + test("deserialize long value") { + val testval : Long = 9730889947L + val bbuf = ByteBuffer.allocate(8) + assert(bbuf.hasArray) + bbuf.order(ByteOrder.BIG_ENDIAN) + bbuf.putLong(testval) + assert(bbuf.array.length === 8) + assert(Utils.deserializeLongValue(bbuf.array) === testval) + } } diff --git a/docs/_config.yml b/docs/_config.yml index ad851673a5..02067f9750 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -3,8 +3,8 @@ markdown: kramdown # These allow the documentation to be updated with nerw releases # of Spark, Scala, and Mesos. -SPARK_VERSION: 0.8.0-SNAPSHOT -SPARK_VERSION_SHORT: 0.8.0 +SPARK_VERSION: 0.9.0-incubating-SNAPSHOT +SPARK_VERSION_SHORT: 0.9.0-SNAPSHOT SCALA_VERSION: 2.10 MESOS_VERSION: 0.13.0 SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index 238ad26de0..0c1d657cde 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -6,7 +6,7 @@ <head> <meta charset="utf-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> - <title>{{ page.title }} - Spark {{site.SPARK_VERSION}} Documentation</title> + <title>{{ page.title }} - Spark {{site.SPARK_VERSION_SHORT}} Documentation</title> <meta name="description" content=""> <link rel="stylesheet" href="css/bootstrap.min.css"> @@ -109,7 +109,7 @@ </ul> </li> </ul> - <!--<p class="navbar-text pull-right"><span class="version-text">v{{site.SPARK_VERSION}}</span></p>--> + <!--<p class="navbar-text pull-right"><span class="version-text">v{{site.SPARK_VERSION_SHORT}}</span></p>--> </div> </div> </div> diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index c611db0af4..30128ec45d 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -50,6 +50,7 @@ The command to launch the YARN Client is as follows: --master-memory <MEMORY_FOR_MASTER> \ --worker-memory <MEMORY_PER_WORKER> \ --worker-cores <CORES_PER_WORKER> \ + --name <application_name> \ --queue <queue_name> For example: diff --git a/ec2/README b/ec2/README index 0add81312c..433da37b4c 100644 --- a/ec2/README +++ b/ec2/README @@ -1,4 +1,4 @@ This folder contains a script, spark-ec2, for launching Spark clusters on Amazon EC2. Usage instructions are available online at: -http://spark-project.org/docs/latest/ec2-scripts.html +http://spark.incubator.apache.org/docs/latest/ec2-scripts.html diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 6b7d202a88..65868b76b9 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -23,6 +23,7 @@ from __future__ import with_statement import logging import os +import pipes import random import shutil import subprocess @@ -36,6 +37,9 @@ import boto from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType from boto import ec2 +class UsageError(Exception): + pass + # A URL prefix from which to fetch AMI information AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list" @@ -66,7 +70,7 @@ def parse_args(): "slaves across multiple (an additional $0.01/Gb for bandwidth" + "between zones applies)") parser.add_option("-a", "--ami", help="Amazon Machine Image ID to use") - parser.add_option("-v", "--spark-version", default="0.7.3", + parser.add_option("-v", "--spark-version", default="0.8.0", help="Version of Spark to use: 'X.Y.Z' or a specific git hash") parser.add_option("--spark-git-repo", default="https://github.com/mesos/spark", @@ -103,11 +107,7 @@ def parse_args(): parser.print_help() sys.exit(1) (action, cluster_name) = args - if opts.identity_file == None and action in ['launch', 'login', 'start']: - print >> stderr, ("ERROR: The -i or --identity-file argument is " + - "required for " + action) - sys.exit(1) - + # Boto config check # http://boto.cloudhackers.com/en/latest/boto_config_tut.html home_dir = os.getenv('HOME') @@ -155,7 +155,7 @@ def is_active(instance): # Return correct versions of Spark and Shark, given the supplied Spark version def get_spark_shark_version(opts): - spark_shark_map = {"0.7.3": "0.7.0"} + spark_shark_map = {"0.7.3": "0.7.1", "0.8.0": "0.8.0"} version = opts.spark_version.replace("v", "") if version not in spark_shark_map: print >> stderr, "Don't know about Spark version: %s" % version @@ -390,10 +390,18 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): master = master_nodes[0].public_dns_name if deploy_ssh_key: - print "Copying SSH key %s to master..." % opts.identity_file - ssh(master, opts, 'mkdir -p ~/.ssh') - scp(master, opts, opts.identity_file, '~/.ssh/id_rsa') - ssh(master, opts, 'chmod 600 ~/.ssh/id_rsa') + print "Generating cluster's SSH key on master..." + key_setup = """ + [ -f ~/.ssh/id_rsa ] || + (ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa && + cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys) + """ + ssh(master, opts, key_setup) + dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) + print "Transferring cluster's SSH key to slaves..." + for slave in slave_nodes: + print slave.public_dns_name + ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar) modules = ['spark', 'shark', 'ephemeral-hdfs', 'persistent-hdfs', 'mapreduce', 'spark-standalone'] @@ -535,18 +543,33 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): dest.write(text) dest.close() # rsync the whole directory over to the master machine - command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " + - "'%s/' '%s@%s:/'") % (opts.identity_file, tmp_dir, opts.user, active_master)) - subprocess.check_call(command, shell=True) + command = [ + 'rsync', '-rv', + '-e', stringify_command(ssh_command(opts)), + "%s/" % tmp_dir, + "%s@%s:/" % (opts.user, active_master) + ] + subprocess.check_call(command) # Remove the temp directory we created above shutil.rmtree(tmp_dir) -# Copy a file to a given host through scp, throwing an exception if scp fails -def scp(host, opts, local_file, dest_file): - subprocess.check_call( - "scp -q -o StrictHostKeyChecking=no -i %s '%s' '%s@%s:%s'" % - (opts.identity_file, local_file, opts.user, host, dest_file), shell=True) +def stringify_command(parts): + if isinstance(parts, str): + return parts + else: + return ' '.join(map(pipes.quote, parts)) + + +def ssh_args(opts): + parts = ['-o', 'StrictHostKeyChecking=no'] + if opts.identity_file is not None: + parts += ['-i', opts.identity_file] + return parts + + +def ssh_command(opts): + return ['ssh'] + ssh_args(opts) # Run a command on a host through ssh, retrying up to two times @@ -556,18 +579,42 @@ def ssh(host, opts, command): while True: try: return subprocess.check_call( - "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" % - (opts.identity_file, opts.user, host, command), shell=True) + ssh_command(opts) + ['-t', '%s@%s' % (opts.user, host), stringify_command(command)]) except subprocess.CalledProcessError as e: if (tries > 2): - raise e - print "Couldn't connect to host {0}, waiting 30 seconds".format(e) + # If this was an ssh failure, provide the user with hints. + if e.returncode == 255: + raise UsageError("Failed to SSH to remote host {0}.\nPlease check that you have provided the correct --identity-file and --key-pair parameters and try again.".format(host)) + else: + raise e + print >> stderr, "Error executing remote command, retrying after 30 seconds: {0}".format(e) time.sleep(30) tries = tries + 1 +def ssh_read(host, opts, command): + return subprocess.check_output( + ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)]) +def ssh_write(host, opts, command, input): + tries = 0 + while True: + proc = subprocess.Popen( + ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)], + stdin=subprocess.PIPE) + proc.stdin.write(input) + proc.stdin.close() + status = proc.wait() + if status == 0: + break + elif (tries > 2): + raise RuntimeError("ssh_write failed with error %s" % proc.returncode) + else: + print >> stderr, "Error {0} while executing remote command, retrying after 30 seconds".format(status) + time.sleep(30) + tries = tries + 1 + # Gets a list of zones to launch instances in def get_zones(conn, opts): @@ -586,7 +633,7 @@ def get_partition(total, num_partitions, current_partitions): return num_slaves_this_zone -def main(): +def real_main(): (opts, action, cluster_name) = parse_args() try: conn = ec2.connect_to_region(opts.region) @@ -669,11 +716,11 @@ def main(): conn, opts, cluster_name) master = master_nodes[0].public_dns_name print "Logging into master " + master + "..." - proxy_opt = "" + proxy_opt = [] if opts.proxy_port != None: - proxy_opt = "-D " + opts.proxy_port - subprocess.check_call("ssh -o StrictHostKeyChecking=no -i %s %s %s@%s" % - (opts.identity_file, proxy_opt, opts.user, master), shell=True) + proxy_opt = ['-D', opts.proxy_port] + subprocess.check_call( + ssh_command(opts) + proxy_opt + ['-t', "%s@%s" % (opts.user, master)]) elif action == "get-master": (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) @@ -715,6 +762,13 @@ def main(): sys.exit(1) +def main(): + try: + real_main() + except UsageError, e: + print >> stderr, "\nError:\n", e + + if __name__ == "__main__": logging.basicConfig() main() diff --git a/examples/pom.xml b/examples/pom.xml index ca06a9ad8d..c6c9def5be 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,38 +21,46 @@ <parent> <groupId>org.apache.spark</groupId> <artifactId>spark-parent</artifactId> - <version>0.8.0-SNAPSHOT</version> + <version>0.9.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-examples</artifactId> + <artifactId>spark-examples_${scala-short.version}</artifactId> <packaging>jar</packaging> <name>Spark Project Examples</name> <url>http://spark.incubator.apache.org/</url> + <repositories> + <!-- A repository in the local filesystem for the Kafka JAR, which we modified for Scala 2.9 --> + <repository> + <id>lib</id> + <url>file://${project.basedir}/lib</url> + </repository> + </repositories> + <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_${scala-short.version}</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming</artifactId> + <artifactId>spark-streaming_${scala-short.version}</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib</artifactId> + <artifactId>spark-mllib_${scala-short.version}</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-bagel</artifactId> + <artifactId>spark-bagel_${scala-short.version}</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> @@ -72,6 +80,12 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka</artifactId> + <version>0.7.2-spark</version> <!-- Comes from our in-project repository --> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> </dependency> @@ -161,7 +175,7 @@ </goals> <configuration> <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> diff --git a/make-distribution.sh b/make-distribution.sh index bffb19843c..32bbdb90a5 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -95,7 +95,7 @@ cp $FWDIR/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/jars/" # Copy other things mkdir "$DISTDIR"/conf -cp "$FWDIR/conf/*.template" "$DISTDIR"/conf +cp "$FWDIR"/conf/*.template "$DISTDIR"/conf cp -r "$FWDIR/bin" "$DISTDIR" cp -r "$FWDIR/python" "$DISTDIR" cp "$FWDIR/spark-class" "$DISTDIR" diff --git a/mllib/pom.xml b/mllib/pom.xml index f4190148b1..a57bddeff3 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -21,12 +21,12 @@ <parent> <groupId>org.apache.spark</groupId> <artifactId>spark-parent</artifactId> - <version>0.8.0-SNAPSHOT</version> + <version>0.9.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib</artifactId> + <artifactId>spark-mllib_${scala-short.version}</artifactId> <packaging>jar</packaging> <name>Spark Project ML Library</name> <url>http://spark.incubator.apache.org/</url> @@ -34,7 +34,7 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_${scala-short.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -48,12 +48,12 @@ </dependency> <dependency> <groupId>org.scalatest</groupId> - <artifactId>scalatest_2.10</artifactId> + <artifactId>scalatest_${scala-short.version}</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_2.10</artifactId> + <artifactId>scalacheck_${scala-short.version}</artifactId> <scope>test</scope> </dependency> <dependency> @@ -25,7 +25,7 @@ </parent> <groupId>org.apache.spark</groupId> <artifactId>spark-parent</artifactId> - <version>0.8.0-SNAPSHOT</version> + <version>0.9.0-incubating-SNAPSHOT</version> <packaging>pom</packaging> <name>Spark Project Parent POM</name> <url>http://spark.incubator.apache.org/</url> @@ -40,6 +40,7 @@ <connection>scm:git:git@github.com:apache/incubator-spark.git</connection> <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-spark.git</developerConnection> <url>scm:git:git@github.com:apache/incubator-spark.git</url> + <tag>HEAD</tag> </scm> <developers> <developer> @@ -558,7 +559,6 @@ <useZincServer>true</useZincServer> <args> <arg>-unchecked</arg> - <arg>-optimise</arg> <arg>-deprecation</arg> </args> <jvmArgs> @@ -605,7 +605,7 @@ <junitxml>.</junitxml> <filereports>${project.build.directory}/SparkTestSuite.txt</filereports> <argLine>-Xms64m -Xmx3g</argLine> - <stderr/> + <stderr /> </configuration> <executions> <execution> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5d4250a53b..a734558142 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -79,9 +79,10 @@ object SparkBuild extends Build { def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.apache.spark", - version := "0.8.0-SNAPSHOT", + version := "0.9.0-incubating-SNAPSHOT", scalaVersion := "2.10.3", - scalacOptions := Seq("-unchecked", "-optimize", "-deprecation", "-target:" + SCALAC_JVM_VERSION), + scalacOptions := Seq("-Xmax-classfile-name", "120", "-unchecked", "-deprecation", + "-target:" + SCALAC_JVM_VERSION), javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION), unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, retrieveManaged := true, @@ -96,6 +97,9 @@ object SparkBuild extends Build { // Only allow one test at a time, even across projects, since they run in the same JVM concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), + // also check the local Maven repository ~/.m2 + resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))), + // For Sonatype publishing resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", "sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"), @@ -149,6 +153,7 @@ object SparkBuild extends Build { */ + libraryDependencies ++= Seq( "io.netty" % "netty-all" % "4.0.0.CR1", "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", @@ -175,6 +180,7 @@ object SparkBuild extends Build { val slf4jVersion = "1.7.2" + val excludeCglib = ExclusionRule(organization = "org.sonatype.sisu.inject") val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson") val excludeNetty = ExclusionRule(organization = "org.jboss.netty") val excludeAsm = ExclusionRule(organization = "asm") @@ -198,7 +204,6 @@ object SparkBuild extends Build { "commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407 "org.ow2.asm" % "asm" % "4.0", "com.google.protobuf" % "protobuf-java" % "2.4.1", - "de.javakaffee" % "kryo-serializers" % "0.22", "com.typesafe.akka" %% "akka-remote" % "2.2.1" excludeAll(excludeNetty), "com.typesafe.akka" %% "akka-slf4j" % "2.2.1" excludeAll(excludeNetty), "net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty), @@ -216,7 +221,7 @@ object SparkBuild extends Build { "com.codahale.metrics" % "metrics-ganglia" % "3.0.0", "com.twitter" %% "chill" % "0.3.1", "com.twitter" % "chill-java" % "0.3.1" - ) + ) ) def rootSettings = sharedSettings ++ Seq( @@ -246,6 +251,7 @@ object SparkBuild extends Build { exclude("log4j","log4j") exclude("org.apache.cassandra.deps", "avro") excludeAll(excludeSnappy) + excludeAll(excludeCglib) ) ) ++ assemblySettings ++ extraAssemblySettings @@ -285,10 +291,10 @@ object SparkBuild extends Build { def yarnEnabledSettings = Seq( libraryDependencies ++= Seq( // Exclude rule required for all ? - "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm) + "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib) ) ) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 7611b13e82..33dc865256 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -29,7 +29,7 @@ from threading import Thread from pyspark import cloudpickle from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \ - read_from_pickle_file + read_from_pickle_file, pack_long from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_cogroup from pyspark.statcounter import StatCounter @@ -690,11 +690,13 @@ class RDD(object): # form the hash buckets in Python, transferring O(numPartitions) objects # to Java. Each object is a (splitNumber, [objects]) pair. def add_shuffle_key(split, iterator): + buckets = defaultdict(list) + for (k, v) in iterator: buckets[partitionFunc(k) % numPartitions].append((k, v)) for (split, items) in buckets.iteritems(): - yield str(split) + yield pack_long(split) yield dump_pickle(Batch(items)) keyed = PipelinedRDD(self, add_shuffle_key) keyed._bypass_serializer = True @@ -831,8 +833,8 @@ class RDD(object): >>> sorted(x.subtractByKey(y).collect()) [('b', 4), ('b', 5)] """ - filter_func = lambda tpl: len(tpl[1][0]) > 0 and len(tpl[1][1]) == 0 - map_func = lambda tpl: [(tpl[0], val) for val in tpl[1][0]] + filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0 + map_func = lambda (key, vals): [(key, val) for val in vals[0]] return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func) def subtract(self, other, numPartitions=None): diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index fecacd1241..54fed1c9c7 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -67,6 +67,10 @@ def write_long(value, stream): stream.write(struct.pack("!q", value)) +def pack_long(value): + return struct.pack("!q", value) + + def read_int(stream): length = stream.read(4) if length == "": diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index dc205b306f..a475959090 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -35,7 +35,7 @@ print """Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ - /__ / .__/\_,_/_/ /_/\_\ version 0.8.0 + /__ / .__/\_,_/_/ /_/\_\ version 0.9.0-SNAPSHOT /_/ """ print "Using Python version %s (%s, %s)" % ( diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index 3685561501..c983ea5dfb 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -21,12 +21,12 @@ <parent> <groupId>org.apache.spark</groupId> <artifactId>spark-parent</artifactId> - <version>0.8.0-SNAPSHOT</version> + <version>0.9.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-repl-bin</artifactId> + <artifactId>spark-repl-bin_${scala-short.version}</artifactId> <packaging>pom</packaging> <name>Spark Project REPL binary packaging</name> <url>http://spark.incubator.apache.org/</url> @@ -40,18 +40,18 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_${scala-short.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-bagel</artifactId> + <artifactId>spark-bagel_${scala-short.version}</artifactId> <version>${project.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-repl</artifactId> + <artifactId>spark-repl_${scala-short.version}</artifactId> <version>${project.version}</version> <scope>runtime</scope> </dependency> @@ -89,7 +89,7 @@ </goals> <configuration> <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> diff --git a/repl/pom.xml b/repl/pom.xml index a7b5e1f3c7..ff66493229 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -21,12 +21,12 @@ <parent> <groupId>org.apache.spark</groupId> <artifactId>spark-parent</artifactId> - <version>0.8.0-SNAPSHOT</version> + <version>0.9.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-repl</artifactId> + <artifactId>spark-repl_${scala-short.version}</artifactId> <packaging>jar</packaging> <name>Spark Project REPL</name> <url>http://spark.incubator.apache.org/</url> @@ -39,18 +39,18 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_${scala-short.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-bagel</artifactId> + <artifactId>spark-bagel_${scala-short.version}</artifactId> <version>${project.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib</artifactId> + <artifactId>spark-mllib_${scala-short.version}</artifactId> <version>${project.version}</version> <scope>runtime</scope> </dependency> @@ -103,14 +103,14 @@ <configuration> <exportAntProperties>true</exportAntProperties> <tasks> - <property name="spark.classpath" refid="maven.test.classpath"/> - <property environment="env"/> + <property name="spark.classpath" refid="maven.test.classpath" /> + <property environment="env" /> <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry."> <condition> <not> <or> - <isset property="env.SCALA_HOME"/> - <isset property="env.SCALA_LIBRARY_PATH"/> + <isset property="env.SCALA_HOME" /> + <isset property="env.SCALA_LIBRARY_PATH" /> </or> </not> </condition> diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala index 31596cc02f..21b1ba305d 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala @@ -24,7 +24,7 @@ trait SparkILoopInit { ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ - /___/ .__/\_,_/_/ /_/\_\ version 0.8.0 + /___/ .__/\_,_/_/ /_/\_\ version 0.9.0-SNAPSHOT /_/ """) import Properties._ diff --git a/streaming/pom.xml b/streaming/pom.xml index 8367256004..3f2033f34a 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,12 +21,12 @@ <parent> <groupId>org.apache.spark</groupId> <artifactId>spark-parent</artifactId> - <version>0.8.0-SNAPSHOT</version> + <version>0.9.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming</artifactId> + <artifactId>spark-streaming_${scala-short.version}</artifactId> <packaging>jar</packaging> <name>Spark Project Streaming</name> <url>http://spark.incubator.apache.org/</url> @@ -42,7 +42,7 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_${scala-short.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -58,6 +58,7 @@ <groupId>org.apache.kafka</groupId> <artifactId>kafka</artifactId> <version>0.7.2-spark</version> <!-- Comes from our in-project repository --> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flume</groupId> diff --git a/tools/pom.xml b/tools/pom.xml index 0933c75a7f..db87b54dec 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -20,12 +20,12 @@ <parent> <groupId>org.apache.spark</groupId> <artifactId>spark-parent</artifactId> - <version>0.8.0-SNAPSHOT</version> + <version>0.9.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-tools</artifactId> + <artifactId>spark-tools_${scala-short.version}</artifactId> <packaging>jar</packaging> <name>Spark Project Tools</name> <url>http://spark.incubator.apache.org/</url> @@ -33,12 +33,12 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_${scala-short.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming</artifactId> + <artifactId>spark-streaming_${scala-short.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> diff --git a/yarn/pom.xml b/yarn/pom.xml index 47e27ee41c..7770cbb0cc 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -20,12 +20,12 @@ <parent> <groupId>org.apache.spark</groupId> <artifactId>spark-parent</artifactId> - <version>0.8.0-SNAPSHOT</version> + <version>0.9.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn</artifactId> + <artifactId>spark-yarn_${scala-short.version}</artifactId> <packaging>jar</packaging> <name>Spark Project YARN Support</name> <url>http://spark.incubator.apache.org/</url> @@ -33,7 +33,7 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> + <artifactId>spark-core_${scala-short.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -97,7 +97,7 @@ </goals> <configuration> <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 844c707834..076dd3c9b0 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -106,7 +106,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl logInfo("Setting up application submission context for ASM") val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) appContext.setApplicationId(appId) - appContext.setApplicationName("Spark") + appContext.setApplicationName(args.appName) return appContext } @@ -224,8 +224,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // Add Xmx for am memory JAVA_OPTS += "-Xmx" + amMemory + "m " - JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), - YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + JAVA_OPTS += " -Djava.io.tmpdir=" + + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. @@ -241,6 +241,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 " JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 " } + if (env.isDefinedAt("SPARK_JAVA_OPTS")) { JAVA_OPTS += env("SPARK_JAVA_OPTS") + " " } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index cd651904d2..c56dbd99ba 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -32,6 +32,7 @@ class ClientArguments(val args: Array[String]) { var numWorkers = 2 var amQueue = System.getProperty("QUEUE", "default") var amMemory: Int = 512 + var appName: String = "Spark" // TODO var inputFormatInfo: List[InputFormatInfo] = null @@ -78,6 +79,10 @@ class ClientArguments(val args: Array[String]) { amQueue = value args = tail + case ("--name") :: value :: tail => + appName = value + args = tail + case Nil => if (userJar == null || userClass == null) { printUsageAndExit(1) @@ -108,6 +113,7 @@ class ClientArguments(val args: Array[String]) { " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" + " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + + " --name NAME The name of your application (Default: Spark)\n" + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')" ) System.exit(exitCode) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 6229167cb4..a60e8a3007 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -77,8 +77,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S JAVA_OPTS += env("SPARK_JAVA_OPTS") + " " } - JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), - YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + JAVA_OPTS += " -Djava.io.tmpdir=" + + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " + // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same |