diff options
Diffstat (limited to 'core/src/main/scala/org')
48 files changed, 632 insertions, 270 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 68b99ca125..4cf7eb96da 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -26,28 +26,29 @@ import org.apache.spark.rdd.RDD sure a node doesn't load two copies of an RDD at once. */ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { - private val loading = new HashSet[String] + + /** Keys of RDD splits that are being computed/loaded. */ + private val loading = new HashSet[String]() /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */ def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel) : Iterator[T] = { val key = "rdd_%d_%d".format(rdd.id, split.index) - logInfo("Cache key is " + key) + logDebug("Looking for partition " + key) blockManager.get(key) match { - case Some(cachedValues) => - // Partition is in cache, so just return its values - logInfo("Found partition in cache!") - return cachedValues.asInstanceOf[Iterator[T]] + case Some(values) => + // Partition is already materialized, so just return its values + return values.asInstanceOf[Iterator[T]] case None => // Mark the split as loading (unless someone else marks it first) loading.synchronized { if (loading.contains(key)) { - logInfo("Loading contains " + key + ", waiting...") + logInfo("Another thread is loading %s, waiting for it to finish...".format(key)) while (loading.contains(key)) { try {loading.wait()} catch {case _ : Throwable =>} } - logInfo("Loading no longer contains " + key + ", so returning cached result") + logInfo("Finished waiting for %s".format(key)) // See whether someone else has successfully loaded it. The main way this would fail // is for the RDD-level cache eviction policy if someone else has loaded the same RDD // partition but we didn't want to make space for it. However, that case is unlikely @@ -57,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { case Some(values) => return values.asInstanceOf[Iterator[T]] case None => - logInfo("Whoever was loading " + key + " failed; we'll try it ourselves") + logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key)) loading.add(key) } } else { @@ -66,7 +67,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } try { // If we got here, we have to load the split - logInfo("Computing partition " + split) + logInfo("Partition %s not found, computing it".format(key)) val computedValues = rdd.computeOrReadCheckpoint(split, context) // Persist the result, so long as the task is not running locally if (context.runningLocally) { return computedValues } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ef97fa85fa..efcc92e8e7 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -53,14 +53,15 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.mesos.MesosNativeLibrary +import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.LocalSparkCluster import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, - ClusterScheduler, Schedulable, SchedulingMode} + ClusterScheduler} import org.apache.spark.scheduler.local.LocalScheduler -import org.apache.spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} +import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.storage.{StorageUtils, BlockManagerSource} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap} @@ -85,9 +86,11 @@ class SparkContext( val sparkHome: String = null, val jars: Seq[String] = Nil, val environment: Map[String, String] = Map(), - // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too. - // This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host - val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map()) + // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) + // too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set + // of data-local splits on host + val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = + scala.collection.immutable.Map()) extends Logging { // Ensure logging is initialized before we spawn any threads @@ -147,7 +150,7 @@ class SparkContext( } // Create and start the scheduler - private var taskScheduler: TaskScheduler = { + private[spark] var taskScheduler: TaskScheduler = { // Regular expression used for local[N] master format val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r // Regular expression for local[N, maxRetries], used in tests with failing tasks @@ -240,7 +243,8 @@ class SparkContext( val env = SparkEnv.get val conf = env.hadoop.newConfiguration() // Explicitly check for S3 environment variables - if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) { + if (System.getenv("AWS_ACCESS_KEY_ID") != null && + System.getenv("AWS_SECRET_ACCESS_KEY") != null) { conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) @@ -339,6 +343,8 @@ class SparkContext( valueClass: Class[V], minSplits: Int = defaultMinSplits ): RDD[(K, V)] = { + // Add necessary security credentials to the JobConf before broadcasting it. + SparkEnv.get.hadoop.addCredentials(conf) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } @@ -349,10 +355,27 @@ class SparkContext( keyClass: Class[K], valueClass: Class[V], minSplits: Int = defaultMinSplits - ) : RDD[(K, V)] = { - val conf = new JobConf(hadoopConfiguration) - FileInputFormat.setInputPaths(conf, path) - new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) + ): RDD[(K, V)] = { + // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. + val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) + hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) + } + + /** + * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration + * that has already been broadcast, assuming that it's safe to use it to construct a + * HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued). + */ + def hadoopFile[K, V]( + path: String, + confBroadcast: Broadcast[SerializableWritable[Configuration]], + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int + ): RDD[(K, V)] = { + new HadoopFileRDD( + this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) } /** diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 03bf268863..8466c2a004 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -46,6 +46,10 @@ private[spark] case class ExceptionFailure( metrics: Option[TaskMetrics]) extends TaskEndReason -private[spark] case class OtherFailure(message: String) extends TaskEndReason +/** + * The task finished successfully, but the result was lost from the executor's block manager before + * it was fetched. + */ +private[spark] case object TaskResultLost extends TaskEndReason -private[spark] case class TaskResultTooBigFailure() extends TaskEndReason +private[spark] case class OtherFailure(message: String) extends TaskEndReason diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index feb2cab578..d7b45d4caa 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -69,6 +69,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { new JavaRDD(rdd.map(f)(f.returnType()))(f.returnType()) /** + * Return a new RDD by applying a function to each partition of this RDD, while tracking the index + * of the original partition. + */ + def mapPartitionsWithIndex[R: ClassManifest]( + f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]], + preservesPartitioning: Boolean = false): JavaRDD[R] = + new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))), + preservesPartitioning)) + + /** * Return a new RDD by applying a function to all elements of this RDD. */ def map[R](f: DoubleFunction[T]): JavaDoubleRDD = diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala index b090c6edf3..2be4e323be 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala @@ -17,12 +17,13 @@ package org.apache.spark.api.python -import org.apache.spark.Partitioner import java.util.Arrays + +import org.apache.spark.Partitioner import org.apache.spark.util.Utils /** - * A [[org.apache.spark.Partitioner]] that performs handling of byte arrays, for use by the Python API. + * A [[org.apache.spark.Partitioner]] that performs handling of long-valued keys, for use by the Python API. * * Stores the unique id() of the Python-side partitioning function so that it is incorporated into * equality comparisons. Correctness requires that the id is a unique identifier for the @@ -30,6 +31,7 @@ import org.apache.spark.util.Utils * function). This can be ensured by using the Python id() function and maintaining a reference * to the Python partitioning function so that its id() is not reused. */ + private[spark] class PythonPartitioner( override val numPartitions: Int, val pyPartitionFunctionId: Long) @@ -37,7 +39,9 @@ private[spark] class PythonPartitioner( override def getPartition(key: Any): Int = key match { case null => 0 - case key: Array[Byte] => Utils.nonNegativeMod(Arrays.hashCode(key), numPartitions) + // we don't trust the Python partition function to return valid partition ID's so + // let's do a modulo numPartitions in any case + case key: Long => Utils.nonNegativeMod(key.toInt, numPartitions) case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index cb2db77f39..4d887cf195 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -187,14 +187,14 @@ private class PythonException(msg: String) extends Exception(msg) * This is used by PySpark's shuffle operations. */ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends - RDD[(Array[Byte], Array[Byte])](prev) { + RDD[(Long, Array[Byte])](prev) { override def getPartitions = prev.partitions override def compute(split: Partition, context: TaskContext) = prev.iterator(split, context).grouped(2).map { - case Seq(a, b) => (a, b) + case Seq(a, b) => (Utils.deserializeLongValue(a), b) case x => throw new SparkException("PairwiseRDD: unexpected value: " + x) } - val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this) + val asJavaPairRDD : JavaPairRDD[Long, Array[Byte]] = JavaPairRDD.fromRDD(this) } private[spark] object PythonRDD { diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index 87a703427c..04d01c169d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -41,6 +41,7 @@ private[spark] object JsonProtocol { ("starttime" -> obj.startTime) ~ ("id" -> obj.id) ~ ("name" -> obj.desc.name) ~ + ("appuiurl" -> obj.appUiUrl) ~ ("cores" -> obj.desc.maxCores) ~ ("user" -> obj.desc.user) ~ ("memoryperslave" -> obj.desc.memoryPerSlave) ~ @@ -64,7 +65,7 @@ private[spark] object JsonProtocol { } def writeMasterState(obj: MasterStateResponse) = { - ("url" -> ("spark://" + obj.uri)) ~ + ("url" -> obj.uri) ~ ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~ ("cores" -> obj.workers.map(_.cores).sum) ~ ("coresused" -> obj.workers.map(_.coresUsed).sum) ~ diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 0a5f4c368f..993ba6bd3d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -16,6 +16,9 @@ */ package org.apache.spark.deploy + +import com.google.common.collect.MapMaker + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf @@ -24,11 +27,16 @@ import org.apache.hadoop.mapred.JobConf * Contains util methods to interact with Hadoop from spark. */ class SparkHadoopUtil { + // A general, soft-reference map for metadata needed during HadoopRDD split computation + // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). + private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() - // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems + // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop + // subsystems def newConfiguration(): Configuration = new Configuration() - // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster + // Add any user credentials to the job conf which are necessary for running on a secure Hadoop + // cluster def addCredentials(conf: JobConf) {} def isYarnMode(): Boolean = { false } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 99a4a95e82..b4153f3533 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import java.io.{File} +import java.io.File import java.lang.management.ManagementFactory import java.nio.ByteBuffer import java.util.concurrent._ @@ -27,11 +27,11 @@ import scala.collection.mutable.HashMap import org.apache.spark.scheduler._ import org.apache.spark._ +import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils - /** - * The Mesos executor for Spark. + * Spark executor used with Mesos and the standalone scheduler. */ private[spark] class Executor( executorId: String, @@ -167,12 +167,20 @@ private[spark] class Executor( // we need to serialize the task metrics first. If TaskMetrics had a custom serialized format, we could // just change the relevants bytes in the byte buffer val accumUpdates = Accumulators.values - val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null)) - val serializedResult = ser.serialize(result) - logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit) - if (serializedResult.limit >= (akkaFrameSize - 1024)) { - context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskResultTooBigFailure())) - return + val directResult = new DirectTaskResult(value, accumUpdates, task.metrics.getOrElse(null)) + val serializedDirectResult = ser.serialize(directResult) + logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit) + val serializedResult = { + if (serializedDirectResult.limit >= akkaFrameSize - 1024) { + logInfo("Storing result for " + taskId + " in local BlockManager") + val blockId = "taskresult_" + taskId + env.blockManager.putBytes( + blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) + ser.serialize(new IndirectTaskResult[Any](blockId)) + } else { + logInfo("Sending result for " + taskId + " directly to driver") + serializedDirectResult + } } context.statusUpdate(taskId, TaskState.FINISHED, serializedResult) logInfo("Finished task ID " + taskId) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 2cb6734e41..d3b3fffd40 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -19,6 +19,7 @@ package org.apache.spark.rdd import java.io.EOFException +import org.apache.hadoop.mapred.FileInputFormat import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.InputSplit import org.apache.hadoop.mapred.JobConf @@ -26,10 +27,47 @@ import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils -import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, + TaskContext} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.util.NextIterator import org.apache.hadoop.conf.{Configuration, Configurable} +/** + * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file + * system, or S3). + * This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same + * across multiple reads; the 'path' is the only variable that is different across new JobConfs + * created from the Configuration. + */ +class HadoopFileRDD[K, V]( + sc: SparkContext, + path: String, + broadcastedConf: Broadcast[SerializableWritable[Configuration]], + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int) + extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) { + + override def getJobConf(): JobConf = { + if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { + // getJobConf() has been called previously, so there is already a local cache of the JobConf + // needed by this RDD. + return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] + } else { + // Create a new JobConf, set the input file/directory paths to read from, and cache the + // JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through + // HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple + // getJobConf() calls for this RDD in the local process. + // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. + val newJobConf = new JobConf(broadcastedConf.value.value) + FileInputFormat.setInputPaths(newJobConf, path) + HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) + return newJobConf + } + } +} /** * A Spark split class that wraps around a Hadoop InputSplit. @@ -45,29 +83,80 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp } /** - * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file - * system, or S3, tables in HBase, etc). + * A base class that provides core functionality for reading data partitions stored in Hadoop. */ class HadoopRDD[K, V]( sc: SparkContext, - @transient conf: JobConf, + broadcastedConf: Broadcast[SerializableWritable[Configuration]], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minSplits: Int) extends RDD[(K, V)](sc, Nil) with Logging { - // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it - private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) + def this( + sc: SparkContext, + conf: JobConf, + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int) = { + this( + sc, + sc.broadcast(new SerializableWritable(conf)) + .asInstanceOf[Broadcast[SerializableWritable[Configuration]]], + inputFormatClass, + keyClass, + valueClass, + minSplits) + } + + protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) + + protected val inputFormatCacheKey = "rdd_%d_input_format".format(id) + + // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. + protected def getJobConf(): JobConf = { + val conf: Configuration = broadcastedConf.value.value + if (conf.isInstanceOf[JobConf]) { + // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. + return conf.asInstanceOf[JobConf] + } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { + // getJobConf() has been called previously, so there is already a local cache of the JobConf + // needed by this RDD. + return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] + } else { + // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the + // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). + // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. + val newJobConf = new JobConf(broadcastedConf.value.value) + HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) + return newJobConf + } + } + + protected def getInputFormat(conf: JobConf): InputFormat[K, V] = { + if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) { + return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]] + } + // Once an InputFormat for this RDD is created, cache it so that only one reflection call is + // done in each local process. + val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) + .asInstanceOf[InputFormat[K, V]] + if (newInputFormat.isInstanceOf[Configurable]) { + newInputFormat.asInstanceOf[Configurable].setConf(conf) + } + HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat) + return newInputFormat + } override def getPartitions: Array[Partition] = { - val env = SparkEnv.get - env.hadoop.addCredentials(conf) - val inputFormat = createInputFormat(conf) + val jobConf = getJobConf() + val inputFormat = getInputFormat(jobConf) if (inputFormat.isInstanceOf[Configurable]) { - inputFormat.asInstanceOf[Configurable].setConf(conf) + inputFormat.asInstanceOf[Configurable].setConf(jobConf) } - val inputSplits = inputFormat.getSplits(conf, minSplits) + val inputSplits = inputFormat.getSplits(jobConf, minSplits) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) @@ -75,22 +164,14 @@ class HadoopRDD[K, V]( array } - def createInputFormat(conf: JobConf): InputFormat[K, V] = { - ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) - .asInstanceOf[InputFormat[K, V]] - } - override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] { val split = theSplit.asInstanceOf[HadoopPartition] logInfo("Input split: " + split.inputSplit) var reader: RecordReader[K, V] = null - val conf = confBroadcast.value.value - val fmt = createInputFormat(conf) - if (fmt.isInstanceOf[Configurable]) { - fmt.asInstanceOf[Configurable].setConf(conf) - } - reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) + val jobConf = getJobConf() + val inputFormat = getInputFormat(jobConf) + reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback{ () => closeIfNeeded() } @@ -127,5 +208,18 @@ class HadoopRDD[K, V]( // Do nothing. Hadoop RDD should not be checkpointed. } - def getConf: Configuration = confBroadcast.value.value + def getConf: Configuration = getJobConf() +} + +private[spark] object HadoopRDD { + /** + * The three methods below are helpers for accessing the local map, a property of the SparkEnv of + * the local process. + */ + def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key) + + def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key) + + def putCachedMetadata(key: String, value: Any) = + SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index ea4eeb7dbf..731ef90c90 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -756,24 +756,42 @@ abstract class RDD[T: ClassTag]( } /** - * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so - * it will be slow if a lot of partitions are required. In that case, use collect() to get the - * whole RDD instead. + * Take the first num elements of the RDD. It works by first scanning one partition, and use the + * results from that partition to estimate the number of additional partitions needed to satisfy + * the limit. */ def take(num: Int): Array[T] = { if (num == 0) { return new Array[T](0) } + val buf = new ArrayBuffer[T] - var p = 0 - while (buf.size < num && p < partitions.size) { + val totalParts = this.partitions.length + var partsScanned = 0 + while (buf.size < num && partsScanned < totalParts) { + // The number of partitions to try in this iteration. It is ok for this number to be + // greater than totalParts because we actually cap it at totalParts in runJob. + var numPartsToTry = 1 + if (partsScanned > 0) { + // If we didn't find any rows after the first iteration, just try all partitions next. + // Otherwise, interpolate the number of partitions we need to try, but overestimate it + // by 50%. + if (buf.size == 0) { + numPartsToTry = totalParts - 1 + } else { + numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt + } + } + numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions + val left = num - buf.size - val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true) - buf ++= res(0) - if (buf.size == num) - return buf.toArray - p += 1 + val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) + val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true) + + res.foreach(buf ++= _.take(num - buf.size)) + partsScanned += numPartsToTry } + return buf.toArray } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dca84db597..e79b67579e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -29,7 +29,6 @@ import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} -import org.apache.spark.scheduler.cluster.TaskInfo import org.apache.spark.storage.{BlockManager, BlockManagerMaster} import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap} @@ -554,7 +553,7 @@ class DAGScheduler( SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head) } catch { case e: NotSerializableException => - abortStage(stage, e.toString) + abortStage(stage, "Task not serializable: " + e.toString) running -= stage return } @@ -706,6 +705,9 @@ class DAGScheduler( case ExceptionFailure(className, description, stackTrace, metrics) => // Do nothing here, left up to the TaskScheduler to decide how to handle user failures + case TaskResultLost => + // Do nothing here; the TaskScheduler handles these failures and resubmits the task. + case other => // Unrecognized failure - abort all jobs depending on this stage abortStage(stageIdToStage(task.stageId), task + " failed: " + other) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 0d99670648..10ff1b4376 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -19,7 +19,6 @@ package org.apache.spark.scheduler import java.util.Properties -import org.apache.spark.scheduler.cluster.TaskInfo import scala.collection.mutable.Map import org.apache.spark._ diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index c8b78bf00a..3628b1b078 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -30,7 +30,6 @@ import scala.io.Source import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.scheduler.cluster.TaskInfo
// Used to record runtime information for each job, including RDD graph
// tasks' start/stop shuffle information and information from outside
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 35b32600da..9eb8d48501 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import org.apache.spark.Logging -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode /** * An Schedulable entity that represent collection of Pools or TaskSetManagers @@ -45,7 +45,7 @@ private[spark] class Pool( var priority = 0 var stageId = 0 var name = poolName - var parent:Schedulable = null + var parent: Pool = null var taskSetSchedulingAlgorithm: SchedulingAlgorithm = { schedulingMode match { @@ -101,14 +101,14 @@ private[spark] class Pool( return sortedTaskSetQueue } - override def increaseRunningTasks(taskNum: Int) { + def increaseRunningTasks(taskNum: Int) { runningTasks += taskNum if (parent != null) { parent.increaseRunningTasks(taskNum) } } - override def decreaseRunningTasks(taskNum: Int) { + def decreaseRunningTasks(taskNum: Int) { runningTasks -= taskNum if (parent != null) { parent.decreaseRunningTasks(taskNum) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala index f4726450ec..1c7ea2dccc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala @@ -15,9 +15,9 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import scala.collection.mutable.ArrayBuffer /** @@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer * there are two type of Schedulable entities(Pools and TaskSetManagers) */ private[spark] trait Schedulable { - var parent: Schedulable + var parent: Pool // child queues def schedulableQueue: ArrayBuffer[Schedulable] def schedulingMode: SchedulingMode @@ -36,8 +36,6 @@ private[spark] trait Schedulable { def stageId: Int def name: String - def increaseRunningTasks(taskNum: Int): Unit - def decreaseRunningTasks(taskNum: Int): Unit def addSchedulable(schedulable: Schedulable): Unit def removeSchedulable(schedulable: Schedulable): Unit def getSchedulableByName(name: String): Schedulable diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 114617c51a..4e25086ec9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import java.io.{FileInputStream, InputStream} import java.util.{NoSuchElementException, Properties} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala index cbeed4731a..3418640b8c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler /** * An interface for sort algorithm diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala index 16013b3208..3832ee7ff6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler /** * "FAIR" and "FIFO" determines which policy is used diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index c3cf4b8907..62b521ad45 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -18,7 +18,6 @@ package org.apache.spark.scheduler import java.util.Properties -import org.apache.spark.scheduler.cluster.TaskInfo import org.apache.spark.util.{Utils, Distribution} import org.apache.spark.{Logging, SparkContext, TaskEndReason} import org.apache.spark.executor.TaskMetrics diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 72cb1c9ce8..b6f11969e5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -17,8 +17,8 @@ package org.apache.spark.scheduler -import org.apache.spark.scheduler.cluster.TaskInfo import scala.collection._ + import org.apache.spark.executor.TaskMetrics case class StageInfo( diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 309ac2f6c9..5190d234d4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import java.nio.ByteBuffer import org.apache.spark.util.SerializableBuffer diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 9685fb1a67..7c2a422aff 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala index 8d8d708612..d31a5d5177 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler private[spark] object TaskLocality diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index 5c7e5bb977..db3954a9d3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -26,12 +26,17 @@ import java.nio.ByteBuffer import org.apache.spark.util.Utils // Task result. Also contains updates to accumulator variables. -// TODO: Use of distributed cache to return result is a hack to get around -// what seems to be a bug with messages over 60KB in libprocess; fix it +private[spark] sealed trait TaskResult[T] + +/** A reference to a DirectTaskResult that has been stored in the worker's BlockManager. */ +private[spark] +case class IndirectTaskResult[T](val blockId: String) extends TaskResult[T] with Serializable + +/** A TaskResult that contains the task's return value and accumulator updates. */ private[spark] -class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics) - extends Externalizable -{ +class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics) + extends TaskResult[T] with Externalizable { + def this() = this(null.asInstanceOf[T], null, null) override def writeExternal(out: ObjectOutput) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 63be8ba3f5..7c2a9f03d7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -17,10 +17,11 @@ package org.apache.spark.scheduler -import org.apache.spark.scheduler.cluster.Pool -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode + /** * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler. + * Each TaskScheduler schedulers task for a single SparkContext. * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage, * and are responsible for sending the tasks to the cluster, running them, retrying if there * are failures, and mitigating stragglers. They return events to the DAGScheduler through diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala index 83be051c1a..593fa9fb93 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala @@ -17,7 +17,6 @@ package org.apache.spark.scheduler -import org.apache.spark.scheduler.cluster.TaskInfo import scala.collection.mutable.Map import org.apache.spark.TaskEndReason diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 648a3ef922..90f6bcefac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -15,12 +15,11 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import java.nio.ByteBuffer import org.apache.spark.TaskState.TaskState -import org.apache.spark.scheduler.TaskSet /** * Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of @@ -45,7 +44,5 @@ private[spark] trait TaskSetManager extends Schedulable { maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] - def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) - def error(message: String) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 919acce828..1a844b7e7e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -18,6 +18,9 @@ package org.apache.spark.scheduler.cluster import java.lang.{Boolean => JBoolean} +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicLong +import java.util.{TimerTask, Timer} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap @@ -26,10 +29,7 @@ import scala.collection.mutable.HashSet import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode -import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicLong -import java.util.{TimerTask, Timer} +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode /** * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call @@ -55,7 +55,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // Threshold above which we warn user initial TaskSet may be starved val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong - val activeTaskSets = new HashMap[String, TaskSetManager] + // ClusterTaskSetManagers are not thread safe, so any access to one should be synchronized + // on this class. + val activeTaskSets = new HashMap[String, ClusterTaskSetManager] val taskIdToTaskSetId = new HashMap[Long, String] val taskIdToExecutorId = new HashMap[Long, String] @@ -65,7 +67,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) @volatile private var hasLaunchedTask = false private val starvationTimer = new Timer(true) - // Incrementing Mesos task IDs + // Incrementing task IDs val nextTaskId = new AtomicLong(0) // Which executor IDs we have executors on @@ -96,6 +98,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val schedulingMode: SchedulingMode = SchedulingMode.withName( System.getProperty("spark.scheduler.mode", "FIFO")) + // This is a var so that we can reset it for testing purposes. + private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this) + override def setListener(listener: TaskSchedulerListener) { this.listener = listener } @@ -234,7 +239,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { - var taskSetToUpdate: Option[TaskSetManager] = None var failedExecutor: Option[String] = None var taskFailed = false synchronized { @@ -249,9 +253,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } taskIdToTaskSetId.get(tid) match { case Some(taskSetId) => - if (activeTaskSets.contains(taskSetId)) { - taskSetToUpdate = Some(activeTaskSets(taskSetId)) - } if (TaskState.isFinished(state)) { taskIdToTaskSetId.remove(tid) if (taskSetTaskIds.contains(taskSetId)) { @@ -262,6 +263,15 @@ private[spark] class ClusterScheduler(val sc: SparkContext) if (state == TaskState.FAILED) { taskFailed = true } + activeTaskSets.get(taskSetId).foreach { taskSet => + if (state == TaskState.FINISHED) { + taskSet.removeRunningTask(tid) + taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) + } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { + taskSet.removeRunningTask(tid) + taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) + } + } case None => logInfo("Ignoring update from TID " + tid + " because its task set is gone") } @@ -269,10 +279,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) case e: Exception => logError("Exception in statusUpdate", e) } } - // Update the task set and DAGScheduler without holding a lock on this, since that can deadlock - if (taskSetToUpdate != None) { - taskSetToUpdate.get.statusUpdate(tid, state, serializedData) - } + // Update the DAGScheduler without holding a lock on this, since that can deadlock if (failedExecutor != None) { listener.executorLost(failedExecutor.get) backend.reviveOffers() @@ -283,6 +290,25 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } + def handleSuccessfulTask( + taskSetManager: ClusterTaskSetManager, + tid: Long, + taskResult: DirectTaskResult[_]) = synchronized { + taskSetManager.handleSuccessfulTask(tid, taskResult) + } + + def handleFailedTask( + taskSetManager: ClusterTaskSetManager, + tid: Long, + taskState: TaskState, + reason: Option[TaskEndReason]) = synchronized { + taskSetManager.handleFailedTask(tid, taskState, reason) + if (taskState == TaskState.FINISHED) { + // The task finished successfully but the result was lost, so we should revive offers. + backend.reviveOffers() + } + } + def error(message: String) { synchronized { if (activeTaskSets.size > 0) { @@ -311,6 +337,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) if (jarServer != null) { jarServer.stop() } + if (taskResultGetter != null) { + taskResultGetter.stop() + } // sleeping for an arbitrary 5 seconds : to ensure that messages are sent out. // TODO: Do something better ! diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala index f61fde6957..194ab55102 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -25,15 +25,12 @@ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet import scala.math.max import scala.math.min +import scala.Some -import org.apache.spark.{FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskState} -import org.apache.spark.{ExceptionFailure, SparkException, TaskResultTooBigFailure} +import org.apache.spark.{ExceptionFailure, FetchFailed, Logging, Resubmitted, SparkEnv, + SparkException, Success, TaskEndReason, TaskResultLost, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler._ -import scala.Some -import org.apache.spark.FetchFailed -import org.apache.spark.ExceptionFailure -import org.apache.spark.TaskResultTooBigFailure import org.apache.spark.util.{SystemClock, Clock} @@ -71,18 +68,20 @@ private[spark] class ClusterTaskSetManager( val tasks = taskSet.tasks val numTasks = tasks.length val copiesRunning = new Array[Int](numTasks) - val finished = new Array[Boolean](numTasks) + val successful = new Array[Boolean](numTasks) val numFailures = new Array[Int](numTasks) val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) - var tasksFinished = 0 + var tasksSuccessful = 0 var weight = 1 var minShare = 0 - var runningTasks = 0 var priority = taskSet.priority var stageId = taskSet.stageId var name = "TaskSet_"+taskSet.stageId.toString - var parent: Schedulable = null + var parent: Pool = null + + var runningTasks = 0 + private val runningTasksSet = new HashSet[Long] // Set of pending tasks for each executor. These collections are actually // treated as stacks, in which new tasks are added to the end of the @@ -223,7 +222,7 @@ private[spark] class ClusterTaskSetManager( while (!list.isEmpty) { val index = list.last list.trimEnd(1) - if (copiesRunning(index) == 0 && !finished(index)) { + if (copiesRunning(index) == 0 && !successful(index)) { return Some(index) } } @@ -243,7 +242,7 @@ private[spark] class ClusterTaskSetManager( private def findSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value) : Option[(Int, TaskLocality.Value)] = { - speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set + speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set if (!speculatableTasks.isEmpty) { // Check for process-local or preference-less tasks; note that tasks can be process-local @@ -344,7 +343,7 @@ private[spark] class ClusterTaskSetManager( maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = { - if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { + if (tasksSuccessful < numTasks && availableCpus >= CPUS_PER_TASK) { val curTime = clock.getTime() var allowedLocality = getAllowedLocalityLevel(curTime) @@ -375,7 +374,7 @@ private[spark] class ClusterTaskSetManager( val serializedTask = Task.serializeWithDependencies( task, sched.sc.addedFiles, sched.sc.addedJars, ser) val timeTaken = clock.getTime() - startTime - increaseRunningTasks(1) + addRunningTask(taskId) logInfo("Serialized task %s:%d as %d bytes in %d ms".format( taskSet.id, index, serializedTask.limit, timeTaken)) val taskName = "task %s:%d".format(taskSet.id, index) @@ -417,94 +416,61 @@ private[spark] class ClusterTaskSetManager( index } - /** Called by cluster scheduler when one of our tasks changes state */ - override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { - SparkEnv.set(env) - state match { - case TaskState.FINISHED => - taskFinished(tid, state, serializedData) - case TaskState.LOST => - taskLost(tid, state, serializedData) - case TaskState.FAILED => - taskLost(tid, state, serializedData) - case TaskState.KILLED => - taskLost(tid, state, serializedData) - case _ => - } - } - - def taskStarted(task: Task[_], info: TaskInfo) { + private def taskStarted(task: Task[_], info: TaskInfo) { sched.listener.taskStarted(task, info) } - def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) { + /** + * Marks the task as successful and notifies the listener that a task has ended. + */ + def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = { val info = taskInfos(tid) - if (info.failed) { - // We might get two task-lost messages for the same task in coarse-grained Mesos mode, - // or even from Mesos itself when acks get delayed. - return - } val index = info.index info.markSuccessful() - decreaseRunningTasks(1) - if (!finished(index)) { - tasksFinished += 1 + removeRunningTask(tid) + if (!successful(index)) { logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format( - tid, info.duration, info.host, tasksFinished, numTasks)) - // Deserialize task result and pass it to the scheduler - try { - val result = ser.deserialize[TaskResult[_]](serializedData) - result.metrics.resultSize = serializedData.limit() - sched.listener.taskEnded( - tasks(index), Success, result.value, result.accumUpdates, info, result.metrics) - } catch { - case cnf: ClassNotFoundException => - val loader = Thread.currentThread().getContextClassLoader - throw new SparkException("ClassNotFound with classloader: " + loader, cnf) - case ex: Throwable => throw ex - } - // Mark finished and stop if we've finished all the tasks - finished(index) = true - if (tasksFinished == numTasks) { + tid, info.duration, info.host, tasksSuccessful, numTasks)) + sched.listener.taskEnded( + tasks(index), Success, result.value, result.accumUpdates, info, result.metrics) + + // Mark successful and stop if all the tasks have succeeded. + tasksSuccessful += 1 + successful(index) = true + if (tasksSuccessful == numTasks) { sched.taskSetFinished(this) } } else { - logInfo("Ignoring task-finished event for TID " + tid + - " because task " + index + " is already finished") + logInfo("Ignorning task-finished event for TID " + tid + " because task " + + index + " has already completed successfully") } } - def taskLost(tid: Long, state: TaskState, serializedData: ByteBuffer) { + /** + * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the listener. + */ + def handleFailedTask(tid: Long, state: TaskState, reason: Option[TaskEndReason]) { val info = taskInfos(tid) if (info.failed) { - // We might get two task-lost messages for the same task in coarse-grained Mesos mode, - // or even from Mesos itself when acks get delayed. return } + removeRunningTask(tid) val index = info.index info.markFailed() - decreaseRunningTasks(1) - if (!finished(index)) { + if (!successful(index)) { logInfo("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index)) copiesRunning(index) -= 1 // Check if the problem is a map output fetch failure. In that case, this // task will never succeed on any node, so tell the scheduler about it. - if (serializedData != null && serializedData.limit() > 0) { - val reason = ser.deserialize[TaskEndReason](serializedData, getClass.getClassLoader) - reason match { + reason.foreach { + _ match { case fetchFailed: FetchFailed => logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress) sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null) - finished(index) = true - tasksFinished += 1 + successful(index) = true + tasksSuccessful += 1 sched.taskSetFinished(this) - decreaseRunningTasks(runningTasks) - return - - case taskResultTooBig: TaskResultTooBigFailure => - logInfo("Loss was due to task %s result exceeding Akka frame size; aborting job".format( - tid)) - abort("Task %s result exceeded Akka frame size".format(tid)) + removeAllRunningTasks() return case ef: ExceptionFailure => @@ -534,13 +500,16 @@ private[spark] class ClusterTaskSetManager( logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount)) } + case TaskResultLost => + logInfo("Lost result for TID %s on host %s".format(tid, info.host)) + sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null) + case _ => {} } } // On non-fetch failures, re-enqueue the task as pending for a max number of retries addPendingTask(index) - // Count failed attempts only on FAILED and LOST state (not on KILLED) - if (state == TaskState.FAILED || state == TaskState.LOST) { + if (state != TaskState.KILLED) { numFailures(index) += 1 if (numFailures(index) > MAX_TASK_FAILURES) { logError("Task %s:%d failed more than %d times; aborting job".format( @@ -564,22 +533,36 @@ private[spark] class ClusterTaskSetManager( causeOfFailure = message // TODO: Kill running tasks if we were not terminated due to a Mesos error sched.listener.taskSetFailed(taskSet, message) - decreaseRunningTasks(runningTasks) + removeAllRunningTasks() sched.taskSetFinished(this) } - override def increaseRunningTasks(taskNum: Int) { - runningTasks += taskNum - if (parent != null) { - parent.increaseRunningTasks(taskNum) + /** If the given task ID is not in the set of running tasks, adds it. + * + * Used to keep track of the number of running tasks, for enforcing scheduling policies. + */ + def addRunningTask(tid: Long) { + if (runningTasksSet.add(tid) && parent != null) { + parent.increaseRunningTasks(1) + } + runningTasks = runningTasksSet.size + } + + /** If the given task ID is in the set of running tasks, removes it. */ + def removeRunningTask(tid: Long) { + if (runningTasksSet.remove(tid) && parent != null) { + parent.decreaseRunningTasks(1) } + runningTasks = runningTasksSet.size } - override def decreaseRunningTasks(taskNum: Int) { - runningTasks -= taskNum + private def removeAllRunningTasks() { + val numRunningTasks = runningTasksSet.size + runningTasksSet.clear() if (parent != null) { - parent.decreaseRunningTasks(taskNum) + parent.decreaseRunningTasks(numRunningTasks) } + runningTasks = 0 } override def getSchedulableByName(name: String): Schedulable = { @@ -615,10 +598,10 @@ private[spark] class ClusterTaskSetManager( if (tasks(0).isInstanceOf[ShuffleMapTask]) { for ((tid, info) <- taskInfos if info.executorId == execId) { val index = taskInfos(tid).index - if (finished(index)) { - finished(index) = false + if (successful(index)) { + successful(index) = false copiesRunning(index) -= 1 - tasksFinished -= 1 + tasksSuccessful -= 1 addPendingTask(index) // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. @@ -628,7 +611,7 @@ private[spark] class ClusterTaskSetManager( } // Also re-enqueue any tasks that were running on the node for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { - taskLost(tid, TaskState.KILLED, null) + handleFailedTask(tid, TaskState.KILLED, None) } } @@ -641,13 +624,13 @@ private[spark] class ClusterTaskSetManager( */ override def checkSpeculatableTasks(): Boolean = { // Can't speculate if we only have one task, or if all tasks have finished. - if (numTasks == 1 || tasksFinished == numTasks) { + if (numTasks == 1 || tasksSuccessful == numTasks) { return false } var foundTasks = false val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation) - if (tasksFinished >= minFinishedForSpeculation) { + if (tasksSuccessful >= minFinishedForSpeculation) { val time = clock.getTime() val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray Arrays.sort(durations) @@ -658,7 +641,7 @@ private[spark] class ClusterTaskSetManager( logDebug("Task length threshold for speculation: " + threshold) for ((tid, info) <- taskInfos) { val index = info.index - if (!finished(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && + if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && !speculatableTasks.contains(index)) { logInfo( "Marking task %s:%d (on %s) as speculatable because it ran more than %.0f ms".format( @@ -672,7 +655,7 @@ private[spark] class ClusterTaskSetManager( } override def hasPendingTasks(): Boolean = { - numTasks > 0 && tasksFinished < numTasks + numTasks > 0 && tasksSuccessful < numTasks } private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala index 9c36d221f6..c0b836bf1a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster import java.nio.ByteBuffer import org.apache.spark.TaskState.TaskState +import org.apache.spark.scheduler.TaskDescription import org.apache.spark.util.{Utils, SerializableBuffer} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 49f668eb32..b6f0ec961a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -28,6 +28,7 @@ import akka.pattern.ask import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent} import org.apache.spark.{SparkException, Logging, TaskState} +import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._ import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala new file mode 100644 index 0000000000..feec8ecfe4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import java.nio.ByteBuffer +import java.util.concurrent.{LinkedBlockingDeque, ThreadFactory, ThreadPoolExecutor, TimeUnit} + +import org.apache.spark._ +import org.apache.spark.TaskState.TaskState +import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult} +import org.apache.spark.serializer.SerializerInstance + +/** + * Runs a thread pool that deserializes and remotely fetches (if necessary) task results. + */ +private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler) + extends Logging { + private val MIN_THREADS = System.getProperty("spark.resultGetter.minThreads", "4").toInt + private val MAX_THREADS = System.getProperty("spark.resultGetter.maxThreads", "4").toInt + private val getTaskResultExecutor = new ThreadPoolExecutor( + MIN_THREADS, + MAX_THREADS, + 0L, + TimeUnit.SECONDS, + new LinkedBlockingDeque[Runnable], + new ResultResolverThreadFactory) + + class ResultResolverThreadFactory extends ThreadFactory { + private var counter = 0 + private var PREFIX = "Result resolver thread" + + override def newThread(r: Runnable): Thread = { + val thread = new Thread(r, "%s-%s".format(PREFIX, counter)) + counter += 1 + thread.setDaemon(true) + return thread + } + } + + protected val serializer = new ThreadLocal[SerializerInstance] { + override def initialValue(): SerializerInstance = { + return sparkEnv.closureSerializer.newInstance() + } + } + + def enqueueSuccessfulTask( + taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) { + getTaskResultExecutor.execute(new Runnable { + override def run() { + try { + val result = serializer.get().deserialize[TaskResult[_]](serializedData) match { + case directResult: DirectTaskResult[_] => directResult + case IndirectTaskResult(blockId) => + logDebug("Fetching indirect task result for TID %s".format(tid)) + val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId) + if (!serializedTaskResult.isDefined) { + /* We won't be able to get the task result if the machine that ran the task failed + * between when the task ended and when we tried to fetch the result, or if the + * block manager had to flush the result. */ + scheduler.handleFailedTask( + taskSetManager, tid, TaskState.FINISHED, Some(TaskResultLost)) + return + } + val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]]( + serializedTaskResult.get) + sparkEnv.blockManager.master.removeBlock(blockId) + deserializedResult + } + result.metrics.resultSize = serializedData.limit() + scheduler.handleSuccessfulTask(taskSetManager, tid, result) + } catch { + case cnf: ClassNotFoundException => + val loader = Thread.currentThread.getContextClassLoader + taskSetManager.abort("ClassNotFound with classloader: " + loader) + case ex => + taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex)) + } + } + }) + } + + def enqueueFailedTask(taskSetManager: ClusterTaskSetManager, tid: Long, taskState: TaskState, + serializedData: ByteBuffer) { + var reason: Option[TaskEndReason] = None + getTaskResultExecutor.execute(new Runnable { + override def run() { + try { + if (serializedData != null && serializedData.limit() > 0) { + reason = Some(serializer.get().deserialize[TaskEndReason]( + serializedData, getClass.getClassLoader)) + } + } catch { + case cnd: ClassNotFoundException => + // Log an error but keep going here -- the task failed, so not catastropic if we can't + // deserialize the reason. + val loader = Thread.currentThread.getContextClassLoader + logError( + "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) + case ex => {} + } + scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) + } + }) + } + + def stop() { + getTaskResultExecutor.shutdownNow() + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index babe875fa1..bf4040fafc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -15,22 +15,22 @@ * limitations under the License. */ -package org.apache.spark.scheduler.mesos +package org.apache.spark.scheduler.cluster.mesos -import com.google.protobuf.ByteString +import java.io.File +import java.util.{ArrayList => JArrayList, List => JList} +import java.util.Collections + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.JavaConversions._ +import com.google.protobuf.ByteString import org.apache.mesos.{Scheduler => MScheduler} import org.apache.mesos._ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} -import org.apache.spark.{SparkException, Logging, SparkContext} -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.collection.JavaConversions._ -import java.io.File -import org.apache.spark.scheduler.cluster._ -import java.util.{ArrayList => JArrayList, List => JList} -import java.util.Collections -import org.apache.spark.TaskState +import org.apache.spark.{SparkException, Logging, SparkContext, TaskState} +import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend} /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 541f86e338..50cbc2ca92 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -15,22 +15,24 @@ * limitations under the License. */ -package org.apache.spark.scheduler.mesos +package org.apache.spark.scheduler.cluster.mesos -import com.google.protobuf.ByteString +import java.io.File +import java.util.{ArrayList => JArrayList, List => JList} +import java.util.Collections + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.JavaConversions._ +import com.google.protobuf.ByteString import org.apache.mesos.{Scheduler => MScheduler} import org.apache.mesos._ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} -import org.apache.spark.{SparkException, Logging, SparkContext} -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.collection.JavaConversions._ -import java.io.File -import org.apache.spark.scheduler.cluster._ -import java.util.{ArrayList => JArrayList, List => JList} -import java.util.Collections -import org.apache.spark.TaskState +import org.apache.spark.{Logging, SparkException, SparkContext, TaskState} +import org.apache.spark.scheduler.TaskDescription +import org.apache.spark.scheduler.cluster.{ClusterScheduler, ExecutorExited, ExecutorLossReason} +import org.apache.spark.scheduler.cluster.{SchedulerBackend, SlaveLost, WorkerOffer} import org.apache.spark.util.Utils /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala index 8cb4d1396f..4d1bb1c639 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala @@ -31,8 +31,7 @@ import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.ExecutorURLClassLoader import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster._ -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import akka.actor._ import org.apache.spark.util.Utils @@ -92,7 +91,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: var rootPool: Pool = null val schedulingMode: SchedulingMode = SchedulingMode.withName( System.getProperty("spark.scheduler.mode", "FIFO")) - val activeTaskSets = new HashMap[String, TaskSetManager] + val activeTaskSets = new HashMap[String, LocalTaskSetManager] val taskIdToTaskSetId = new HashMap[Long, String] val taskSetTaskIds = new HashMap[String, HashSet[Long]] @@ -211,7 +210,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: deserializedTask.metrics.get.executorRunTime = serviceTime.toInt deserializedTask.metrics.get.jvmGCTime = getTotalGCTime - startGCTime deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt - val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null)) + val taskResult = new DirectTaskResult( + result, accumUpdates, deserializedTask.metrics.getOrElse(null)) val serializedResult = ser.serialize(taskResult) localActor ! LocalStatusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala index e52cb998bd..c2e2399ccb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala @@ -21,16 +21,16 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, Success, TaskState} +import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, SparkException, Success, TaskState} import org.apache.spark.TaskState.TaskState -import org.apache.spark.scheduler.{Task, TaskResult, TaskSet} -import org.apache.spark.scheduler.cluster.{Schedulable, TaskDescription, TaskInfo, TaskLocality, TaskSetManager} +import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Pool, Schedulable, Task, + TaskDescription, TaskInfo, TaskLocality, TaskResult, TaskSet, TaskSetManager} private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet) extends TaskSetManager with Logging { - var parent: Schedulable = null + var parent: Pool = null var weight: Int = 1 var minShare: Int = 0 var runningTasks: Int = 0 @@ -49,14 +49,14 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas val numFailures = new Array[Int](numTasks) val MAX_TASK_FAILURES = sched.maxFailures - override def increaseRunningTasks(taskNum: Int): Unit = { + def increaseRunningTasks(taskNum: Int): Unit = { runningTasks += taskNum if (parent != null) { parent.increaseRunningTasks(taskNum) } } - override def decreaseRunningTasks(taskNum: Int): Unit = { + def decreaseRunningTasks(taskNum: Int): Unit = { runningTasks -= taskNum if (parent != null) { parent.decreaseRunningTasks(taskNum) @@ -132,7 +132,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas return None } - override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { + def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { SparkEnv.set(env) state match { case TaskState.FINISHED => @@ -152,7 +152,12 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas val index = info.index val task = taskSet.tasks(index) info.markSuccessful() - val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader) + val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader) match { + case directResult: DirectTaskResult[_] => directResult + case IndirectTaskResult(blockId) => { + throw new SparkException("Expect only DirectTaskResults when using LocalScheduler") + } + } result.metrics.resultSize = serializedData.limit() sched.listener.taskEnded(task, Success, result.value, result.accumUpdates, info, result.metrics) numFinished += 1 diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 13b98a51a1..7852849ce5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -484,7 +484,7 @@ private[spark] class BlockManager( for (loc <- locations) { logDebug("Getting remote block " + blockId + " from " + loc) val data = BlockManagerWorker.syncGetBlock( - GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) + GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) if (data != null) { return Some(dataDeserialize(blockId, data)) } @@ -495,10 +495,45 @@ private[spark] class BlockManager( } /** + * Get block from remote block managers as serialized bytes. + */ + def getRemoteBytes(blockId: String): Option[ByteBuffer] = { + // TODO: As with getLocalBytes, this is very similar to getRemote and perhaps should be + // refactored. + if (blockId == null) { + throw new IllegalArgumentException("Block Id is null") + } + logDebug("Getting remote block " + blockId + " as bytes") + + val locations = master.getLocations(blockId) + for (loc <- locations) { + logDebug("Getting remote block " + blockId + " from " + loc) + val data = BlockManagerWorker.syncGetBlock( + GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) + if (data != null) { + return Some(data) + } + logDebug("The value of block " + blockId + " is null") + } + logDebug("Block " + blockId + " not found") + return None + } + + /** * Get a block from the block manager (either local or remote). */ def get(blockId: String): Option[Iterator[Any]] = { - getLocal(blockId).orElse(getRemote(blockId)) + val local = getLocal(blockId) + if (local.isDefined) { + logInfo("Found block %s locally".format(blockId)) + return local + } + val remote = getRemote(blockId) + if (remote.isDefined) { + logInfo("Found block %s remotely".format(blockId)) + return remote + } + None } /** diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 3b3b2342fa..77a39c71ed 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -30,10 +30,10 @@ import org.apache.spark.util.{SizeEstimator, Utils} private class MemoryStore(blockManager: BlockManager, maxMemory: Long) extends BlockStore(blockManager) { - case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false) + case class Entry(value: Any, size: Long, deserialized: Boolean) private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true) - private var currentMemory = 0L + @volatile private var currentMemory = 0L // Object used to ensure that only one thread is putting blocks and if necessary, dropping // blocks from the memory store. private val putLock = new Object() @@ -110,9 +110,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def remove(blockId: String): Boolean = { entries.synchronized { - val entry = entries.get(blockId) + val entry = entries.remove(blockId) if (entry != null) { - entries.remove(blockId) currentMemory -= entry.size logInfo("Block %s of size %d dropped from memory (free %d)".format( blockId, entry.size, freeMemory)) @@ -126,6 +125,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def clear() { entries.synchronized { entries.clear() + currentMemory = 0 } logInfo("MemoryStore cleared") } @@ -160,8 +160,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) putLock.synchronized { if (ensureFreeSpace(blockId, size)) { val entry = new Entry(value, size, deserialized) - entries.synchronized { entries.put(blockId, entry) } - currentMemory += size + entries.synchronized { + entries.put(blockId, entry) + currentMemory += size + } if (deserialized) { logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala index 3ec9760ed0..453394dfda 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala @@ -21,7 +21,7 @@ import scala.util.Random import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ -import org.apache.spark.scheduler.cluster.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode /** diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index d1868dcf78..42e9be6e19 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -26,8 +26,8 @@ import org.eclipse.jetty.server.Handler import org.apache.spark.{ExceptionFailure, Logging, SparkContext} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.scheduler.cluster.TaskInfo import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListenerTaskEnd, SparkListener} +import org.apache.spark.scheduler.TaskInfo import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.Page.Executors import org.apache.spark.ui.UIUtils diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 3b428effaf..b39c0e9769 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.{NodeSeq, Node} -import org.apache.spark.scheduler.cluster.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode import org.apache.spark.ui.Page._ import org.apache.spark.ui.UIUtils._ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 5d46f38a2a..eb3b4e8522 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -21,10 +21,8 @@ import scala.Seq import scala.collection.mutable.{ListBuffer, HashMap, HashSet} import org.apache.spark.{ExceptionFailure, SparkContext, Success} -import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.TaskInfo import org.apache.spark.executor.TaskMetrics -import collection.mutable +import org.apache.spark.scheduler._ /** * Tracks task-level information to be displayed in the UI. diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala index 54e273fd8b..c1ee2f3d00 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala @@ -31,8 +31,9 @@ import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.{ExceptionFailure, SparkContext, Success} import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.SchedulingMode -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import collection.mutable +import org.apache.spark.scheduler.SchedulingMode +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.util.Utils /** Web UI showing progress status of all jobs in the given SparkContext. */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index b3d3666944..06810d8dbc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -21,8 +21,7 @@ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet import scala.xml.Node -import org.apache.spark.scheduler.Stage -import org.apache.spark.scheduler.cluster.Schedulable +import org.apache.spark.scheduler.{Schedulable, Stage} import org.apache.spark.ui.UIUtils /** Table showing list of pools */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index a9969ab1c0..163a3746ea 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -23,12 +23,12 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node +import org.apache.spark.{ExceptionFailure} +import org.apache.spark.executor.TaskMetrics import org.apache.spark.ui.UIUtils._ import org.apache.spark.ui.Page._ import org.apache.spark.util.{Utils, Distribution} -import org.apache.spark.{ExceptionFailure} -import org.apache.spark.scheduler.cluster.TaskInfo -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler.TaskInfo /** Page showing statistics and task list for a given stage */ private[spark] class StagePage(parent: JobProgressUI) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 32776eaa25..07db8622da 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -22,8 +22,7 @@ import java.util.Date import scala.xml.Node import scala.collection.mutable.HashSet -import org.apache.spark.scheduler.cluster.{SchedulingMode, TaskInfo} -import org.apache.spark.scheduler.Stage +import org.apache.spark.scheduler.{SchedulingMode, Stage, TaskInfo} import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index be215fc127..94ce50e964 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -77,6 +77,19 @@ private[spark] object Utils extends Logging { return ois.readObject.asInstanceOf[T] } + /** Deserialize a Long value (used for {@link org.apache.spark.api.python.PythonPartitioner}) */ + def deserializeLongValue(bytes: Array[Byte]) : Long = { + // Note: we assume that we are given a Long value encoded in network (big-endian) byte order + var result = bytes(7) & 0xFFL + result = result + ((bytes(6) & 0xFFL) << 8) + result = result + ((bytes(5) & 0xFFL) << 16) + result = result + ((bytes(4) & 0xFFL) << 24) + result = result + ((bytes(3) & 0xFFL) << 32) + result = result + ((bytes(2) & 0xFFL) << 40) + result = result + ((bytes(1) & 0xFFL) << 48) + result + ((bytes(0) & 0xFFL) << 56) + } + /** Serialize via nested stream using specific serializer */ def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(f: SerializationStream => Unit) = { val osWrapper = ser.serializeStream(new OutputStream { |